Skip to content
Snippets Groups Projects
Commit 276fe7a7 authored by Akrem Sellami's avatar Akrem Sellami
Browse files

Upload New File

parent 9109a372
No related branches found
No related tags found
No related merge requests found
"""""""""""""""
Monomodal Autoenconder code
"""""""""""""""
import keras
from keras.layers import Input, Dense, concatenate
from keras.models import Model
from keras.layers import Dropout
from keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import sys as os
import os
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error
from math import sqrt
from keras.optimizers import SGD, Adadelta, Adam
import numpy as np
from sklearn.model_selection import KFold
from keras.callbacks import ModelCheckpoint
from keras.models import load_model
# loading multimodal fMRI data
def load_data(sub, view):
# Import Task fMRI data
if view == 1:
view_tfmri = np.load(os.path.join(path, "tfmri/{}/gii_matrix_fsaverage5.npy".format(sub)))
return view_tfmri
# Import Resting-State fMRI data
if view == 2:
view_rsfmri = np.load(os.path.join(path, "rsfmri/{}/correlation_matrix_fsaverage5.npy".format(sub)))
return view_rsfmri
# Import concatenated fMRI data
if view ==3:
view_rsfmri = np.load(os.path.join(path, "rsfmri/{}/correlation_matrix_fsaverage5.npy".format(sub)))
view_tfmri = np.load(os.path.join(path, "tfmri/{}/gii_matrix_fsaverage5.npy".format(sub)))
fmri_data =np.concatenate([view_tfmri, view_rsfmri], axis=1)
return fmri_data
# Path
path = "/home/asellami/data_fsaverage5"
print('View 1: task-fMRI')
print('View 2: resting-state fMRI')
print('View=3: concatenated views (task-fMRI + rest-fMRI)')
# view =1: tfmri, view =2: rsfmri, view=3: concatenated views (task-fMRI + rest-fMRI)
view=1
# activation functions
hidden_layer='linear'
output_layer='linear'
# MSE
mse_train = []
mse_test = []
# RMSE
rmse_train = []
rmse_test = []
#
# Standard deviation MSE
std_mse_train = []
std_mse_test = []
# Standard deviation RMSE
std_rmse_train = []
std_rmse_test = []
# missing data
missing_data=[36]
index_subjects=np.arange(3,43)
index_subjects = np.delete(index_subjects, np.argwhere(index_subjects == missing_data))
#ar = np.arange(75,101)
#dimensions = ar[ar%2==0]
dimensions=[2, 6, 10, 16, 20, 26, 30, 36, 40, 46, 50, 56, 60, 66, 70, 76, 80, 86, 90, 96, 100]
batch_1=dimensions[0:6]
batch_2=dimensions[6:12]
batch_3=dimensions[12:17]
batch_4=dimensions[17:21]
for dim in batch_1:
# create directory
directory = '{}'.format(dim)
if not os.path.exists(directory):
os.makedirs(directory)
# Cross Validation
kf = KFold(n_splits=10)
print(kf.get_n_splits(index_subjects))
print("number of splits:", kf)
print("number of features:", dimensions)
cvscores_mse_test = []
cvscores_rmse_test = []
cvscores_mse_train = []
cvscores_rmse_train = []
fold = 0
for train_index, test_index in kf.split(index_subjects):
fold += 1
# create directory
directory = '{}/fold_{}'.format(dim, fold)
if not os.path.exists(directory):
os.makedirs(directory)
print(f"Fold #{fold}")
print("TRAIN:", index_subjects[train_index], "TEST:", index_subjects[test_index])
# load training and testing data
print('Load training data... (view {})'.format(view))
train_data = np.concatenate([load_data(sub, view) for sub in index_subjects[train_index]])
print("Shape of the training data:", train_data.shape)
print('Load testdata... (view {})'.format(view))
test_data = np.concatenate([load_data(sub, view) for sub in index_subjects[test_index]])
print("Shape of the test data:", test_data.shape)
# Data normalization to range [-1, 1]
print("Data normalization to range [0, 1]")
scaler = MinMaxScaler()
normalized_train_data = scaler.fit_transform(train_data)
normalized_test_data = scaler.fit_transform(test_data)
# Apply linear autoencoder
# Inputs Shape
input_train_data = Input(shape=(normalized_train_data[0].shape))
# Create linear AE
encoded = Dense(110, activation=hidden_layer)(input_train_data)
encoded = Dense(dim, activation=hidden_layer)(encoded)
decoded = Dense(110, activation=hidden_layer)(encoded)
decoded = Dense(normalized_train_data[0].shape[0], activation=output_layer)(decoded)
# This model maps an input to its reconstruction
autoencoder = Model(input_train_data, decoded)
adam= Adam(lr=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False)
autoencoder.compile(optimizer=adam, loss='mse')
print(autoencoder.summary())
# fit Autoencoder on training set
history = autoencoder.fit(normalized_train_data, normalized_train_data,
epochs=70,
batch_size=1000,
shuffle=True,
validation_data=(normalized_test_data, normalized_test_data), verbose=1)
# list all data in history
print(history.history.keys())
# use our encoded layer to encode the training input
encoder = Model(input_train_data, encoded)
# create a placeholder for an encoded (32-dimensional) input
encoded_input = Input(shape=(dim,))
# retrieve the last layer of the autoencoder model
decoder_layer_1 = autoencoder.layers[-2]
decoder_layer_2 = autoencoder.layers[-1]
# create the decoder model
decoder = Model(encoded_input, decoder_layer_2(decoder_layer_1(encoded_input)))
# save models
autoencoder.save('{}/fold_{}/autoencoder.h5'.format(dim, fold))
encoder.save('{}/fold_{}/encoder.h5'.format(dim, fold))
decoder.save('{}/fold_{}/decoder.h5'.format(dim, fold))
# plot our loss
plt.plot(history.history['loss'], label='loss_fold_{}'.format(fold))
plt.plot(history.history['val_loss'], label='val_loss_fold_{}'.format(fold))
print("vector of val_loss", history.history['val_loss'])
plt.title('model train vs validation loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend()
plt.savefig('{}/fold_{}/loss.png'.format(dim, fold))
plt.savefig('{}/fold_{}/loss.pdf'.format(dim, fold))
plt.close()
# Apply the mapping (transform) to both the training set and the test set
X_train_AE = encoder.predict(normalized_train_data)
X_test_AE = encoder.predict(normalized_test_data)
print("Original shape: ", normalized_train_data.shape)
print("Transformed shape:", X_train_AE.shape)
# Reconstruction of training data
print("Reconstruction of training data... ")
X_train_new = autoencoder.predict(normalized_train_data)
print("Max value of predicted training data", np.max(X_train_new))
print("Min value of predicted training data", np.min(X_train_new))
print("Reconstructed matrix shape:", X_train_new.shape)
val_mse_train = mean_squared_error(normalized_train_data, X_train_new)
print('Value of MSE (train) : ', val_mse_train)
cvscores_mse_train.append(val_mse_train)
val_rmse = sqrt(val_mse_train)
print('Value of RMSE (train) : ', val_rmse)
cvscores_rmse_train.append(val_rmse)
# Reconstruction of test data
print("Reconstruction of test data... ")
X_test_new = autoencoder.predict(normalized_test_data)
print("Max value of predicted test data", np.max(X_test_new))
print("Min value of predicted test data", np.min(X_test_new))
print("Reconstructed matrix shape:", X_test_new.shape)
val_mse = mean_squared_error(normalized_test_data, X_test_new)
cvscores_mse_test.append(val_mse)
print('Value of MSE (test) : ', val_mse)
val_rmse = sqrt(val_mse)
print('Value of MSE (test) : ', val_rmse)
cvscores_rmse_test.append(val_rmse)
# transform and save latent representation
print("shape of mse vector (train):", np.array([cvscores_mse_train]).shape)
print(cvscores_mse_train)
np.save('{}/cvscores_mse_train.npy'.format(dim), np.array([cvscores_mse_train]))
print("shape of mse vector(test):", np.array([cvscores_mse_test]).shape)
print(cvscores_mse_test)
np.save('{}/cvscores_mse_test.npy'.format(dim), np.array([cvscores_mse_test]))
print("shape of rmse vector (train):", np.array([cvscores_rmse_train]).shape)
print(cvscores_rmse_train)
np.save('{}/cvscores_rmse_train.npy'.format(dim), np.array([cvscores_rmse_train]))
print("shape of rmse vector (test):", np.array([cvscores_rmse_test]).shape)
print(cvscores_rmse_test)
np.save('{}/cvscores_rmse_test.npy'.format(dim), np.array([cvscores_rmse_test]))
print("%.3f%% (+/- %.5f%%)" % (np.mean(cvscores_mse_test), np.std(cvscores_mse_test)))
mse_train.append(np.mean(cvscores_mse_train))
std_mse_train.append(np.std(cvscores_mse_train))
mse_test.append(np.mean(cvscores_mse_test))
std_mse_test.append(np.std(cvscores_mse_test))
rmse_train.append(np.mean(cvscores_rmse_train))
std_rmse_train.append(np.std(cvscores_rmse_train))
rmse_test.append(np.mean(cvscores_rmse_test))
std_rmse_test.append(np.std(cvscores_rmse_test))
# save MSE, RMSE, and STD vectors for training and test sets
np.save('mse_train_mean.npy', np.array([mse_train]))
np.save('rmse_train_mean.npy', np.array([rmse_train]))
np.save('std_mse_train_mean.npy', np.array([std_mse_train]))
np.save('std_rmse_train_mean.npy', np.array([std_rmse_train]))
np.save('mse_test_mean.npy', np.array([mse_test]))
np.save('rmse_test_mean.npy', np.array([rmse_test]))
np.save('std_mse_test_mean.npy', np.array([std_mse_test]))
np.save('std_rmse_test_mean.npy', np.array([std_rmse_test]))
# plotting the mse train
# setting x and y axis range
# plotting the mse train
plt.plot(dimensions, mse_train, label="mse_train")
plt.plot(dimensions, mse_test, label="mse_test")
plt.xlabel('Encoding dimension')
plt.ylabel('Reconstruction error (MSE)')
# showing legend
plt.legend()
plt.savefig('reconstruction_error_mse.pdf')
plt.savefig('reconstruction_error_mse.png')
plt.close()
# plotting the rmse train
# setting x and y axis range
plt.plot(dimensions, rmse_train, label="rmse_train")
plt.plot(dimensions, rmse_test, label="rmse_test")
plt.xlabel('Encoding dimension')
plt.ylabel('Reconstruction error (RMSE)')
# showing legend
plt.legend()
plt.savefig('reconstruction_error_rmse.pdf')
plt.savefig('reconstruction_error_rmse.png')
plt.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment