Select Git revision
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
-
Emmanuel Bruno authoredEmmanuel Bruno authored
dataset.py 12.51 KiB
import logging
import os
import select
import sys
import errno
import h5py
import numpy as np
from scipy import sparse
# from . import get_multiview_db as DB
class Dataset():
def __init__(self, views=None, labels=None, are_sparse=False,
file_name="dataset.hdf5", view_names=None, path="",
hdf5_file=None, labels_names=None):
if hdf5_file is not None:
self.dataset=hdf5_file
else:
if not os.path.exists(os.path.dirname(os.path.join(path, file_name))):
try:
os.makedirs(os.path.dirname(os.path.join(path, file_name)))
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
dataset_file = h5py.File(os.path.join(path, file_name), "w")
if view_names is None:
view_names = ["View"+str(index) for index in range(len(views))]
if isinstance(are_sparse, bool):
are_sparse = [are_sparse for _ in views]
for view_index, (view_name, view, is_sparse) in enumerate(zip(view_names, views, are_sparse)):
view_dataset = dataset_file.create_dataset("View" + str(view_index),
view.shape,
data=view)
view_dataset.attrs["name"] = view_name
view_dataset.attrs["sparse"] = is_sparse
labels_dataset = dataset_file.create_dataset("Labels",
shape=labels.shape,
data=labels)
if labels_names is None:
labels_names = [str(index) for index in np.unique(labels)]
labels_dataset.attrs["names"] = [label_name.encode()
if not isinstance(label_name, bytes)
else label_name
for label_name in labels_names]
meta_data_grp = dataset_file.create_group("Metadata")
meta_data_grp.attrs["nbView"] = len(views)
meta_data_grp.attrs["nbClass"] = len(np.unique(labels))
meta_data_grp.attrs["datasetLength"] = len(labels)
dataset_file.close()
dataset_file = h5py.File(os.path.join(path, file_name), "r")
self.dataset = dataset_file
self.nb_view = self.dataset.get("Metadata").attrs["nbView"]
self.view_dict = self.get_view_dict()
def get_view_dict(self):
view_dict = {}
for view_index in range(self.nb_view):
view_dict[self.dataset.get("View" + str(view_index)).attrs["name"]] = view_index
return view_dict
def get_label_names(self, decode=True):
if decode:
return [label_name.decode("utf-8")
for label_name in self.dataset.get("Labels").attrs["names"]]
else:
return self.dataset.get("Labels").attrs["names"]
def init_example_indces(self, example_indices=None):
if example_indices is None:
return range(self.dataset.get("Metadata").attrs["datasetLength"])
else:
return example_indices
def get_v(self, view_index, example_indices=None):
example_indices = self.init_example_indces(example_indices)
if type(example_indices) is int:
return self.dataset.get("View" + str(view_index))[example_indices, :]
else:
example_indices = np.array(example_indices)
sorted_indices = np.argsort(example_indices)
example_indices = example_indices[sorted_indices]
if not self.dataset.get("View" + str(view_index)).attrs["sparse"]:
return self.dataset.get("View" + str(view_index))[example_indices, :][
np.argsort(sorted_indices), :]
else:
sparse_mat = sparse.csr_matrix(
(self.dataset.get("View" + str(view_index)).get("data").value,
self.dataset.get("View" + str(view_index)).get("indices").value,
self.dataset.get("View" + str(view_index)).get("indptr").value),
shape=self.dataset.get("View" + str(view_index)).attrs["shape"])[
example_indices, :][
np.argsort(sorted_indices), :]
return sparse_mat
# def copy(self, examples_indices, views_indices, target_dataset):
# new_dataset = Dataset(views=,
# labels=,
# are_sparse=,
# file_name=,
# view_names=,
# path=,
# labels_names=)
# return self.dataset.copy(part_name, target_dataset)
def get_nb_class(self, example_indices=None):
example_indices = self.init_example_indces(example_indices)
return len(np.unique(self.dataset.get("Labels").value[example_indices]))
def get_labels(self, example_indices=None):
example_indices = self.init_example_indces(example_indices)
return self.dataset.get("Labels").value([example_indices])
def copy_view(self, target_dataset=None, source_view_name=None,
target_view_name=None, example_indices=None):
example_indices = self.init_example_indces(example_indices)
new_d_set = target_dataset.create_dataset(target_view_name,
data=self.get_v(self.view_dict[source_view_name],
example_indices=example_indices))
for key, value in self.get_v(self.view_dict[source_view_name]).attrs.items():
new_d_set.attrs[key] = value
def datasets_already_exist(pathF, name, nbCores):
"""Used to check if it's necessary to copy datasets"""
allDatasetExist = True
for coreIndex in range(nbCores):
import os.path
allDatasetExist *= os.path.isfile(
pathF + name + str(coreIndex) + ".hdf5")
return allDatasetExist
def get_v(dataset, view_index, used_indices=None):
"""Used to extract a view as a numpy array or a sparse mat from the HDF5 dataset"""
if used_indices is None:
used_indices = range(dataset.get("Metadata").attrs["datasetLength"])
if type(used_indices) is int:
return dataset.get("View" + str(view_index))[used_indices, :]
else:
used_indices = np.array(used_indices)
sorted_indices = np.argsort(used_indices)
used_indices = used_indices[sorted_indices]
if not dataset.get("View" + str(view_index)).attrs["sparse"]:
return dataset.get("View" + str(view_index))[used_indices, :][
np.argsort(sorted_indices), :]
else:
sparse_mat = sparse.csr_matrix(
(dataset.get("View" + str(view_index)).get("data").value,
dataset.get("View" + str(view_index)).get("indices").value,
dataset.get("View" + str(view_index)).get("indptr").value),
shape=dataset.get("View" + str(view_index)).attrs["shape"])[
used_indices, :][
np.argsort(sorted_indices), :]
return sparse_mat
def get_shape(dataset, view_index):
"""Used to get the dataset shape even if it's sparse"""
if not dataset.get("View" + str(view_index)).attrs["sparse"]:
return dataset.get("View" + str(view_index)).shape
else:
return dataset.get("View" + str(view_index)).attrs["shape"]
def get_value(dataset):
"""Used to get the value of a view in the HDF5 dataset even if it sparse"""
if not dataset.attrs["sparse"]:
return dataset.value
else:
sparse_mat = sparse.csr_matrix((dataset.get("data").value,
dataset.get("indices").value,
dataset.get("indptr").value),
shape=dataset.attrs["shape"])
return sparse_mat
def extract_subset(matrix, used_indices):
"""Used to extract a subset of a matrix even if it's sparse"""
if sparse.issparse(matrix):
new_indptr = np.zeros(len(used_indices) + 1, dtype=int)
oldindptr = matrix.indptr
for exampleIndexIndex, exampleIndex in enumerate(used_indices):
new_indptr[exampleIndexIndex + 1] = new_indptr[exampleIndexIndex] + (
oldindptr[exampleIndex + 1] - oldindptr[exampleIndex])
new_data = np.ones(new_indptr[-1], dtype=bool)
new_indices = np.zeros(new_indptr[-1], dtype=int)
old_indices = matrix.indices
for exampleIndexIndex, exampleIndex in enumerate(used_indices):
new_indices[new_indptr[exampleIndexIndex]:new_indptr[
exampleIndexIndex + 1]] = old_indices[
oldindptr[exampleIndex]:
oldindptr[exampleIndex + 1]]
return sparse.csr_matrix((new_data, new_indices, new_indptr),
shape=(len(used_indices), matrix.shape[1]))
else:
return matrix[used_indices]
def init_multiple_datasets(path_f, name, nb_cores):
r"""Used to create copies of the dataset if multicore computation is used.
This is a temporary solution to fix the sharing memory issue with HDF5 datasets.
Parameters
----------
path_f : string
Path to the original dataset directory
name : string
Name of the dataset
nb_cores : int
The number of threads that the benchmark can use
Returns
-------
datasetFiles : None
Dictionary resuming which mono- and multiview algorithms which will be used in the benchmark.
"""
if nb_cores > 1:
if datasets_already_exist(path_f, name, nb_cores):
logging.debug(
"Info:\t Enough copies of the dataset are already available")
pass
else:
logging.debug("Start:\t Creating " + str(
nb_cores) + " temporary datasets for multiprocessing")
logging.warning(
" WARNING : /!\ This may use a lot of HDD storage space : " +
str(os.path.getsize(path_f + name + ".hdf5") * nb_cores / float(
1024) / 1000 / 1000) + " Gbytes /!\ ")
confirmation = confirm()
if not confirmation:
sys.exit(0)
else:
dataset_files = copy_hdf5(path_f, name, nb_cores)
logging.debug("Start:\t Creating datasets for multiprocessing")
return dataset_files
def copy_hdf5(pathF, name, nbCores):
"""Used to copy a HDF5 database in case of multicore computing"""
datasetFile = h5py.File(pathF + name + ".hdf5", "r")
for coreIndex in range(nbCores):
newDataSet = h5py.File(pathF + name + str(coreIndex) + ".hdf5", "w")
for dataset in datasetFile:
datasetFile.copy("/" + dataset, newDataSet["/"])
newDataSet.close()
def delete_HDF5(benchmarkArgumentsDictionaries, nbCores, DATASET):
"""Used to delete temporary copies at the end of the benchmark"""
if nbCores > 1:
logging.debug("Start:\t Deleting " + str(
nbCores) + " temporary datasets for multiprocessing")
args = benchmarkArgumentsDictionaries[0]["args"]
logging.debug("Start:\t Deleting datasets for multiprocessing")
for coreIndex in range(nbCores):
os.remove(args["Base"]["pathf"] + args["Base"]["name"] + str(coreIndex) + ".hdf5")
filename = DATASET.filename
DATASET.close()
if "_temp_" in filename:
os.remove(filename)
def confirm(resp=True, timeout=15):
"""Used to process answer"""
ans = input_(timeout)
if not ans:
return resp
if ans not in ['y', 'Y', 'n', 'N']:
print('please enter y or n.')
if ans == 'y' or ans == 'Y':
return True
if ans == 'n' or ans == 'N':
return False
def input_(timeout=15):
"""used as a UI to stop if too much HDD space will be used"""
logging.warning("You have " + str(
timeout) + " seconds to stop the dataset copy by typing n")
i, o, e = select.select([sys.stdin], [], [], timeout)
if i:
return sys.stdin.readline().strip()
else:
return "y"
def get_monoview_shared(path, name, view_name, labels_names, classification_indices):
"""ATM is not used with shared memory, but soon :)"""
hdf5_dataset_file = h5py.File(path + name + ".hdf5", "w")
X = hdf5_dataset_file.get(view_name).value
y = hdf5_dataset_file.get("Labels").value
return X, y