From 5950be9ccb0abbcdfdae9d80d0a743eb765b8758 Mon Sep 17 00:00:00 2001 From: Baptiste Bauvin <baptiste.bauvin@lis-lab.fr> Date: Mon, 25 Jan 2021 10:54:41 -0500 Subject: [PATCH] Removed gaussian_classed --- multiview_generator/gaussian_classes.py | 320 ------------------------ 1 file changed, 320 deletions(-) delete mode 100644 multiview_generator/gaussian_classes.py diff --git a/multiview_generator/gaussian_classes.py b/multiview_generator/gaussian_classes.py deleted file mode 100644 index eb8a03e..0000000 --- a/multiview_generator/gaussian_classes.py +++ /dev/null @@ -1,320 +0,0 @@ - -import numpy as np -import itertools -import math -from scipy.special import erfinv - -from .utils import format_array, get_config_from_file, \ - init_random_state, init_error_matrix, init_list -from .base_strs import * -from .base import MultiViewSubProblemsGenerator -from multiview_generator import sub_problems - - -class MultiViewGaussianSubProblemsGenerator(MultiViewSubProblemsGenerator): - - def __init__(self, random_state=42, n_samples=100, n_classes=4, n_views=4, - error_matrix=None, latent_size_multiplicator=2, n_features=3, - class_weights=1.0, redundancy=0.1, complementarity=0.1, - complementarity_level=3, - mutual_error=0.2, name="generated_dataset", config_file=None, - sub_problem_type="base", sub_problem_configurations=None, - sub_problem_generators="StumpsGenerator", random_vertices=False - , **kwargs): - """ - - :param random_state: - :param n_samples: - :param n_classes: - :param n_views: - :param error_matrix: - :param latent_size_multiplicator: - :param n_features: - :param class_weights: - :param redundancy: - :param complementarity: - :param complementarity_level: - :param mutual_error: - :param name: - :param config_file: - :param sub_problem_type: - :param sub_problem_configurations: - :param kwargs: - """ - - MultiViewSubProblemsGenerator.__init__(self, random_state=random_state, - n_samples=n_samples, - n_classes=n_classes, - n_views=n_views, - error_matrix=error_matrix, - latent_size_multiplicator=latent_size_multiplicator, - n_features=n_features, - class_weights=class_weights, - redundancy=redundancy, - complementarity=complementarity, - complementarity_level=complementarity_level, - mutual_error=mutual_error, - name=name, - config_file=config_file, - sub_problem_type=sub_problem_type, - F=sub_problem_configurations, - **kwargs) - self.random_vertices = format_array(random_vertices, n_views, bool) - self.sub_problem_generators = format_array(sub_problem_generators, n_views, str) - - - def generate_multi_view_dataset(self, ): - """ - This is the main method. It will generate a multiview dataset according - to the configuration. - To do so, - - * it generates the labels of the multiview dataset, - * then it generates all the subsets of samples (redundant, ...) - * finally, for each view it generates a monview dataset according to the configuration - - - :return: view_data a list contianing the views np.ndarrays and y, the label array. - """ - - # Generate the labels - self.error_2D = np.ones((self.n_samples, self.n_views)) - # Generate the sample descriptions according to the error matrix - self._sub_problem_generators = [_ for _ in range(self.n_views)] - for view_index in range(self.n_views): - sub_problem_generator = getattr(sub_problems, - self.sub_problem_generators[view_index])( - n_classes=self.n_classes, - n_features=self.n_features[view_index], - random_vertices=self.random_vertices[view_index], - errors=self.error_matrix[:,view_index], - random_state=self.rs, - n_samples_per_class=self.n_samples_per_class, - **self.sub_problem_configurations[view_index]) - vec = sub_problem_generator.gen_data() - self._sub_problem_generators[view_index] = sub_problem_generator - self.view_names[view_index] = "view_{}_{}".format(view_index, sub_problem_generator.view_name) - self.bayes_error[view_index, :] = sub_problem_generator.bayes_error - self.generated_data[view_index, :, :,:self.n_features[view_index]] = vec - self.selected_vertices[view_index] = sub_problem_generator.selected_vertices - self.descriptions[view_index, :,:] = sub_problem_generator.descriptions - - self.y = [] - for ind, n_samples_ in enumerate(self.n_samples_per_class): - self.y += [ind for _ in range(n_samples_)] - self.y = np.array(self.y, dtype=int) - - - self.sample_ids = ["{}_l_{}".format(ind, self.y[ind]) for ind in - range(self.n_samples)] - - self.dataset = [np.zeros((self.n_total_samples, - self.n_features[view_index])) - for view_index in range(self.n_views)] - - self.assign_mutual_error() - self.assign_complementarity() - self.assign_redundancy() - - self.get_distance() - return self.dataset, self.y - - def assign_mutual_error(self): - for class_ind in range(self.n_classes): - mutual_start = np.sum(self.n_samples_per_class[:class_ind]) - mutual_end = np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind] - for view_index in range(self.n_views): - mis_described_random_ind = self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==-1)[0], self.mutual_error_per_class[class_ind], replace=False) - self.dataset[view_index][mutual_start:mutual_end, :] = self.generated_data[view_index, class_ind, mis_described_random_ind, :self.n_features[view_index]] - self.error_2D[mutual_start:mutual_end, view_index] = 0 - self.descriptions[view_index, class_ind, mis_described_random_ind] = 0 - for sample_ind in np.arange(start=mutual_start, stop=mutual_end): - self.sample_ids[sample_ind] = self.sample_ids[sample_ind]+"_m" - - def assign_complementarity(self): - self.complementarity_ratio = 0 - for class_ind in range(self.n_classes): - complem_level = int(self.complementarity_level[class_ind]) - complem_start = np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind] - complem_ind = 0 - while complem_level != 0: - avail_errors = np.array([len(np.where(self.descriptions[view_index, class_ind, :] ==-1)[0]) for view_index in range(self.n_views)]) - avail_success = np.array([len(np.where(self.descriptions[view_index, class_ind, :] == 1)[0]) for view_index in range(self.n_views)]) - - cond=True - - while cond: - if np.sum(avail_errors) == 0 or np.sum(avail_success) < self.n_views - complem_level: - cond = False - break - elif len(np.where(avail_errors > 0)[0]) < complem_level: - cond = False - break - self.sample_ids[complem_start+complem_ind] += "_c" - self.complementarity_ratio += 1/self.n_samples - sorted_inds = np.argsort(-avail_errors) - selected_failed_views = sorted_inds[:complem_level] - sorted_inds = np.array([i for i in np.argsort(-avail_success) if - i not in selected_failed_views]) - selected_succeeded_views = sorted_inds[ - :self.n_views - complem_level] - for view_index in range(self.n_views): - if view_index in selected_failed_views: - self.error_2D[complem_start+complem_ind, view_index] = 0 - chosen_ind = int(self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==-1)[0],size=1, replace=False)) - self.dataset[view_index][complem_start+complem_ind, :] = self.generated_data[view_index, class_ind, chosen_ind, :self.n_features[view_index]] - self.descriptions[view_index, class_ind, chosen_ind] = 0 - self.sample_ids[complem_start+complem_ind] += "_{}".format(view_index) - avail_errors[view_index]-=1 - elif view_index in selected_succeeded_views: - chosen_ind = int(self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==1)[0],size=1, replace=False)) - self.dataset[view_index][complem_start + complem_ind,:] = self.generated_data[view_index, class_ind, chosen_ind, :self.n_features[view_index]] - self.descriptions[view_index, class_ind, chosen_ind] = 0 - avail_success[view_index] -= 1 - complem_ind += 1 - complem_level -= 1 - self.n_complem[class_ind] = complem_ind - - def assign_redundancy(self): - self.real_redundancy_level=0 - for class_ind in range(self.n_classes): - redun_start = int(np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind]+self.n_complem[class_ind]) - redun_end = np.sum(self.n_samples_per_class[:class_ind+1]) - for view_index in range(self.n_views): - if len(np.where(self.descriptions[view_index, class_ind, :] == 1)[0]) < redun_end - redun_start and len(np.where(self.descriptions[view_index, class_ind, :] == -1)[0])>0: - raise ValueError("For class {}, view {}, reduce the error " - "(now: {}), or increase the complemetarity " - "level (now: {}), there is not enough good " - "descriptions with the current " - "configuration".format(class_ind, - view_index, - self.error_matrix[class_ind, - view_index], - self.complementarity_level[class_ind])) - remaining_good_desc = np.where(self.descriptions[view_index, class_ind, :] == 1)[0] - self.dataset[view_index][redun_start:redun_end,:] = self.generated_data[view_index, class_ind,remaining_good_desc, :self.n_features[view_index]] - self.descriptions[view_index, class_ind, remaining_good_desc] = 0 - for sample_ind in np.arange(start=redun_start, stop=redun_end): - self.sample_ids[sample_ind] = self.sample_ids[sample_ind] + "_r" - self.real_redundancy_level+=1/self.n_samples - - def get_distance(self): - self.distances = np.zeros((self.n_views, self.n_samples)) - for view_index, view_data in enumerate(self.dataset): - for sample_ind, data in enumerate(view_data): - # The closest dimension to the limit - dist = np.min(np.abs(data)) - # dist = np.linalg.norm(data-self.selected_vertices[view_index][self.y[sample_ind]]) - self.sample_ids[sample_ind] += "-{}_{}".format(view_index, round(dist, 2)) - self.distances[view_index,sample_ind] = dist - - # def _gen_data(self, view_index): - # """ - # Generates the samples according to gaussian distributions with scales - # computed with the given error and class separation - # - # :param view_index: - # :return: - # """ - # class_sep = self.sub_problem_configurations[view_index]["class_sep"] - # vertices = np.array( - # [np.array([coord for coord in coords]) for coords in - # itertools.product( - # *zip([-1 for _ in range(self.n_features[view_index])], - # [1 for _ in range(self.n_features[view_index])]))]) - # if self.random_vertices == True: - # selected_vertices = self.rs.choice(np.arange(len(vertices)), - # self.n_classes, - # replace=False) - # else: - # selected_vertices = np.arange(self.n_classes) - # self.selected_vertices[view_index] = vertices[selected_vertices, :] * class_sep - # for class_ind, center_coord in enumerate(self.selected_vertices[view_index]): - # - # error = self.error_matrix[class_ind, view_index] - # - # scale = (class_sep / math.sqrt(2)) * (1 / ( - # erfinv(2 * (1 - error)**(1/self.n_features[view_index]) - 1))) - # cov = np.identity(self.n_features[view_index]) * scale**2 - # vec = self.rs.multivariate_normal(center_coord, cov, - # self.n_samples_per_class[ - # class_ind]) - # mis_described = np.unique(np.where( np.multiply(vec, center_coord) < 0 )[0]) - # well_described = np.array([ind for ind - # in range(self.n_samples_per_class[class_ind]) - # if ind not in mis_described]) - # self.bayes_error[view_index, class_ind] = mis_described.shape[0] - # self.generated_data[view_index, class_ind, :, :self.n_features[view_index]] = vec - # self.descriptions[view_index, class_ind, mis_described] = -1 - # self.descriptions[view_index, class_ind, well_described] = 1 - - def _get_generator_report(self, view_index, doc_type=".md"): - return "home made gaussian generator" - - def _init_sub_problem_config(self, sub_problem_configs, sub_problem_type): - """ - - :param sub_problem_configs: - :param sub_problem_type: - :return: - """ - - if sub_problem_configs is None: - self.sub_problem_configurations = [ - {"n_clusters_per_class": 1, - "class_sep": 1.0, } - for _ in range(self.n_views)] - else: - self.sub_problem_configurations = init_list(sub_problem_configs, - size=self.n_views, - type_needed=dict) - - def _init_base_arguments(self): - self.n_samples_per_class = ( - self.class_weights * self.n_samples).astype(int) - self.n_max_samples = np.max(self.n_samples_per_class) - self.n_samples = np.sum(self.n_samples_per_class) - self.n_complem =np.zeros(self.n_classes) - self.n_max_features = np.max(self.n_features) - self.generated_data = self.rs.uniform(low=-self.latent_size_mult, high=self.latent_size_mult, size=(self.n_views, self.n_classes, self.n_max_samples, self.n_max_features)) - self.descriptions = np.zeros((self.n_views, self.n_classes, self.n_max_samples,)) - self.n_total_samples = np.sum(self.n_samples_per_class) - sample_indices = np.arange(int(np.sum(self.n_samples_per_class))) - self.rs.shuffle(sample_indices) - self.class_sample_indices = [ - sample_indices[sum(self.n_samples_per_class[:ind]): - sum(self.n_samples_per_class[:ind + 1])] - for ind in range(self.n_classes)] - self.well_described = [[_ for _ in range(self.n_views)] for _ in - range(self.n_classes)] - self.misdescribed = [[_ for _ in range(self.n_views)] for _ in - range(self.n_classes)] - self.redundancy_indices = [_ for _ in range(self.n_classes)] - self.mutual_error_indices = [_ for _ in range(self.n_classes)] - self.complementarity_samples = [_ for _ in range(self.n_classes)] - self.good_views_indices = [_ for _ in range(self.n_classes)] - self.bad_views_indices = [_ for _ in range(self.n_classes)] - self.available_init_indices = self.class_sample_indices.copy() - self.sample_ids = ["sample_{}".format(ind) - for ind - in range(int(np.sum(self.n_samples_per_class)))] - self.bayes_error = np.zeros((self.n_views, self.n_classes)) - self.sub_problems = [[] for _ in range(self.n_views)] - self.mutual_error_per_class = np.array( - [int(float(self.mutual_error[class_ind]) * n_sample_) for class_ind, n_sample_ in - enumerate(self.n_samples_per_class)]) - self.redundancy_per_class = np.array( - [int(self.redundancy[class_ind] * n_sample_) for class_ind, n_sample_ in enumerate(self.n_samples_per_class)]) - self.view_data = [np.zeros((self.n_samples, self.n_features[view_ind])) for view_ind in range(self.n_views)] - self.all_mis_described = [[] for _ in range(self.n_views)] - self.all_well_described = [[] for _ in range(self.n_views)] - self.selected_vertices = [_ for _ in range(self.n_views)] - self.avail_well_described = [[] for _ in range(self.n_views)] - self.avail_mis_described = [[] for _ in range(self.n_views)] - self.mutual_error_indices = [[] for _ in range(self.n_views)] - self.redundancy_indices = [[] for _ in range(self.n_views)] - self.complementarity_indices = [[[] for _ in range(self.n_classes)] for _ - in - range(self.n_views)] - self.complem_names = [[] for _ in range(self.n_classes)] - self.complem_error = [[] for _ in range(self.n_classes)] \ No newline at end of file -- GitLab