Skip to content
Snippets Groups Projects
test_mumbo.py 36 KiB
Newer Older
  • Learn to ignore specific revisions
  • Dominique Benielli's avatar
    Dominique Benielli committed
    # -*- coding: utf-8 -*-
    """Testing for the mumbo module."""
    
    # Université d'Aix Marseille (AMU) -
    # Centre National de la Recherche Scientifique (CNRS) -
    # Université de Toulon (UTLN).
    # Copyright © 2017-2018 AMU, CNRS, UTLN
    #
    # This file is part of multimodalboost.
    #
    # multimodalboost is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Lesser General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # multimodalboost is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Lesser General Public License for more details.
    #
    # You should have received a copy of the GNU Lesser General Public License
    # along with multimodalboost. If not, see <http://www.gnu.org/licenses/>.
    #
    # Author: Florent JAILLET - Laboratoire d'Informatique et Systèmes - UMR 7020
    
    import pickle
    import unittest
    import numpy as np
    from scipy.sparse import csc_matrix, csr_matrix, coo_matrix, dok_matrix
    from scipy.sparse import lil_matrix
    from sklearn.model_selection import GridSearchCV
    from sklearn.svm import SVC
    from sklearn.utils.estimator_checks import check_estimator
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.cluster import KMeans
    from sklearn.tree import DecisionTreeClassifier
    from sklearn import datasets
    
    from multimodal.boosting.mumbo import MumboClassifier
    
    Dominique Benielli's avatar
    Dominique Benielli committed
    
    
    class TestMuCumboClassifier(unittest.TestCase):
    
        @classmethod
        def setUpClass(clf):
            # Load the iris dataset
            iris = datasets.load_iris()
            iris.views_ind = np.array([0, 2, 4])
            clf.iris = iris
    
    
        def test_sparse(self):
            rng = np.random.RandomState(0)
            X = rng.rand(40, 10)
            X[X < .8] = 0
            X_csr = csr_matrix(X)
            clf = MumboClassifier()
            y = (4 * rng.rand(40)).astype(np.int)
            clf.fit(X_csr, y)
    
    
    Dominique Benielli's avatar
    Dominique Benielli committed
        def test_init_var(self):
            n_classes = 3
    
            n_views = 3
            y = np.array([0, 2, 1, 2])
            expected_cost = np.array(
                [[[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]],
                 [[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]],
                 [[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]]],
                dtype=np.float64)
            expected_cost_glob = np.array(
                [[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]], dtype=np.float64)
            expected_label_score = np.zeros((n_views, y.shape[0], n_classes))
            expected_label_score_glob = np.zeros((y.shape[0], n_classes))
            expected_predicted_classes_shape = ((n_views, y.shape[0]))
    
            clf = MumboClassifier()
            clf.n_classes_ = n_classes
            (cost, cost_glob, label_score, label_score_glob,
             predicted_classes) = clf._init_var(n_views, y)
            np.testing.assert_equal(cost, expected_cost)
    
            np.testing.assert_equal(cost_glob, expected_cost_glob)
            np.testing.assert_equal(label_score, expected_label_score)
            np.testing.assert_equal(label_score_glob, expected_label_score_glob)
            self.assertEqual(predicted_classes.shape, expected_predicted_classes_shape)
    
    
        def test_compute_edge_global(self):
            cost_global = np.array([[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]],
                                   dtype=np.float64)
            predicted_classes = np.array([[0, 0, 1, 1], [0, 1, 0, 2], [2, 2, 0, 0]])
            y = np.array([0, 2, 1, 2])
            expected_edge_global = np.array([0.25, 0.25, -0.125])
    
            clf = MumboClassifier()
            edge_global = clf._compute_edge_global(cost_global, predicted_classes, y)
            np.testing.assert_equal(edge_global, expected_edge_global)
    
    
        def test_compute_dist(self):
            cost = np.array(
                [[[-2, 1, 1], [-1, -1, -2], [1, -2, 1], [1, 1, -2]],
                 [[-1, 2, 2], [2, 2, -1], [-2, 4, -2], [2, 2, -4]],
                 [[1, 4, -4], [-1, 3, -1], [-2, 2, 4], [4, 4, -4]]],
                dtype=np.float64)
            y = np.array([0, 2, 1, 2])
            expected_dist = np.array(
                [[0.25, 0.25, 0.25, 0.25], [0.5, 0.5, -2., 2.], [-0.5, 0.5, -1., 2.]])
    
            clf = MumboClassifier()
            dist = clf._compute_dist(cost, y)
            np.testing.assert_equal(dist, expected_dist)
    
            # The computation of the distribution only uses the costs when predicting
            # the right classes, so the following cost matrix should give the same
            # result as the previous.
            cost = np.array(
                [[[-2, 0, 0], [0, 0, -2], [0, -2, 0], [0, 0, -2]],
                 [[-1, 0, 0], [0, 0, -1], [0, 4, 0], [0, 0, -4]],
                 [[1, 0, 0], [0, 0, -1], [0, 2, 0], [0, 0, -4]]],
                dtype=np.float64)
    
            dist = clf._compute_dist(cost, y)
            np.testing.assert_equal(dist, expected_dist)
    
        def test_compute_coop_coef(self):
            y = np.array([0, 1, 2, 0])
            predicted_classes = np.array([[0, 0, 1, 1], [0, 1, 0, 2], [2, 2, 0, 0]])
            expected_coop_coef = np.array([[1, 0, 1, 0], [1, 1, 1, 0], [0, 0, 1, 1]],
                                          dtype=np.float64)
    
            clf = MumboClassifier()
            coop_coef = clf._compute_coop_coef(predicted_classes, y)
            np.testing.assert_equal(coop_coef, expected_coop_coef)
    
        def test_compute_edges(self):
            cost = np.array(
                [[[-2, 1, 1], [-1, -1, -2], [1, -2, 1], [1, 1, -2]],
                 [[-2, 2, 2], [2, 2, -4], [-2, -4, -2], [2, 2, -4]],
                 [[1, 4, -4], [-1, 3, -1], [-2, 4, 4], [4, 4, -1]]],
                dtype=np.float64)
            predicted_classes = np.array([[0, 2, 1, 1], [0, 1, 0, 2], [2, 2, 0, 1]])
            y = np.array([0, 2, 1, 2])
            expected_edges = np.array([1.25, 0.75, 0.25])
    
            clf = MumboClassifier()
            edges = clf._compute_edges(cost, predicted_classes, y)
            np.testing.assert_equal(edges, expected_edges)
    
        def test_compute_alphas(self):
            decimal = 12
            expected_alpha = 0.5
            edge = (np.e-1.) / (np.e+1.)
    
            clf = MumboClassifier()
            alpha = clf._compute_alphas(edge)
    
            np.testing.assert_almost_equal(alpha, expected_alpha, decimal)
    
            expected_alphas = np.array([0.5, 1., 2.])
            tmp = np.array([np.e, np.e**2, np.e**4])
            edges = (tmp-1.) / (tmp+1.)
    
            alphas = clf._compute_alphas(edges)
            np.testing.assert_almost_equal(alphas, expected_alphas, decimal)
    
        def test_compute_cost_global(self):
            decimal = 12
            label_score_glob = np.array(
                [[-1, -2, 4], [-8, 1, 4], [2, 8, -4], [2, -1, 4]],
                dtype=np.float64)
            best_pred_classes = np.array([0, 1, 0, 2])
            y = np.array([0, 2, 1, 2])
            alpha = 0.5
            expected_label_score_glob = np.array(
                [[-0.5, -2, 4], [-8, 1.5, 4], [2.5, 8, -4], [2, -1, 4.5]],
                dtype=np.float64)
    
            clf = MumboClassifier()
            cost_glob, label_score_glob = clf._compute_cost_global(
                label_score_glob, best_pred_classes, y, alpha)
            np.testing.assert_almost_equal(label_score_glob, expected_label_score_glob,
                                           decimal)
    
            label_score_glob = np.zeros((4, 3), dtype=np.float64)
            alpha = 0.
            expected_label_score_glob = np.zeros((4, 3), dtype=np.float64)
            expected_cost_glob = np.array(
                [[-2, 1, 1], [1, 1, -2], [1, -2, 1], [1, 1, -2]],
                dtype=np.float64)
    
            cost_glob, label_score_glob = clf._compute_cost_global(
                label_score_glob, best_pred_classes, y, alpha)
            np.testing.assert_equal(label_score_glob, expected_label_score_glob)
            np.testing.assert_equal(cost_glob, expected_cost_glob, decimal)
            label_score_glob = np.array(
                [[0, 0, np.log(4)], [np.log(8), 0, 0], [0, 0, 0], [0, 0, 0]],
                dtype=np.float64)
            alpha = np.log(2)
            expected_label_score_glob = np.array(
                [[alpha, 0, np.log(4)],
                 [np.log(8), alpha, 0],
                 [alpha, 0, 0],
                 [0, 0, alpha]],
                dtype=np.float64)
            expected_cost_glob = np.array(
                [[-2.5, 0.5, 2.], [8., 2., -10.], [2., -3., 1.], [0.5, 0.5, -1.]],
                dtype=np.float64)
    
            cost_glob, label_score_glob = clf._compute_cost_global(
                label_score_glob, best_pred_classes, y, alpha)
    
            np.testing.assert_almost_equal(label_score_glob, expected_label_score_glob,
                                           decimal)
            np.testing.assert_almost_equal(cost_glob, expected_cost_glob, decimal)
    
        def test_compute_cost(self):
            decimal = 12
            label_score = np.array(
                [[[-1, -2, 4], [-8, 1, 4], [2, 8, -4], [2, -1, 4]],
                 [[2, -2, 1], [4, -1, 2], [1, 2, 4], [-2, 8, -1]],
                 [[8, 2, -4], [2, 4, -2], [4, 1, -2], [8, 2, 1]]],
                dtype=np.float64)
            pred_classes = np.array([[0, 2, 1, 1], [0, 1, 0, 0], [2, 2, 0, 1]])
            y = np.array([0, 2, 1, 2])
            alphas = np.array([0.25, 0.5, 2.])
            expected_label_score = np.array(
                [[[-0.75, -2, 4], [-8, 1, 4.25], [2, 8.25, -4], [2, -0.75, 4]],
                 [[2.5, -2, 1], [4, -0.5, 2], [1.5, 2, 4], [-1.5, 8, -1]],
                 [[8, 2, -2.], [2, 4, 0.], [6., 1, -2], [8, 4., 1]]],
                dtype=np.float64)
    
            clf = MumboClassifier()
            cost, label_score = clf._compute_cost(label_score, pred_classes, y, alphas,
                                                  use_coop_coef=False)
    
            np.testing.assert_almost_equal(label_score, expected_label_score,
                                           decimal)
    
            label_score = np.array(
                [[[-1, -2, 4], [-8, 1, 4], [2, 8, -4], [2, -1, 4]],
                 [[2, -2, 1], [4, -1, 2], [1, 2, 4], [-2, 8, -1]],
                 [[8, 2, -4], [2, 4, -2], [4, 1, -2], [8, 2, 1]]],
                dtype=np.float64)
            expected_label_score = np.array(
                [[[-0.75, -2, 4], [-8, 1, 4.25], [2, 8.25, -4], [2, -0.75, 4]],
                 [[2.5, -2, 1], [4, -1, 2], [1, 2, 4], [-1.5, 8, -1]],
                 [[8, 2, -4], [2, 4, 0.], [4, 1, -2], [8, 4., 1]]],
                dtype=np.float64)
    
            clf = MumboClassifier()
            cost, label_score = clf._compute_cost(label_score, pred_classes, y, alphas,
                                                  use_coop_coef=True)
    
            np.testing.assert_almost_equal(label_score, expected_label_score,
                                           decimal)
    
            label_score = np.array(
                [[[0, 0, np.log(4)], [np.log(8), 0, 0], [0, 0, 0], [0, 0, 0]],
                 [[0, np.log(2), 0], [0, 0, 0], [0, 0, 0], [0, np.log(4), 0]],
                 [[0, 0, 0], [np.log(8), 0, 0], [0, np.log(2), 0], [0, 0, 0]]],
                dtype=np.float64)
            alphas = np.array([np.log(2), np.log(4), np.log(8)])
            expected_label_score = np.array(
                [[[np.log(2), 0, np.log(4)],
                  [np.log(8), 0, np.log(2)],
                  [0, np.log(2), 0],
                  [0, np.log(2), 0]],
                 [[np.log(4), np.log(2), 0],
                  [0, np.log(4), 0],
                  [np.log(4), 0, 0],
                  [np.log(4), np.log(4), 0]],
                 [[0, 0, np.log(8)],
                  [np.log(8), 0, np.log(8)],
                  [np.log(8), np.log(2), 0],
                  [0, np.log(8), 0]]],
                dtype=np.float64)
            expected_cost = np.array(
                [[[-2.5, 0.5, 2.], [4., 0.5, -4.5], [0.5, -1., 0.5], [1., 2., -3.]],
                 [[-0.75, 0.5, 0.25], [1., 4., -5.], [4., -5., 1.], [4., 4., -8.]],
                 [[-9., 1., 8.], [1., 0.125, -1.125], [4., -4.5, 0.5], [1., 8., -9.]]],
                dtype=np.float64)
    
            clf = MumboClassifier()
            cost, label_score = clf._compute_cost(label_score, pred_classes, y, alphas,
                                                  use_coop_coef=False)
    
            np.testing.assert_almost_equal(label_score, expected_label_score,
                                           decimal)
            np.testing.assert_almost_equal(cost, expected_cost, decimal)
    
            label_score = np.array(
                [[[0, 0, np.log(4)], [np.log(8), 0, 0], [0, 0, 0], [0, 0, 0]],
                 [[0, np.log(2), 0], [0, 0, 0], [0, 0, 0], [0, np.log(4), 0]],
                 [[0, 0, 0], [np.log(8), 0, 0], [0, np.log(2), 0], [0, 0, 0]]],
                dtype=np.float64)
            alphas = np.array([np.log(2), np.log(4), np.log(8)])
            expected_label_score = np.array(
                [[[np.log(2), 0, np.log(4)],
                  [np.log(8), 0, np.log(2)],
                  [0, np.log(2), 0],
                  [0, np.log(2), 0]],
                 [[np.log(4), np.log(2), 0],
                  [0, 0, 0],
                  [0, 0, 0],
                  [np.log(4), np.log(4), 0]],
                 [[0, 0, 0],
                  [np.log(8), 0, np.log(8)],
                  [0, np.log(2), 0],
                  [0, np.log(8), 0]]],
                dtype=np.float64)
            expected_cost = np.array(
                [[[-2.5, 0.5, 2.], [4., 0.5, -4.5], [0.5, -1., 0.5], [1., 2., -3.]],
                 [[-0.75, 0.5, 0.25], [1., 1., -2.], [1., -2., 1.], [4., 4., -8.]],
                 [[-2., 1., 1.], [1., 0.125, -1.125], [0.5, -1., 0.5], [1., 8., -9.]]],
                dtype=np.float64)
    
            clf = MumboClassifier()
            cost, label_score = clf._compute_cost(label_score, pred_classes, y, alphas,
                                                  use_coop_coef=True)
    
            np.testing.assert_almost_equal(label_score, expected_label_score,
                                           decimal)
            np.testing.assert_almost_equal(cost, expected_cost, decimal)
    
        def test_algo_options(self):
            seed = 7
            np.random.seed(seed)
    
            n_estimators = 10
    
            #print("iris views ind", self.iris.views_ind)
    
    Dominique Benielli's avatar
    Dominique Benielli committed
            clf = MumboClassifier(n_estimators=n_estimators, best_view_mode='edge')
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
            score = clf.score(self.iris.data, self.iris.target)
            self.assertGreater(score, 0.95, "Failed with score = {}".format(score))
    
            clf = MumboClassifier(n_estimators=n_estimators, best_view_mode='error')
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
            score = clf.score(self.iris.data, self.iris.target)
            self.assertGreater(score, 0.95, "Failed with score = {}".format(score))
    
            self.assertRaises(ValueError, MumboClassifier, best_view_mode='test')
    
            clf = MumboClassifier()
            clf.best_view_mode = 'test'
            self.assertRaises(ValueError, clf.fit, self.iris.data,
                              self.iris.target, self.iris.views_ind)
    
        def test_fit_arg(self):
            seed = 7
            np.random.seed(seed)
    
            # Check that using the default value for views_ind corresponds to using 2
            # views
            X = np.array([[1., 1., 1.], [-1., -1., -1.]])
            y = np.array([0, 1])
            expected_views_ind = np.array([0, 1, 3])
            clf = MumboClassifier()
            clf.fit(X, y)
    
            np.testing.assert_equal(clf.X_.views_ind, expected_views_ind)
    
    Dominique Benielli's avatar
    Dominique Benielli committed
    
            # Check that classes labels can be integers or strings and can be stored
            # into any kind of sequence
            views_ind = np.array([0, 1, 3])
            y = np.array([3, 1])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
    
            y = np.array(["class_1", "class_2"])
            clf = MumboClassifier()
            clf.fit(X, y)
            np.testing.assert_equal(clf.predict(X), y)
    
            y = [1, 0]
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
    
            y = (2, 1)
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
    
            # Check that misformed or inconsistent inputs raise expections
            X = np.zeros((5, 4, 2))
            y = np.array([0, 1])
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            X = ["str1", "str2"]
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            X = np.array([[1., 1., 1.], [-1., -1., -1.]])
            y = np.array([1])
            views_ind = np.array([0, 1, 3])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            y = np.array([1, 0, 0, 1])
            views_ind = np.array([0, 1, 3])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            y = np.array([3.2, 1.1])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            y = np.array([0, 1])
            views_ind = np.array([0, 3, 1])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([-1, 1, 3])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([0, 1, 4])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([0.5, 1, 3])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array("test")
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.zeros((3, 2, 4))
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[-1], [1, 2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[3], [1, 2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[0.5], [1, 2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[-1, 0], [1, 2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[0, 3], [1, 2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
            views_ind = np.array([[0.5], [1], [2]])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y, views_ind)
    
        def test_decision_function_arg(self):
            # Test that decision_function() gives proper exception on deficient input.
            seed = 7
            np.random.seed(seed)
    
            clf = MumboClassifier()
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
    
            X = np.zeros((4, 3))
            self.assertRaises(ValueError, clf.decision_function, X)
            X = np.zeros((4, 5))
            self.assertRaises(ValueError, clf.decision_function, X)
            X = np.zeros((5, 4, 2))
            self.assertRaises(ValueError, clf.decision_function, X)
            X = ["str1", "str2"]
            self.assertRaises(ValueError, clf.decision_function, X)
    
        def test_limit_cases(self):
            seed = 7
            np.random.seed(seed)
    
            # Check that using empty data raises an exception
            X = np.array([[]])
            y = np.array([])
            clf = MumboClassifier()
            self.assertRaises(ValueError, clf.fit, X, y)
    
            # Check that fit() works for the smallest possible dataset
            X = np.array([[0.]])
            y = np.array([0])
            clf = MumboClassifier()
            clf.fit(X, y)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[1.]])), np.array([0]))
    
            # Check that fit() works with samples from a single class
            X = np.array([[0., 0.5, 0.7], [1., 1.5, 1.7], [2., 2.5, 2.7]])
            y = np.array([1, 1, 1])
            views_ind = np.array([0, 1, 3])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[-1., 0., 1.]])), np.array([1]))
    
            X = np.array([[0., 0.5, 0.7], [1., 1.5, 1.7], [2., 2.5, 2.7]])
            y = np.array([1, 1, 1])
            views_ind = np.array([[0, 2], [1]])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[-1., 0., 1.]])), np.array([1]))
    
        def test_simple_examples(self):
            seed =7
            np.random.seed(seed)
    
            # Simple example with 2 classes and 1 view
            X = np.array(
                [[1.1, 2.1],
                 [2.1, 0.2],
                 [0.7, 1.2],
                 [-0.9, -1.8],
                 [-1.1, -2.2],
                 [-0.3, -1.3]])
            y = np.array([0, 0, 0, 1, 1, 1])
            views_ind = np.array([0, 2])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[1., 1.], [-1., -1.]])),
                               np.array([0, 1]))
    
            X = clf._global_X_transform(X, clf.X_.views_ind)
    
    Dominique Benielli's avatar
    Dominique Benielli committed
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            views_ind = np.array([[1, 0]])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[1., 1.], [-1., -1.]])),
                               np.array([0, 1]))
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            # Simple example with 2 classes and 2 views
            X = np.array(
                [[1.1, 2.1, 0.5],
                 [2.1, 0.2, 1.2],
                 [0.7, 1.2, 2.1],
                 [-0.9, -1.8, -0.3],
                 [-1.1, -2.2, -0.9],
                 [-0.3, -1.3, -1.4]])
            y = np.array([0, 0, 0, 1, 1, 1])
            views_ind = np.array([0, 2, 3])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[1., 1., 1.], [-1., -1., -1.]])),
                               np.array([0, 1]))
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            views_ind = np.array([[2, 0], [1]])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            np.testing.assert_equal(clf.predict(np.array([[1., 1., 1.], [-1., -1., -1.]])),
                               np.array([0, 1]))
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            # Simple example with 2 classes and 3 views
            X = np.array(
                [[1.1, 2.1, 0.5, 1.2, 1.7],
                 [2.1, 0.2, 1.2, 0.6, 1.3],
                 [0.7, 1.2, 2.1, 1.1, 0.9],
                 [-0.9, -1.8, -0.3, -2.1, -1.1],
                 [-1.1, -2.2, -0.9, -1.5, -1.2],
                 [-0.3, -1.3, -1.4, -0.6, -0.7]])
            y = np.array([0, 0, 0, 1, 1, 1])
            views_ind = np.array([0, 2, 3, 5])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            data = np.array([[1., 1., 1., 1., 1.], [-1., -1., -1., -1., -1.]])
            np.testing.assert_equal(clf.predict(data), np.array([0, 1]))
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            views_ind = np.array([[2, 0], [1], [3, 4]])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            data = np.array([[1., 1., 1., 1., 1.], [-1., -1., -1., -1., -1.]])
            np.testing.assert_equal(clf.predict(data), np.array([0, 1]))
            self.assertEqual(clf.decision_function(X).shape, y.shape)
    
            # Simple example with 3 classes and 3 views
            X = np.array(
                [[1.1, -1.2, 0.5, 1.2, -1.7],
                 [2.1, -0.2, 0.9, 0.6, -1.3],
                 [0.7, 1.2, 2.1, 1.1, 0.9],
                 [0.9, 1.8, 2.2, 2.1, 1.1],
                 [-1.1, -2.2, -0.9, -1.5, -1.2],
                 [-0.3, -1.3, -1.4, -0.6, -0.7]])
            y = np.array([0, 0, 1, 1, 2, 2])
            views_ind = np.array([0, 2, 3, 5])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            data = np.array(
                [[1., -1., 1., 1., -1.],
                 [1., 1., 1., 1., 1.],
                 [-1., -1., -1., -1., -1.]])
            np.testing.assert_equal(clf.predict(data), np.array([0, 1, 2]))
            self.assertEqual(clf.decision_function(X).shape, (X.shape[0], 3))
    
            views_ind = np.array([[1, 0], [2], [3, 4]])
            clf = MumboClassifier()
            clf.fit(X, y, views_ind)
            np.testing.assert_equal(clf.predict(X), y)
            data = np.array(
                [[1., -1., 1., 1., -1.],
                 [1., 1., 1., 1., 1.],
                 [-1., -1., -1., -1., -1.]])
            np.testing.assert_equal(clf.predict(data), np.array([0, 1, 2]))
            self.assertEqual(clf.decision_function(X).shape, (X.shape[0], 3))
    
        def test_generated_examples(self):
            seed = 7
            def generate_data_in_orthotope(n_samples, limits):
                limits = np.array(limits)
                n_features = limits.shape[0]
                data = np.random.random((n_samples, n_features))
                data = (limits[:, 1]-limits[:, 0]) * data + limits[:, 0]
                return data
    
            n_samples = 100
    
            np.random.seed(seed)
            view_0 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]])))
            view_1 = generate_data_in_orthotope(2*n_samples, [[0., 1.], [0., 1.]])
            X = np.concatenate((view_0, view_1), axis=1)
            y = np.zeros(2*n_samples, dtype=np.int64)
            y[n_samples:] = 1
            views_ind = np.array([0, 2, 4])
            clf = MumboClassifier(n_estimators=1)
            clf.fit(X, y, views_ind)
            self.assertEqual(clf.score(X, y), 1.)
    
            np.random.seed(seed)
            view_0 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [1., 2.]])))
            view_1 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [1., 2.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]])))
            X = np.concatenate((view_0, view_1), axis=1)
            y = np.zeros(4*n_samples, dtype=np.int64)
            y[2*n_samples:] = 1
            views_ind = np.array([0, 2, 4])
            clf = MumboClassifier(n_estimators=3)
            clf.fit(X, y, views_ind)
            self.assertEqual(clf.score(X, y), 1.)
    
            np.random.seed(seed)
            view_0 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]])))
            view_1 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]])))
            view_2 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]])))
            X = np.concatenate((view_0, view_1, view_2), axis=1)
            y = np.zeros(3*n_samples, dtype=np.int64)
            y[n_samples:2*n_samples] = 1
            y[2*n_samples:] = 2
            views_ind = np.array([0, 2, 4, 6])
            clf = MumboClassifier(n_estimators=3)
            clf.fit(X, y, views_ind)
            self.assertEqual(clf.score(X, y), 1.)
    
            np.random.seed(seed)
            view_0 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 2.], [0., 1.]])))
            view_1 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]])))
            view_2 = np.concatenate(
                (generate_data_in_orthotope(n_samples, [[0., 2.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[0., 1.], [0., 1.]]),
                 generate_data_in_orthotope(n_samples, [[1., 2.], [0., 1.]])))
            X = np.concatenate((view_0, view_1, view_2), axis=1)
            y = np.zeros(3*n_samples, dtype=np.int64)
            y[n_samples:2*n_samples] = 1
            y[2*n_samples:] = 2
            views_ind = np.array([0, 2, 4, 6])
            clf = MumboClassifier(n_estimators=4)
            clf.fit(X, y, views_ind)
            self.assertEqual(clf.score(X, y), 1.)
    
    
        def test_classifier(self):
    
    Dominique Benielli's avatar
    Dominique Benielli committed
            # X_zero_features = np.empty(0).reshape(3, 0)
            # y = np.array([1, 0, 1])
    
            # e = MumboClassifier()
            # e.fit(X_zero_features, y)
            # print(e.predict(X_zero_features))
    
    Dominique Benielli's avatar
    Dominique Benielli committed
            return check_estimator(MumboClassifier)
    
        def test_iris(self):
            # Check consistency on dataset iris.
            seed = 7
            np.random.seed(seed)
            n_estimators = 5
            classes = np.unique(self.iris.target)
    
            for views_ind in [self.iris.views_ind, np.array([[0, 2], [1, 3]])]:
                clf = MumboClassifier(n_estimators=n_estimators)
    
                clf.fit(self.iris.data, self.iris.target, views_ind)
    
                self.assertTrue(np.all((0. <= clf.estimator_errors_)
                                   & (clf.estimator_errors_ <= 1.)))
                self.assertTrue(np.all(np.diff(clf.estimator_errors_) < 0.))
    
                np.testing.assert_equal(classes, clf.classes_)
                self.assertEqual(clf.decision_function(self.iris.data).shape[1], len(classes))
    
                score = clf.score(self.iris.data, self.iris.target)
                self.assertGreater(score, 0.95, "Failed with score = {}".format(score))
    
                self.assertEqual(len(clf.estimators_), n_estimators)
    
                # Check for distinct random states
                self.assertEqual(len(set(est.random_state for est in clf.estimators_)),
                             len(clf.estimators_))
    
        def test_staged_methods(self):
            n_estimators = 10
            seed = 7
    
            target_two_classes = np.zeros(self.iris.target.shape, dtype=np.int64)
            target_two_classes[target_two_classes.shape[0]//2:] = 1
    
            data = (
                    (self.iris.data, self.iris.target, self.iris.views_ind),
                    (self.iris.data, self.iris.target, np.array([[0, 2], [1, 3]])),
                    (self.iris.data, target_two_classes, self.iris.views_ind),
                    (self.iris.data, target_two_classes, np.array([[0, 2], [1, 3]])),
                   )
    
            for X, y, views_ind in data:
                clf = MumboClassifier(n_estimators=n_estimators, random_state=seed)
                clf.fit(X, y, views_ind)
                staged_dec_func = [dec_f for dec_f in clf.staged_decision_function(X)]
                staged_predict = [predict for predict in clf.staged_predict(X)]
                staged_score = [score for score in clf.staged_score(X, y)]
    
                self.assertEqual(len(staged_dec_func), n_estimators)
                self.assertEqual(len(staged_predict), n_estimators)
                self.assertEqual(len(staged_score), n_estimators)
    
                for ind in range(n_estimators):
                    clf = MumboClassifier(n_estimators=ind+1, random_state=seed)
                    clf.fit(X, y, views_ind)
                    dec_func = clf.decision_function(X)
                    predict = clf.predict(X)
                    score = clf.score(X, y)
                    np.testing.assert_equal(dec_func, staged_dec_func[ind])
                    np.testing.assert_equal(predict, staged_predict[ind])
                    self.assertEqual(score, staged_score[ind])
    
        def test_gridsearch(self):
            seed = 7
            np.random.seed(seed)
    
            # Check that base trees can be grid-searched.
            mumbo = MumboClassifier(base_estimator=DecisionTreeClassifier())
            parameters = {'n_estimators': (1, 2),
                          'base_estimator__max_depth': (1, 2)}
            clf = GridSearchCV(mumbo, parameters)
            clf.fit(self.iris.data, self.iris.target, views_ind=self.iris.views_ind)
    
    
    Dominique Benielli's avatar
    Dominique Benielli committed
        def test_pickle(self):
            seed = 7
            np.random.seed(seed)
    
            # Check pickability.
    
            clf = MumboClassifier()
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
            score = clf.score(self.iris.data, self.iris.target)
            dump = pickle.dumps(clf)
            clf_loaded = pickle.loads(dump)
            self.assertEqual(type(clf_loaded), clf.__class__)
            score_loaded = clf_loaded.score(self.iris.data, self.iris.target)
            self.assertEqual(score, score_loaded)
    
        def test_base_estimator(self):
            seed = 7
            np.random.seed(seed)
    
            # Test different base estimators.
            n_estimators = 5
            clf = MumboClassifier(RandomForestClassifier(), n_estimators=n_estimators)
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
            score = clf.score(self.iris.data, self.iris.target)
            self.assertGreater(score, 0.95, "Failed with score = {}".format(score))
    
            clf = MumboClassifier(SVC(), n_estimators=n_estimators)
            clf.fit(self.iris.data, self.iris.target, self.iris.views_ind)
            score = clf.score(self.iris.data, self.iris.target)
            self.assertGreater(score, 0.95, "Failed with score = {}".format(score))
    
            # Check that using a base estimator that doesn't support sample_weight
            # raises an error.
            clf = MumboClassifier(KMeans())
            self.assertRaises(ValueError, clf.fit, self.iris.data, self.iris.target, self.iris.views_ind)
    
    
        def test_sparse_classification(self):
            # Check classification with sparse input.
            seed = 7
            np.random.seed(seed)
    
            class CustomSVC(SVC):
                """SVC variant that records the nature of the training set."""
    
                def fit(self, X, y, sample_weight=None):
                    """Modification on fit caries data type for later verification."""
                    super(CustomSVC, self).fit(X, y, sample_weight=sample_weight)
                    self.data_type_ = type(X)
                    return self
    
            n_estimators = 5
            X_dense = self.iris.data
            y = self.iris.target
    
    
            for sparse_format in [csc_matrix, csr_matrix]: #, lil_matrix, coo_matrix,dok_matrix]:
    
    Dominique Benielli's avatar
    Dominique Benielli committed
                for views_ind in (self.iris.views_ind, np.array([[0, 2], [1, 3]])):
                    X_sparse = sparse_format(X_dense)
                    clf_sparse = MumboClassifier(
                        base_estimator=CustomSVC(),
                        random_state=seed,
                        n_estimators=n_estimators)
                    clf_sparse.fit(X_sparse, y, views_ind)
    
                    clf_dense = MumboClassifier(
                        base_estimator=CustomSVC(),
                        random_state=seed,
                        n_estimators=n_estimators)
                    clf_dense.fit(X_dense, y, views_ind)
    
                    np.testing.assert_equal(clf_sparse.decision_function(X_sparse),
                                       clf_dense.decision_function(X_dense))
    
                    np.testing.assert_equal(clf_sparse.predict(X_sparse),
                                       clf_dense.predict(X_dense))
    
                    self.assertEqual(clf_sparse.score(X_sparse, y),
                                 clf_dense.score(X_dense, y))
    
                    for res_sparse, res_dense in \
                            zip(clf_sparse.staged_decision_function(X_sparse),
                                clf_dense.staged_decision_function(X_dense)):
                        np.testing.assert_equal(res_sparse, res_dense)
    
                    for res_sparse, res_dense in \
                            zip(clf_sparse.staged_predict(X_sparse),
                                clf_dense.staged_predict(X_dense)):
                        np.testing.assert_equal(res_sparse, res_dense)
    
                    for res_sparse, res_dense in \
                            zip(clf_sparse.staged_score(X_sparse, y),
                                clf_dense.staged_score(X_dense, y)):
                        np.testing.assert_equal(res_sparse, res_dense)
    
                    # Check that sparsity of data is maintained during training
                    types = [clf.data_type_ for clf in clf_sparse.estimators_]
                    if sparse_format == csc_matrix:
    
                        self.assertTrue(all([issubclass(type_, csc_matrix)  for type_ in types]))
    
    Dominique Benielli's avatar
    Dominique Benielli committed
                    else:
    
                        self.assertTrue(all([issubclass(type_, csr_matrix) for type_ in types]))
    
    Dominique Benielli's avatar
    Dominique Benielli committed
    
    
    if __name__ == '__main__':
        unittest.main()