Skip to content
Snippets Groups Projects
Commit 7f9e38b6 authored by Baptiste Bauvin's avatar Baptiste Bauvin
Browse files

Before refatoring

parent bcc721b1
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
import os
import h5py
import numpy as np
import yaml
from sklearn.datasets import make_classification, make_gaussian_quantiles
from tabulate import tabulate
import pandas as pd
import inspect
from datetime import datetime
import plotly
import math
from sklearn.metrics import zero_one_loss
from sklearn.model_selection import cross_val_predict
from sklearn.decomposition import PCA
from sklearn.tree import DecisionTreeClassifier
from .utils import format_array, get_config_from_file, \
init_random_state, init_error_matrix, init_list
from .base_strs import *
class MultiViewSubProblemsGenerator:
r"""
This engine generates one monoview sub-problem for each view with independant data.
If then switch descriptions between the samples to create error and difficulty in the dataset
:param random_state: The random state or seed.
:param n_samples: The number of samples that the dataset will contain
:param n_classes: The number of classes in which the samples will be labelled
:param n_views: The number of views describing the samples
:param error_matrix: The error matrix giving in row i column j the error of the Bayes classifier on Class i for View j
:param n_features: The number of features describing the samples for each view (can specify an int or array-like of length ``n_views``)
:param class_weights: The proportion of the dataset that will be labelled in each class. Must specify an array-like of size n_classes ([0.1,0.45,0.45] will output a dataset with with 10% of the samples in the first class and 45% in the two others.)
:param redundancy: The proportion of the samples that will be well-decribed by all the views.
# :param complementarity: The proportion of samples that will be well-decribed only by some views
:param complementarity_level: The number of views that will have a bad description of the complementray samples
:param mutual_error: The proportion of samples that will be mis-described by all the views
:param name: The name of the dataset (will be used to name the file)
:param config_file: The path to the yaml config file. If provided, the config fil entries will overwrite the one passed as arguments.
:type random_state: int or np.random.RandomState
:type n_samples: int
:type n_classes: int
:type n_views: int
:type error_matrix: np.ndarray
:type latent_size_multiplicator: float
:type n_features: int or array-like
:type class_weights: float or array-like
:type redundancy: float
:type complementarity: float
:type complementarity_level: float
:type mutual_error: float
:type name: str
:type config_file: str
:type sub_problem_type: str or list
:type sub_problem_configurations: None, dict or list
"""
def __init__(self, random_state=42, n_samples=100, n_classes=4, n_views=4,
error_matrix=None, latent_size_multiplicator=2, n_features=3,
class_weights=1.0, redundancy=0.0, complementarity=0.0,
complementarity_level=3,
mutual_error=0.0, name="generated_dataset", config_file=None,
sub_problem_type="base", sub_problem_configurations=None,
**kwargs):
if config_file is not None:
args = get_config_from_file(config_file)
self.__init__(**args)
else:
self.view_names = ["generated_view_{}".format(view_index) for view_index in range(n_views)]
self.rs = init_random_state(random_state)
self.n_samples = n_samples
self.n_classes = n_classes
self.n_views = n_views
self.name = name
self.n_features = format_array(n_features, n_views, type_needed=int)
self.redundancy = format_array(redundancy, n_classes,
type_needed=float).reshape(
(n_classes, 1))
self.mutual_error = format_array(mutual_error, n_classes,
type_needed=float).reshape(
(n_classes, 1))
self.complementarity = format_array(complementarity, n_classes,
type_needed=float).reshape(
(n_classes, 1))
self.complementarity_level = format_array(complementarity_level, n_classes, type_needed=int).reshape(((n_classes, 1)))
self.latent_size_mult = latent_size_multiplicator
self._init_sub_problem_config(sub_problem_configurations,
sub_problem_type)
self.error_matrix = init_error_matrix(error_matrix, n_classes,
n_views)
self.classes = np.arange(self.n_classes)
self.class_weights = format_array(class_weights, n_classes,
type_needed=float)
self.class_weights /= np.sum(self.class_weights)
self._init_base_arguments()
def to_hdf5_mc(self, saving_path="."):
"""
This is used to save the dataset in an HDF5 file, compatible with
:summit:`SuMMIT <>`
:param saving_path: where to save the dataset, the file will be names after the self.name attribute.
:type saving_path: str
:return: None
"""
dataset_file = h5py.File(os.path.join(saving_path, self.name + ".hdf5"),
'w')
labels_dataset = dataset_file.create_dataset("Labels",
shape=self.y.shape,
data=self.y)
labels_names = ["label_" + str(i + 1) for i in range(self.n_classes)]
labels_dataset.attrs["names"] = [
label_name.encode() if not isinstance(label_name, bytes)
else label_name for label_name in labels_names]
for view_index, data in enumerate(self.dataset):
df_dataset = dataset_file.create_dataset("View" + str(view_index),
shape=data.shape,
data=data)
df_dataset.attrs["sparse"] = False
df_dataset.attrs["name"] = self.view_names[view_index]
meta_data_grp = dataset_file.create_group("Metadata")
meta_data_grp.attrs["nbView"] = self.n_views
meta_data_grp.attrs["nbClass"] = np.unique(self.y)
meta_data_grp.attrs["datasetLength"] = self.dataset[0].shape[0]
self.gen_report(save=False)
meta_data_grp.attrs["description"] = self.report
meta_data_grp.create_dataset("sample_ids", data=np.array(
self.sample_ids).astype(
np.dtype("S100")), dtype=np.dtype("S100"))
dataset_file.close()
def gen_report(self, output_path='.', file_type="md", save=True, n_cv=5):
"""
Generates a markdown report based on the configuration.
If ``save`` is True, it will be saved in ``output_path`` as <self.name>.<``file_type``> .
:param output_path: path to store the text report.
:type output_path: str
:param file_type: Type of file in which the report is saved (currently supported : "md" or "txt")
:type file_type: str
:param save: Whether to save the string in a file or not.
:type save: bool
:return: The report string
"""
report_string = "# Generated dataset description\n\n"
report_string+= "The dataset named `{}` has been generated by [{}]({}) " \
"and is comprised of \n\n* {} samples, splitted in " \
"\n* {} classes, described by \n* {} views.\n\n".format(self.name, GENE, LINK, self.n_samples, self.n_classes, self.n_views)
error_df = pd.DataFrame(self.error_matrix,
index=["Class "+str(i+1)
for i in range(self.n_classes)],
columns=['View '+str(i+1) for i in range(self.n_views)])
report_string += "The input error matrix is \n \n"+tabulate(error_df,
headers='keys',
tablefmt='github')
report_string += "\n\n The classes are balanced as : \n\n* Class "
report_string += '\n* Class '.join(["{} : {} samples ({}% of the dataset)".format(i+1,
n_ex,
int(ratio*100))
for i, (n_ex, ratio)
in enumerate(zip(self.n_samples_per_class,
self.class_weights))])
report_string += "\n\n The views have \n\n* {}% redundancy, \n* {}% mutual error" \
" and \n* {}% complementarity with a level of {}.\n\n".format(round(self.real_redundancy_level*100, 2), self.mutual_error[0,0]*100, round(self.complementarity_ratio*100, 2), self.complementarity_level)
report_string+="## Views description"
for view_index in range(self.n_views):
report_string += self.gen_view_report(view_index)
report_string += "\n\n## Statistical analysis"
bayes_error = pd.DataFrame(self.bayes_error/self.n_samples_per_class,
columns=["Class " + str(i + 1)
for i in range(self.n_classes)],
index=['View ' + str(i + 1) for i in
range(self.n_views)])
report_string += "\n\nBayes error matrix : \n\n"+tabulate(bayes_error, headers='keys',
tablefmt='github')
max_depth = math.ceil(math.log(self.n_classes, 2))
report_string += "\n\n The error, as computed by the 'empirical bayes' classifier of each view : \n\n".format(max_depth)
self._gen_dt_error_mat(n_cv)
dt_error = pd.DataFrame(np.transpose(self.dt_error),
columns=["Class " + str(i + 1)
for i in range(self.n_classes)],
index=['View ' + str(i + 1) for i in
range(self.n_views)])
report_string += tabulate(dt_error, headers='keys', tablefmt='github')
self._plot_2d_error(output_path, error=self.error_2D, name="report_bayesian_error_2D.html")
self._plot_2d_error(output_path, error=self.error_2D_dt, name="report_dt_error_2D.html")
report_string += "\n\nThis report has been automatically generated on {}".format(datetime.now().strftime("%B %d, %Y at %H:%M:%S"))
if save:
with open(os.path.join(output_path, "report_"+self.name+"."+file_type), "w") as output:
output.write(report_string)
self.report = report_string
return report_string
def _plot_2d_error(self, output_path, error=None, name=""):
label_index_list = np.concatenate([np.where(self.y == i)[0] for i in
np.unique(
self.y)])
hover_text = [[self.sample_ids[sample_index] + " labelled " + str(
self.y[sample_index])
for view_index in range(self.n_views)]
for sample_index in range(self.n_samples)]
fig = plotly.graph_objs.Figure()
fig.add_trace(plotly.graph_objs.Heatmap(
x=["View {}".format(view_index) for view_index in range(self.n_views)],
y=[self.sample_ids[label_ind] for label_ind in label_index_list],
z=error[label_index_list, :],
text=[hover_text[label_ind] for label_ind in label_index_list],
hoverinfo=["y", "x", "text"],
colorscale="Greys",
colorbar=dict(tickvals=[0, 1],
ticktext=["Misdescribed", "Well described"]),
reversescale=True), )
fig.update_yaxes(title_text="Samples", showticklabels=True)
fig.update_layout(paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)')
fig.update_xaxes(showticklabels=True, )
plotly.offline.plot(fig, filename=os.path.join(output_path, name),
auto_open=False)
def _gen_dt_error_mat(self, n_cv=10):
# TODO : Seems to rely on random state, but unsure
self.dt_error = np.zeros((self.n_classes, self.n_views))
self.error_2D_dt = np.zeros((self.n_samples, self.n_views,))
classifiers = [generator.get_bayes_classifier() for generator in self._sub_problem_generators]
for view_index, view_data in enumerate(self.dataset):
pred = cross_val_predict(classifiers[view_index], view_data, self.y, cv=n_cv, )
self.error_2D_dt[:, view_index] = np.equal(self.y, pred).astype(int)
label_indices = [np.where(self.y == i)[0] for i in
range(self.n_classes)]
loss = [zero_one_loss(pred[label_indice], self.y[label_indice]) for
label_indice in label_indices]
self.dt_error[:, view_index] = np.array(loss)
def _find_rows_cols(self):
rows=1
cols=1
if self.n_views == 4:
rows = 2
cols = 2
if self.n_views>1:
for i in range(self.n_views):
if rows*cols < i+1:
if cols < 4*rows:
cols+=1
else:
rows+=1
return rows, cols
def _get_pca(self, n_components=2, output_path='.'):
pca = PCA(n_components=n_components)
import plotly.graph_objects as go
from plotly.subplots import make_subplots
rows, cols = self._find_rows_cols()
fig = make_subplots(rows=rows, cols=cols,
subplot_titles=["View{}".format(view_index)
for view_index
in range(self.n_views)],
specs=[[{'type': 'scatter'} for _ in range(cols) ]
for _ in range(rows)])
row = 1
col = 1
import plotly.express as px
for view_index, view_data in enumerate(self.dataset):
if self.n_features[view_index]>n_components:
pca.fit(view_data)
reducted_data = pca.transform(view_data)
elif self.n_features[view_index] ==1:
reducted_data = np.transpose(np.array([view_data, view_data]))[0, :, :]
else:
reducted_data = view_data
fig.add_trace(
go.Scatter(
x=reducted_data[:, 0],
y=reducted_data[:, 1],
text=self.sample_ids,
mode='markers', marker=dict(
size=3, # set color to an array/list of desired values
color=self.y,
colorscale=["red", "blue", "black", "green", "orange", "purple"],
opacity=0.8
), ),
row=row, col=col)
col += 1
if col > cols:
col = 1
row += 1
fig.update_shapes(dict(xref='x', yref='y'))
plotly.offline.plot(fig, filename=os.path.join(output_path, self.name+"_fig_pca.html"), auto_open=False)
def gen_view_report(self, view_index):
view_string = "\n\n### View "+str(view_index+1)
view_string+=self._sub_problem_generators[view_index].gen_report()
return view_string
def _get_generator_report(self, view_index, doc_type=".md"):
if self.sub_problem_types[view_index] in ["make_classification", "base"]:
return "[`make_classification`](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_classification.html)"
elif self.sub_problem_types[view_index]in ["gaussian", "make_gaussian_quantiles"]:
return "[`make_gaussian_quantiles`](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_gaussian_quantiles.html#sklearn.datasets.make_gaussian_quantiles)"
def _init_base_arguments(self):
self.n_samples_per_class = (
self.class_weights * self.n_samples).astype(int)
self.n_well_described = [
(self.n_samples_per_class[class_index] * (1 - confusion)).astype(
int)
for class_index, confusion in enumerate(self.error_matrix)]
self.n_misdescribed = [(self.n_samples_per_class[class_index] -
self.n_well_described[class_index])
for class_index in range(self.n_classes)]
self.n_samples = np.sum(self.n_samples_per_class)
sample_indices = np.arange(int(np.sum(self.n_samples_per_class)))
self.rs.shuffle(sample_indices)
self.class_sample_indices = [
sample_indices[sum(self.n_samples_per_class[:ind]):
sum(self.n_samples_per_class[:ind + 1])]
for ind in range(self.n_classes)]
self.well_described = [[_ for _ in range(self.n_views)] for _ in
range(self.n_classes)]
self.misdescribed = [[_ for _ in range(self.n_views)] for _ in
range(self.n_classes)]
self.redundancy_indices = [_ for _ in range(self.n_classes)]
self.mutual_error_indices = [_ for _ in range(self.n_classes)]
self.complementarity_samples = [_ for _ in range(self.n_classes)]
self.good_views_indices = [_ for _ in range(self.n_classes)]
self.bad_views_indices = [_ for _ in range(self.n_classes)]
self.available_init_indices = self.class_sample_indices.copy()
self.sample_ids = ["sample_{}".format(ind)
for ind
in range(int(np.sum(self.n_samples_per_class)))]
\ No newline at end of file
import numpy as np
import itertools
import math
from scipy.special import erfinv
from .utils import format_array, get_config_from_file, \
init_random_state, init_error_matrix, init_list
from .base_strs import *
from .base import MultiViewSubProblemsGenerator
from multiview_generator import sub_problems
class MultiViewGaussianSubProblemsGenerator(MultiViewSubProblemsGenerator):
def __init__(self, random_state=42, n_samples=100, n_classes=4, n_views=4,
error_matrix=None, latent_size_multiplicator=2, n_features=3,
class_weights=1.0, redundancy=0.1, complementarity=0.1,
complementarity_level=3,
mutual_error=0.2, name="generated_dataset", config_file=None,
sub_problem_type="base", sub_problem_configurations=None,
sub_problem_generators="StumpsGenerator", random_vertices=False
, **kwargs):
"""
:param random_state:
:param n_samples:
:param n_classes:
:param n_views:
:param error_matrix:
:param latent_size_multiplicator:
:param n_features:
:param class_weights:
:param redundancy:
:param complementarity:
:param complementarity_level:
:param mutual_error:
:param name:
:param config_file:
:param sub_problem_type:
:param sub_problem_configurations:
:param kwargs:
"""
MultiViewSubProblemsGenerator.__init__(self, random_state=random_state,
n_samples=n_samples,
n_classes=n_classes,
n_views=n_views,
error_matrix=error_matrix,
latent_size_multiplicator=latent_size_multiplicator,
n_features=n_features,
class_weights=class_weights,
redundancy=redundancy,
complementarity=complementarity,
complementarity_level=complementarity_level,
mutual_error=mutual_error,
name=name,
config_file=config_file,
sub_problem_type=sub_problem_type,
F=sub_problem_configurations,
**kwargs)
self.random_vertices = format_array(random_vertices, n_views, bool)
self.sub_problem_generators = format_array(sub_problem_generators, n_views, str)
def generate_multi_view_dataset(self, ):
"""
This is the main method. It will generate a multiview dataset according
to the configuration.
To do so,
* it generates the labels of the multiview dataset,
* then it generates all the subsets of samples (redundant, ...)
* finally, for each view it generates a monview dataset according to the configuration
:return: view_data a list contianing the views np.ndarrays and y, the label array.
"""
# Generate the labels
self.error_2D = np.ones((self.n_samples, self.n_views))
# Generate the sample descriptions according to the error matrix
self._sub_problem_generators = [_ for _ in range(self.n_views)]
for view_index in range(self.n_views):
sub_problem_generator = getattr(sub_problems,
self.sub_problem_generators[view_index])(
n_classes=self.n_classes,
n_features=self.n_features[view_index],
random_vertices=self.random_vertices[view_index],
errors=self.error_matrix[:,view_index],
random_state=self.rs,
n_samples_per_class=self.n_samples_per_class,
**self.sub_problem_configurations[view_index])
vec = sub_problem_generator.gen_data()
self._sub_problem_generators[view_index] = sub_problem_generator
self.view_names[view_index] = "view_{}_{}".format(view_index, sub_problem_generator.view_name)
self.bayes_error[view_index, :] = sub_problem_generator.bayes_error
self.generated_data[view_index, :, :,:self.n_features[view_index]] = vec
self.selected_vertices[view_index] = sub_problem_generator.selected_vertices
self.descriptions[view_index, :,:] = sub_problem_generator.descriptions
self.y = []
for ind, n_samples_ in enumerate(self.n_samples_per_class):
self.y += [ind for _ in range(n_samples_)]
self.y = np.array(self.y, dtype=int)
self.sample_ids = ["{}_l_{}".format(ind, self.y[ind]) for ind in
range(self.n_samples)]
self.dataset = [np.zeros((self.n_total_samples,
self.n_features[view_index]))
for view_index in range(self.n_views)]
self.assign_mutual_error()
self.assign_complementarity()
self.assign_redundancy()
self.get_distance()
return self.dataset, self.y
def assign_mutual_error(self):
for class_ind in range(self.n_classes):
mutual_start = np.sum(self.n_samples_per_class[:class_ind])
mutual_end = np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind]
for view_index in range(self.n_views):
mis_described_random_ind = self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==-1)[0], self.mutual_error_per_class[class_ind], replace=False)
self.dataset[view_index][mutual_start:mutual_end, :] = self.generated_data[view_index, class_ind, mis_described_random_ind, :self.n_features[view_index]]
self.error_2D[mutual_start:mutual_end, view_index] = 0
self.descriptions[view_index, class_ind, mis_described_random_ind] = 0
for sample_ind in np.arange(start=mutual_start, stop=mutual_end):
self.sample_ids[sample_ind] = self.sample_ids[sample_ind]+"_m"
def assign_complementarity(self):
self.complementarity_ratio = 0
for class_ind in range(self.n_classes):
complem_level = int(self.complementarity_level[class_ind])
complem_start = np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind]
complem_ind = 0
while complem_level != 0:
avail_errors = np.array([len(np.where(self.descriptions[view_index, class_ind, :] ==-1)[0]) for view_index in range(self.n_views)])
avail_success = np.array([len(np.where(self.descriptions[view_index, class_ind, :] == 1)[0]) for view_index in range(self.n_views)])
cond=True
while cond:
if np.sum(avail_errors) == 0 or np.sum(avail_success) < self.n_views - complem_level:
cond = False
break
elif len(np.where(avail_errors > 0)[0]) < complem_level:
cond = False
break
self.sample_ids[complem_start+complem_ind] += "_c"
self.complementarity_ratio += 1/self.n_samples
sorted_inds = np.argsort(-avail_errors)
selected_failed_views = sorted_inds[:complem_level]
sorted_inds = np.array([i for i in np.argsort(-avail_success) if
i not in selected_failed_views])
selected_succeeded_views = sorted_inds[
:self.n_views - complem_level]
for view_index in range(self.n_views):
if view_index in selected_failed_views:
self.error_2D[complem_start+complem_ind, view_index] = 0
chosen_ind = int(self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==-1)[0],size=1, replace=False))
self.dataset[view_index][complem_start+complem_ind, :] = self.generated_data[view_index, class_ind, chosen_ind, :self.n_features[view_index]]
self.descriptions[view_index, class_ind, chosen_ind] = 0
self.sample_ids[complem_start+complem_ind] += "_{}".format(view_index)
avail_errors[view_index]-=1
elif view_index in selected_succeeded_views:
chosen_ind = int(self.rs.choice(np.where(self.descriptions[view_index, class_ind, :]==1)[0],size=1, replace=False))
self.dataset[view_index][complem_start + complem_ind,:] = self.generated_data[view_index, class_ind, chosen_ind, :self.n_features[view_index]]
self.descriptions[view_index, class_ind, chosen_ind] = 0
avail_success[view_index] -= 1
complem_ind += 1
complem_level -= 1
self.n_complem[class_ind] = complem_ind
def assign_redundancy(self):
self.real_redundancy_level=0
for class_ind in range(self.n_classes):
redun_start = int(np.sum(self.n_samples_per_class[:class_ind])+self.mutual_error_per_class[class_ind]+self.n_complem[class_ind])
redun_end = np.sum(self.n_samples_per_class[:class_ind+1])
for view_index in range(self.n_views):
if len(np.where(self.descriptions[view_index, class_ind, :] == 1)[0]) < redun_end - redun_start and len(np.where(self.descriptions[view_index, class_ind, :] == -1)[0])>0:
raise ValueError("For class {}, view {}, reduce the error "
"(now: {}), or increase the complemetarity "
"level (now: {}), there is not enough good "
"descriptions with the current "
"configuration".format(class_ind,
view_index,
self.error_matrix[class_ind,
view_index],
self.complementarity_level[class_ind]))
remaining_good_desc = np.where(self.descriptions[view_index, class_ind, :] == 1)[0]
self.dataset[view_index][redun_start:redun_end,:] = self.generated_data[view_index, class_ind,remaining_good_desc, :self.n_features[view_index]]
self.descriptions[view_index, class_ind, remaining_good_desc] = 0
for sample_ind in np.arange(start=redun_start, stop=redun_end):
self.sample_ids[sample_ind] = self.sample_ids[sample_ind] + "_r"
self.real_redundancy_level+=1/self.n_samples
def get_distance(self):
self.distances = np.zeros((self.n_views, self.n_samples))
for view_index, view_data in enumerate(self.dataset):
for sample_ind, data in enumerate(view_data):
# The closest dimension to the limit
dist = np.min(np.abs(data))
# dist = np.linalg.norm(data-self.selected_vertices[view_index][self.y[sample_ind]])
self.sample_ids[sample_ind] += "-{}_{}".format(view_index, round(dist, 2))
self.distances[view_index,sample_ind] = dist
# def _gen_data(self, view_index):
# """
# Generates the samples according to gaussian distributions with scales
# computed with the given error and class separation
#
# :param view_index:
# :return:
# """
# class_sep = self.sub_problem_configurations[view_index]["class_sep"]
# vertices = np.array(
# [np.array([coord for coord in coords]) for coords in
# itertools.product(
# *zip([-1 for _ in range(self.n_features[view_index])],
# [1 for _ in range(self.n_features[view_index])]))])
# if self.random_vertices == True:
# selected_vertices = self.rs.choice(np.arange(len(vertices)),
# self.n_classes,
# replace=False)
# else:
# selected_vertices = np.arange(self.n_classes)
# self.selected_vertices[view_index] = vertices[selected_vertices, :] * class_sep
# for class_ind, center_coord in enumerate(self.selected_vertices[view_index]):
#
# error = self.error_matrix[class_ind, view_index]
#
# scale = (class_sep / math.sqrt(2)) * (1 / (
# erfinv(2 * (1 - error)**(1/self.n_features[view_index]) - 1)))
# cov = np.identity(self.n_features[view_index]) * scale**2
# vec = self.rs.multivariate_normal(center_coord, cov,
# self.n_samples_per_class[
# class_ind])
# mis_described = np.unique(np.where( np.multiply(vec, center_coord) < 0 )[0])
# well_described = np.array([ind for ind
# in range(self.n_samples_per_class[class_ind])
# if ind not in mis_described])
# self.bayes_error[view_index, class_ind] = mis_described.shape[0]
# self.generated_data[view_index, class_ind, :, :self.n_features[view_index]] = vec
# self.descriptions[view_index, class_ind, mis_described] = -1
# self.descriptions[view_index, class_ind, well_described] = 1
def _get_generator_report(self, view_index, doc_type=".md"):
return "home made gaussian generator"
def _init_sub_problem_config(self, sub_problem_configs, sub_problem_type):
"""
:param sub_problem_configs:
:param sub_problem_type:
:return:
"""
if sub_problem_configs is None:
self.sub_problem_configurations = [
{"n_clusters_per_class": 1,
"class_sep": 1.0, }
for _ in range(self.n_views)]
else:
self.sub_problem_configurations = init_list(sub_problem_configs,
size=self.n_views,
type_needed=dict)
def _init_base_arguments(self):
self.n_samples_per_class = (
self.class_weights * self.n_samples).astype(int)
self.n_max_samples = np.max(self.n_samples_per_class)
self.n_samples = np.sum(self.n_samples_per_class)
self.n_complem =np.zeros(self.n_classes)
self.n_max_features = np.max(self.n_features)
self.generated_data = self.rs.uniform(low=-self.latent_size_mult, high=self.latent_size_mult, size=(self.n_views, self.n_classes, self.n_max_samples, self.n_max_features))
self.descriptions = np.zeros((self.n_views, self.n_classes, self.n_max_samples,))
self.n_total_samples = np.sum(self.n_samples_per_class)
sample_indices = np.arange(int(np.sum(self.n_samples_per_class)))
self.rs.shuffle(sample_indices)
self.class_sample_indices = [
sample_indices[sum(self.n_samples_per_class[:ind]):
sum(self.n_samples_per_class[:ind + 1])]
for ind in range(self.n_classes)]
self.well_described = [[_ for _ in range(self.n_views)] for _ in
range(self.n_classes)]
self.misdescribed = [[_ for _ in range(self.n_views)] for _ in
range(self.n_classes)]
self.redundancy_indices = [_ for _ in range(self.n_classes)]
self.mutual_error_indices = [_ for _ in range(self.n_classes)]
self.complementarity_samples = [_ for _ in range(self.n_classes)]
self.good_views_indices = [_ for _ in range(self.n_classes)]
self.bad_views_indices = [_ for _ in range(self.n_classes)]
self.available_init_indices = self.class_sample_indices.copy()
self.sample_ids = ["sample_{}".format(ind)
for ind
in range(int(np.sum(self.n_samples_per_class)))]
self.bayes_error = np.zeros((self.n_views, self.n_classes))
self.sub_problems = [[] for _ in range(self.n_views)]
self.mutual_error_per_class = np.array(
[int(float(self.mutual_error[class_ind]) * n_sample_) for class_ind, n_sample_ in
enumerate(self.n_samples_per_class)])
self.redundancy_per_class = np.array(
[int(self.redundancy[class_ind] * n_sample_) for class_ind, n_sample_ in enumerate(self.n_samples_per_class)])
self.view_data = [np.zeros((self.n_samples, self.n_features[view_ind])) for view_ind in range(self.n_views)]
self.all_mis_described = [[] for _ in range(self.n_views)]
self.all_well_described = [[] for _ in range(self.n_views)]
self.selected_vertices = [_ for _ in range(self.n_views)]
self.avail_well_described = [[] for _ in range(self.n_views)]
self.avail_mis_described = [[] for _ in range(self.n_views)]
self.mutual_error_indices = [[] for _ in range(self.n_views)]
self.redundancy_indices = [[] for _ in range(self.n_views)]
self.complementarity_indices = [[[] for _ in range(self.n_classes)] for _
in
range(self.n_views)]
self.complem_names = [[] for _ in range(self.n_classes)]
self.complem_error = [[] for _ in range(self.n_classes)]
\ No newline at end of file
import numpy as np
import itertools
import math
from scipy.special import erfinv
import yaml
class BaseSubProblem():
def __init__(self, n_classes=2, n_features=2, random_vertices=True, errors=np.array([0.5,0.5]), random_state=np.random.RandomState(42), n_samples_per_class=np.array([100,100]), **configuration):
self.n_classes = n_classes
self.random_vertices = random_vertices
self.errors = errors
self.n_features = n_features
self.rs = random_state
self.n_samples_per_class = n_samples_per_class
self.bayes_error = np.zeros(self.n_classes)
self.descriptions = np.zeros((self.n_classes, np.max(self.n_samples_per_class)))
self.config = configuration
self.view_name = "generated"
def gen_report(self):
view_string = "\n\nThis view is generated with {}, with the following configuration : \n```yaml\n".format(
self.__class__.__name__)
view_string += yaml.dump(self.config,
line_break="\n", default_flow_style=False)
view_string += "n_features: {}\n".format(self.n_features)
view_string += "```"
return view_string
class StumpsGenerator(BaseSubProblem):
def gen_data(self):
"""
Generates the samples according to gaussian distributions with scales
computed with the given error and class separation
:param view_index:
:return:
"""
self.n_relevant_features = math.ceil(math.log2(self.n_classes))
self.view_name = "stumps"
class_sep = self.config["class_sep"]
vertices = np.array(
[np.array([coord for coord in coords]) for coords in
itertools.product(
*zip([-1 for _ in range(self.n_relevant_features)],
[1 for _ in range(self.n_relevant_features)]))])
if self.random_vertices == True:
selected_vertices = self.rs.choice(np.arange(len(vertices)),
self.n_classes,
replace=False)
else:
selected_vertices = np.arange(self.n_classes)
self.selected_vertices = vertices[selected_vertices,
:] * class_sep
vec = np.zeros((self.n_classes, max(self.n_samples_per_class),
self.n_relevant_features))
for class_ind, center_coord in enumerate(
self.selected_vertices):
error = self.errors[class_ind]
scale = (class_sep / math.sqrt(2)) * (1 / (
erfinv(2 * (1 - error) ** (
1 / self.n_relevant_features) - 1)))
cov = np.identity(self.n_relevant_features) * scale **2
vec[class_ind, :,:] = self.rs.multivariate_normal(center_coord, cov,
self.n_samples_per_class[
class_ind])
mis_described = np.unique(
np.where(np.multiply(vec[class_ind], center_coord) < 0)[0])
well_described = np.array([ind for ind
in range(
self.n_samples_per_class[class_ind])
if ind not in mis_described])
self.bayes_error[class_ind] = mis_described.shape[0]
self.descriptions[class_ind, mis_described] = -1
self.descriptions[class_ind, well_described] = 1
data = self.rs.uniform(low=np.min(vec), high=np.max(vec), size=(self.n_classes, max(self.n_samples_per_class), self.n_features))
data[:,:,:self.n_relevant_features] = vec
return data
def gen_report(self):
base_str = BaseSubProblem.gen_report(self)
base_str += "\n\nThis view has {} features, among which {} are relevant for classification (they are the {} first columns of the view) the other are filled with uniform noise.".format(
self.n_features, self.n_relevant_features, self.n_relevant_features)
base_str += "\n\n Its empirical bayesian classifier is a decision stump"
return base_str
def get_bayes_classifier(self):
from sklearn.tree import DecisionTreeClassifier
return DecisionTreeClassifier(max_depth=1)
class TreesGenerator(BaseSubProblem):
"""We stay with depth 2 trees ATM"""
def gen_data(self):
"""
Generates the samples according to gaussian distributions with scales
computed with the given error and class separation
:param view_index:
:return:
"""
self.n_relevant_features = math.ceil(math.log2(self.n_classes))
self.view_name = "tree_depth_2"
class_sep = self.config["class_sep"]
vertices = np.array(
[np.array([coord for coord in coords]) for coords in
itertools.product(
*zip([-1 for _ in range(self.n_relevant_features)],
[1 for _ in range(self.n_relevant_features)]))])
if self.random_vertices == True:
selected_vertices = self.rs.choice(np.arange(len(vertices)),
self.n_classes,
replace=False)
else:
selected_vertices = np.arange(self.n_classes)
self.selected_vertices = vertices[selected_vertices,
:] * class_sep
self.covs = np.zeros((self.n_classes, self.n_relevant_features, self.n_relevant_features))
vec = np.zeros((self.n_classes, max(self.n_samples_per_class),
self.n_relevant_features))
blob_centers = np.zeros((self.n_classes, self.n_relevant_features+1, self.n_relevant_features))
for class_ind, center_coord in enumerate(
self.selected_vertices):
mis_described = []
error = self.errors[class_ind]/(self.n_relevant_features+1)
blob_centers[class_ind, 0, :] = center_coord
internal_error_percentage = self.n_relevant_features*2/(self.n_relevant_features*2+self.n_relevant_features**2)
internal_scale = (class_sep / math.sqrt(2)) * (1 / (
erfinv(2 * (1 - error/internal_error_percentage) ** (
1 / (2*self.n_relevant_features)) - 1)))
cov = np.identity(self.n_relevant_features) * internal_scale**2
self.covs[class_ind] = cov
n_samples = self.n_samples_per_class[class_ind] - (int(self.n_samples_per_class[class_ind]/(self.n_relevant_features+1)))*self.n_relevant_features
vec[class_ind, :n_samples, :] = self.rs.multivariate_normal(center_coord, cov,
n_samples)
# mis_described += list(np.unique(np.where(
# np.any(abs(vec[class_ind] - center_coord)>class_sep, axis=1))[0]))
# print(len(mis_described)*2/self.n_samples_per_class)
n_samples_per_blob = int(self.n_samples_per_class[class_ind]/(self.n_relevant_features+1))
external_error_percentage = self.n_relevant_features / (
self.n_relevant_features * 2 + self.n_relevant_features ** 2)
external_scale = (class_sep / math.sqrt(2)) * (1 / (
erfinv(2 * (1 - error / external_error_percentage) ** (
1 / self.n_relevant_features) - 1)))
cov = np.identity(
self.n_relevant_features) * external_scale**2
# print(internal_scale, external_scale)
for dim_index, update_coord in enumerate(center_coord):
beg = n_samples+dim_index*n_samples_per_blob
end = n_samples+(dim_index+1)*n_samples_per_blob
new_center = center_coord.copy()
new_center[dim_index] = update_coord-4*update_coord
blob_centers[class_ind, dim_index+1, :] = new_center
vec[class_ind, beg:end,:] = self.rs.multivariate_normal(new_center, cov,
n_samples_per_blob)
mis_described += list(np.unique(np.where(
np.any(abs(vec[class_ind, beg:end] - new_center)>class_sep, axis=1))[0])+beg)
# mis_described = np.array(mis_described)
# well_described = np.array([ind for ind
# in range(
# self.n_samples_per_class[class_ind])
# if ind not in mis_described])
# self.bayes_error[class_ind] = mis_described.shape[0]
# self.descriptions[class_ind, mis_described] = -1
# self.descriptions[class_ind, well_described] = 1
for class_ind in range(self.n_classes):
for sample_ind in range(self.n_samples_per_class[class_ind]):
if np.argmin(np.min(np.linalg.norm(vec[class_ind, sample_ind, :] - blob_centers, axis=2), axis=1))!= class_ind:
self.bayes_error[class_ind] +=1
self.descriptions[class_ind, sample_ind] = -1
else:
self.descriptions[class_ind, sample_ind] = +1
data = self.rs.uniform(low=-1, high=1, size=(
self.n_classes, max(self.n_samples_per_class), self.n_features))
data[:, :, :self.n_relevant_features] = vec
return data
def gen_report(self):
base_str = BaseSubProblem.gen_report(self)
base_str += "\n\nThis view has {} features, among which {} are relevant for classification (they are the {} first columns of the view).".format(self.n_features, self.n_relevant_features, self.n_relevant_features)
base_str += "\n\n Its empirical bayesian classifier is a decision tree of depth 3"
return base_str
def get_bayes_classifier(self):
from sklearn.tree import DecisionTreeClassifier
return DecisionTreeClassifier(max_depth=2)
class RingsGenerator(BaseSubProblem):
def gen_data(self):
"""
Generates the samples according to gaussian distributions with scales
computed with the given error and class separation
:param view_index:
:return:
"""
if self.n_features<2:
raise ValueError("n_features for view {} must be at least 2, (now: {})".format(1, self.n_features))
self.view_name = "rings"
data = np.zeros((self.n_classes, max(self.n_samples_per_class), self.n_features))
class_sep = self.config["class_sep"]
vertices = (np.arange(self.n_classes)+2)*class_sep
if self.random_vertices == True:
selected_vertices = self.rs.choice(np.arange(len(vertices)),
self.n_classes,
replace=False)
else:
selected_vertices = np.arange(self.n_classes)
self.selected_vertices = vertices[selected_vertices]
radii = np.zeros((self.n_classes, max(self.n_samples_per_class)))
for class_ind, center_coord in enumerate(
self.selected_vertices):
error = self.errors[class_ind]
scale = ((class_sep/2) / math.sqrt(2)) * (1 /
erfinv(1 - 2*error))
radii[class_ind, :] = self.rs.normal(center_coord, scale,
self.n_samples_per_class[
class_ind])
first_angle = self.rs.uniform(low=0, high=2*math.pi, size=(self.n_samples_per_class[class_ind],1))
if self.n_features>2:
other_angles = self.rs.uniform(low=0, high=1, size=(self.n_samples_per_class[class_ind], self.n_features-2))
other_angles = np.arccos( 1 - 2 * other_angles)
angles = np.concatenate((other_angles, first_angle), axis=1)
else:
angles = first_angle
cartesian = np.array([to_cartesian(r, angle) for r, angle in zip(radii[class_ind], angles)])
data[class_ind, :, :] = cartesian
back_to_radii = np.sqrt(np.sum(np.square(cartesian), axis=1))
if class_ind>1 and class_ind<self.n_classes-1:
mis_described = np.unique(
np.where(np.logical_or(back_to_radii < vertices[class_ind]-(vertices[class_ind]-vertices[class_ind-1])/2, back_to_radii > vertices[class_ind]+(vertices[class_ind+1]-vertices[class_ind])/2))[0])
elif class_ind==0:
mis_described = np.unique(np.where(back_to_radii > vertices[class_ind]+(vertices[class_ind + 1] - vertices[class_ind]) / 2)[0])
else:
mis_described = np.unique(np.where(back_to_radii < vertices[class_ind]-(vertices[class_ind] - vertices[class_ind - 1]) / 2)[0])
well_described = np.array([ind for ind
in range(
self.n_samples_per_class[class_ind])
if ind not in mis_described])
self.bayes_error[class_ind] = mis_described.shape[0]
self.descriptions[class_ind, mis_described] = -1
self.descriptions[class_ind, well_described] = 1
return data
def gen_report(self):
base_str = BaseSubProblem.gen_report(self)
base_str += "\n\nThis view has {} features, all of them are relevant for classification.".format(
self.n_features)
base_str += "\n\n Its empirical bayesian classifier is any algorithm used with an RBF kernel."
return base_str
def get_bayes_classifier(self):
from sklearn.svm import SVC
return SVC(kernel='rbf', gamma=0.1, C=0.001)
def to_cartesian(radius, angles):
a = np.concatenate((np.array([2 * np.pi]), angles))
si = np.sin(a)
si[0] = 1
si = np.cumprod(si)
co = np.cos(a)
co = np.roll(co, -1)
return si * co * radius
\ No newline at end of file
import unittest
from ..gaussian_classes import MultiViewGaussianSubProblemsGenerator
class Test_MultiViewGaussianSubProblemsGenerator(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
@classmethod
def tearDownClass(cls):
pass
def test_simple(self):
gene = MultiViewGaussianSubProblemsGenerator()
import unittest
import numpy as np
from multiview_generator.multiple_sub_problems import MultiViewSubProblemsGenerator
from ..multiple_sub_problems import MultiViewSubProblemsGenerator
class Test_MultiViewSubProblemsGenerator(unittest.TestCase):
......
......@@ -14,6 +14,9 @@ def format_array(input, size, type_needed=int):
:return: a ``numpy.ndarray`` of shape (``size``, )
"""
if isinstance(input, type_needed):
if type_needed==str:
return [input for _ in range(size)]
else:
return np.zeros(size, dtype=type_needed) + input
elif isinstance(input, list) and isinstance(input[0], type_needed):
if size == len(input):
......@@ -54,7 +57,7 @@ def init_sub_problem_config(sub_problem_configs, n_views):
def init_error_matrix(error_matrix, n_classes, n_views):
if error_matrix is None:
error_matrix = np.zeros((n_classes, n_views)) + 0.5
error_matrix = np.zeros((n_classes, n_views)) + 0.3
elif isinstance(error_matrix, np.ndarray):
if error_matrix.shape != (n_classes, n_views):
raise ValueError("Confusion matrix must be of shape "
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment