Skip to content
Snippets Groups Projects
Commit 38625746 authored by Baptiste Bauvin's avatar Baptiste Bauvin
Browse files

Removed private algos

parent 01670fc0
No related branches found
No related tags found
No related merge requests found
Pipeline #3531 passed
Showing
with 0 additions and 2114 deletions
import logging
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_is_fitted
from ..metrics import zero_one_loss
from ..monoview.additions.BoostUtils import StumpsClassifiersGenerator, \
BaseBoost
from ..monoview.monoview_utils import CustomRandint, \
BaseMonoviewClassifier, change_label_to_minus, change_label_to_zero
classifier_class_name = "AdaboostGraalpy"
class AdaBoostGP(BaseEstimator, ClassifierMixin, BaseBoost):
"""Scikit-Learn compatible AdaBoost classifier. Original code by Pascal Germain, adapted by Jean-Francis Roy.
Parameters
----------
n_iterations : int, optional
The number of iterations of the algorithm. Defaults to 200.
iterations_to_collect_as_hyperparameters : list
Iteration numbers to collect while learning, that will be converted as hyperparameter values at evaluation time.
Defaults to None.
classifiers_generator : Transformer, optional
A transformer to convert input samples in voters' outputs. Default: Decision stumps transformer, with 10 stumps
per attributes.
callback_function : function, optional
A function to call at each iteration that is supplied learning information. Defaults to None.
n_stumps : int ( default : 10)
self_complemented : boolean (default : True
Attributes
----------
n_iterations : int, optional
The number of iterations of the algorithm. Defaults to 200.
iterations_to_collect_as_hyperparameters : list
Iteration numbers to collect while learning, that will be converted as hyperparameter values at evaluation time.
Defaults to None.
classifiers_generator : Transformer, optional
A transformer to convert input samples in voters' outputs. Default: Decision stumps transformer, with 10 stumps
per attributes.
callback_function : function, optional
A function to call at each iteration that is supplied learning information. Defaults to None.
"""
def __init__(self, n_iterations=200,
iterations_to_collect_as_hyperparameters=True,
classifiers_generator=None, callback_function=None,
n_stumps=10, self_complemented=True):
self.n_iterations = n_iterations
self.n_stumps = n_stumps
self.iterations_to_collect_as_hyperparameters = iterations_to_collect_as_hyperparameters
self.estimators_generator = classifiers_generator
self.callback_function = callback_function
self.self_complemented = self_complemented
def fit(self, X, y):
"""Fits the algorithm on training data.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
The input data.
y : ndarray of shape (n_samples, )
The input labels.
Returns
-------
self
"""
y_neg = change_label_to_minus(y)
if self.estimators_generator is None:
self.estimators_generator = StumpsClassifiersGenerator(
n_stumps_per_attribute=self.n_stumps,
self_complemented=self.self_complemented)
# Step 1: We fit the classifiers generator and get its classification matrix.
self.estimators_generator.fit(X, y_neg)
# hint: This is equivalent to construct a new X
classification_matrix = self._binary_classification_matrix(X)
n_samples, n_voters = classification_matrix.shape
# logging.debug("n_voters = {}".format(n_voters))
# Step 2: We initialize the weights on the samples and the weak classifiers.
sample_weights = np.ones(n_samples) / n_samples
alpha_weights = np.zeros(n_voters)
self.losses = []
# Step 3: We loop for each iteration.
self.collected_weight_vectors_ = []
for t in range(self.n_iterations):
# Step 4: We find the classifier that maximizes the success,
# weighted by the sample weights.
classifier_successes = np.dot(classification_matrix.T,
sample_weights * y_neg)
best_voter_index = np.argmax(classifier_successes)
success = classifier_successes[best_voter_index]
if success >= 1.0:
logging.info("AdaBoost stopped : perfect classifier found!")
self.weights_ = np.zeros(n_voters)
self.weights_[best_voter_index] = 1.0
return self
# Step 5: We calculate the alpha_t parameter and update the alpha weights.
alpha = 0.5 * np.log((1.0 + success) / (1.0 - success))
alpha_weights[best_voter_index] += alpha
# logging.debug("{} : {}".format(t, str(alpha)))
# Step 6: We update the sample weights.
sample_weights *= np.exp(
-1 * alpha * y_neg * classification_matrix[:, best_voter_index])
normalization_constant = sample_weights.sum()
sample_weights = sample_weights / normalization_constant
# We collect iteration information for later evaluation.
if self.iterations_to_collect_as_hyperparameters:
weights = alpha_weights / np.sum(alpha_weights)
self.collected_weight_vectors_.append(weights.copy())
loss = zero_one_loss.score(y_neg, np.sign(np.sum(
np.multiply(classification_matrix,
alpha_weights / np.sum(alpha_weights)), axis=1)))
self.losses.append(loss)
if self.callback_function is not None:
self.callback_function(t, alpha_weights, normalization_constant,
self.estimators_generator, self.weights_)
self.weights_ = alpha_weights / np.sum(alpha_weights)
self.losses = np.array(self.losses)
self.learner_info_ = {
'n_nonzero_weights': np.sum(self.weights_ > 1e-12)}
return self
def predict(self, X):
"""Predict inputs using the fit classifier.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
The data to classify.
Returns
-------
predictions : ndarray of shape (n_samples, )
The estimated labels.
"""
check_is_fitted(self, 'weights_')
classification_matrix = self._binary_classification_matrix(X)
if self.iterations_to_collect_as_hyperparameters:
self.test_preds = []
for weight_vector in self.collected_weight_vectors_:
preds = np.sum(np.multiply(classification_matrix,
weight_vector), axis=1)
self.test_preds.append(change_label_to_zero(np.sign(preds)))
self.test_preds = np.array(self.test_preds)
margins = np.squeeze(
np.asarray(np.dot(classification_matrix, self.weights_)))
return change_label_to_zero(
np.array([int(x) for x in np.sign(margins)]))
class AdaboostGraalpy(AdaBoostGP, BaseMonoviewClassifier):
"""AdaboostGraalpy
Parameters
----------
random_state : int seed, RandomState instance, or None (default=None)
The seed of the pseudo random number generator to use when
shuffling the data.
n_iterations : in number of iterations (default : 200)
n_stumps : int (default 1)
kwargs : others arguments
Attributes
----------
param_names :
distribs :
weird_strings :
n_stumps :
nbCores :
"""
def __init__(self, random_state=None, n_iterations=200, n_stumps=1,
**kwargs):
super(AdaboostGraalpy, self).__init__(
n_iterations=n_iterations,
n_stumps=n_stumps
)
self.param_names = ["n_iterations", "n_stumps", "random_state"]
self.distribs = [CustomRandint(low=1, high=500), [n_stumps],
[random_state]]
self.classed_params = []
self.weird_strings = {}
self.n_stumps = n_stumps
if "nbCores" not in kwargs:
self.nbCores = 1
else:
self.nbCores = kwargs["nbCores"]
# def canProbas(self):
# """
# Used to know if the classifier can return label probabilities
#
# Returns
# -------
# True in any case
# """
# return True
def getInterpret(self, directory, y_test):
"""
Parameters
----------
directory :
y_test :
Returns
-------
retur string of interpret
"""
np.savetxt(directory + "train_metrics.csv", self.losses, delimiter=',')
np.savetxt(directory + "y_test_step.csv", self.test_preds,
delimiter=',')
step_metrics = []
for step_index in range(self.test_preds.shape[0] - 1):
step_metrics.append(zero_one_loss.score(y_test,
self.test_preds[step_index,
:]))
step_metrics = np.array(step_metrics)
np.savetxt(directory + "step_test_metrics.csv", step_metrics,
delimiter=',')
return ""
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"n_iterations": args.AdG_n_iter,
# "n_stumps": args.AdG_stumps, }
# return kwargsDict
def paramsToSet(nIter, random_state):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({"n_iterations": random_state.randint(1, 500), })
return paramsSet
import numpy as np
from ..monoview.additions.BoostUtils import getInterpretBase
from ..monoview.additions.CQBoostUtils import ColumnGenerationClassifier
from ..monoview.monoview_utils import CustomUniform, CustomRandint, \
BaseMonoviewClassifier
classifier_class_name = "CQBoost"
class CQBoost(ColumnGenerationClassifier, BaseMonoviewClassifier):
def __init__(self, random_state=None, mu=0.01, epsilon=1e-06, n_stumps=1,
n_max_iterations=None, estimators_generator="Stumps",
max_depth=1, **kwargs):
super(CQBoost, self).__init__(
random_state=random_state,
mu=mu,
epsilon=epsilon,
estimators_generator=estimators_generator,
n_max_iterations=n_max_iterations,
max_depth=max_depth
)
self.param_names = ["mu", "epsilon", "n_stumps", "random_state",
"n_max_iterations", "estimators_generator",
"max_depth"]
self.distribs = [CustomUniform(loc=0.5, state=1.0, multiplier="e-"),
CustomRandint(low=1, high=15, multiplier="e-"),
[n_stumps], [random_state], [n_max_iterations],
["Stumps", "Trees"], CustomRandint(low=1, high=5)]
self.classed_params = []
self.weird_strings = {}
self.n_stumps = n_stumps
if "nbCores" not in kwargs:
self.nbCores = 1
else:
self.nbCores = kwargs["nbCores"]
# def canProbas(self):
# """Used to know if the classifier can return label probabilities"""
# return False
def getInterpret(self, directory, y_test):
np.savetxt(directory + "train_metrics.csv", self.train_metrics,
delimiter=',')
np.savetxt(directory + "c_bounds.csv", self.c_bounds,
delimiter=',')
np.savetxt(directory + "y_test_step.csv", self.step_decisions,
delimiter=',')
step_metrics = []
for step_index in range(self.step_decisions.shape[1] - 1):
step_metrics.append(self.plotted_metric.score(y_test,
self.step_decisions[:,
step_index]))
step_metrics = np.array(step_metrics)
np.savetxt(directory + "step_test_metrics.csv", step_metrics,
delimiter=',')
return getInterpretBase(self, directory, "CQBoost", self.weights_,
y_test)
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"mu": args.CQB_mu,
# "epsilon": args.CQB_epsilon,
# "n_stumps": args.CQB_stumps,
# "n_max_iterations": args.CQB_n_iter}
# return kwargsDict
def paramsToSet(nIter, randomState):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({"mu": 10 ** -randomState.uniform(0.5, 1.5),
"epsilon": 10 ** -randomState.randint(1, 15)})
return paramsSet
import numpy as np
from ..monoview.additions.BoostUtils import StumpsClassifiersGenerator
from ..monoview.additions.MinCQUtils import RegularizedBinaryMinCqClassifier
from ..monoview.monoview_utils import BaseMonoviewClassifier, CustomUniform
classifier_class_name = "MinCQGraalpy"
class MinCQGraalpy(RegularizedBinaryMinCqClassifier, BaseMonoviewClassifier):
"""
MinCQGraalpy extend of ``RegularizedBinaryMinCqClassifier ``
Parameters
----------
random_state : int seed, RandomState instance, or None (default=None)
The seed of the pseudo random number generator to use when
shuffling the data.
mu : float, (default: 0.01)
self_complemented : bool (default : True)
n_stumps_per_attribute : (default: =1
kwargs : others arguments
Attributes
----------
param_names
distribs
n_stumps_per_attribute
classed_params
weird_strings
nbCores : number of cores
"""
def __init__(self, random_state=None, mu=0.01, self_complemented=True,
n_stumps_per_attribute=1, **kwargs):
super(MinCQGraalpy, self).__init__(mu=mu,
estimators_generator=StumpsClassifiersGenerator(
n_stumps_per_attribute=n_stumps_per_attribute,
self_complemented=self_complemented),
)
self.param_names = ["mu", "n_stumps_per_attribute", "random_state"]
self.distribs = [CustomUniform(loc=0.05, state=2.0, multiplier="e-"),
[n_stumps_per_attribute], [random_state]]
self.n_stumps_per_attribute = n_stumps_per_attribute
self.classed_params = []
self.weird_strings = {}
self.random_state = random_state
if "nbCores" not in kwargs:
self.nbCores = 1
else:
self.nbCores = kwargs["nbCores"]
# def canProbas(self):
# """
# Used to know if the classifier can return label probabilities
# Returns
# -------
# False
# """
# return False
def set_params(self, **params):
"""
set parameter 'self.mu', 'self.random_state
'self.n_stumps_per_attribute
Parameters
----------
params
Returns
-------
self : object
Returns self.
"""
self.mu = params["mu"]
self.random_state = params["random_state"]
self.n_stumps_per_attribute = params["n_stumps_per_attribute"]
return self
def get_params(self, deep=True):
"""
Parameters
----------
deep : bool (default : true) not used
Returns
-------
dictianary with "random_state", "mu", "n_stumps_per_attribute"
"""
return {"random_state": self.random_state, "mu": self.mu,
"n_stumps_per_attribute": self.n_stumps_per_attribute}
def getInterpret(self, directory, y_test):
"""
Parameters
----------
directory
y_test
Returns
-------
string of interpret_string
"""
interpret_string = "Cbound on train :" + str(self.train_cbound)
np.savetxt(directory + "times.csv", np.array([self.train_time, 0]))
# interpret_string += "Train C_bound value : "+str(self.cbound_train)
# y_rework = np.copy(y_test)
# y_rework[np.where(y_rework==0)] = -1
# interpret_string += "\n Test c_bound value : "+str(self.majority_vote.cbound_value(self.x_test, y_rework))
return interpret_string
def get_name_for_fusion(self):
return "MCG"
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"mu": args.MCG_mu,
# "n_stumps_per_attribute": args.MCG_stumps}
# return kwargsDict
def paramsToSet(nIter, random_state):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({})
return paramsSet
import numpy as np
from ..monoview.additions.BoostUtils import TreeClassifiersGenerator
from ..monoview.additions.MinCQUtils import RegularizedBinaryMinCqClassifier
from ..monoview.monoview_utils import BaseMonoviewClassifier, CustomUniform
classifier_class_name = "MinCQGraalpyTree"
class MinCQGraalpyTree(RegularizedBinaryMinCqClassifier,
BaseMonoviewClassifier):
"""
Parameters
----------
random_state :
mu : (default : 0.01)
self_complemented : ( default : True)
n_stumps_per_attribute : int ( default : 1)
max_depth :
kwargs : others parameters
Attributes
----------
param_name :
distribs :
classed_params :
n_stumps_per_attribute : int
weird_strings :
max_depth :
random_state :
nbCores :
"""
def __init__(self, random_state=None, mu=0.01, self_complemented=True,
n_stumps_per_attribute=1, max_depth=2, **kwargs):
super(MinCQGraalpyTree, self).__init__(mu=mu,
estimators_generator=TreeClassifiersGenerator(
n_trees=n_stumps_per_attribute,
max_depth=max_depth,
self_complemented=self_complemented),
)
self.param_names = ["mu", "n_stumps_per_attribute", "random_state",
"max_depth"]
self.distribs = [CustomUniform(loc=0.05, state=2.0, multiplier="e-"),
[n_stumps_per_attribute], [random_state], [max_depth]]
self.n_stumps_per_attribute = n_stumps_per_attribute
self.classed_params = []
self.weird_strings = {}
self.max_depth = max_depth
self.random_state = random_state
if "nbCores" not in kwargs:
self.nbCores = 1
else:
self.nbCores = kwargs["nbCores"]
# def canProbas(self):
# """
# Used to know if the classifier can return label probabilities
#
# Returns
# -------
# True
# """
# return True
def set_params(self, **params):
"""
set parameter in the input dictionary
Parameters
----------
params : dict parameter to set
Returns
-------
self : object
Returns self.
"""
self.mu = params["mu"]
self.random_state = params["random_state"]
self.n_stumps_per_attribute = params["n_stumps_per_attribute"]
self.max_depth = params["max_depth"]
return self
def get_params(self, deep=True):
"""
get parameter
Parameters
----------
deep : (boolean (default : True) not used
Returns
-------
dictionary of parameter as key and its values
"""
return {"random_state": self.random_state, "mu": self.mu,
"n_stumps_per_attribute": self.n_stumps_per_attribute,
"max_depth": self.max_depth}
def getInterpret(self, directory, y_test):
"""
Parameters
----------
directory :
y_test :
Returns
-------
string for interpretation interpret_string
"""
interpret_string = "Cbound on train :" + str(self.train_cbound)
np.savetxt(directory + "times.csv", np.array([self.train_time, 0]))
# interpret_string += "Train C_bound value : "+str(self.cbound_train)
# y_rework = np.copy(y_test)
# y_rework[np.where(y_rework==0)] = -1
# interpret_string += "\n Test c_bound value : "+str(self.majority_vote.cbound_value(self.x_test, y_rework))
return interpret_string
def get_name_for_fusion(self):
return "MCG"
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"mu": args.MCGT_mu,
# "n_stumps_per_attribute": args.MCGT_trees,
# "max_depth": args.MCGT_max_depth}
# return kwargsDict
def paramsToSet(nIter, randomState):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({})
return paramsSet
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..monoview.monoview_utils import CustomRandint, CustomUniform, \
BaseMonoviewClassifier
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
# class DecisionStumpSCMNew(scm, BaseEstimator, ClassifierMixin):
# """docstring for SCM
# A hands on class of SCM using decision stump, built with sklearn format in order to use sklearn function on SCM like
# CV, gridsearch, and so on ..."""
#
# def __init__(self, model_type='conjunction', p=0.1, max_rules=10, random_state=42):
# super(DecisionStumpSCMNew, self).__init__(model_type=model_type, max_rules=max_rules, p=p, random_state=random_state)
# # self.model_type = model_type
# # self.p = p
# # self.max_rules = max_rules
# # self.random_state = random_state
# # self.clf = scm(model_type=self.model_type, max_rules=self.max_rules, p=self.p, random_state=self.random_state)
#
# # def fit(self, X, y):
# # print(self.clf.model_type)
# # self.clf.fit(X=X, y=y)
# #
# # def predict(self, X):
# # return self.clf.predict(X)
# #
# # def set_params(self, **params):
# # for key, value in iteritems(params):
# # if key == 'p':
# # self.p = value
# # if key == 'model_type':
# # self.model_type = value
# # if key == 'max_rules':
# # self.max_rules = value
#
# # def get_stats(self):
# # return {"Binary_attributes": self.clf.model_.rules}
classifier_class_name = "SCM"
class SCM(scm, BaseMonoviewClassifier):
"""
SCM Classifier
Parameters
----------
random_state (default : None)
model_type : string (default: "conjunction")
max_rules : int number maximum of rules (default : 10)
p : float value(default : 0.1 )
kwarg : others arguments
Attributes
----------
param_names
distribs
classed_params
weird_strings
"""
def __init__(self, random_state=None, model_type="conjunction",
max_rules=10, p=0.1, **kwargs):
"""
Parameters
----------
random_state
model_type
max_rules
p
kwargs
"""
super(SCM, self).__init__(
random_state=random_state,
model_type=model_type,
max_rules=max_rules,
p=p
)
self.param_names = ["model_type", "max_rules", "p", "random_state"]
self.distribs = [["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [random_state]]
self.classed_params = []
self.weird_strings = {}
# def canProbas(self):
# """
# Used to know if the classifier can return label probabilities
#
# Returns
# -------
# return False in any case
# """
# return False
def getInterpret(self, directory, y_test):
interpretString = "Model used : " + str(self.model_)
return interpretString
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"model_type": args.SCM_model_type,
# "p": args.SCM_p,
# "max_rules": args.SCM_max_rules}
# return kwargsDict
def paramsToSet(nIter, random_state):
paramsSet = []
for _ in range(nIter):
paramsSet.append(
{"model_type": random_state.choice(["conjunction", "disjunction"]),
"max_rules": random_state.randint(1, 15),
"p": random_state.random_sample()})
return paramsSet
import os
import numpy as np
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..monoview.additions.PregenUtils import PregenClassifier
from ..monoview.monoview_utils import CustomRandint, CustomUniform, \
BaseMonoviewClassifier
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
classifier_class_name = "SCMPregen"
class SCMPregen(BaseMonoviewClassifier, PregenClassifier, scm):
"""
Parameters
----------
random_state : int seed, RandomState instance, or None (default=None)
The seed of the pseudo random number generator to use when
shuffling the data.
model_type
max_rules
p
n_stumps
self_complemented
estimators_generator
max_depth
kwargs
Attributes
----------
param_names
distribs
classed_params
weird_strings
self_complemented
n_stumps
estimators_generator
max_depth
"""
def __init__(self, random_state=None, model_type="conjunction",
max_rules=10, p=0.1, n_stumps=10, self_complemented=True,
estimators_generator="Stumps", max_depth=1, **kwargs):
super(SCMPregen, self).__init__(
random_state=random_state,
model_type=model_type,
max_rules=max_rules,
p=p
)
self.param_names = ["model_type", "max_rules", "p", "n_stumps",
"random_state", "estimators_generator", "max_depth"]
self.distribs = [["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [n_stumps],
[random_state], ["Stumps", "Tree"],
CustomRandint(low=1, high=5)]
self.classed_params = []
self.weird_strings = {}
self.self_complemented = self_complemented
self.n_stumps = n_stumps
self.estimators_generator = estimators_generator
self.max_depth=1
def get_params(self, deep=True):
"""
Parameters
----------
deep : boolean (default : True) not used
Returns
-------
parameters dictionary
"""
params = super(SCMPregen, self).get_params(deep)
params["estimators_generator"] = self.estimators_generator
params["max_depth"] = self.max_depth
params["n_stumps"] = self.n_stumps
return params
def fit(self, X, y, tiebreaker=None, iteration_callback=None,
**fit_params):
"""
fit function
Parameters
----------
X {array-like, sparse matrix}, shape (n_samples, n_features)
For kernel="precomputed", the expected shape of X is
(n_samples_test, n_samples_train).
y : { array-like, shape (n_samples,)
Target values class labels in classification
tiebreaker
iteration_callback : (default : None)
fit_params : others parameters
Returns
-------
self : object
Returns self.
"""
pregen_X, _ = self.pregen_voters(X, y)
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
super(SCMPregen, self).fit(place_holder, y, tiebreaker=tiebreaker,
iteration_callback=iteration_callback,
**fit_params)
return self
def predict(self, X):
"""
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Training vectors, where n_samples is the number of samples
and n_features is the number of features.
For kernel="precomputed", the expected shape of X is
(n_samples, n_samples).
Returns
-------
y_pred : array, shape (n_samples,)
"""
pregen_X, _ = self.pregen_voters(X)
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
return self.classes_[self.model_.predict(place_holder)]
# def canProbas(self):
# """
# Used to know if the classifier can return label probabilities
# Returns
# -------
# False in any case
# """
#
# return False
def getInterpret(self, directory, y_test):
"""
Parameters
----------
directory
y_test
Returns
-------
interpret_string string of interpretation
"""
interpret_string = "Model used : " + str(self.model_)
return interpret_string
# def formatCmdArgs(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {"model_type": args.SCP_model_type,
# "p": args.SCP_p,
# "max_rules": args.SCP_max_rules,
# "n_stumps": args.SCP_stumps}
# return kwargsDict
def paramsToSet(nIter, randomState):
paramsSet = []
for _ in range(nIter):
paramsSet.append(
{"model_type": randomState.choice(["conjunction", "disjunction"]),
"max_rules": randomState.randint(1, 15),
"p": randomState.random_sample()})
return paramsSet
from . import fat_late_fusion, analyze_results
\ No newline at end of file
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule)
\ No newline at end of file
import numpy as np
from ...utils.multiclass import isBiclass, genMulticlassMonoviewDecision
def genName(config):
return "fat_late_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["fat_late_fusion"] = ["take_everything"]
return benchmark
def getArgs(args, benchmark, views, views_indices, randomState, directory, resultsMonoview, classificationIndices):
argumentsList = []
multiclass_preds = [monoviewResult.y_test_multiclass_pred for monoviewResult in resultsMonoview]
if isBiclass(multiclass_preds):
monoviewDecisions = np.array([monoviewResult.full_labels_pred for monoviewResult in resultsMonoview])
else:
monoviewDecisions = np.array([genMulticlassMonoviewDecision(monoviewResult, classificationIndices) for monoviewResult in resultsMonoview])
if len(args.FLF_weights) == 0:
weights = [1.0 for _ in range(monoviewDecisions.shape[0])]
else:
weights = args.FLF_weights
arguments = {"CL_type": "fat_late_fusion",
"views": views,
"NB_VIEW": len(resultsMonoview),
"views_indices": range(len(resultsMonoview)),
"NB_CLASS": len(args.CL_classes),
"LABELS_NAMES": args.CL_classes,
"FatLateFusionKWARGS": {
"monoviewDecisions": monoviewDecisions,
"weights": weights
}
}
argumentsList.append(arguments)
return argumentsList
def genParamsSets(classificationKWARGS, randomState, nIter=1):
"""Used to generate parameters sets for the random hyper parameters optimization function"""
nbMonoviewClassifiers = len(classificationKWARGS["monoviewDecisions"])
weights = [randomState.random_sample(nbMonoviewClassifiers) for _ in range(nIter)]
nomralizedWeights = [[weightVector/np.sum(weightVector)] for weightVector in weights]
return nomralizedWeights
class FatLateFusionClass:
def __init__(self, randomState, NB_CORES=1, **kwargs):
if kwargs["weights"] == []:
self.weights = [1.0/len(["monoviewDecisions"]) for _ in range(len(["monoviewDecisions"]))]
else:
self.weights = np.array(kwargs["weights"])/np.sum(np.array(kwargs["weights"]))
self.monoviewDecisions = kwargs["monoviewDecisions"]
def setParams(self, paramsSet):
self.weights = paramsSet[0]
def fit_hdf5(self, DATASET, labels, trainIndices=None, views_indices=None, metric=["f1_score", None]):
pass
def predict_hdf5(self, DATASET, usedIndices=None, views_indices=None):
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
votes = np.zeros((len(usedIndices), DATASET.get("Metadata").attrs["nbClass"]), dtype=float)
for usedIndex, exampleIndex in enumerate(usedIndices):
for monoviewDecisionIndex, monoviewDecision in enumerate(self.monoviewDecisions):
votes[usedIndex, monoviewDecision[exampleIndex]] += self.weights[monoviewDecisionIndex]
predictedLabels = np.argmax(votes, axis=1)
return predictedLabels
def predict_probas_hdf5(self, DATASET, usedIndices=None):
pass
def getConfigString(self, classificationKWARGS):
return "weights : "+", ".join(map(str, list(self.weights)))
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = ''
return stringAnalysis
from . import fat_scm_late_fusion, analyze_results
\ No newline at end of file
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule)
\ No newline at end of file
import numpy as np
from pyscm.scm import SetCoveringMachineClassifier as scm
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.externals.six import iteritems
from ...utils.multiclass import isBiclass, genMulticlassMonoviewDecision
def genName(config):
return "fat_scm_late_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["fat_scm_late_fusion"] = ["take_everything"]
return benchmark
def getArgs(args, benchmark, views, views_indices, random_state, directory, resultsMonoview, classificationIndices):
argumentsList = []
multiclass_preds = [monoviewResult.y_test_multiclass_pred for monoviewResult in resultsMonoview]
if isBiclass(multiclass_preds):
monoviewDecisions = np.array([monoviewResult.full_labels_pred for monoviewResult in resultsMonoview])
else:
monoviewDecisions = np.array([genMulticlassMonoviewDecision(monoviewResult, classification_indices) for monoviewResult in resultsMonoview])
monoviewDecisions = np.transpose(monoviewDecisions)
#monoviewDecisions = np.transpose(np.array([monoviewResult[1][3] for monoviewResult in resultsMonoview]))
arguments = {"CL_type": "fat_scm_late_fusion",
"views": ["all"],
"NB_VIEW": len(resultsMonoview),
"views_indices": range(len(resultsMonoview)),
"NB_CLASS": len(args.CL_classes),
"LABELS_NAMES": args.CL_classes,
"FatSCMLateFusionKWARGS": {
"monoviewDecisions": monoviewDecisions,
"p": args.FSCMLF_p,
"max_attributes": args.FSCMLF_max_attributes,
"model":args.FSCMLF_model,
}
}
argumentsList.append(arguments)
return argumentsList
def genParamsSets(classificationKWARGS, random_state, nIter=1):
"""Used to generate parameters sets for the random hyper parameters optimization function"""
paramsSets = []
for _ in range(nIter):
max_attributes = random_state.randint(1, 20)
p = random_state.random_sample()
model = random_state.choice(["conjunction", "disjunction"])
paramsSets.append([p, max_attributes, model])
return paramsSets
class FatSCMLateFusionClass:
def __init__(self, random_state, NB_CORES=1, **kwargs):
if kwargs["p"]:
self.p = kwargs["p"]
else:
self.p = 0.5
if kwargs["max_attributes"]:
self.max_attributes = kwargs["max_attributes"]
else:
self.max_attributes = 5
if kwargs["model"]:
self.model = kwargs["model"]
else:
self.model = "conjunction"
self.monoviewDecisions = kwargs["monoviewDecisions"]
self.random_state = random_state
def setParams(self, paramsSet):
self.p = paramsSet[0]
self.max_attributes = paramsSet[1]
self.model = paramsSet[2]
def fit_hdf5(self, DATASET, labels, trainIndices=None, views_indices=None, metric=["f1_score", None]):
features = self.monoviewDecisions[trainIndices]
self.SCMClassifier = DecisionStumpSCMNew(p=self.p, max_rules=self.max_attributes, model_type=self.model,
random_state=self.random_state)
self.SCMClassifier.fit(features, labels[trainIndices].astype(int))
def predict_hdf5(self, DATASET, usedIndices=None, views_indices=None):
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
predictedLabels = self.SCMClassifier.predict(self.monoviewDecisions[usedIndices])
return predictedLabels
def predict_probas_hdf5(self, DATASET, usedIndices=None):
pass
def getConfigString(self, classificationKWARGS):
return "p : "+str(self.p)+", max_aributes : "+str(self.max_attributes)+", model : "+self.model
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = 'Rules used : ' + str(self.SCMClassifier.clf.model_)
return stringAnalysis
class DecisionStumpSCMNew(BaseEstimator, ClassifierMixin):
"""docstring for SCM
A hands on class of SCM using decision stump, built with sklearn format in order to use sklearn function on SCM like
CV, gridsearch, and so on ..."""
def __init__(self, model_type='conjunction', p=0.1, max_rules=10, random_state=42):
super(DecisionStumpSCMNew, self).__init__()
self.model_type = model_type
self.p = p
self.max_rules = max_rules
self.random_state = random_state
def fit(self, X, y):
self.clf = scm(model_type=self.model_type, max_rules=self.max_rules, p=self.p, random_state=self.random_state)
self.clf.fit(X=X, y=y)
def predict(self, X):
return self.clf.predict(X)
def set_params(self, **params):
for key, value in iteritems(params):
if key == 'p':
self.p = value
if key == 'model_type':
self.model_type = value
if key == 'max_rules':
self.max_rules = value
def get_stats(self):
return {"Binary_attributes": self.clf.model_.rules}
from sklearn.tree import DecisionTreeClassifier
from multimodalboost.mumbo import MumboClassifier
from ..multiview.multiview_utils import BaseMultiviewClassifier, \
get_examples_views_indices
from ..utils.hyper_parameter_search import CustomRandint
classifier_class_name = "Mumbo"
class Mumbo(BaseMultiviewClassifier, MumboClassifier):
def __init__(self, base_estimator=None,
n_estimators=50,
random_state=None,
best_view_mode="edge"):
super().__init__(random_state)
super(BaseMultiviewClassifier, self).__init__(base_estimator=base_estimator,
n_estimators=n_estimators,
random_state=random_state,
best_view_mode=best_view_mode)
self.param_names = ["base_estimator", "n_estimators", "random_state", "best_view_mode"]
self.distribs = [[DecisionTreeClassifier(max_depth=1)],
CustomRandint(5,200), [random_state], ["edge", "error"]]
def fit(self, X, y, train_indices=None, view_indices=None):
train_indices, view_indices = get_examples_views_indices(X,
train_indices,
view_indices)
numpy_X, view_limits = X.to_numpy_array(example_indices=train_indices,
view_indices=view_indices)
return super(Mumbo, self).fit(numpy_X, y[train_indices],
view_limits)
def predict(self, X, example_indices=None, view_indices=None):
example_indices, view_indices = get_examples_views_indices(X,
example_indices,
view_indices)
numpy_X, view_limits = X.to_numpy_array(example_indices=example_indices,
view_indices=view_indices)
return super(Mumbo, self).predict(numpy_X)
from . import analyze_results, pseudo_cq_fusion
\ No newline at end of file
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule)
\ No newline at end of file
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions import \
diversity_utils
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.difficulty_fusion_old import difficulty
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.double_fault_fusion_old import doubleFault
def genName(config):
return "pseudo_cq_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["pseudo_cq_fusion"] = ["take_everything"]
return benchmark
def pseudoCQ(difficulty, doubleFlaut):
return difficulty/float(doubleFlaut)
def getArgs(args, benchmark, views, views_indices, randomState, directory, resultsMonoview, classificationIndices):
return diversity_utils.getArgs(args, benchmark, views,
views_indices, randomState, directory,
resultsMonoview, classificationIndices,
[doubleFault, difficulty], "pseudo_cq_fusion")
def genParamsSets(classificationKWARGS, randomState, nIter=1):
return diversity_utils.genParamsSets(classificationKWARGS, randomState, nIter=nIter)
class PseudoCQFusionClass(diversity_utils.DiversityFusionClass):
def __init__(self, randomState, NB_CORES=1, **kwargs):
diversity_utils.DiversityFusionClass.__init__(self, randomState, NB_CORES=1, **kwargs)
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = "Classifiers used for each view : " + ', '.join(self.classifiers_names) +\
', with a pseudo CQ of ' + str(self.div_measure)
return stringAnalysis
\ No newline at end of file
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.externals.six import iteritems
import itertools
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..multiview_classifiers.additions.late_fusion_utils import \
LateFusionClassifier
from ..multiview.multiview_utils import get_examples_views_indices
from ..monoview.monoview_utils import CustomRandint, CustomUniform
classifier_class_name = "SCMLateFusionClassifier"
class DecisionStumpSCMNew(BaseEstimator, ClassifierMixin):
"""docstring for SCM
A hands on class of SCM using decision stump, built with sklearn format in order to use sklearn function on SCM like
CV, gridsearch, and so on ..."""
def __init__(self, model_type='conjunction', p=0.1, max_rules=10, random_state=42):
super(DecisionStumpSCMNew, self).__init__()
self.model_type = model_type
self.p = p
self.max_rules = max_rules
self.random_state = random_state
def fit(self, X, y):
self.clf = scm(model_type=self.model_type, max_rules=self.max_rules, p=self.p, random_state=self.random_state)
self.clf.fit(X=X, y=y)
def predict(self, X):
return self.clf.predict(X)
def set_params(self, **params):
for key, value in iteritems(params):
if key == 'p':
self.p = value
if key == 'model_type':
self.model_type = value
if key == 'max_rules':
self.max_rules = value
def get_stats(self):
return {"Binary_attributes": self.clf.model_.rules}
class SCMLateFusionClassifier(LateFusionClassifier):
def __init__(self, random_state=None, classifier_names=None,
classifier_configs=None, nb_cores=1,
p=1, max_rules=5, order=1, model_type="conjunction", weights=None):
self.need_probas=False
super(SCMLateFusionClassifier, self).__init__(random_state=random_state,
classifier_names=classifier_names,
classifier_configs=classifier_configs,
nb_cores=nb_cores
)
self.scm_classifier = None
self.p = p
self.max_rules = max_rules
self.order = order
self.model_type = model_type
self.param_names+=["model_type", "max_rules", "p", "order"]
self.distribs+=[["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [1,2,3]]
def fit(self, X, y, train_indices=None, view_indices=None):
super(SCMLateFusionClassifier, self).fit(X, y,
train_indices=train_indices,
view_indices=view_indices)
self.scm_fusion_fit(X, y, train_indices=train_indices, view_indices=view_indices)
return self
def predict(self, X, example_indices=None, view_indices=None):
example_indices, view_indices = get_examples_views_indices(X,
example_indices,
view_indices)
monoview_decisions = np.zeros((len(example_indices), X.nb_view),
dtype=int)
for index, view_index in enumerate(view_indices):
monoview_decision = self.monoview_estimators[index].predict(
X.get_v(view_index, example_indices))
monoview_decisions[:, index] = monoview_decision
features = self.generate_interactions(monoview_decisions)
predicted_labels = self.scm_classifier.predict(features)
return predicted_labels
def scm_fusion_fit(self, X, y, train_indices=None, view_indices=None):
train_indices, view_indices = get_examples_views_indices(X, train_indices, view_indices)
self.scm_classifier = DecisionStumpSCMNew(p=self.p, max_rules=self.max_rules, model_type=self.model_type,
random_state=self.random_state)
monoview_decisions = np.zeros((len(train_indices), X.nb_view), dtype=int)
for index, view_index in enumerate(view_indices):
monoview_decisions[:, index] = self.monoview_estimators[index].predict(
X.get_v(view_index, train_indices))
features = self.generate_interactions(monoview_decisions)
features = np.array([np.array([feat for feat in feature])
for feature in features])
self.scm_classifier.fit(features, y[train_indices].astype(int))
def generate_interactions(self, monoview_decisions):
if self.order is None:
self.order = monoview_decisions.shape[1]
if self.order == 1:
return monoview_decisions
else:
genrated_intercations = [monoview_decisions[:, i]
for i in range(monoview_decisions.shape[1])]
for order_index in range(self.order - 1):
combins = itertools.combinations(range(monoview_decisions.shape[1]),
order_index + 2)
for combin in combins:
generated_decision = monoview_decisions[:, combin[0]]
for index in range(len(combin) - 1):
if self.model_type == "disjunction":
generated_decision = np.logical_and(generated_decision,
monoview_decisions[:, combin[index + 1]])
else:
generated_decision = np.logical_or(generated_decision,
monoview_decisions[:, combin[index + 1]])
genrated_intercations.append(generated_decision)
return np.transpose(np.array(genrated_intercations))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment