Select Git revision
analyzeResult.py
-
Baptiste Bauvin authoredBaptiste Bauvin authored
dataset.py 22.96 KiB
import logging
import os
import select
import sys
import errno
import h5py
import numpy as np
from scipy import sparse
# from . import get_multiview_db as DB
class Dataset():
"""
Class of Dataset
This class is used to encapsulate the multiview dataset
Parameters
----------
views : list of numpy arrays or None
The list containing each view of the dataset as a numpy array of shape
(nb examples, nb features).
labels : numpy array or None
The labels for the multiview dataset, of shape (nb examples, ).
are_sparse : list of bool, or None
The list of boolean telling if each view is sparse or not.
file_name : str, or None
The name of the hdf5 file that will be created to store the multiview
dataset.
view_names : list of str, or None
The name of each view.
path : str, or None
The path where the hdf5 dataset file will be stored
hdf5_file : h5py.File object, or None
If not None, the dataset will be imported directly from this file.
labels_names : list of str, or None
The name for each unique value of the labels given in labels.
is_temp : bool
Used if a temporary dataset has to be used by the benchmark.
Attributes
----------
dataset : h5py.File object
The h5py file pbject that points to the hdf5 dataset on the disk.
nb_view : int
The number of views in the dataset.
view_dict : dict
The dictionnary with the name of each view as the keys and their indices
as values
"""
# The following methods use hdf5
def __init__(self, views=None, labels=None, are_sparse=False,
file_name="dataset.hdf5", view_names=None, path="",
hdf5_file=None, labels_names=None, is_temp=False):
self.is_temp = False
if hdf5_file is not None:
self.dataset=hdf5_file
self.init_attrs()
else:
if not os.path.exists(os.path.dirname(os.path.join(path, file_name))):
try:
os.makedirs(os.path.dirname(os.path.join(path, file_name)))
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
dataset_file = h5py.File(os.path.join(path, file_name), "w")
if view_names is None:
view_names = ["View"+str(index) for index in range(len(views))]
if isinstance(are_sparse, bool):
are_sparse = [are_sparse for _ in views]
for view_index, (view_name, view, is_sparse) in enumerate(zip(view_names, views, are_sparse)):
view_dataset = dataset_file.create_dataset("View" + str(view_index),
view.shape,
data=view)
view_dataset.attrs["name"] = view_name
view_dataset.attrs["sparse"] = is_sparse
labels_dataset = dataset_file.create_dataset("Labels",
shape=labels.shape,
data=labels)
if labels_names is None:
labels_names = [str(index) for index in np.unique(labels)]
labels_dataset.attrs["names"] = [label_name.encode()
if not isinstance(label_name, bytes)
else label_name
for label_name in labels_names]
meta_data_grp = dataset_file.create_group("Metadata")
meta_data_grp.attrs["nbView"] = len(views)
meta_data_grp.attrs["nbClass"] = len(np.unique(labels))
meta_data_grp.attrs["datasetLength"] = len(labels)
dataset_file.close()
self.update_hdf5_dataset(os.path.join(path, file_name))
def rm(self):
"""
Method used to delete the dataset file on the disk if the dataset is
temporary.
Returns
-------
"""
filename = self.dataset.filename
self.dataset.close()
if self.is_temp:
os.remove(filename)
def get_view_name(self, view_idx):
"""
Method to get a view's name for it's index.
Parameters
----------
view_idx : int
The index of the view in the dataset
Returns
-------
The view's name.
"""
return self.dataset["View"+str(view_idx)].attrs["name"]
def init_attrs(self):
"""
Used to init the two attributes that are modified when self.dataset
changes
Returns
-------
"""
self.nb_view = self.dataset["Metadata"].attrs["nbView"]
self.view_dict = self.get_view_dict()
def get_nb_examples(self):
"""
Used to get the number of examples available
Returns
-------
"""
return self.dataset["Metadata"].attrs["datasetLength"]
def get_view_dict(self):
view_dict = {}
for view_index in range(self.nb_view):
view_dict[self.dataset["View" + str(view_index)].attrs["name"]] = view_index
return view_dict
def get_label_names(self, decode=True, example_indices=None):
example_indices = self.init_example_indces(example_indices)
selected_labels = self.get_labels(example_indices)
if decode:
return [label_name.decode("utf-8")
for label, label_name in enumerate(self.dataset["Labels"].attrs["names"])
if label in selected_labels]
else:
return [label_name
for label, label_name in enumerate(self.dataset["Labels"].attrs["names"])
if label in selected_labels]
def init_example_indces(self, example_indices=None):
if example_indices is None:
return range(self.get_nb_examples())
else:
return example_indices
def get_v(self, view_index, example_indices=None):
example_indices = self.init_example_indces(example_indices)
if type(example_indices) is int:
return self.dataset["View" + str(view_index)][example_indices, :]
else:
example_indices = np.array(example_indices)
sorted_indices = np.argsort(example_indices)
example_indices = example_indices[sorted_indices]
if not self.dataset["View" + str(view_index)].attrs["sparse"]:
return self.dataset["View" + str(view_index)][example_indices, :][
np.argsort(sorted_indices), :]
else:
sparse_mat = sparse.csr_matrix(
(self.dataset["View" + str(view_index)]["data"][()],
self.dataset["View" + str(view_index)]["indices"][()],
self.dataset["View" + str(view_index)]["indptr"][()]),
shape=self.dataset["View" + str(view_index)].attrs["shape"])[
example_indices, :][
np.argsort(sorted_indices), :]
return sparse_mat
def get_shape(self, example_indices=None):
return self.get_v(0,example_indices=example_indices).shape
def get_nb_class(self, example_indices=None):
example_indices = self.init_example_indces(example_indices)
return len(np.unique(self.dataset["Labels"][example_indices]))
def get_labels(self, example_indices=None):
example_indices = self.init_example_indces(example_indices)
return self.dataset["Labels"][example_indices]
def copy_view(self, target_dataset=None, source_view_name=None,
target_view_index=None, example_indices=None):
example_indices = self.init_example_indces(example_indices)
new_d_set = target_dataset.create_dataset("View"+str(target_view_index),
data=self.get_v(self.view_dict[source_view_name],
example_indices=example_indices))
for key, value in self.dataset["View"+str(self.view_dict[source_view_name])].attrs.items():
new_d_set.attrs[key] = value
def init_view_names(self, view_names=None):
if view_names is None:
return [key for key in self.get_view_dict().keys()]
else:
return view_names
def update_hdf5_dataset(self, path):
if hasattr(self, 'dataset'):
self.dataset.close()
self.dataset = h5py.File(path, 'r')
self.is_temp = True
self.init_attrs()
def filter(self, labels, label_names, example_indices, view_names, path):
dataset_file_path = os.path.join(path,self.get_name()+"_temp_filter.hdf5")
new_dataset_file = h5py.File(dataset_file_path,"w")
self.dataset.copy("Metadata", new_dataset_file)
new_dataset_file["Metadata"].attrs["datasetLength"] = len(example_indices)
new_dataset_file["Metadata"].attrs["nbClass"] = np.unique(labels)
new_dataset_file.create_dataset("Labels", data=labels)
new_dataset_file["Labels"].attrs["names"] = [label_name.encode()
if not isinstance(label_name, bytes)
else label_name
for label_name in label_names]
view_names = self.init_view_names(view_names)
new_dataset_file["Metadata"].attrs["nbView"] = len(view_names)
for new_index, view_name in enumerate(view_names):
self.copy_view(target_dataset=new_dataset_file,
source_view_name=view_name,
target_view_index=new_index,
example_indices=example_indices)
new_dataset_file.close()
self.update_hdf5_dataset(dataset_file_path)
def add_gaussian_noise(self, random_state, path,
noise_std=0.15):
"""In this function, we add a guaussian noise centered in 0 with specified
std to each view, according to it's range (the noise will be
mutliplied by this range) and we crop the noisy signal according to the
view's attributes limits.
This is done by creating a new dataset, to keep clean data."""
noisy_dataset = h5py.File(path + self.get_name() + "_noised.hdf5", "w")
self.dataset.copy("Metadata", noisy_dataset)
self.dataset.copy("Labels", noisy_dataset)
for view_index in range(self.nb_view):
self.copy_view(target_dataset=noisy_dataset,
source_view_name=self.get_view_name(view_index),
target_view_index=view_index)
for view_index in range(noisy_dataset["Metadata"].attrs["nbView"]):
view_key = "View" + str(view_index)
view_dset = noisy_dataset.get[view_key]
try:
view_limits = self.dataset[
"Metadata/View" + str(view_index) + "_limits"][()]
except:
import pdb;pdb.set_trace()
view_ranges = view_limits[:, 1] - view_limits[:, 0]
normal_dist = random_state.normal(0, noise_std, view_dset[()].shape)
noise = normal_dist * view_ranges
noised_data = view_dset[()] + noise
noised_data = np.where(noised_data < view_limits[:, 0],
view_limits[:, 0], noised_data)
noised_data = np.where(noised_data > view_limits[:, 1],
view_limits[:, 1], noised_data)
noisy_dataset[view_key][...] = noised_data
noisy_dataset_path = noisy_dataset.filename
noisy_dataset.close()
self.update_hdf5_dataset(noisy_dataset_path)
# The following methods are hdf5 free
def to_numpy_array(self, example_indices=None, view_indices=None):
"""
To concanteant the needed views in one big numpy array while saving the
limits of each view in a list, to be bale to retrieve them later.
Parameters
----------
example_indices : array like,
The indices of the examples to extract from the dataset
view_indices : array like,
The indices of the view to concatenate in the numpy array
Returns
-------
concat_views : numpy array,
The numpy array containing all the needed views.
view_limits : list of int
The limits of each slice used to extract the views.
"""
view_limits = [0]
for view_index in view_indices:
view_data = self.get_v(view_index, example_indices=example_indices)
nb_features = view_data.shape[1]
view_limits.append(view_limits[-1]+nb_features)
concat_views = np.concatenate([self.get_v(view_index,
example_indices=example_indices)
for view_index in view_indices], axis=1)
return concat_views, view_limits
def select_views_and_labels(self, nb_labels=None,
selected_label_names=None, random_state=None,
view_names = None, path_for_new="../data/"):
if view_names is None and selected_label_names is None and nb_labels is None:
pass
else:
selected_label_names = self.check_selected_label_names(nb_labels,
selected_label_names,
random_state)
labels, label_names, example_indices = self.select_labels(selected_label_names)
self.filter(labels, label_names, example_indices, view_names, path_for_new)
labels_dictionary = dict(
(labelIndex, labelName) for labelIndex, labelName in
enumerate(self.get_label_names()))
return labels_dictionary
def get_name(self):
"""Ony works if there are not multiple dots in the files name"""
return self.dataset.filename.split('/')[-1].split('.')[0]
def select_labels(self, selected_label_names):
selected_labels = [self.get_label_names().index(label_name.decode())
if isinstance(label_name, bytes)
else self.get_label_names().index(label_name)
for label_name in selected_label_names]
selected_indices = np.array([index
for index, label in enumerate(self.get_labels())
if label in selected_labels])
labels = np.array([selected_labels.index(self.get_labels()[idx])
for idx in selected_indices])
return labels, selected_label_names, selected_indices
def check_selected_label_names(self, nb_labels=None,
selected_label_names=None,
random_state=np.random.RandomState(42)):
if selected_label_names is None or nb_labels is None or len(selected_label_names) < nb_labels:
if selected_label_names is None:
nb_labels_to_add = nb_labels
selected_label_names = []
elif nb_labels is not None:
nb_labels_to_add = nb_labels - len(selected_label_names)
else:
nb_labels_to_add=0
labels_names_to_choose = [available_label_name
for available_label_name
in self.get_label_names()
if available_label_name
not in selected_label_names]
added_labels_names = random_state.choice(labels_names_to_choose,
nb_labels_to_add,
replace=False)
selected_label_names = list(selected_label_names) + list(
added_labels_names)
elif len(selected_label_names) > nb_labels:
selected_label_names = list(
random_state.choice(selected_label_names, nb_labels,
replace=False))
return selected_label_names
def datasets_already_exist(pathF, name, nbCores):
"""Used to check if it's necessary to copy datasets"""
allDatasetExist = True
for coreIndex in range(nbCores):
import os.path
allDatasetExist *= os.path.isfile(
pathF + name + str(coreIndex) + ".hdf5")
return allDatasetExist
# def get_v(dataset, view_index, used_indices=None):
# # """Used to extract a view as a numpy array or a sparse mat from the HDF5 dataset"""
# # if used_indices is None:
# # used_indices = range(dataset.get("Metadata").attrs["datasetLength"])
# # if type(used_indices) is int:
# # return dataset.get("View" + str(view_index))[used_indices, :]
# # else:
# # used_indices = np.array(used_indices)
# # sorted_indices = np.argsort(used_indices)
# # used_indices = used_indices[sorted_indices]
# #
# # if not dataset.get("View" + str(view_index)).attrs["sparse"]:
# # return dataset.get("View" + str(view_index))[used_indices, :][
# # np.argsort(sorted_indices), :]
# # else:
# # sparse_mat = sparse.csr_matrix(
# # (dataset.get("View" + str(view_index)).get("data").value,
# # dataset.get("View" + str(view_index)).get("indices").value,
# # dataset.get("View" + str(view_index)).get("indptr").value),
# # shape=dataset.get("View" + str(view_index)).attrs["shape"])[
# # used_indices, :][
# # np.argsort(sorted_indices), :]
# #
# # return sparse_mat
def get_shape(dataset, view_index):
"""Used to get the dataset shape even if it's sparse"""
if not dataset.get("View" + str(view_index)).attrs["sparse"]:
return dataset.get("View" + str(view_index)).shape
else:
return dataset.get("View" + str(view_index)).attrs["shape"]
def get_value(dataset):
"""Used to get the value of a view in the HDF5 dataset even if it sparse"""
if not dataset.attrs["sparse"]:
return dataset[()]
else:
sparse_mat = sparse.csr_matrix((dataset.get("data")[()],
dataset.get("indices")[()],
dataset.get("indptr")[()]),
shape=dataset.attrs["shape"])
return sparse_mat
def extract_subset(matrix, used_indices):
"""Used to extract a subset of a matrix even if it's sparse"""
if sparse.issparse(matrix):
new_indptr = np.zeros(len(used_indices) + 1, dtype=int)
oldindptr = matrix.indptr
for exampleIndexIndex, exampleIndex in enumerate(used_indices):
new_indptr[exampleIndexIndex + 1] = new_indptr[exampleIndexIndex] + (
oldindptr[exampleIndex + 1] - oldindptr[exampleIndex])
new_data = np.ones(new_indptr[-1], dtype=bool)
new_indices = np.zeros(new_indptr[-1], dtype=int)
old_indices = matrix.indices
for exampleIndexIndex, exampleIndex in enumerate(used_indices):
new_indices[new_indptr[exampleIndexIndex]:new_indptr[
exampleIndexIndex + 1]] = old_indices[
oldindptr[exampleIndex]:
oldindptr[exampleIndex + 1]]
return sparse.csr_matrix((new_data, new_indices, new_indptr),
shape=(len(used_indices), matrix.shape[1]))
else:
return matrix[used_indices]
def init_multiple_datasets(path_f, name, nb_cores):
r"""Used to create copies of the dataset if multicore computation is used.
This is a temporary solution to fix the sharing memory issue with HDF5 datasets.
Parameters
----------
path_f : string
Path to the original dataset directory
name : string
Name of the dataset
nb_cores : int
The number of threads that the benchmark can use
Returns
-------
datasetFiles : None
Dictionary resuming which mono- and multiview algorithms which will be used in the benchmark.
"""
if nb_cores > 1:
if datasets_already_exist(path_f, name, nb_cores):
logging.debug(
"Info:\t Enough copies of the dataset are already available")
pass
else:
logging.debug("Start:\t Creating " + str(
nb_cores) + " temporary datasets for multiprocessing")
logging.warning(
" WARNING : /!\ This may use a lot of HDD storage space : " +
str(os.path.getsize(path_f + name + ".hdf5") * nb_cores / float(
1024) / 1000 / 1000) + " Gbytes /!\ ")
confirmation = confirm()
if not confirmation:
sys.exit(0)
else:
dataset_files = copy_hdf5(path_f, name, nb_cores)
logging.debug("Start:\t Creating datasets for multiprocessing")
return dataset_files
def copy_hdf5(pathF, name, nbCores):
"""Used to copy a HDF5 database in case of multicore computing"""
datasetFile = h5py.File(pathF + name + ".hdf5", "r")
for coreIndex in range(nbCores):
newDataSet = h5py.File(pathF + name + str(coreIndex) + ".hdf5", "w")
for dataset in datasetFile:
datasetFile.copy("/" + dataset, newDataSet["/"])
newDataSet.close()
def delete_HDF5(benchmarkArgumentsDictionaries, nbCores, dataset):
"""Used to delete temporary copies at the end of the benchmark"""
if nbCores > 1:
logging.debug("Start:\t Deleting " + str(
nbCores) + " temporary datasets for multiprocessing")
args = benchmarkArgumentsDictionaries[0]["args"]
logging.debug("Start:\t Deleting datasets for multiprocessing")
for coreIndex in range(nbCores):
os.remove(args["Base"]["pathf"] + args["Base"]["name"] + str(coreIndex) + ".hdf5")
if dataset.is_temp:
dataset.rm()
def confirm(resp=True, timeout=15):
"""Used to process answer"""
ans = input_(timeout)
if not ans:
return resp
if ans not in ['y', 'Y', 'n', 'N']:
print('please enter y or n.')
if ans == 'y' or ans == 'Y':
return True
if ans == 'n' or ans == 'N':
return False
def input_(timeout=15):
"""used as a UI to stop if too much HDD space will be used"""
logging.warning("You have " + str(
timeout) + " seconds to stop the dataset copy by typing n")
i, o, e = select.select([sys.stdin], [], [], timeout)
if i:
return sys.stdin.readline().strip()
else:
return "y"
def get_monoview_shared(path, name, view_name, labels_names, classification_indices):
"""ATM is not used with shared memory, but soon :)"""
hdf5_dataset_file = h5py.File(path + name + ".hdf5", "w")
X = hdf5_dataset_file.get(view_name)[()]
y = hdf5_dataset_file.get("Labels")[()]
return X, y