Skip to content
Snippets Groups Projects
Select Git revision
  • 6ca78567c4b16076e4194f429e9adca5cd4b3bdb
  • main default protected
2 results

BBPUtils.cpython-39.pyc

Blame
  • BoostUtils.py 26.12 KiB
    import numpy as np
    from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
    from sklearn.preprocessing import LabelEncoder
    from sklearn.utils.validation import check_is_fitted
    import sys
    import matplotlib.pyplot as plt
    import datetime
    
    
    class DecisionStumpClassifier(BaseEstimator, ClassifierMixin):
        """Generic Attribute Threshold Binary Classifier
    
        Attributes
        ----------
        attribute_index : int
            The attribute to consider for the classification.
        threshold : float
            The threshold value for classification rule.
        direction : int, optional
            A multiplicative constant (1 or -1) to choose the "direction" of the stump. Defaults to 1. If -1, the stump
            will predict the "negative" class (generally -1 or 0), and if 1, the stump will predict the second class (generally 1).
    
        """
        def __init__(self, attribute_index, threshold, direction=1):
            super(DecisionStumpClassifier, self).__init__()
            self.attribute_index = attribute_index
            self.threshold = threshold
            self.direction = direction
    
        def fit(self, X, y):
            # Only verify that we are in the binary classification setting, with support for transductive learning.
            if isinstance(y, np.ma.MaskedArray):
                self.classes_ = np.unique(y[np.logical_not(y.mask)])
            else:
                self.classes_ = np.unique(y)
    
            # This label encoder is there for the predict function to be able to return any two classes that were used
            # when fitting, for example {-1, 1} or {0, 1}.
            self.le_ = LabelEncoder()
            self.le_.fit(self.classes_)
            self.classes_ = self.le_.classes_
    
            assert len(self.classes_) == 2, "DecisionStumpsVoter only supports binary classification"
            return self
    
        def predict(self, X):
            """Returns the output of the classifier, on a sample X.
    
            Parameters
            ----------
            X : array-like, shape = [n_samples, n_features]
                Training vectors, where n_samples is the number of samples and
                n_features is the number of features.
    
            Returns
            -------
            predictions : array-like, shape = [n_samples]
                Predicted class labels.
    
            """
            check_is_fitted(self, 'classes_')
            return self.le_.inverse_transform(np.argmax(self.predict_proba(X), axis=1))
    
        def predict_proba(self, X):
            """Compute probabilities of possible outcomes for samples in X.
    
            Parameters
            ----------
            X : array-like, shape = [n_samples, n_features]
                Training vectors, where n_samples is the number of samples and
                n_features is the number of features.
    
            Returns
            -------
            avg : array-like, shape = [n_samples, n_classes]
                Weighted average probability for each class per sample.
    
            """
            check_is_fitted(self, 'classes_')
            X = np.asarray(X)
            probas = np.zeros((X.shape[0], 2))
            positive_class = np.argwhere(X[:, self.attribute_index] > self.threshold)
            negative_class = np.setdiff1d(range(X.shape[0]), positive_class)
            probas[positive_class, 1] = 1.0
            probas[negative_class, 0] = 1.0
    
            if self.direction == -1:
                probas = 1 - probas
    
            return probas
    
        def reverse_decision(self):
            self.direction *= -1
    
    
    class ClassifiersGenerator(BaseEstimator, TransformerMixin):
        """Base class to create a set of voters using training samples, and then transform a set of examples in
        the voters' output space.
    
        Attributes
        ----------
        self_complemented : bool, optional
            Whether or not a binary complement voter must be generated for each voter. Defaults to False.
        voters : ndarray of voter functions
            Once fit, contains the voter functions.
    
        """
        def __init__(self, self_complemented=False):
            super(ClassifiersGenerator, self).__init__()
            self.self_complemented = self_complemented
    
        def fit(self, X, y=None):
            """Generates the voters using training samples.
    
            Parameters
            ----------
            X : ndarray of shape (n_samples, n_features)
                Input data on which to base the voters.
            y : ndarray of shape (n_labeled_samples,), optional
                Input labels, usually determines the decision polarity of each voter.
    
            Returns
            -------
            self
    
            """
            raise NotImplementedError
    
        def transform(self, X):
            """Transforms the input points in a matrix of classification, using previously learned voters.
    
            Parameters
            ----------
            X : ndarray of shape (n_samples, n_features)
                Input data to classify.
    
            Returns
            -------
            ndarray of shape (n_samples, n_voters)
                The voters' decision on each example.
    
            """
            check_is_fitted(self, 'estimators_')
            return np.array([voter.predict(X) for voter in self.estimators_]).T
    
    class StumpsClassifiersGenerator(ClassifiersGenerator):
        """Decision Stump Voters transformer.
    
        Parameters
        ----------
        n_stumps_per_attribute : int, optional
            Determines how many decision stumps will be created for each attribute. Defaults to 10.
            No stumps will be created for attributes with only one possible value.
        self_complemented : bool, optional
            Whether or not a binary complement voter must be generated for each voter. Defaults to False.
    
        """
        def __init__(self, n_stumps_per_attribute=10, self_complemented=False):
            super(StumpsClassifiersGenerator, self).__init__(self_complemented)
            self.n_stumps_per_attribute = n_stumps_per_attribute
    
        def fit(self, X, y):
            """Fits Decision Stump voters on a training set.
    
            Parameters
            ----------
            X : ndarray of shape (n_samples, n_features)
                Input data on which to base the voters.
            y : ndarray of shape (n_labeled_samples,), optional
                Only used to ensure that we are in the binary classification setting.
    
            Returns
            -------
            self
    
            """
            minimums = np.min(X, axis=0)
            maximums = np.max(X, axis=0)
            ranges = (maximums - minimums) / (self.n_stumps_per_attribute + 1)
    
            self.estimators_ = [DecisionStumpClassifier(i, minimums[i] + ranges[i] * stump_number, 1).fit(X, y)
                                for i in range(X.shape[1]) for stump_number in range(1, self.n_stumps_per_attribute + 1)
                                if ranges[i] != 0]
    
            if self.self_complemented:
                self.estimators_ += [DecisionStumpClassifier(i, minimums[i] + ranges[i] * stump_number, -1).fit(X, y)
                                     for i in range(X.shape[1]) for stump_number in range(1, self.n_stumps_per_attribute + 1)
                                     if ranges[i] != 0]
    
            self.estimators_ = np.asarray(self.estimators_)
            return self
    
    def _as_matrix(element):
        """ Utility function to convert "anything" to a Numpy matrix.
        """
        # If a scalar, return a 1x1 matrix.
        if len(np.shape(element)) == 0:
            return np.matrix([[element]], dtype=float)
    
        # If a nd-array vector, return a column matrix.
        elif len(np.shape(element)) == 1:
            matrix = np.matrix(element, dtype=float)
            if np.shape(matrix)[1] != 1:
                matrix = matrix.T
            return matrix
    
        return np.matrix(element, dtype=float)
    
    
    def _as_column_matrix(array_like):
        """ Utility function to convert any array to a column Numpy matrix.
        """
        matrix = _as_matrix(array_like)
        if 1 not in np.shape(matrix):
            raise ValueError("_as_column_vector: input must be a vector")
    
        if np.shape(matrix)[0] == 1:
            matrix = matrix.T
    
        return matrix
    
    
    def _as_line_matrix(array_like):
        """ Utility function to convert any array to a line Numpy matrix.
        """
        matrix = _as_matrix(array_like)
        if 1 not in np.shape(matrix):
            raise ValueError("_as_column_vector: input must be a vector")
    
        if np.shape(matrix)[1] == 1:
            matrix = matrix.T
    
        return matrix
    
    
    
    
    
    def sign(array):
        """Computes the elementwise sign of all elements of an array. The sign function returns -1 if x <=0 and 1 if x > 0.
        Note that numpy's sign function can return 0, which is not desirable in most cases in Machine Learning algorithms.
    
        Parameters
        ----------
        array : array-like
            Input values.
    
        Returns
        -------
        ndarray
            An array with the signs of input elements.
    
        """
        signs = np.sign(array)
    
        signs[array == 0] = -1
        return signs
    
    
    class ConvexProgram(object):
        """
        Encapsulates a quadratic program of the following form:
    
        minimize    (1/2)*x'*P*x + q'*x
        subject to  G*x <= h
                    A*x = b.
    
    
        or a linear program of the following form:
    
        minimize    c'*x
        subject to  G*x <= h
                    A*x = b
        """
        def __init__(self):
            self._quadratic_func = None
            self._linear_func = None
            self._inequality_constraints_matrix = None
            self._inequality_constraints_values = None
            self._equality_constraints_matrix = None
            self._equality_constraints_values = None
            self._lower_bound_values = None
            self._upper_bound_values = None
            self._n_variables = None
    
        @property
        def n_variables(self):
            return self._n_variables
    
        @property
        def quadratic_func(self):
            return self._quadratic_func
    
        @quadratic_func.setter
        def quadratic_func(self, quad_matrix):
            quad_matrix = _as_matrix(quad_matrix)
            n_lines, n_columns = np.shape(quad_matrix)
            assert(n_lines == n_columns)
    
            if self._linear_func is not None:
                assert(np.shape(quad_matrix)[0] == self._n_variables)
            else:
                self._n_variables = n_lines
    
            self._quadratic_func = quad_matrix
    
        @property
        def linear_func(self):
            return self._linear_func
    
        @linear_func.setter
        def linear_func(self, lin_vector):
            if lin_vector is not None:
                lin_vector = _as_column_matrix(lin_vector)
    
                if self._quadratic_func is not None:
                    assert(np.shape(lin_vector)[0] == self._n_variables)
    
                else:
                    self._n_variables = np.shape(lin_vector)[0]
    
                self._linear_func = lin_vector
    
        def add_inequality_constraints(self, inequality_matrix, inequality_values):
            if inequality_matrix is None:
                return
    
            self._assert_objective_function_is_set()
    
            if 1 in np.shape(inequality_matrix) or len(np.shape(inequality_matrix)) == 1:
                inequality_matrix = _as_line_matrix(inequality_matrix)
            else:
                inequality_matrix = _as_matrix(inequality_matrix)
    
            inequality_values = _as_column_matrix(inequality_values)
            assert np.shape(inequality_matrix)[1] == self._n_variables
            assert np.shape(inequality_values)[1] == 1
    
            if self._inequality_constraints_matrix is None:
                self._inequality_constraints_matrix = inequality_matrix
            else:
                self._inequality_constraints_matrix = np.append(self._inequality_constraints_matrix,
                                                                inequality_matrix, axis=0)
    
            if self._inequality_constraints_values is None:
                self._inequality_constraints_values = inequality_values
            else:
                self._inequality_constraints_values = np.append(self._inequality_constraints_values,
                                                                inequality_values, axis=0)
    
        def add_equality_constraints(self, equality_matrix, equality_values):
            if equality_matrix is None:
                return
    
            self._assert_objective_function_is_set()
    
            if 1 in np.shape(equality_matrix) or len(np.shape(equality_matrix)) == 1:
                equality_matrix = _as_line_matrix(equality_matrix)
            else:
                equality_matrix = _as_matrix(equality_matrix)
    
            equality_values = _as_matrix(equality_values)
            assert np.shape(equality_matrix)[1] == self._n_variables
            assert np.shape(equality_values)[1] == 1
    
            if self._equality_constraints_matrix is None:
                self._equality_constraints_matrix = equality_matrix
            else:
                self._equality_constraints_matrix = np.append(self._equality_constraints_matrix,
                                                              equality_matrix, axis=0)
    
            if self._equality_constraints_values is None:
                self._equality_constraints_values = equality_values
            else:
                self._equality_constraints_values = np.append(self._equality_constraints_values,
                                                              equality_values, axis=0)
    
        def add_lower_bound(self, lower_bound):
            if lower_bound is not None:
                self._assert_objective_function_is_set()
                self._lower_bound_values = np.array([lower_bound] * self._n_variables)
    
        def add_upper_bound(self, upper_bound):
            if upper_bound is not None:
                self._assert_objective_function_is_set()
                self._upper_bound_values = np.array([upper_bound] * self._n_variables)
    
        def _convert_bounds_to_inequality_constraints(self):
            self._assert_objective_function_is_set()
    
            if self._lower_bound_values is not None:
                c_matrix = []
                for i in range(self._n_variables):
                    c_line = [0] * self._n_variables
                    c_line[i] = -1.0
                    c_matrix.append(c_line)
    
                c_vector = _as_column_matrix(self._lower_bound_values)
                self._lower_bound_values = None
                self.add_inequality_constraints(np.matrix(c_matrix).T, c_vector)
    
            if self._upper_bound_values is not None:
                c_matrix = []
                for i in range(self._n_variables):
                    c_line = [0] * self._n_variables
                    c_line[i] = 1.0
                    c_matrix.append(c_line)
    
                c_vector = _as_column_matrix(self._upper_bound_values)
                self._upper_bound_values = None
                self.add_inequality_constraints(np.matrix(c_matrix).T, c_vector)
    
        def _convert_to_cvxopt_matrices(self):
            from cvxopt import matrix as cvxopt_matrix
    
            if self._quadratic_func is not None:
                self._quadratic_func = cvxopt_matrix(self._quadratic_func)
    
            if self._linear_func is not None:
                self._linear_func = cvxopt_matrix(self._linear_func)
            else:
                # CVXOPT needs this vector to be set even if it is not used, so we put zeros in it!
                self._linear_func = cvxopt_matrix(np.zeros((self._n_variables, 1)))
    
            if self._inequality_constraints_matrix is not None:
                self._inequality_constraints_matrix = cvxopt_matrix(self._inequality_constraints_matrix)
    
            if self._inequality_constraints_values is not None:
                self._inequality_constraints_values = cvxopt_matrix(self._inequality_constraints_values)
    
            if self._equality_constraints_matrix is not None:
                self._equality_constraints_matrix = cvxopt_matrix(self._equality_constraints_matrix)
    
            if self._equality_constraints_values is not None:
                self._equality_constraints_values = cvxopt_matrix(self._equality_constraints_values)
    
        def _assert_objective_function_is_set(self):
            assert self._n_variables is not None
    
        def solve(self, solver="cvxopt", feastol=1e-7, abstol=1e-7, reltol=1e-6, return_all_information=False):
    
            # Some solvers are very verbose, and we don't want them to pollute STDOUT or STDERR.
            original_stdout = sys.stdout
            original_stderr = sys.stderr
    
            ret = None
    
            # TODO: Repair
            # if solver == "cvxopt":
            #     stdout_logger = logging.getLogger('CVXOPT')
            #     sl = StreamToLogger(stdout_logger, logging.DEBUG)
            #     sys.stdout = sl
    
            #     stderr_logger = logging.getLogger('CVXOPT')
            #     sl = StreamToLogger(stderr_logger, logging.WARNING)
            #     sys.stderr = sl
    
            try:
                if solver == "cvxopt":
                    from cvxopt.solvers import qp, lp, options
                    options['feastol'] = feastol
                    options['abstol'] = abstol
                    options['reltol'] = reltol
                    options['show_progress'] = False
    
                    self._convert_bounds_to_inequality_constraints()
                    self._convert_to_cvxopt_matrices()
    
                    if self._quadratic_func is not None:
                        ret = qp(self.quadratic_func, self.linear_func, self._inequality_constraints_matrix,
                                 self._inequality_constraints_values, self._equality_constraints_matrix,
                                 self._equality_constraints_values)
    
                    else:
                        ret = lp(self.linear_func,
                                 G=self._inequality_constraints_matrix,
                                 h=self._inequality_constraints_values,
                                 A=self._equality_constraints_matrix,
                                 b=self._equality_constraints_values)
    
                    #logging.info("Primal objective value  = {}".format(ret['primal objective']))
                    #logging.info("Dual objective value  = {}".format(ret['dual objective']))
    
                    if not return_all_information:
                        ret = np.asarray(np.array(ret['x']).T[0])
    
                elif solver == "cplex":
                    import cplex
                    p = cplex.Cplex()
                    p.objective.set_sense(p.objective.sense.minimize)
    
                    # This is ugly. CPLEX wants a list of lists of lists. First dimension represents the lines of the QP
                    # matrix. Second dimension contains a pair of two elements: the indices of the variables in play (all of
                    # them...), and the values (columns of the QP matrix).
                    names = [str(x) for x in range(self._n_variables)]
                    p.variables.add(names=names)
    
                    if self.quadratic_func is not None:
                        p_matrix = []
                        for line in self._quadratic_func:
                            p_matrix.append([names, line.tolist()[0]])
    
                        p.objective.set_quadratic(p_matrix)
    
                    if self.linear_func is not None:
                        p.objective.set_linear(zip(names,
                                                   np.asarray(self.linear_func.T).reshape(self.n_variables,).tolist()))
    
                    if self._inequality_constraints_matrix is not None:
                        inequality_linear = []
                        for line in self._inequality_constraints_matrix:
                            inequality_linear.append([names, line.tolist()[0]])
                        p.linear_constraints.add(lin_expr=inequality_linear,
                                                 rhs=np.asarray(self._inequality_constraints_values.T).tolist()[0],
                                                 senses="L"*len(self._inequality_constraints_values))
    
                    if self._equality_constraints_matrix is not None:
                        equality_linear = []
                        for line in self._equality_constraints_matrix:
                            equality_linear.append([names, line.tolist()[0]])
                        p.linear_constraints.add(lin_expr=equality_linear,
                                                 rhs=np.asarray(self._equality_constraints_values.T).tolist()[0],
                                                 senses="E"*len(self._equality_constraints_values))
    
                    if self._lower_bound_values is not None:
                        p.variables.set_lower_bounds(zip(names, self._lower_bound_values))
    
                    if self._upper_bound_values is not None:
                        p.variables.set_upper_bounds(zip(names, self._upper_bound_values))
    
                    p.solve()
    
                    if not return_all_information:
                        ret = np.array(p.solution.get_values())
                    else:
                        ret = {'primal': np.array(p.solution.get_values()),
                               'dual': np.array(p.solution.get_dual_values())}
    
                elif solver == "pycpx":
                    # This shows how easy it is to use pycpx. However, it is much slower (as it is more versatile!).
    
                    import pycpx
                    model = pycpx.CPlexModel(verbosity=2)
                    q = model.new(self.n_variables)
    
                    if self._inequality_constraints_matrix is not None:
                        model.constrain(self._inequality_constraints_matrix * q <= self._inequality_constraints_values)
                    if self._equality_constraints_matrix is not None:
                        model.constrain(self._equality_constraints_matrix * q == self._equality_constraints_values)
                    if self._lower_bound_values is not None:
                        model.constrain(q >= self._lower_bound_values)
                    if self._upper_bound_values is not None:
                        model.constrain(q <= self._upper_bound_values)
    
                    value = model.minimize(0.5 * q.T * self._quadratic_func * q + self.linear_func.T * q)
    
                    if not return_all_information:
                        ret = np.array(model[q])
                    else:
                        ret = model
    
            except:
                raise
    
            finally:
                sys.stdout = original_stdout
                sys.stderr = original_stderr
    
            return ret
    
        def _as_matrix(element):
            """ Utility function to convert "anything" to a Numpy matrix.
            """
            # If a scalar, return a 1x1 matrix.
            if len(np.shape(element)) == 0:
                return np.matrix([[element]], dtype=float)
    
            # If a nd-array vector, return a column matrix.
            elif len(np.shape(element)) == 1:
                matrix = np.matrix(element, dtype=float)
                if np.shape(matrix)[1] != 1:
                    matrix = matrix.T
                return matrix
    
            return np.matrix(element, dtype=float)
    
        def _as_column_matrix(array_like):
            """ Utility function to convert any array to a column Numpy matrix.
            """
            matrix = _as_matrix(array_like)
            if 1 not in np.shape(matrix):
                raise ValueError("_as_column_vector: input must be a vector")
    
            if np.shape(matrix)[0] == 1:
                matrix = matrix.T
    
            return matrix
    
        def _as_line_matrix(array_like):
            """ Utility function to convert any array to a line Numpy matrix.
            """
            matrix = _as_matrix(array_like)
            if 1 not in np.shape(matrix):
                raise ValueError("_as_column_vector: input must be a vector")
    
            if np.shape(matrix)[1] == 1:
                matrix = matrix.T
    
            return matrix
    
        def sign(array):
            """Computes the elementwise sign of all elements of an array. The sign function returns -1 if x <=0 and 1 if x > 0.
            Note that numpy's sign function can return 0, which is not desirable in most cases in Machine Learning algorithms.
    
            Parameters
            ----------
            array : array-like
                Input values.
    
            Returns
            -------
            ndarray
                An array with the signs of input elements.
    
            """
            signs = np.sign(array)
    
            signs[array == 0] = -1
            return signs
    
    def get_accuracy_graph(train_accuracies, classifier_name, file_name):
        f, ax = plt.subplots(nrows=1, ncols=1)
        ax.set_title("Accuracies during train for "+classifier_name)
        x = np.arange(len(train_accuracies))
        scat = ax.scatter(x, np.array(train_accuracies), )
        ax.legend((scat,), ("Accuracies",))
        plt.tight_layout()
        f.savefig(file_name)
        plt.close()
    
    class BaseBoost(object):
    
        def __init__(self):
            self.n_stumps = 10
    
        def _collect_probas(self, X):
            return np.asarray([clf.predict_proba(X) for clf in self.estimators_generator.estimators_])
    
        def _binary_classification_matrix(self, X):
            probas = self._collect_probas(X)
            predicted_labels = np.argmax(probas, axis=2)
            predicted_labels[predicted_labels == 0] = -1
            values = np.max(probas, axis=2)
            return (predicted_labels * values).T
    
        def _initialize_alphas(self, n_examples):
            raise NotImplementedError("Alpha weights initialization function is not implemented.")
    
        def check_opposed_voters(self, ):
            nb_opposed = 0
            for column in self.classification_matrix[:, self.chosen_columns_].transpose():
                for chosen_col in self.chosen_columns_:
                    if (-column.reshape((self.n_total_examples, 1)) == self.classification_matrix[:, chosen_col]).all():
                        nb_opposed+=1
            return int(nb_opposed/2)
    
    
    def getInterpretBase(classifier, directory, classifier_name, weights,
                         break_cause=" the dual constrail was not violated"):
        interpretString = "\t "+classifier_name+" permformed classification with weights : \n"
        weights_sort = np.argsort(-weights)
        interpretString += np.array2string(weights[weights_sort], precision=4, separator=',', suppress_small=True)
        interpretString += "\n \t It used {} iterations to converge, and selected {} couple(s) of opposed voters".format(
            len(weights_sort), classifier.nb_opposed_voters)
        if len(weights_sort) == classifier.n_max_iterations or len(weights) == classifier.n_total_hypotheses_:
            if len(weights) == classifier.n_max_iterations:
                interpretString += ", and used all available iterations, "
            else:
                interpretString += "."
            if len(weights) == classifier.n_total_hypotheses_:
                interpretString += ", and all the voters have been used."
            else:
                interpretString += "."
        else:
            interpretString += ", and the loop was broken because"+break_cause
        interpretString += "\n\t Selected voters : \n"
        interpretString += np.array2string(np.array(classifier.chosen_columns_)[weights_sort])
        interpretString += "\n\t Trained in "+str(datetime.timedelta(seconds=classifier.train_time))+" and predicted in "+str(datetime.timedelta(seconds=classifier.predict_time))+"."
        interpretString += "\n\t Selected columns : \n"
        interpretString += np.array2string(classifier.classification_matrix[:, classifier.chosen_columns_], precision=4,
                                           separator=',', suppress_small=True)
        np.savetxt(directory + "voters.csv", classifier.classification_matrix[:, classifier.chosen_columns_], delimiter=',')
        np.savetxt(directory + "weights.csv", classifier.weights_, delimiter=',')
        get_accuracy_graph(classifier.train_accuracies, classifier_name, directory + 'accuracies.png')
        return interpretString