Skip to content
Snippets Groups Projects
Commit d1a72db4 authored by Baptiste Bauvin's avatar Baptiste Bauvin
Browse files

Monoview passing

parent 69b80a6e
Branches
Tags
No related merge requests found
Showing
with 20 additions and 819 deletions
......@@ -669,3 +669,5 @@ def execClassif(arguments):
NB_CLASS, DATASET)
noise_results.append([noise_std, results_mean_stds])
plot_results_noise(directory, noise_results, metrics[0][0], name)
......@@ -17,7 +17,7 @@ from ... import Metrics
# Used for QarBoost and CGreed
class ColumnGenerationClassifierQar(BaseEstimator, ClassifierMixin, BaseBoost):
def __init__(self, n_max_iterations=None, estimators_generator=None,
def __init__(self, n_max_iterations=None, estimators_generator="Stumps",
random_state=42, self_complemented=True, twice_the_same=False,
c_bound_choice=True, random_start=True,
n_stumps=1, use_r=True, c_bound_sol=True,
......
......@@ -17,7 +17,7 @@ from ... import Metrics
class ColumnGenerationClassifier(BaseEstimator, ClassifierMixin, BaseBoost):
def __init__(self, mu=0.01, epsilon=1e-06, n_max_iterations=100,
estimators_generator=None, dual_constraint_rhs=0,
estimators_generator="Stumps", dual_constraint_rhs=0,
save_iteration_as_hyperparameter_each=None, random_state=None):
super(ColumnGenerationClassifier, self).__init__()
self.epsilon = epsilon
......@@ -78,6 +78,7 @@ class ColumnGenerationClassifier(BaseEstimator, ClassifierMixin, BaseBoost):
h_values = ma.array(
np.squeeze(np.array((alpha).T.dot(y_kernel_matrix).T)),
fill_value=-np.inf)
h_values[self.chosen_columns_] = ma.masked
worst_h_index = ma.argmax(h_values)
......
......@@ -4,6 +4,7 @@ import matplotlib.pyplot as plt
import numpy as np
from matplotlib.ticker import FuncFormatter
from scipy.stats import uniform, randint
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import RandomizedSearchCV
from .. import Metrics
......@@ -134,7 +135,7 @@ class CustomUniform:
return unif
class BaseMonoviewClassifier(object):
class BaseMonoviewClassifier(BaseEstimator, ClassifierMixin):
def genBestParams(self, detector):
return dict(
......
import numpy as np
from ..Monoview.Additions.BoostUtils import getInterpretBase
from ..Monoview.Additions.CQBoostUtils import ColumnGenerationClassifier
from ..Monoview.MonoviewUtils import CustomRandint, CustomUniform, \
BaseMonoviewClassifier
class ColumnGenerationClassifierv2(ColumnGenerationClassifier):
def __init__(self, mu=0.01, epsilon=1e-06, random_state=None):
super(ColumnGenerationClassifierv2, self).__init__(mu=mu,
epsilon=epsilon,
random_state=random_state)
def initialize(self):
self.weights_ = []
self.edge_scores = []
self.alphas = []
def update_values(self, h_values=None, worst_h_index=None, alpha=None,
w=None):
self.edge_scores.append(h_values[worst_h_index])
self.alphas.append(alpha)
self.weights_.append(w[-1])
def get_margins(self, w=None):
self.weights = np.array(self.weights_)
self.final_vote_weights = np.array(
[np.prod(1 - self.weights[t + 1:]) * self.weights_[t] if t <
self.weights.shape[
0] - 1 else
self.weights[t] for t in range(self.weights.shape[0])])
margins = np.squeeze(np.asarray(
np.matmul(self.classification_matrix[:, self.chosen_columns_],
self.final_vote_weights)))
return margins
def compute_weights_(self, w=None):
self.weights_ = np.array(self.weights_)
self.final_vote_weights = np.array(
[np.prod(1 - self.weights_[t + 1:]) * self.weights_[t] if t <
self.weights_.shape[
0] - 1 else
self.weights_[t] for t in range(self.weights_.shape[0])])
self.weights_ = self.final_vote_weights
def get_matrix_to_optimize(self, y_kernel_matrix, w=None):
m = self.n_total_examples
if w is not None:
matrix_to_optimize = np.concatenate(
(np.matmul(self.matrix_to_optimize, w).reshape((m, 1)),
y_kernel_matrix[:, self.chosen_columns_[-1]].reshape((m, 1))),
axis=1)
else:
matrix_to_optimize = y_kernel_matrix[:,
self.chosen_columns_[-1]].reshape((m, 1))
return matrix_to_optimize
class CQBoostv2(ColumnGenerationClassifierv2, BaseMonoviewClassifier):
def __init__(self, random_state=None, mu=0.01, epsilon=1e-06, **kwargs):
super(CQBoostv2, self).__init__(
random_state=random_state,
mu=mu,
epsilon=epsilon
)
self.param_names = ["mu", "epsilon"]
self.distribs = [CustomUniform(loc=0.5, state=1.0, multiplier="e-"),
CustomRandint(low=1, high=15, multiplier="e-")]
self.classed_params = []
self.weird_strings = {}
def canProbas(self):
"""Used to know if the classifier can return label probabilities"""
return True
def getInterpret(self, directory, y_test):
return getInterpretBase(self, directory, "CQBoostv2", self.weights_, )
def get_name_for_fusion(self):
return "CQB2"
def formatCmdArgs(args):
"""Used to format kwargs for the parsed args"""
kwargsDict = {"mu": args.CQB_mu,
"epsilon": args.CQB_epsilon}
return kwargsDict
def paramsToSet(nIter, randomState):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({"mu": 10 ** -randomState.uniform(0.5, 1.5),
"epsilon": 10 ** -randomState.randint(1, 15)})
return paramsSet
# class CQBoostv2(CqBoostClassifierv2):
#
# def __init__(self, random_state, **kwargs):
# super(CQBoostv2, self).__init__(
# mu=kwargs['mu'],
# epsilon=kwargs['epsilon'],
# n_max_iterations= kwargs['n_max_iterations'],
# )
#
# def canProbas(self):
# """Used to know if the classifier can return label probabilities"""
# return False
#
# def paramsToSrt(self, nIter=1):
# """Used for weighted linear early fusion to generate random search sets"""
# paramsSet = []
# for _ in range(nIter):
# paramsSet.append({"mu": 0.001,
# "epsilon": 1e-08,
# "n_max_iterations": None})
# return paramsSet
#
# def getKWARGS(self, args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {}
# kwargsDict['mu'] = 0.001
# kwargsDict['epsilon'] = 1e-08
# kwargsDict['n_max_iterations'] = None
# return kwargsDict
#
# def genPipeline(self):
# return Pipeline([('classifier', CqBoostClassifierv2())])
#
# def genParamsDict(self, randomState):
# return {"classifier__mu": [0.001],
# "classifier__epsilon": [1e-08],
# "classifier__n_max_iterations": [None]}
#
# def genBestParams(self, detector):
# return {"mu": detector.best_params_["classifier__mu"],
# "epsilon": detector.best_params_["classifier__epsilon"],
# "n_max_iterations": detector.best_params_["classifier__n_max_iterations"]}
#
# def genParamsFromDetector(self, detector):
# nIter = len(detector.cv_results_['param_classifier__mu'])
# return [("mu", np.array([0.001 for _ in range(nIter)])),
# ("epsilon", np.array(detector.cv_results_['param_classifier__epsilon'])),
# ("n_max_iterations", np.array(detector.cv_results_['param_classifier__n_max_iterations']))]
#
# def getConfig(self, config):
# if type(config) is not dict: # Used in late fusion when config is a classifier
# return "\n\t\t- CQBoost with mu : " + str(config.mu) + ", epsilon : " + str(
# config.epsilon + ", n_max_iterations : " + str(config.n_max_iterations))
# else:
# return "\n\t\t- CQBoost with mu : " + str(config["mu"]) + ", epsilon : " + str(
# config["epsilon"] + ", n_max_iterations : " + str(config["n_max_iterations"]))
#
#
# def getInterpret(self, classifier, directory):
# interpretString = ""
# return interpretString
#
#
# def canProbas():
# return False
#
#
# def fit(DATASET, CLASS_LABELS, randomState, NB_CORES=1, **kwargs):
# """Used to fit the monoview classifier with the args stored in kwargs"""
# start = time.time()
# classifier = CqBoostClassifierv2(mu=kwargs['mu'],
# epsilon=kwargs['epsilon'],
# n_max_iterations=kwargs["n_max_iterations"],)
# # random_state=randomState)
# classifier.fit(DATASET, CLASS_LABELS)
# end = time.time()
# classifier.train_time =end-start
# return classifier
#
#
# def paramsToSet(nIter, randomState):
# """Used for weighted linear early fusion to generate random search sets"""
# paramsSet = []
# for _ in range(nIter):
# paramsSet.append({"mu": randomState.uniform(1e-02, 10**(-0.5)),
# "epsilon": 10**-randomState.randint(1, 15),
# "n_max_iterations": None})
# return paramsSet
#
#
# def getKWARGS(args):
# """Used to format kwargs for the parsed args"""
# kwargsDict = {}
# kwargsDict['mu'] = args.CQB2_mu
# kwargsDict['epsilon'] = args.CQB2_epsilon
# kwargsDict['n_max_iterations'] = None
# return kwargsDict
#
#
# def genPipeline():
# return Pipeline([('classifier', CqBoostClassifierv2())])
#
#
# def genParamsDict(randomState):
# return {"classifier__mu": CustomUniform(loc=.5, state=2, multiplier='e-'),
# "classifier__epsilon": CustomRandint(low=1, high=15, multiplier='e-'),
# "classifier__n_max_iterations": [None]}
#
#
# def genBestParams(detector):
# return {"mu": detector.best_params_["classifier__mu"],
# "epsilon": detector.best_params_["classifier__epsilon"],
# "n_max_iterations": detector.best_params_["classifier__n_max_iterations"]}
#
#
# def genParamsFromDetector(detector):
# nIter = len(detector.cv_results_['param_classifier__mu'])
# return [("mu", np.array([0.001 for _ in range(nIter)])),
# ("epsilon", np.array(detector.cv_results_['param_classifier__epsilon'])),
# ("n_max_iterations", np.array(detector.cv_results_['param_classifier__n_max_iterations']))]
#
#
# def getConfig(config):
# if type(config) is not dict: # Used in late fusion when config is a classifier
# return "\n\t\t- CQBoostv2 with mu : " + str(config.mu) + ", epsilon : " + str(
# config.epsilon) + ", n_max_iterations : " + str(config.n_max_iterations)
# else:
# return "\n\t\t- CQBoostv2 with mu : " + str(config["mu"]) + ", epsilon : " + str(
# config["epsilon"]) + ", n_max_iterations : " + str(config["n_max_iterations"])
#
#
# def getInterpret(classifier, directory):
# return getInterpretBase(classifier, directory, "CQBoostv2", classifier.final_vote_weights)
import logging
import time
from collections import defaultdict
import numpy as np
import numpy.ma as ma
import scipy
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score
from sklearn.utils.validation import check_is_fitted
from ..Monoview.Additions.BoostUtils import StumpsClassifiersGenerator, sign, \
getInterpretBase, BaseBoost
from ..Monoview.MonoviewUtils import CustomUniform, CustomRandint, \
BaseMonoviewClassifier
class ColumnGenerationClassifierv21(BaseEstimator, ClassifierMixin, BaseBoost):
def __init__(self, epsilon=1e-06, n_max_iterations=None,
estimators_generator=None, dual_constraint_rhs=0,
save_iteration_as_hyperparameter_each=None, random_state=42):
super(ColumnGenerationClassifierv21, self).__init__()
self.epsilon = epsilon
self.n_max_iterations = n_max_iterations
self.estimators_generator = estimators_generator
self.dual_constraint_rhs = dual_constraint_rhs
self.save_iteration_as_hyperparameter_each = save_iteration_as_hyperparameter_each
self.random_state = random_state
def fit(self, X, y):
if scipy.sparse.issparse(X):
logging.info('Converting to dense matrix.')
X = np.array(X.todense())
if self.estimators_generator is None:
self.estimators_generator = StumpsClassifiersGenerator(
n_stumps_per_attribute=self.n_stumps, self_complemented=True)
y[y == 0] = -1
self.estimators_generator.fit(X, y)
self.classification_matrix = self._binary_classification_matrix(X)
self.weights_ = []
self.infos_per_iteration_ = defaultdict(list)
m, n = self.classification_matrix.shape
y_kernel_matrix = np.multiply(y.reshape((len(y), 1)),
self.classification_matrix)
# Initialization
w = None
self.collected_weight_vectors_ = {}
self.collected_dual_constraint_violations_ = {}
example_weights = self._initialize_alphas(m).reshape((m, 1))
self.chosen_columns_ = []
self.fobidden_columns = []
self.edge_scores = []
self.example_weights_ = [example_weights]
self.train_accuracies = []
self.previous_votes = []
self.n_total_hypotheses_ = n
self.n_total_examples = m
# print("\n \t\t Start fit\n")
for k in range(min(n,
self.n_max_iterations if self.n_max_iterations is not None else np.inf)):
# Find worst weak hypothesis given alpha.
new_voter_index, criterion = self._find_new_voter(example_weights,
y_kernel_matrix,
"pseudo_h")
# Append the weak hypothesis.
self.chosen_columns_.append(new_voter_index)
self.fobidden_columns.append(new_voter_index)
new_voter_margin = y_kernel_matrix[:,
self.chosen_columns_[-1]].reshape((m, 1))
self.edge_scores.append(criterion)
if w is None:
self.previous_vote = new_voter_margin
w = 1
self.weights_.append(w)
example_weights = self._update_example_weights(example_weights,
y_kernel_matrix,
m)
self.example_weights_.append(example_weights)
self.train_accuracies.append(
accuracy_score(y, np.sign(self.previous_vote)))
continue
# ---- On resoud le probleme a deux votants analytiquement.
w = self._solve_two_weights_min_c(new_voter_margin, example_weights)
if w[0] == "break":
self.chosen_columns_.pop()
self.break_cause = w[1]
break
self.previous_vote = np.matmul(
np.concatenate((self.previous_vote, new_voter_margin), axis=1),
w).reshape((m, 1))
# We collect iteration information for later evaluation.
self.weights_.append(w[-1])
self.weights = np.array(self.weights_)
self.final_vote_weights = np.array(
[np.prod(1 - self.weights[t + 1:]) * self.weights[t] if t <
self.weights.shape[
0] - 1 else
self.weights[t] for t in range(self.weights.shape[0])])
margins = np.squeeze(np.asarray(
np.matmul(self.classification_matrix[:, self.chosen_columns_],
self.final_vote_weights)))
signs_array = np.array([int(x) for x in sign(margins)])
self.train_accuracies.append(accuracy_score(y, signs_array))
# ---- On change l'edge
example_weights = self._update_example_weights(example_weights,
y_kernel_matrix, m)
self.example_weights_.append(example_weights)
self.nb_opposed_voters = self.check_opposed_voters()
self.estimators_generator.estimators_ = \
self.estimators_generator.estimators_[self.chosen_columns_]
y[y == -1] = 0
return self
def predict(self, X):
start = time.time()
check_is_fitted(self, 'weights_')
if scipy.sparse.issparse(X):
logging.warning('Converting sparse matrix to dense matrix.')
X = np.array(X.todense())
classification_matrix = self._binary_classification_matrix(X)
self.weights_ = np.array(self.weights_)
self.final_vote_weights = np.array([np.prod(1 - self.weights_[t + 1:]) *
self.weights_[t] if t <
self.weights_.shape[
0] - 1 else
self.weights_[t] for t in
range(self.weights_.shape[0])])
margins = np.squeeze(np.asarray(
np.matmul(classification_matrix, self.final_vote_weights)))
signs_array = np.array([int(x) for x in sign(margins)])
signs_array[signs_array == -1] = 0
end = time.time()
self.predict_time = end - start
return signs_array
def _find_new_voter(self, example_weights, y_kernel_matrix,
type="pseudo_h"):
if type == "pseudo_h":
pseudo_h_values = ma.array(
np.squeeze(np.array(example_weights.T.dot(y_kernel_matrix).T)),
fill_value=-np.inf)
pseudo_h_values[self.fobidden_columns] = ma.masked
worst_h_index = ma.argmax(pseudo_h_values)
return worst_h_index, pseudo_h_values[worst_h_index]
elif type == "random":
new_index = self.random_state.choice(
np.arange(self.n_total_hypotheses_))
while new_index in self.fobidden_columns:
new_index = self.random_state.choice(
np.arange(self.n_total_hypotheses_))
return new_index, 100
def _update_example_weights(self, example_weights, y_kernel_matrix, m):
if len(self.weights_) == 1:
example_weights[self.previous_vote == -1] *= 2
example_weights[self.previous_vote == 1] /= 2
pass
else:
weights = np.array(self.weights_)
current_vote_weights = np.array(
[np.prod(1 - weights[t + 1:]) * weights[t] if t <
weights.shape[
0] - 1 else
weights[t] for t in range(weights.shape[0])]).reshape(
(weights.shape[0], 1))
weighted_margin = np.matmul(
y_kernel_matrix[:, self.chosen_columns_], current_vote_weights)
example_weights = np.multiply(example_weights,
np.exp((1 - np.sum(weighted_margin,
axis=1) /
np.sum(weighted_margin,
axis=1))).reshape(
(m, 1)))
return example_weights
def _solve_two_weights_min_c(self, next_column, example_weights):
m = next_column.shape[0]
zero_diag = np.ones((m, m)) - np.identity(m)
weighted_previous_vote = self.previous_vote.reshape((m, 1))
weighted_next_column = next_column.reshape((m, 1))
mat_prev = np.repeat(weighted_previous_vote, m, axis=1) * zero_diag
mat_next = np.repeat(weighted_next_column, m, axis=1) * zero_diag
self.B2 = np.sum((weighted_previous_vote - weighted_next_column) ** 2)
self.B1 = np.sum(2 * weighted_next_column * (
weighted_previous_vote - 2 * weighted_next_column * weighted_next_column))
self.B0 = np.sum(weighted_next_column * weighted_next_column)
self.A2 = self.B2 + np.sum(
(mat_prev - mat_next) * np.transpose(mat_prev - mat_next))
self.A1 = self.B1 + np.sum(
mat_prev * np.transpose(mat_next) - mat_next * np.transpose(
mat_prev) - 2 * mat_next * np.transpose(mat_next))
self.A0 = self.B0 + np.sum(mat_next * np.transpose(mat_next))
C2 = (self.A1 * self.B2 - self.A2 * self.B1)
C1 = 2 * (self.A0 * self.B2 - self.A2 * self.B0)
C0 = self.A0 * self.B1 - self.A1 * self.B0
if C2 == 0:
if C1 == 0:
return np.array([0.5, 0.5])
elif abs(C1) > 0:
return np.array([0., 1.])
else:
return ['break', "the derivate was constant."]
elif C2 == 0:
return ["break", "the derivate was affine."]
sols = np.roots(np.array([C2, C1, C0]))
is_acceptable, sol = self._analyze_solutions(sols)
if is_acceptable:
# print("cb", self._cborn(sol))
return np.array([sol, 1 - sol])
else:
return ["break", sol]
def _analyze_solutions(self, sols):
if sols.shape[0] == 1:
if self._cborn(sols[0]) < self._cborn(sols[0] + 1):
best_sol = sols[0]
else:
return False, " the only solution was a maximum."
elif sols.shape[0] == 2:
best_sol = self._best_sol(sols)
else:
return False, " no solution were found."
if 0 < best_sol < 1:
return True, self._best_sol(sols)
elif best_sol <= 0:
return False, " the minimum was below 0."
else:
return False, " the minimum was over 1."
def _cborn(self, sol):
return 1 - (self.A2 * sol ** 2 + self.A1 * sol + self.A0) / (
self.B2 * sol ** 2 + self.B1 * sol + self.B0)
def _best_sol(self, sols):
values = np.array([self._cborn(sol) for sol in sols])
return sols[np.argmin(values)]
def _restricted_master_problem(self, y_kernel_matrix):
raise NotImplementedError("Restricted master problem not implemented.")
class CqBoostClassifierv21(ColumnGenerationClassifierv21):
def __init__(self, mu=0.001, epsilon=1e-08, n_max_iterations=None,
estimators_generator=None,
save_iteration_as_hyperparameter_each=None, random_state=42):
super(CqBoostClassifierv21, self).__init__(epsilon, n_max_iterations,
estimators_generator,
dual_constraint_rhs=0,
save_iteration_as_hyperparameter_each=save_iteration_as_hyperparameter_each,
random_state=random_state)
self.train_time = 0
self.mu = mu
def _initialize_alphas(self, n_examples):
return 1.0 / n_examples * np.ones((n_examples,))
class CQBoostv21(CqBoostClassifierv21, BaseMonoviewClassifier):
def __init__(self, random_state=None, mu=0.01, epsilon=1e-06, **kwargs):
super(CQBoostv21, self).__init__(
random_state=random_state,
mu=mu,
epsilon=epsilon
)
self.param_names = ["mu", "epsilon"]
self.distribs = [CustomUniform(loc=0.5, state=1.0, multiplier="e-"),
CustomRandint(low=1, high=15, multiplier="e-")]
self.classed_params = []
self.weird_strings = {}
def canProbas(self):
"""Used to know if the classifier can return label probabilities"""
return True
def getInterpret(self, directory, y_test):
return getInterpretBase(self, directory, "CQBoostv21", self.weights_,
self.break_cause)
def get_name_for_fusion(self):
return "CQ21"
def formatCmdArgs(args):
"""Used to format kwargs for the parsed args"""
kwargsDict = {"mu": args.CQB2_mu,
"epsilon": args.CQB2_epsilon}
return kwargsDict
def paramsToSet(nIter, randomState):
"""Used for weighted linear early fusion to generate random search sets"""
paramsSet = []
for _ in range(nIter):
paramsSet.append({"mu": 10 ** -randomState.uniform(0.5, 1.5),
"epsilon": 10 ** -randomState.randint(1, 15)})
return paramsSet
......@@ -133,7 +133,6 @@ class MinCqLearner(BaseEstimator, ClassifierMixin):
logging.info("MinCq training started...")
logging.info("Training dataset shape: {}".format(str(np.shape(X))))
logging.info("Number of voters: {}".format(len(voters)))
self.majority_vote = MajorityVote(voters)
n_base_voters = len(self.majority_vote.weights)
......
import os
import time
import numpy as np
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..Metrics import zero_one_loss
from ..Monoview.Additions.PregenUtils import PregenClassifier
from ..Monoview.MonoviewUtils import CustomRandint, CustomUniform, \
BaseMonoviewClassifier
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
class SCMSparsity(BaseMonoviewClassifier, PregenClassifier):
def __init__(self, random_state=None, model_type="disjunction",
max_rules=10, p=0.1, n_stumps=1, self_complemented=True,
**kwargs):
self.scm_estimators = [scm(
random_state=random_state,
model_type=model_type,
max_rules=max_rule + 1,
p=p
) for max_rule in range(max_rules)]
self.model_type = model_type
self.self_complemented = self_complemented
self.n_stumps = n_stumps
self.p = p
self.random_state = random_state
self.max_rules = max_rules
self.param_names = ["model_type", "max_rules", "p", "random_state",
"n_stumps"]
self.distribs = [["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [random_state],
[n_stumps]]
self.classed_params = []
self.weird_strings = {}
def get_params(self):
return {"model_type": self.model_type, "p": self.p,
"max_rules": self.max_rules, "random_state": self.random_state,
"n_stumps": self.n_stumps}
def fit(self, X, y, tiebreaker=None, iteration_callback=None, **fit_params):
pregen_X, _ = self.pregen_voters(X, y)
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
for scm_estimator in self.scm_estimators:
beg = time.time()
scm_estimator.fit(place_holder, y, tiebreaker=None,
iteration_callback=None, **fit_params)
end = time.time()
self.times = np.array([end - beg, 0])
self.train_metrics = [
zero_one_loss.score(y, scm_estimator.predict(place_holder)) for
scm_estimator in self.scm_estimators]
return self.scm_estimators[-1]
def predict(self, X):
pregen_X, _ = self.pregen_voters(X, )
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
self.preds = [scm_estimator.predict(place_holder) for scm_estimator in
self.scm_estimators]
return self.preds[-1]
def canProbas(self):
"""Used to know if the classifier can return label probabilities"""
return True
def getInterpret(self, directory, y_test):
interpretString = ""
np.savetxt(directory + "test_metrics.csv", np.array(
[zero_one_loss.score(y_test, pred) for pred in self.preds]))
np.savetxt(directory + "times.csv", self.times)
np.savetxt(directory + "train_metrics.csv", self.train_metrics)
return interpretString
def formatCmdArgs(args):
"""Used to format kwargs for the parsed args"""
kwargsDict = {"model_type": args.SCS_model_type,
"p": args.SCS_p,
"max_rules": args.SCS_max_rules,
"n_stumps": args.SCS_stumps}
return kwargsDict
def paramsToSet(nIter, randomState):
paramsSet = []
for _ in range(nIter):
paramsSet.append(
{"model_type": randomState.choice(["conjunction", "disjunction"]),
"max_rules": randomState.randint(1, 15),
"p": randomState.random_sample()})
return paramsSet
import os
import time
import numpy as np
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..Metrics import zero_one_loss
from ..Monoview.Additions.PregenUtils import PregenClassifier
from ..Monoview.MonoviewUtils import CustomRandint, CustomUniform, \
BaseMonoviewClassifier
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
class SCMSparsityTree(BaseMonoviewClassifier, PregenClassifier):
def __init__(self, random_state=None, model_type="conjunction",
max_rules=10, p=0.1, n_stumps=1, max_depth=2, **kwargs):
self.scm_estimators = [scm(
random_state=random_state,
model_type=model_type,
max_rules=max_rule + 1,
p=p
) for max_rule in range(max_rules)]
self.model_type = model_type
self.max_depth = max_depth
self.p = p
self.n_stumps = n_stumps
self.random_state = random_state
self.max_rules = max_rules
self.param_names = ["model_type", "max_rules", "p", "random_state",
"max_depth"]
self.distribs = [["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [random_state],
[max_depth]]
self.classed_params = []
self.weird_strings = {}
def get_params(self):
return {"model_type": self.model_type, "p": self.p,
"max_rules": self.max_rules, "random_state": self.random_state,
"max_depth": self.max_depth, "n_stumps": self.n_stumps}
def fit(self, X, y, tiebreaker=None, iteration_callback=None, **fit_params):
pregen_X, _ = self.pregen_voters(X, y, generator="Trees")
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
for scm_estimator in self.scm_estimators:
beg = time.time()
scm_estimator.fit(place_holder, y, tiebreaker=None,
iteration_callback=None, **fit_params)
end = time.time()
self.times = np.array([end - beg, 0])
self.train_metrics = [
zero_one_loss.score(y, scm_estimator.predict(place_holder)) for
scm_estimator in self.scm_estimators]
return self.scm_estimators[-1]
def predict(self, X):
pregen_X, _ = self.pregen_voters(X, generator="Trees")
list_files = os.listdir(".")
a = int(self.random_state.randint(0, 10000))
if "pregen_x" + str(a) + ".csv" in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
while file_name in list_files:
a = int(np.random.randint(0, 10000))
file_name = "pregen_x" + str(a) + ".csv"
else:
file_name = "pregen_x" + str(a) + ".csv"
np.savetxt(file_name, pregen_X, delimiter=',')
place_holder = np.genfromtxt(file_name, delimiter=',')
os.remove(file_name)
self.preds = [scm_estimator.predict(place_holder) for scm_estimator in
self.scm_estimators]
return self.preds[-1]
def canProbas(self):
"""Used to know if the classifier can return label probabilities"""
return True
def getInterpret(self, directory, y_test):
interpretString = ""
np.savetxt(directory + "test_metrics.csv", np.array(
[zero_one_loss.score(y_test, pred) for pred in self.preds]))
np.savetxt(directory + "times.csv", self.times)
np.savetxt(directory + "train_metrics.csv", self.train_metrics)
return interpretString
def formatCmdArgs(args):
"""Used to format kwargs for the parsed args"""
kwargsDict = {"model_type": args.SCST_model_type,
"p": args.SCST_p,
"max_rules": args.SCST_max_rules,
"n_stumps": args.SCST_trees,
"max_depth": args.SCST_max_depth}
return kwargsDict
def paramsToSet(nIter, randomState):
paramsSet = []
for _ in range(nIter):
paramsSet.append(
{"model_type": randomState.choice(["conjunction", "disjunction"]),
"max_rules": randomState.randint(1, 15),
"p": randomState.random_sample()})
return paramsSet
......@@ -75,7 +75,7 @@ def deleteHDF5(benchmarkArgumentsDictionaries, nbCores, DATASET):
os.remove(filename)
def makeMeNoisy(viewData, randomState, percentage=15):
def makeMeNoisy(viewData, randomState, percentage=5):
"""used to introduce some noise in the generated data"""
viewData = viewData.astype(bool)
nbNoisyCoord = int(
......@@ -93,11 +93,9 @@ def makeMeNoisy(viewData, randomState, percentage=15):
def getPlausibleDBhdf5(features, pathF, name, NB_CLASS=3, LABELS_NAME="",
randomState=None, full=True, add_noise=False,
noise_std=0.15, nbView=3,
nbClass=2, datasetLength=1000, randomStateInt=None):
nbClass=2, datasetLength=100, randomStateInt=42, nbFeatures = 5):
"""Used to generate a plausible dataset to test the algorithms"""
randomStateInt = 42
randomState = np.random.RandomState(randomStateInt)
nbFeatures = 10
if not os.path.exists(os.path.dirname(pathF + "Plausible.hdf5")):
try:
os.makedirs(os.path.dirname(pathF + "Plausible.hdf5"))
......@@ -383,7 +381,7 @@ def copyhdf5Dataset(sourceDataFile, destinationDataFile, sourceDatasetName,
def getClassicDBhdf5(views, pathF, nameDB, NB_CLASS, askedLabelsNames,
randomState, full=False, add_noise=False, noise_std=0.15):
randomState, full=False, add_noise=False, noise_std=0.15,):
"""Used to load a hdf5 database"""
if full:
datasetFile = h5py.File(pathF + nameDB + ".hdf5", "r")
......@@ -422,7 +420,9 @@ def getClassicDBhdf5(views, pathF, nameDB, NB_CLASS, askedLabelsNames,
labelsDictionary = dict(
(labelIndex, labelName.decode("utf-8")) for labelIndex, labelName in
enumerate(datasetFile.get("Labels").attrs["names"]))
datasetFile.close()
datasetFile = h5py.File(pathF + nameDB + "_temp_view_label_select.hdf5",
"r")
if add_noise:
datasetFile, dataset_name = add_gaussian_noise(datasetFile, randomState,
pathF, dataset_name,
......@@ -460,6 +460,8 @@ def add_gaussian_noise(dataset_file, random_state, path_f, dataset_name,
noisy_dataset[view_name][...] = noised_data
original_dataset_filename = dataset_file.filename
dataset_file.close()
noisy_dataset.close()
noisy_dataset = h5py.File(path_f + dataset_name + "_noised.hdf5", "r")
if "_temp_" in original_dataset_filename:
os.remove(original_dataset_filename)
return noisy_dataset, dataset_name + "_noised"
......
......@@ -85,7 +85,7 @@ def parseTheArgs(arguments):
default=0.2)
groupClass.add_argument('--CL_nbFolds', metavar='INT', action='store',
help='Number of folds in cross validation',
type=int, default=5)
type=int, default=2)
groupClass.add_argument('--CL_nbClass', metavar='INT', action='store',
help='Number of classes, -1 for all', type=int,
default=2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment