Skip to content
Snippets Groups Projects
Commit 240596d5 authored by Baptiste Bauvin's avatar Baptiste Bauvin
Browse files

formatted late fusion clfs that will keep for sure

parent c6783173
Branches
Tags
No related merge requests found
from ...multiview import analyze_results
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
LABELS_DICTIONARY, views, nbCores, times,
name, KFolds,
hyperParamSearch, nIter, metrics,
viewsIndices, randomState, labels, classifierModule):
return analyze_results.execute(classifier, trainLabels,
testLabels, DATASET,
classificationKWARGS, classificationIndices,
LABELS_DICTIONARY, views, nbCores, times,
name, KFolds,
hyperParamSearch, nIter, metrics,
viewsIndices, randomState, labels, classifierModule)
import logging
import pkgutil
import numpy as np
# from Methods import *
try:
from . import Methods
except ValueError:
import pdb;pdb.set_trace()
from ... import monoview_classifiers
from ...utils.dataset import get_v
# Author-Info
__author__ = "Baptiste Bauvin"
__status__ = "Prototype" # Production, Development, Prototype
def genName(config):
if config["fusionType"] == "LateFusion":
classifierRedNames = [classifierName[:4] for classifierName in config["classifiersNames"]]
return "Late-" + str(config["fusionMethod"][:4])#+"-"+"-".join(classifierRedNames)
elif config["fusionType"] == "EarlyFusion":
monoview_short_name = getattr(getattr(monoview_classifiers, config["classifiersNames"]),
config["classifiersNames"])().get_name_for_fusion()
return "Early-" + config["fusionMethod"][:4] + "-" + monoview_short_name
def getBenchmark(benchmark, args=None):
"""Used to generate the list of fusion classifiers for the benchmark"""
##### PLaceholder
# To aviod problems with the new args, as Multiview will be reworked
args = None
###########
fusionModulesNames = [name for _, name, isPackage
in pkgutil.iter_modules(['./mono_multi_view_classifiers/multiview_classifiers/fusion/Methods']) if not isPackage]
fusionMethods = dict((fusionModulesName, [name for _, name, isPackage in
pkgutil.iter_modules(
["./mono_multi_view_classifiers/multiview_classifiers/fusion/Methods/" + fusionModulesName + "Package"])
if not isPackage])
for fusionModulesName in fusionModulesNames)
if args is None:
allMonoviewAlgos = [name for _, name, isPackage in
pkgutil.iter_modules(['./mono_multi_view_classifiers/monoview_classifiers'])
if (not isPackage)]
fusionMonoviewClassifiers = allMonoviewAlgos
allFusionAlgos = {"Methods": fusionMethods, "Classifiers": fusionMonoviewClassifiers}
benchmark["multiview"]["fusion"] = allFusionAlgos
else:
benchmark["multiview"]["fusion"] = {}
if args.FU_types != [""]:
benchmark["multiview"]["fusion"]["Methods"] = dict(
(fusionType, []) for fusionType in args.FU_types)
else:
benchmark["multiview"]["fusion"]["Methods"] = dict(
(fusionModulesName, "_") for fusionModulesName in fusionModulesNames)
if "LateFusion" in benchmark["multiview"]["fusion"]["Methods"]:
if args.FU_late_methods == [""]:
benchmark["multiview"]["fusion"]["Methods"]["LateFusion"] = [name for _, name, isPackage in
pkgutil.iter_modules([
"./mono_multi_view_classifiers/multiview_classifiers/fusion/Methods/LateFusionPackage"])
if not isPackage]
else:
benchmark["multiview"]["fusion"]["Methods"]["LateFusion"] = args.FU_late_methods
if "EarlyFusion" in benchmark["multiview"]["fusion"]["Methods"]:
if args.FU_early_methods == [""]:
benchmark["multiview"]["fusion"]["Methods"]["EarlyFusion"] = [name for _, name, isPackage in
pkgutil.iter_modules([
"./mono_multi_view_classifiers/multiview_classifiers/fusion/Methods/EarlyFusionPackage"])
if not isPackage]
else:
benchmark["multiview"]["fusion"]["Methods"]["EarlyFusion"] = args.FU_early_methods
if args.CL_algos_monoview == ['']:
benchmark["multiview"]["fusion"]["Classifiers"] = [name for _, name, isPackage in
pkgutil.iter_modules(['./mono_multi_view_classifiers/monoview_classifiers'])
if (not isPackage) and (name != "SGD") and (
name[:3] != "SVM")
and (name != "SCM")]
else:
benchmark["multiview"]["fusion"]["Classifiers"] = args.CL_algos_monoview
return benchmark
def getArgs(args, benchmark, views, viewsIndices, randomState, directory, resultsMonoview, classificationIndices):
"""Used to generate the list of arguments for each fusion experimentation"""
if not "monoview" in benchmark and not args.FU_L_select_monoview in ["randomClf", "Determined"]:
args.FU_L_select_monoview = "randomClf"
argumentsList = []
for fusionType in benchmark["multiview"]["fusion"]["Methods"]:
fusionTypePackage = getattr(Methods, fusionType + "Package")
for fusionMethod in benchmark["multiview"]["fusion"]["Methods"][fusionType]:
fusionMethodModule = getattr(fusionTypePackage, fusionMethod)
arguments = fusionMethodModule.getArgs(benchmark, args, views, viewsIndices, directory, resultsMonoview,
classificationIndices)
argumentsList += arguments
return argumentsList
def makeMonoviewData_hdf5(DATASET, weights=None, usedIndices=None, viewsIndices=None):
"""Used to concatenate the viewsin one big monoview dataset"""
if type(viewsIndices) == type(None):
viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
if not usedIndices:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
NB_VIEW = len(viewsIndices)
if weights is None:
weights = np.array([1 / NB_VIEW for i in range(NB_VIEW)])
if sum(weights) != 1:
weights = weights / sum(weights)
monoviewData = np.concatenate([weights[index] * get_v(DATASET, viewIndex, usedIndices)
for index, viewIndex in enumerate(viewsIndices)], axis=1)
return monoviewData
def genParamsSets(classificationKWARGS, randomState, nIter=1):
"""Used to generate parameters sets for the random hyper parameters optimization function"""
fusionTypeName = classificationKWARGS["fusionType"]
fusionTypePackage = getattr(Methods, fusionTypeName + "Package")
fusionMethodModuleName = classificationKWARGS["fusionMethod"]
fusionMethodModule = getattr(fusionTypePackage, fusionMethodModuleName)
fusionMethodConfig = fusionMethodModule.genParamsSets(classificationKWARGS, randomState, nIter=nIter)
return fusionMethodConfig
# def gridSearch_hdf5(DATASET, viewsIndices, classificationKWARGS, learningIndices, metric=None, nIter=30):
# if type(viewsIndices) == type(None):
# viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
# fusionTypeName = classificationKWARGS["fusionType"]
# fusionTypePackage = globals()[fusionTypeName + "Package"]
# fusionMethodModuleName = classificationKWARGS["fusionMethod"]
# fusionMethodModule = getattr(fusionTypePackage, fusionMethodModuleName)
# classifiersNames = classificationKWARGS["classifiersNames"]
# bestSettings = []
# for classifierIndex, classifierName in enumerate(classifiersNames):
# logging.debug("\tStart:\t Random search for " + classifierName + " with " + str(nIter) + " iterations")
# classifierModule = getattr(monoview_classifiers, classifierName)
# classifierMethod = getattr(classifierModule, "hyperParamSearch")
# if fusionTypeName == "LateFusion":
# bestSettings.append(classifierMethod(get_v(DATASET, viewsIndices[classifierIndex], learningIndices),
# DATASET.get("Labels")[learningIndices], metric=metric,
# nIter=nIter))
# else:
# bestSettings.append(
# classifierMethod(makeMonoviewData_hdf5(DATASET, usedIndices=learningIndices, viewsIndices=viewsIndices),
# DATASET.get("Labels")[learningIndices], metric=metric,
# nIter=nIter))
# logging.debug("\tDone:\t Random search for " + classifierName)
# classificationKWARGS["classifiersConfigs"] = bestSettings
# logging.debug("\tStart:\t Random search for " + fusionMethodModuleName)
# fusionMethodConfig = fusionMethodModule.gridSearch(DATASET, classificationKWARGS, learningIndices, nIter=nIter,
# viewsIndices=viewsIndices)
# logging.debug("\tDone:\t Random search for " + fusionMethodModuleName)
# return bestSettings, fusionMethodConfig
class FusionClass:
"""The global representant of fusion"""
def __init__(self, randomState, NB_CORES=1, **kwargs):
fusionType = kwargs['fusionType']
fusionMethod = kwargs['fusionMethod']
fusionTypePackage = getattr(Methods, fusionType + "Package")
fusionMethodModule = getattr(fusionTypePackage, fusionMethod)
fusionMethodClass = getattr(fusionMethodModule, fusionMethod)
nbCores = NB_CORES
classifierKWARGS = dict(
(key, value) for key, value in kwargs.items() if key not in ['fusionType', 'fusionMethod'])
self.classifier = fusionMethodClass(randomState, NB_CORES=nbCores, **classifierKWARGS)
def setParams(self, paramsSet):
self.classifier.setParams(paramsSet)
def fit_hdf5(self, DATASET, labels, trainIndices=None, viewsIndices=None, metric=["f1_score", None]):
self.classifier.fit_hdf5(DATASET, labels, trainIndices=trainIndices, viewsIndices=viewsIndices)
def predict_hdf5(self, DATASET, usedIndices=None, viewsIndices=None):
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
if type(viewsIndices) == type(None):
viewsIndices = np.arange(DATASET.get("Metadata").attrs["nbView"])
predictedLabels = self.classifier.predict_hdf5(DATASET, usedIndices=usedIndices, viewsIndices=viewsIndices)
return predictedLabels
def predict_probas_hdf5(self, DATASET, usedIndices=None):
if usedIndices is None:
usedIndices = range(DATASET.get("Metadata").attrs["datasetLength"])
if usedIndices:
predictedLabels = self.classifier.predict_probas_hdf5(DATASET, usedIndices=usedIndices)
else:
predictedLabels = []
return predictedLabels
def getConfigString(self, classificationKWARGS):
monoviewClassifiersNames = classificationKWARGS["classifiersNames"]
monoviewClassifiersConfigs = classificationKWARGS["classifiersConfigs"]
fusionMethodConfig = classificationKWARGS["fusionMethodConfig"]
return self.classifier.getConfig(fusionMethodConfig, monoviewClassifiersNames,
monoviewClassifiersConfigs)
def getSpecificAnalysis(self, classificationKWARGS):
fusionType = classificationKWARGS["fusionType"]
if fusionType == "LateFusion":
stringAnalysis = Methods.LateFusion.getScores(self)
else:
stringAnalysis = ''
return stringAnalysis
import numpy as np
from ..multiview_classifiers.additions.late_fusion_utils import LateFusionClassifier
from ..multiview.multiview_utils import get_examples_views_indices
classifier_class_name = "MajorityVoting"
class VotingIndecision(Exception):
pass
class MajorityVoting(LateFusionClassifier):
def __init__(self, random_state, classifier_names=None,
classifier_configs=None, nb_view=None, nb_cores=1):
super(MajorityVoting, self).__init__(random_state=random_state,
classifier_names=classifier_names,
classifier_configs=classifier_configs,
nb_cores=nb_cores,
nb_view=nb_view)
def predict(self, X, example_indices=None, views_indices=None):
examples_indices, views_indices = get_examples_views_indices(X,
example_indices,
views_indices)
n_examples = len(examples_indices)
votes = np.zeros((n_examples, X.get_nb_class(example_indices)), dtype=float)
monoview_decisions = np.zeros((len(examples_indices), self.nb_view), dtype=int)
for index, view_index in enumerate(views_indices):
monoview_decisions[:, index] = self.monoviewClassifiers[index].predict(
X.get_v(view_index, examples_indices))
for example_index in range(n_examples):
for view_index, feature_classification in enumerate(monoview_decisions[example_index, :]):
votes[example_index, feature_classification] += self.weights[view_index]
nb_maximum = len(np.where(votes[example_index] == max(votes[example_index]))[0])
if nb_maximum == self.nb_view:
raise VotingIndecision("Majority voting can't decide, each classifier has voted for a different class")
predicted_labels = np.argmax(votes, axis=1)
# Can be upgraded by restarting a new classification process if
# there are multiple maximums ?:
# while nbMaximum>1:
# relearn with only the classes that have a maximum number of vote
return predicted_labels
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.externals.six import iteritems
import itertools
from pyscm.scm import SetCoveringMachineClassifier as scm
from ..multiview_classifiers.additions.late_fusion_utils import \
LateFusionClassifier
from ..multiview.multiview_utils import get_examples_views_indices
from ..monoview.monoview_utils import CustomRandint, CustomUniform
classifier_class_name = "SCMLateFusionClassifier"
class DecisionStumpSCMNew(BaseEstimator, ClassifierMixin):
"""docstring for SCM
A hands on class of SCM using decision stump, built with sklearn format in order to use sklearn function on SCM like
CV, gridsearch, and so on ..."""
def __init__(self, model_type='conjunction', p=0.1, max_rules=10, random_state=42):
super(DecisionStumpSCMNew, self).__init__()
self.model_type = model_type
self.p = p
self.max_rules = max_rules
self.random_state = random_state
def fit(self, X, y):
self.clf = scm(model_type=self.model_type, max_rules=self.max_rules, p=self.p, random_state=self.random_state)
self.clf.fit(X=X, y=y)
def predict(self, X):
return self.clf.predict(X)
def set_params(self, **params):
for key, value in iteritems(params):
if key == 'p':
self.p = value
if key == 'model_type':
self.model_type = value
if key == 'max_rules':
self.max_rules = value
def get_stats(self):
return {"Binary_attributes": self.clf.model_.rules}
class SCMLateFusionClassifier(LateFusionClassifier):
def __init__(self, random_state=None, classifier_names=None,
classifier_configs=None, nb_cores=1, nb_view=1,
p=1, max_attributes=5, order=1, model_type="conjunction"):
super(SCMLateFusionClassifier, self).__init__(random_state=random_state,
classifier_names=classifier_names,
classifier_configs=classifier_configs,
nb_cores=nb_cores,
nb_view=nb_view)
self.scm_classifier = None
self.p = p
self.max_attributes = max_attributes
self.order = order
self.model_type = model_type
self.param_names+=["model_type", "max_rules", "p", "order"]
self.distribs+=[["conjunction", "disjunction"],
CustomRandint(low=1, high=15),
CustomUniform(loc=0, state=1), [1,2,3]]
def fit(self, X, y, train_indices=None, view_indices=None):
super(SCMLateFusionClassifier, self).fit(X, y,
train_indices=train_indices,
views_indices=view_indices)
self.scm_fusion_fit(X, y, train_indices=train_indices, view_indices=view_indices)
return self
def predict(self, X, example_indices=None, view_indices=None):
example_indices, view_indices = get_examples_views_indices(X,
example_indices,
view_indices)
monoview_decisions = np.zeros((len(example_indices), self.nb_view),
dtype=int)
for index, view_index in enumerate(view_indices):
monoview_decision = self.monoview_estimators[index].predict(
X.get_v(view_index, example_indices))
monoview_decisions[:, index] = monoview_decision
features = self.generate_interactions(monoview_decisions)
predicted_labels = self.scm_classifier.predict(features)
return predicted_labels
def scm_fusion_fit(self, X, y, train_indices=None, view_indices=None):
train_indices, view_indices = get_examples_views_indices(X, train_indices, view_indices)
self.scm_classifier = DecisionStumpSCMNew(p=self.p, max_rules=self.max_attributes, model_type=self.model_type,
random_state=self.randomState)
monoview_decisions = np.zeros((len(train_indices), self.nb_view), dtype=int)
for index, view_index in enumerate(view_indices):
monoview_decisions[:, index] = self.monoviewClassifiers[index].predict(
X.get_v(view_index, train_indices))
features = self.generate_interactions(monoview_decisions)
features = np.array([np.array([feat for feat in feature])
for feature in features])
self.scm_classifier.fit(features, y[train_indices].astype(int))
def generate_interactions(self, monoview_decisions):
if self.order is None:
self.order = monoview_decisions.shape[1]
if self.order == 1:
return monoview_decisions
else:
genrated_intercations = [monoview_decisions[:, i]
for i in range(monoview_decisions.shape[1])]
for order_index in range(self.order - 1):
combins = itertools.combinations(range(monoview_decisions.shape[1]),
order_index + 2)
for combin in combins:
generated_decision = monoview_decisions[:, combin[0]]
for index in range(len(combin) - 1):
if self.model_type == "disjunction":
generated_decision = np.logical_and(generated_decision,
monoview_decisions[:, combin[index + 1]])
else:
generated_decision = np.logical_or(generated_decision,
monoview_decisions[:, combin[index + 1]])
genrated_intercations.append(generated_decision)
return np.transpose(np.array(genrated_intercations))
import numpy as np
from ..multiview_classifiers.additions.late_fusion_utils import LateFusionClassifier
from ..multiview.multiview_utils import get_examples_views_indices
classifier_class_name = "WeightedLinearLateFusion"
class WeightedLinearLateFusion(LateFusionClassifier):
def __init__(self, random_state, classifier_names=None,
classifier_configs=None, nb_view=None, nb_cores=1):
super(WeightedLinearLateFusion, self).__init__(random_state=random_state,
classifier_names=classifier_names,
classifier_configs=classifier_configs,
nb_cores=nb_cores,
nb_view=nb_view)
def predict(self, X, example_indices=None, views_indices=None):
example_indices, views_indices = get_examples_views_indices(X, example_indices, views_indices)
view_scores = []
for index, viewIndex in enumerate(views_indices):
view_scores.append(np.array(self.monoviewClassifiers[index].predict_proba(
X.get_v(viewIndex, example_indices))) * self.weights[index])
view_scores = np.array(view_scores)
predicted_labels = np.argmax(np.sum(view_scores, axis=0), axis=1)
return predicted_labels
......@@ -18,7 +18,7 @@ class FakeDataset():
def get_nb_class(self, example_indices):
return np.unique(self.labels[example_indices])
class FakeDivCoupleClf(diversity_utils.CoupleDiversityFusion):
class FakeDivCoupleClf(diversity_utils.CoupleDiversityFusionClassifier):
def __init__(self, rs, classifier_names=None,
classifiers_config=None, monoview_estimators=None):
......@@ -32,7 +32,7 @@ class FakeDivCoupleClf(diversity_utils.CoupleDiversityFusion):
return self.rs.randint(0,100)
class FakeDivGlobalClf(diversity_utils.GlobalDiversityFusion):
class FakeDivGlobalClf(diversity_utils.GlobalDiversityFusionClassifier):
def __init__(self, rs, classifier_names=None,
classifiers_config=None, monoview_estimators=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment