Skip to content
Snippets Groups Projects
Commit 04dcbef5 authored by Baptiste Bauvin's avatar Baptiste Bauvin
Browse files

Resolutionde conflits

parents 048f2311 240596d5
Branches
Tags
No related merge requests found
Pipeline #3389 failed
Showing
with 172 additions and 613 deletions
......@@ -6,17 +6,19 @@ import os
import numpy as np
from ...multiview.multiview_utils import ConfigGenerator, \
get_monoview_classifier, get_examples_views_indices, \
get_available_monoview_classifiers, BaseMultiviewClassifier
get_examples_views_indices, get_available_monoview_classifiers, \
BaseMultiviewClassifier
from .fusion_utils import BaseLateFusionClassifier
class DiversityFusion(BaseMultiviewClassifier):
class DiversityFusionClassifier(BaseMultiviewClassifier,
BaseLateFusionClassifier):
"""This is the base class for all the diversity fusion based classifiers."""
def __init__(self, random_state=None, classifier_names=None,
monoview_estimators=None, classifiers_configs=None):
"""Used to init the instances"""
super(DiversityFusion, self).__init__(random_state)
super(DiversityFusionClassifier, self).__init__(random_state)
if classifier_names is None:
classifier_names = get_available_monoview_classifiers()
self.classifier_names = classifier_names
......@@ -33,18 +35,7 @@ class DiversityFusion(BaseMultiviewClassifier):
self.estimator_pool = []
for classifier_idx, classifier_name in enumerate(self.classifier_names):
self.estimator_pool.append([])
if self.classifiers_configs is not None and classifier_name in self.classifiers_configs:
if 'random_state' in inspect.getfullargspec(get_monoview_classifier(classifier_name).__init__).args:
estimator = get_monoview_classifier(classifier_name)(random_state=self.random_state,
**self.classifiers_configs[classifier_name])
else:
estimator = get_monoview_classifier(classifier_name)(
**self.classifiers_configs[classifier_name])
else:
if 'random_state' in inspect.getfullargspec(get_monoview_classifier(classifier_name).__init__).args:
estimator = get_monoview_classifier(classifier_name)(random_state=self.random_state)
else:
estimator = get_monoview_classifier(classifier_name)()
estimator = self.init_monoview_estimator(classifier_name)
for idx, view_idx in enumerate(views_indices):
estimator.fit(X.get_v(view_idx, train_indices), y[train_indices])
self.estimator_pool[classifier_idx].append(estimator)
......@@ -96,7 +87,7 @@ class DiversityFusion(BaseMultiviewClassifier):
return combinations, combis, div_measure, classifiers_decisions, nb_views
class GlobalDiversityFusion(DiversityFusion):
class GlobalDiversityFusionClassifier(DiversityFusionClassifier):
def choose_combination(self, X, y, examples_indices, view_indices):
combinations, combis, div_measure, classifiers_decisions, nb_views = self.init_combinations(
......@@ -114,7 +105,7 @@ class GlobalDiversityFusion(DiversityFusion):
in enumerate(best_combination)]
class CoupleDiversityFusion(DiversityFusion):
class CoupleDiversityFusionClassifier(DiversityFusionClassifier):
def choose_combination(self, X, y, examples_indices, view_indices):
combinations, combis, div_measure, classifiers_decisions, nb_views = self.init_combinations(
......
import inspect
from ...multiview.multiview_utils import BaseMultiviewClassifier, get_monoview_classifier
class BaseLateFusionClassifier(BaseMultiviewClassifier):
def init_monoview_estimator(self, classifier_name, classifier_index=None):
if classifier_index is not None:
classifier_configs = self.classifier_configs[classifier_index]
else:
classifier_configs = self.classifier_configs
if classifier_configs is not None and classifier_name in classifier_configs:
if 'random_state' in inspect.getfullargspec(
get_monoview_classifier(classifier_name).__init__).args:
estimator = get_monoview_classifier(classifier_name)(
random_state=self.random_state,
**classifier_configs[classifier_name])
else:
estimator = get_monoview_classifier(classifier_name)(
**classifier_configs[classifier_name])
else:
if 'random_state' in inspect.getfullargspec(
get_monoview_classifier(classifier_name).__init__).args:
estimator = get_monoview_classifier(classifier_name)(
random_state=self.random_state)
else:
estimator = get_monoview_classifier(classifier_name)()
return estimator
import numpy as np
import warnings
from scipy.stats import uniform
from ...multiview.multiview_utils import BaseMultiviewClassifier, get_available_monoview_classifiers, get_monoview_classifier, get_examples_views_indices, ConfigGenerator
from .fusion_utils import BaseLateFusionClassifier
class ClassifierCombinator:
def __init__(self, nb_view):
self.nb_view = nb_view
self.available_classifiers = get_available_monoview_classifiers()
def rvs(self, random_state=None):
classifier_names = random_state.choice(self.available_classifiers,
size=self.nb_view, replace=True)
return classifier_names
class MultipleConfigGenerator(ConfigGenerator):
def __init__(self, nb_view):
super(MultipleConfigGenerator, self).__init__(get_available_monoview_classifiers())
self.nb_view = nb_view
self.multiple_distribs = [self.distribs for _ in range(nb_view)]
def rvs(self, random_state=None):
config_samples = [super(MultipleConfigGenerator, self).rvs(random_state)
for _ in range(self.nb_view)]
return config_samples
class WeightsGenerator:
def __init__(self, nb_view):
self.nb_view=nb_view
self.uniform = uniform(loc=0, state=1)
def rvs(self, random_state=None):
return np.array([uniform.rvs(random_state=random_state)
for _ in range(self.nb_view)])
class LateFusionClassifier(BaseMultiviewClassifier, BaseLateFusionClassifier):
def __init__(self, random_state=None, classifier_names=None,
classifier_configs=None, nb_cores=1, nb_view=None, weights=None):
super(LateFusionClassifier, self).__init__(random_state)
self.verif_clf_views(classifier_names, nb_view)
self.nb_view = len(classifier_names)
self.classifiers_names = classifier_names
self.classifier_configs = classifier_configs
self.monoview_estimators = [self.init_monoview_estimator(classifier_name, classifier_index)
for classifier_index, classifier_name in self.classifiers_names]
self.nb_cores = nb_cores
self.accuracies = np.zeros(len(classifier_names))
self.needProbas = False
if weights is None:
self.weights = np.ones(nb_view)/nb_view
else:
self.weights = weights
self.param_names = ["classifier_names", "classifier_configs", "weights"]
self.distribs =[ClassifierCombinator(self.nb_view),
MultipleConfigGenerator(self.nb_view),
WeightsGenerator(nb_view)]
def fit(self, X, y, train_indices=None, views_indices=None):
train_indices, views_indices = get_examples_views_indices(X,
train_indices,
views_indices)
self.monoview_estimators = [monoview_estimator.fit(X.get_v(view_index, train_indices), y[train_indices]) for view_index, monoview_estimator in zip(views_indices, self.monoview_estimators)]
return self
def verif_clf_views(self, classifier_names, nb_view):
if classifier_names is None:
if nb_view is None:
raise AttributeError(self.__class__.__name__+" must have either classifier_names or nb_views provided.")
else:
self.classifiers_names = self.get_classifiers(get_available_monoview_classifiers(), nb_view)
else:
if nb_view is None:
self.classifiers_names = classifier_names
else:
if len(classifier_names)==nb_view:
self.classifiers_names = classifier_names
else:
warnings.warn("nb_view and classifier_names not matching, choosing nb_view random classifiers in classifier_names.", UserWarning)
self.classifiers_names = self.get_classifiers(classifier_names, nb_view)
def get_classifiers(self, classifiers_names, nb_choices):
return self.random_state.choice(classifiers_names, size=nb_choices, replace=True)
import numpy as np
from ..multiview_classifiers.additions.late_fusion_utils import \
LateFusionClassifier
from ..multiview.multiview_utils import get_examples_views_indices
classifier_class_name = "BayesianInferenceClassifier"
class BayesianInferenceClassifier(LateFusionClassifier):
def __init__(self, random_state, classifier_names=None,
classifier_configs=None, nb_view=None, nb_cores=1):
super(BayesianInferenceClassifier, self).__init__(random_state=random_state,
classifier_names=classifier_names,
classifier_configs=classifier_configs,
nb_cores=nb_cores,
nb_view=nb_view)
def predict(self, X, example_indices=None, views_indices=None):
example_indices, views_indices = get_examples_views_indices(X, example_indices, views_indices)
if sum(self.weights) != 1.0:
self.weights = self.weights / sum(self.weights)
view_scores = []
for index, view_index in enumerate(views_indices):
view_scores.append(np.power(
self.monoviewClassifiers[index].predict_proba(X.get_v(view_index,
example_indices)),
self.weights[index]))
view_scores = np.array(view_scores)
predicted_labels = np.argmax(np.prod(view_scores, axis=0), axis=1)
return predicted_labels
import numpy as np
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import GlobalDiversityFusion
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import GlobalDiversityFusionClassifier
classifier_class_name = "DifficultyFusion"
class DifficultyFusion(GlobalDiversityFusion):
class DifficultyFusion(GlobalDiversityFusionClassifier):
def diversity_measure(self, classifiers_decisions, combination, y):
_, nb_view, nb_examples = classifiers_decisions.shape
......
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule)
\ No newline at end of file
import numpy as np
from ...multiview.additions import diversity_utils
def genName(config):
return "difficulty_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["difficulty_fusion"] = ["take_everything"]
return benchmark
def difficulty(classifiersDecisions, combination, foldsGroudTruth, foldsLen):
nbView, _, nbFolds, nbExamples = classifiersDecisions.shape
scores = np.zeros((nbView, nbFolds, nbExamples), dtype=int)
for viewIndex, classifierIndex in enumerate(combination):
scores[viewIndex] = np.logical_not(
np.logical_xor(classifiersDecisions[viewIndex, classifierIndex],
foldsGroudTruth)
)
difficulty_scores = np.sum(scores, axis=0)
difficulty_score = np.mean(
np.var(
np.array([
np.sum((difficulty_scores==viewIndex), axis=1)/float(nbView)
for viewIndex in range(len(combination)+1)])
, axis=0)
)
return difficulty_score
def getArgs(args, benchmark, views, views_indices, random_state, directory, resultsMonoview, classificationIndices):
return diversity_utils.getArgs(args, benchmark, views,
views_indices, random_state, directory,
resultsMonoview, classification_indices,
difficulty, "difficulty_fusion")
def genParamsSets(classificationKWARGS, random_state, nIter=1):
return diversity_utils.genParamsSets(classificationKWARGS, random_state, nIter=nIter)
class DifficultyFusionClass(diversity_utils.DiversityFusionClass):
def __init__(self, random_state, NB_CORES=1, **kwargs):
diversity_utils.DiversityFusionClass.__init__(self, random_state, NB_CORES=1, **kwargs)
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = "Classifiers used for each view : "+ ', '.join(self.classifiers_names)+\
', with a difficulty of '+str(self.div_measure)
return stringAnalysis
\ No newline at end of file
import numpy as np
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import CoupleDiversityFusion
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import CoupleDiversityFusionClassifier
classifier_class_name = "DisagreeFusion"
class DisagreeFusion(CoupleDiversityFusion):
class DisagreeFusion(CoupleDiversityFusionClassifier):
def diversity_measure(self, first_classifier_decision, second_classifier_decision, _):
return np.logical_xor(first_classifier_decision, second_classifier_decision)
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule)
\ No newline at end of file
import numpy as np
from ...multiview.additions import diversity_utils
def genName(config):
return "disagree_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["disagree_fusion"] = ["take_everything"]
return benchmark
def disagree(classifierDecision1, classifierDecision2, ground_truth):
return np.logical_xor(classifierDecision1, classifierDecision2)
def getArgs(args, benchmark, views, views_indices, randomState, directory, resultsMonoview, classificationIndices):
return diversity_utils.getArgs(args, benchmark, views, views_indices,
randomState, directory, resultsMonoview,
classificationIndices, disagree, "disagree_fusion")
def genParamsSets(classificationKWARGS, randomState, nIter=1):
return diversity_utils.genParamsSets(classificationKWARGS, randomState, nIter=nIter)
# """Used to generate parameters sets for the random hyper parameters optimization function"""
# weights = [randomState.random_sample(len(classificationKWARGS["classifiers_names"])) for _ in range(nIter)]
# nomralizedWeights = [[weightVector/np.sum(weightVector)] for weightVector in weights]
# return nomralizedWeights
class DisagreeFusionClass(diversity_utils.DiversityFusionClass):
def __init__(self, randomState, NB_CORES=1, **kwargs):
diversity_utils.DiversityFusionClass.__init__(self, randomState, NB_CORES=1, **kwargs)
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = "Classifiers used for each view : "+ ', '.join(self.classifiers_names)+\
', with a disagreement of '+str(self.div_measure)
return stringAnalysis
import numpy as np
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import \
CoupleDiversityFusion
CoupleDiversityFusionClassifier
classifier_class_name = "DoubleFaultFusion"
class DoubleFaultFusion(CoupleDiversityFusion):
class DoubleFaultFusion(CoupleDiversityFusionClassifier):
def diversity_measure(self, first_classifier_decision,
second_classifier_decision, y):
......
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, random_state, labels, classifierModule)
\ No newline at end of file
import numpy as np
from ...multiview.additions import diversity_utils
def genName(config):
return "double_fault_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["double_fault_fusion"] = ["take_everything"]
return benchmark
def doubleFault(classifierDecision1, classifierDecision2, ground_truth):
return np.logical_and(np.logical_xor(classifierDecision1, ground_truth),
np.logical_xor(classifierDecision2, ground_truth))
def getArgs(args, benchmark, views, views_indices, random_state, directory, resultsMonoview, classificationIndices):
return diversity_utils.getArgs(args, benchmark, views,
views_indices, random_state, directory,
resultsMonoview, classificationIndices,
doubleFault, "double_fault_fusion")
def genParamsSets(classificationKWARGS, random_state, nIter=1):
return diversity_utils.genParamsSets(classificationKWARGS, random_state, nIter=nIter)
class DoubleFaultFusionClass(diversity_utils.DiversityFusionClass):
def __init__(self, random_state, NB_CORES=1, **kwargs):
diversity_utils.DiversityFusionClass.__init__(self, random_state, NB_CORES=1, **kwargs)
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = "Classifiers used for each view : "+ ', '.join(self.classifiers_names)+\
', with a double fault ratio of '+str(self.div_measure)
return stringAnalysis
\ No newline at end of file
import numpy as np
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import GlobalDiversityFusion
from multiview_platform.mono_multi_view_classifiers.multiview_classifiers.additions.diversity_utils import GlobalDiversityFusionClassifier
classifier_class_name = "EntropyFusion"
class EntropyFusion(GlobalDiversityFusion):
class EntropyFusion(GlobalDiversityFusionClassifier):
def diversity_measure(self, classifiers_decisions, combination, y):
_, nb_view, nb_examples = classifiers_decisions.shape
......
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classification_indices,
labels_dictionary, views, nbCores, times,
name, KFolds,
hyper_param_search, nIter, metrics,
views_indices, randomState, labels, classifierModule)
\ No newline at end of file
import numpy as np
from ...multiview.additions import diversity_utils
def genName(config):
return "entropy_fusion"
def getBenchmark(benchmark, args=None):
benchmark["multiview"]["entropy_fusion"] = ["take_everything"]
return benchmark
def entropy(classifiersDecisions, combination, foldsGroudTruth, foldsLen):
nbView, _, nbFolds, nbExamples = classifiersDecisions.shape
scores = np.zeros((nbView, nbFolds, nbExamples), dtype=int)
for viewIndex, classifierIndex in enumerate(combination):
scores[viewIndex] = np.logical_not(
np.logical_xor(classifiersDecisions[viewIndex, classifierIndex],
foldsGroudTruth)
)
entropy_scores = np.sum(scores, axis=0)
nbViewMatrix = np.zeros((nbFolds, nbExamples), dtype=int)+nbView-entropy_scores
entropy_score = np.mean(np.mean(np.minimum(entropy_scores, nbViewMatrix).astype(float)/(nbView - int(nbView/2)), axis=1))
return entropy_score
def getArgs(args, benchmark, views, views_indices, randomState, directory, resultsMonoview, classificationIndices):
return diversity_utils.getArgs(args, benchmark, views,
views_indices, randomState, directory,
resultsMonoview, classification_indices,
entropy, "entropy_fusion")
def genParamsSets(classificationKWARGS, randomState, nIter=1):
return diversity_utils.genParamsSets(classificationKWARGS, randomState, nIter=nIter)
class EntropyFusionClass(diversity_utils.DiversityFusionClass):
def __init__(self, randomState, NB_CORES=1, **kwargs):
diversity_utils.DiversityFusionClass.__init__(self, randomState, NB_CORES=1, **kwargs)
def getSpecificAnalysis(self, classificationKWARGS):
stringAnalysis = "Classifiers used for each view : "+ ', '.join(self.classifiers_names)+\
', with an entropy of '+str(self.div_measure)
return stringAnalysis
\ No newline at end of file
#!/usr/bin/env python
# -*- encoding: utf-8
import numpy as np
from ....utils.dataset import get_v
from .... import monoview_classifiers
class EarlyFusionClassifier(object):
def __init__(self, randomState, monoviewClassifierName, monoviewClassifierConfig, NB_CORES=1):
self.monoviewClassifierName = monoviewClassifierName
if type(monoviewClassifierConfig) == dict:
pass
elif monoviewClassifierConfig is None:
pass
else:
monoviewClassifierConfig = dict((str(configIndex), config[0]) for configIndex, config in
enumerate(monoviewClassifierConfig
))
monoviewClassifierModule = getattr(monoview_classifiers, self.monoviewClassifierName)
if monoviewClassifierConfig is not None:
self.monoviewClassifier = getattr(monoviewClassifierModule, self.monoviewClassifierName)(
random_state=randomState, **monoviewClassifierConfig)
else:
self.monoviewClassifier = getattr(monoviewClassifierModule, self.monoviewClassifierName)(
random_state=randomState)
self.monoviewClassifiersConfig = monoviewClassifierConfig
self.nbCores = NB_CORES
self.monoviewData = None
self.randomState = randomState
def makeMonoviewData_hdf5(self, DATASET, weights=None, usedIndices=None, views_indices=None):
if type(views_indices) == type(None):
views_indices = np.arange(DATASET.get("Metadata").attrs["nbView"])
nbView = len(views_indices)
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
if type(weights) == type(None):
weights = np.array([1 / nbView for i in range(nbView)])
if sum(weights) != 1:
weights = weights / sum(weights)
self.monoviewData = np.concatenate([get_v(DATASET, viewIndex, usedIndices)
for index, viewIndex in enumerate(views_indices)], axis=1)
import numpy as np
from ...Methods.EarlyFusion import EarlyFusionClassifier
from ..... import monoview_classifiers
def genParamsSets(classificationKWARGS, randomState, nIter=1):
nbView = classificationKWARGS["nbView"]
if classificationKWARGS["classifiersConfigs"] is None:
monoviewClassifierModule = getattr(monoview_classifiers, classificationKWARGS["classifiers_names"])
paramsMonoview = monoviewClassifierModule.paramsToSet(nIter, randomState)
paramsSets = []
for iterIndex in range(nIter):
randomWeightsArray = randomState.random_sample(nbView)
normalizedArray = randomWeightsArray / np.sum(randomWeightsArray)
paramsSets.append([normalizedArray, paramsMonoview[iterIndex]])
return paramsSets
def getArgs(benchmark, args, views, viewsIndices, directory, resultsMonoview, classificationIndices):
argumentsList = []
if args.FU_E_cl_names != ['']:
pass
else:
monoviewClassifierModulesNames = benchmark["monoview"]
args.FU_E_cl_names = monoviewClassifierModulesNames
args.FU_E_cl_config = [None for _ in monoviewClassifierModulesNames]
for classifierName, classifierConfig in zip(args.FU_E_cl_names, args.FU_E_cl_config):
monoviewClassifierModule = getattr(monoview_classifiers, classifierName)
if classifierConfig is not None:
arguments = {"CL_type": "fusion",
"views": views,
"NB_VIEW": len(views),
"viewsIndices": viewsIndices,
"NB_CLASS": len(args.CL_classes),
"LABELS_NAMES": args.CL_classes,
"FusionKWARGS": {"fusionType": "EarlyFusion",
"fusionMethod": "WeightedLinear",
"classifiers_names": classifierName,
"classifiersConfigs": monoviewClassifierModule.getKWARGS([arg.split(":")
for arg in
classifierConfig.split(
",")]),
'fusionMethodConfig': args.FU_E_method_configs,
"nbView": (len(viewsIndices))}}
else:
arguments = {"CL_type": "fusion",
"views": views,
"NB_VIEW": len(views),
"viewsIndices": viewsIndices,
"NB_CLASS": len(args.CL_classes),
"LABELS_NAMES": args.CL_classes,
"FusionKWARGS": {"fusionType": "EarlyFusion",
"fusionMethod": "WeightedLinear",
"classifiers_names": classifierName,
"classifiersConfigs": None,
'fusionMethodConfig': args.FU_E_method_configs,
"nbView": (len(viewsIndices))}}
argumentsList.append(arguments)
return argumentsList
class WeightedLinear(EarlyFusionClassifier):
def __init__(self, randomState, NB_CORES=1, **kwargs):
EarlyFusionClassifier.__init__(self, randomState, kwargs['classifiers_names'], kwargs['classifiersConfigs'],
NB_CORES=NB_CORES)
if kwargs['fusionMethodConfig'] is None:
self.weights = np.ones(len(kwargs["classifiers_names"]), dtype=float)
elif kwargs['fusionMethodConfig'] == ['']:
self.weights = np.ones(len(kwargs["classifiers_names"]), dtype=float)
else:
self.weights = np.array(map(float, kwargs['fusionMethodConfig']))
self.weights /= float(max(self.weights))
def fit_hdf5(self, DATASET, labels, trainIndices=None, viewsIndices=None):
if type(viewsIndices) == type(None):
viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
if trainIndices is None:
trainIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
self.weights /= float(max(self.weights))
self.makeMonoviewData_hdf5(DATASET, weights=self.weights, usedIndices=trainIndices, viewsIndices=viewsIndices)
# monoviewClassifierModule = getattr(monoview_classifiers, self.monoviewClassifierName)
self.monoviewClassifier.fit(self.monoviewData, labels[trainIndices])
# self.randomState,
# NB_CORES=self.nbCores,
# **self.monoviewClassifiersConfig)
def setParams(self, paramsSet):
self.weights = paramsSet[0]
self.monoviewClassifiersConfig = paramsSet[1]
def predict_hdf5(self, DATASET, usedIndices=None, viewsIndices=None):
if type(viewsIndices) == type(None):
viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
self.weights /= float(np.sum(self.weights))
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
self.makeMonoviewData_hdf5(DATASET, weights=self.weights, usedIndices=usedIndices, viewsIndices=viewsIndices)
predictedLabels = self.monoviewClassifier.predict(self.monoviewData)
return predictedLabels
def predict_proba_hdf5(self, DATASET, usedIndices=None):
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
self.makeMonoviewData_hdf5(DATASET, weights=self.weights, usedIndices=usedIndices)
predictedLabels = self.monoviewClassifier.predict_proba(self.monoviewData)
return predictedLabels
def getConfig(self, fusionMethodConfig, monoviewClassifiersNames, monoviewClassifiersConfigs):
configString = "with weighted concatenation, using weights : " + ", ".join(map(str, self.weights)) + \
" with monoview classifier : "
# monoviewClassifierModule = getattr(monoview_classifiers, monoviewClassifiersNames)
configString += self.monoviewClassifier.getConfig()
return configString
def gridSearch(self, classificationKWARGS):
return
import os
for module in os.listdir(os.path.dirname(os.path.realpath(__file__))):
if module == '__init__.py' or module[-3:] != '.py':
continue
__import__(module[:-3], locals(), globals(), [], 1)
del module
del os
#!/usr/bin/env python
# -*- encoding: utf-8
import numpy as np
import itertools
from joblib import Parallel, delayed
import sys
import math
from .... import monoview_classifiers
from .... import metrics
from ....utils.dataset import get_v
# def canProbasClassifier(classifierConfig):
# try:
# _ = getattr(classifierConfig, "predict_proba")
# return True
# except AttributeError:
# return False
def fitMonoviewClassifier(monoviewClassifier, data, labels, needProbas, randomState, nbCores=1):
if needProbas and not monoviewClassifier.canProbas():
DTConfig = {"max_depth": 300, "criterion": "entropy", "splitter": "random"}
monoviewClassifier = getattr(monoview_classifiers, "DecisionTree")(random_state=randomState, **DTConfig)
classifier = monoviewClassifier.fit(data, labels)
return classifier
else:
# if type(classifierConfig) is dict:
# pass
# else:
# classifierConfig = dict((str(configIndex), config)
# for configIndex, config in enumerate(classifierConfig))
classifier = monoviewClassifier.fit(data, labels)
return classifier
def getScores(LateFusionClassifiers):
return ""
def intersect(allClassifersNames, directory, viewsIndices, resultsMonoview, classificationIndices):
wrongSets = [[] for _ in viewsIndices]
# wrongSets = [0 for _ in allClassifersNames]
classifiers_names = [[] for _ in viewsIndices]
nbViews = len(viewsIndices)
trainLabels = np.genfromtxt(directory + "train_labels.csv", delimiter=",").astype(np.int16)
length = len(trainLabels)
for resultMonoview in resultsMonoview:
if resultMonoview.classifier_name in classifiers_names[viewsIndices.index(resultMonoview.view_index)]:
classifierIndex = classifiers_names.index(resultMonoview.classifier_name)
wrongSets[resultMonoview.view_index][classifierIndex] = np.where(
trainLabels + resultMonoview.full_labels_pred[classificationIndices[0]] == 1)[0]
else:
classifiers_names[viewsIndices.index(resultMonoview.view_index)].append(resultMonoview.classifier_name)
wrongSets[viewsIndices.index(resultMonoview.view_index)].append(
np.where(trainLabels + resultMonoview.full_labels_pred[classificationIndices[0]] == 1)[0])
combinations = itertools.combinations_with_replacement(range(len(classifiers_names[0])), nbViews)
bestLen = length
bestCombination = None
for combination in combinations:
intersect = np.arange(length, dtype=np.int16)
for viewIndex, classifierIndex in enumerate(combination):
intersect = np.intersect1d(intersect, wrongSets[viewIndex][classifierIndex])
if len(intersect) < bestLen:
bestLen = len(intersect)
bestCombination = combination
return [classifiers_names[viewIndex][index] for viewIndex, index in enumerate(bestCombination)]
def bestScore(allClassifersNames, directory, viewsIndices, resultsMonoview, classificationIndices):
nbViews = len(viewsIndices)
nbClassifiers = len(allClassifersNames)
scores = np.zeros((nbViews, nbClassifiers))
classifiers_names = [[] for _ in viewsIndices]
metricName = resultsMonoview[0].metrics_scores.keys()[0]
metricModule = getattr(metrics, metricName)
if metricModule.getConfig()[-14] == "h":
betterHigh = True
else:
betterHigh = False
for resultMonoview in resultsMonoview:
if resultMonoview.classifier_name not in classifiers_names[resultMonoview.view_index]:
classifiers_names[resultMonoview.view_index].append(resultMonoview.classifier_name)
classifierIndex = classifiers_names[resultMonoview.view_index].index(resultMonoview.classifier_name)
scores[resultMonoview.view_index, classifierIndex] = resultMonoview.metrics_scores.values()[0][0]
if betterHigh:
classifierIndices = np.argmax(scores, axis=1)
else:
classifierIndices = np.argmin(scores, axis=1)
return [classifiers_names[viewIndex][index] for viewIndex, index in enumerate(classifierIndices)]
def getClassifiers(selectionMethodName, allClassifiersNames, directory, viewsIndices, resultsMonoview,
classificationIndices):
thismodule = sys.modules[__name__]
selectionMethod = getattr(thismodule, selectionMethodName)
classifiers_names = selectionMethod(allClassifiersNames, directory, viewsIndices, resultsMonoview,
classificationIndices)
return classifiers_names
def getConfig(classifiers_names, resultsMonoview, viewsIndices):
classifiersConfigs = [0 for _ in range(len(classifiers_names))]
for classifierIndex, classifierName in enumerate(classifiers_names):
for resultMonoview in resultsMonoview:
if resultMonoview.view_index == viewsIndices[classifierIndex] and resultMonoview.classifier_name == classifierName:
classifiersConfigs[classifierIndex] = resultMonoview.classifier_config
return classifiersConfigs
class LateFusionClassifier(object):
def __init__(self, randomState, monoviewClassifiersNames, monoviewClassifiersConfigs, monoviewSelection,
NB_CORES=1):
self.monoviewClassifiersNames = monoviewClassifiersNames
monoviewClassifiersModules = [getattr(monoview_classifiers, classifierName)
for classifierName in self.monoviewClassifiersNames]
if type(monoviewClassifiersConfigs[0]) == dict:
self.monoviewClassifiers = [
getattr(monoviewClassifiersModule, classifierName)(random_state=randomState, **config)
for monoviewClassifiersModule, config, classifierName
in zip(monoviewClassifiersModules, monoviewClassifiersConfigs, monoviewClassifiersNames)]
else:
self.monoviewClassifiers = [
getattr(monoviewClassifiersModule, classifierName)(random_state=randomState,)
for monoviewClassifiersModule, config, classifierName
in zip(monoviewClassifiersModules, monoviewClassifiersConfigs, monoviewClassifiersNames)]
self.nbCores = NB_CORES
self.accuracies = np.zeros(len(monoviewClassifiersNames))
self.needProbas = False
self.monoviewSelection = monoviewSelection
self.randomState = randomState
def fit_hdf5(self, DATASET, labels, trainIndices=None, viewsIndices=None):
if type(viewsIndices) == type(None):
viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
if trainIndices is None:
trainIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
self.monoviewClassifiers = Parallel(n_jobs=self.nbCores)(
delayed(fitMonoviewClassifier)(self.monoviewClassifiers[index],
get_v(DATASET, viewIndex, trainIndices),
labels[trainIndices],
self.needProbas, self.randomState)
for index, viewIndex in enumerate(viewsIndices))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment