Skip to content
Snippets Groups Projects
Commit b1d518e5 authored by Luc Giffon's avatar Luc Giffon
Browse files

implementation of NNompforestRegressor and NNompforestbinaryclassifier

parent e207ec6f
No related branches found
No related tags found
1 merge request!24Resolve "non negative omp"
......@@ -6,12 +6,13 @@ import os
class ModelParameters(object):
def __init__(self, extracted_forest_size, normalize_D, subsets_used,
normalize_weights, seed, hyperparameters, extraction_strategy):
normalize_weights, seed, hyperparameters, extraction_strategy, non_negative=False, intermediate_solutions_sizes=None):
"""Init of ModelParameters.
Args:
extracted_forest_size (list): list of all the extracted forest
size.
extracted_forest_size (int): extracted forest size
intermediate_solutions_sizes (list): list of all intermediate solutions sizes
non_negative (bool): tell to use the non negative version of OMP, must be used in combination with intermediate solutions size
normalize_D (bool): true normalize the distribution, false no
subsets_used (list): which dataset use for randomForest and for OMP
'train', 'dev' or 'train+dev' and combination of two of this.
......@@ -29,6 +30,19 @@ class ModelParameters(object):
self._hyperparameters = hyperparameters
self._extraction_strategy = extraction_strategy
if non_negative and intermediate_solutions_sizes is None:
raise ValueError("Intermediate solutions must be set if non negative option is on.")
self._non_negative = non_negative
self._intermediate_solutions_sizes = intermediate_solutions_sizes
@property
def non_negative(self):
return self._non_negative
@property
def intermediate_solutions_sizes(self):
return self._intermediate_solutions_sizes
@property
def extracted_forest_size(self):
return self._extracted_forest_size
......
from copy import deepcopy
from scipy.optimize import nnls
import numpy as np
from sklearn.linear_model.base import _preprocess_data
from bolsonaro import LOG_PATH
from bolsonaro.error_handling.logger_factory import LoggerFactory
class NonNegativeOrthogonalMatchingPursuit:
"""
Input needs to be normalized
"""
def __init__(self, max_iter, intermediate_solutions_sizes, fill_with_final_solution=True):
assert all(type(elm) == int for elm in intermediate_solutions_sizes), "All intermediate solution must be size specified as integers."
self.max_iter = max_iter
self.requested_intermediate_solutions_sizes = intermediate_solutions_sizes
self.fill_with_final_solution = fill_with_final_solution
self._logger = LoggerFactory.create(LOG_PATH, __name__)
self.lst_intermediate_solutions = list()
self.lst_intercept = list()
def _set_intercept(self, X_offset, y_offset, X_scale):
"""Set the intercept_
"""
for sol in self.lst_intermediate_solutions:
sol /= X_scale
intercept = y_offset - np.dot(X_offset, sol.T)
self.lst_intercept.append(intercept)
# self.coef_ = self.coef_ / X_scale
# self.intercept_ = y_offset - np.dot(X_offset, self.coef_.T)
def fit(self, T, y):
"""
Ref: Sparse Non-Negative Solution of a Linear System of Equations is Unique
T: (N x L)
y: (N x 1)
max_iter: the max number of iteration. If requested_intermediate_solutions_sizes is None. Return the max_iter-sparse solution.
requested_intermediate_solutions_sizes: a list of the other returned intermediate solutions than with max_iter (they are returned in a list with same indexes)
Return the list of intermediate solutions. If the perfect solution is found before the end, the list may not be full.
"""
# this is copied from sklearn preprocessing hope this works fine but I am a believer
T, y, T_offset, y_offset, T_scale = _preprocess_data( T, y, fit_intercept=True, normalize=False, copy=False, return_mean=True, check_input=True)
iter_intermediate_solutions_sizes = iter(self.requested_intermediate_solutions_sizes)
lst_intermediate_solutions = []
bool_arr_selected_indexes = np.zeros(T.shape[1], dtype=bool)
residual = y
i = 0
next_solution = next(iter_intermediate_solutions_sizes, None)
while i < self.max_iter and next_solution != None and not np.isclose(np.linalg.norm(residual), 0):
# if logger is not None: logger.debug("iter {}".format(i))
# compute all correlations between atoms and residual
dot_products = T.T @ residual
idx_max_dot_product = np.argmax(dot_products)
# only positively correlated results can be taken
if dot_products[idx_max_dot_product] <= 0:
self._logger.warning("No other atoms is positively correlated with the residual. End prematurely with {} atoms.".format(i + 1))
break
# selection of atom with max correlation with residual
bool_arr_selected_indexes[idx_max_dot_product] = True
tmp_T = T[:, bool_arr_selected_indexes]
sol = nnls(tmp_T, y)[0] # non negative least square
residual = y - tmp_T @ sol
if i + 1 == next_solution:
final_vec = np.zeros(T.shape[1])
final_vec[bool_arr_selected_indexes] = sol # solution is full of zero but on selected indices
lst_intermediate_solutions.append(final_vec)
next_solution = next(iter_intermediate_solutions_sizes, None)
i += 1
nb_missing_solutions = len(self.requested_intermediate_solutions_sizes) - len(lst_intermediate_solutions)
if nb_missing_solutions > 0:
if self.fill_with_final_solution:
self._logger.warning("nn_omp ended prematurely and found less solution than expected: "
"expected {}. found {}".format(len(self.requested_intermediate_solutions_sizes), len(lst_intermediate_solutions)))
lst_intermediate_solutions.extend([deepcopy(lst_intermediate_solutions[-1]) for _ in range(nb_missing_solutions)])
else:
self._logger.warning("nn_omp ended prematurely and found less solution than expected: "
"expected {}. found {}. But fill with the last solution".format(len(self.requested_intermediate_solutions_sizes), len(lst_intermediate_solutions)))
self.lst_intermediate_solutions = lst_intermediate_solutions
self._set_intercept(T_offset, y_offset, T_scale)
def predict(self, X, idx_prediction=None):
if idx_prediction is not None:
return X @ self.lst_intermediate_solutions[idx_prediction] + self.lst_intercept[idx_prediction]
else:
predictions = []
for idx_sol, sol in enumerate(self.lst_intermediate_solutions):
predictions.append(X @ sol + self.lst_intercept[idx_sol])
return predictions
def nn_omp(T, y, max_iter, intermediate_solutions_sizes=None, force_return_all_solutions=True, logger=None):
"""
Ref: Sparse Non-Negative Solution of a Linear System of Equations is Unique
T: (N x L)
y: (N x 1)
max_iter: the max number of iteration. If requested_intermediate_solutions_sizes is None. Return the max_iter-sparse solution.
requested_intermediate_solutions_sizes: a list of the other returned intermediate solutions than with max_iter (they are returned in a list with same indexes)
Return the list of intermediate solutions. If the perfect solution is found before the end, the list may not be full.
"""
if intermediate_solutions_sizes is None:
intermediate_solutions_sizes = [max_iter]
assert all(type(elm) == int for elm in intermediate_solutions_sizes), "All intermediate solution must be size specified as integers."
iter_intermediate_solutions_sizes = iter(intermediate_solutions_sizes)
lst_intermediate_solutions = []
bool_arr_selected_indexes = np.zeros(T.shape[1], dtype=bool)
residual = y
i = 0
next_solution = next(iter_intermediate_solutions_sizes, None)
while i < max_iter and next_solution != None and not np.isclose(np.linalg.norm(residual), 0):
# if logger is not None: logger.debug("iter {}".format(i))
# compute all correlations between atoms and residual
dot_products = T.T @ residual
idx_max_dot_product = np.argmax(dot_products)
# only positively correlated results can be taken
if dot_products[idx_max_dot_product] <= 0:
logger.warning("No other atoms is positively correlated with the residual. End prematurely with {} atoms.".format(i+1))
break
# selection of atom with max correlation with residual
bool_arr_selected_indexes[idx_max_dot_product] = True
tmp_T = T[:, bool_arr_selected_indexes]
sol = nnls(tmp_T, y)[0] # non negative least square
residual = y - tmp_T @ sol
if i+1 == next_solution:
final_vec = np.zeros(T.shape[1])
final_vec[bool_arr_selected_indexes] = sol # solution is full of zero but on selected indices
lst_intermediate_solutions.append(final_vec)
next_solution = next(iter_intermediate_solutions_sizes, None)
i+=1
nb_missing_solutions = len(intermediate_solutions_sizes) - len(lst_intermediate_solutions)
if len(lst_intermediate_solutions) == 1:
return lst_intermediate_solutions[-1]
if nb_missing_solutions > 0:
if force_return_all_solutions:
logger.warning("nn_omp ended prematurely and found less solution than expected: "
"expected {}. found {}".format(len(intermediate_solutions_sizes), len(lst_intermediate_solutions)))
return lst_intermediate_solutions.extend([deepcopy(lst_intermediate_solutions[-1]) for _ in range(len(intermediate_solutions_sizes) - len(lst_intermediate_solutions))])
else:
return lst_intermediate_solutions
else:
return lst_intermediate_solutions
if __name__ == "__main__":
N = 1000
L = 100
K = 10
T = np.random.rand(N, L)
w_star = np.abs(np.random.rand(L))
T /= np.linalg.norm(T, axis=0)
y = T @ w_star
requested_solutions = list(range(1, L, 10))
solutions = nn_omp(T, y, L, requested_solutions)
for idx_sol, w in enumerate(solutions):
solution = T @ w
non_zero = w.astype(bool)
print(requested_solutions[idx_sol], np.sum(non_zero), np.linalg.norm(solution - y)/np.linalg.norm(y))
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from bolsonaro.models.model_parameters import ModelParameters
from bolsonaro.models.omp_forest import OmpForest, SingleOmpForest
from bolsonaro.models.omp_forest_classifier import OmpForestBinaryClassifier
from bolsonaro.utils import binarize_class_data, omp_premature_warning
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import OrthogonalMatchingPursuit
import warnings
class NonNegativeOmpForestBinaryClassifier(OmpForestBinaryClassifier):
def predict(self, X, idx_prediction=None):
"""
Make prediction.
If idx_prediction is None return the list of predictions of all intermediate solutions
:param X:
:return:
"""
forest_predictions = self._base_estimator_predictions(X)
if self._models_parameters.normalize_D:
forest_predictions /= self._forest_norms
return self._omp.predict(forest_predictions, idx_prediction)
def predict_no_weights(self, X, idx_prediction=None):
"""
Make a prediction of the selected trees but without weight.
If idx_prediction is None return the list of unweighted predictions of all intermediate solutions.
:param X: some data to apply the forest to
:return: a np.array of the predictions of the trees selected by OMP without applying the weight
"""
forest_predictions = np.array([tree.predict(X) for tree in self._base_forest_estimator.estimators_])
if idx_prediction is not None:
weights = self._omp.lst_intermediate_solutions[idx_prediction]
select_trees = np.mean(forest_predictions[weights != 0], axis=0)
return select_trees
else:
lst_predictions = []
for sol in self._omp.lst_intermediate_solutions:
lst_predictions.append(np.mean(forest_predictions[sol != 0], axis=0))
return lst_predictions
def score(self, X, y, idx_prediction=None):
"""
Evaluate OMPForestClassifer on (`X`, `y`).
if Idx_prediction is None return the score of all sub forest.`
:param X:
:param y:
:return:
"""
# raise NotImplementedError("Function not verified")
if idx_prediction is not None:
predictions = self.predict(X, idx_prediction)
# not sure predictions are -1/+1 so might be zero percent accuracy
return np.sum(predictions != y) / len(y)
else:
predictions = self.predict(X)
lst_scores = []
for pred in predictions:
lst_scores.append(np.sum(pred != y) / len(y))
return lst_scores
if __name__ == "__main__":
# X, y = load_boston(return_X_y=True)
# X, y = fetch_california_housing(return_X_y=True)
X, y = load_breast_cancer(return_X_y=True)
y = (y-0.5)*2
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.33, random_state = 42)
# intermediate_solutions = [100, 200, 300, 400, 500, 1000]
intermediate_solutions = [10, 20, 30, 40, 50, 100]
nnmodel_params = ModelParameters(extracted_forest_size=50,
normalize_D=True,
subsets_used=["train", "dev"],
normalize_weights=False,
seed=3,
hyperparameters={
"max_depth": 20,
"min_samples_leaf": 1,
"n_estimators": 1000,
"max_features": "log2"
},
extraction_strategy="omp",
non_negative=True,
intermediate_solutions_sizes=intermediate_solutions)
extracted_size = 50
nn_ompforest = NonNegativeOmpForestBinaryClassifier(nnmodel_params)
nn_ompforest.fit(X_train, y_train, X_train, y_train)
model_params = ModelParameters(extracted_forest_size=extracted_size,
normalize_D=True,
subsets_used=["train", "dev"],
normalize_weights=False,
seed=3,
hyperparameters={
"max_depth": 20,
"min_samples_leaf": 1,
"n_estimators": 1000,
"max_features": "log2"
},
extraction_strategy="omp")
omp_forest = OmpForestBinaryClassifier(model_params)
omp_forest.fit(X_train, y_train, X_train, y_train)
print("Breast Cancer")
print("Score full forest on train", nn_ompforest.score_base_estimator(X_train, y_train))
print("Score full forest on test", nn_ompforest.score_base_estimator(X_test, y_test))
print("Size full forest", nnmodel_params.hyperparameters["n_estimators"])
print("Size extracted forests", intermediate_solutions)
print("Score non negative omp on train", nn_ompforest.score(X_train, y_train))
print("Score non negative omp on test", nn_ompforest.score(X_test, y_test))
print("Size extracted omp", extracted_size)
print("Score omp on train", omp_forest.score(X_train, y_train))
print("Score omp on test", omp_forest.score(X_test, y_test))
from copy import deepcopy
from sklearn.model_selection import train_test_split
from bolsonaro.models.model_parameters import ModelParameters
from bolsonaro.models.omp_forest import SingleOmpForest
from sklearn.datasets import load_boston, fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
import numpy as np
from bolsonaro.models.omp_forest_regressor import OmpForestRegressor
class NonNegativeOmpForestRegressor(OmpForestRegressor):
def predict(self, X, idx_prediction=None):
"""
Make prediction.
If idx_prediction is None return the list of predictions of all intermediate solutions
:param X:
:return:
"""
forest_predictions = self._base_estimator_predictions(X)
if self._models_parameters.normalize_D:
forest_predictions /= self._forest_norms
return self._omp.predict(forest_predictions, idx_prediction)
def predict_no_weights(self, X, idx_prediction=None):
"""
Make a prediction of the selected trees but without weight.
If idx_prediction is None return the list of unweighted predictions of all intermediate solutions.
:param X: some data to apply the forest to
:return: a np.array of the predictions of the trees selected by OMP without applying the weight
"""
forest_predictions = np.array([tree.predict(X) for tree in self._base_forest_estimator.estimators_])
if idx_prediction is not None:
weights = self._omp.lst_intermediate_solutions[idx_prediction]
select_trees = np.mean(forest_predictions[weights != 0], axis=0)
return select_trees
else:
lst_predictions = []
for sol in self._omp.lst_intermediate_solutions:
lst_predictions.append(np.mean(forest_predictions[sol != 0], axis=0))
return lst_predictions
def score(self, X, y, idx_prediction=None):
"""
Evaluate OMPForestClassifer on (`X`, `y`).
if Idx_prediction is None return the score of all sub forest.`
:param X:
:param y:
:return:
"""
# raise NotImplementedError("Function not verified")
if idx_prediction is not None:
predictions = self.predict(X, idx_prediction)
# not sure predictions are -1/+1 so might be zero percent accuracy
return np.mean(np.square(predictions - y))
else:
predictions = self.predict(X)
lst_scores = []
for pred in predictions:
lst_scores.append(np.mean(np.square(pred - y)))
return lst_scores
if __name__ == "__main__":
# X, y = load_boston(return_X_y=True)
X, y = fetch_california_housing(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.33, random_state = 42)
intermediate_solutions = [100, 200, 300, 400, 500, 1000]
nnmodel_params = ModelParameters(extracted_forest_size=600,
normalize_D=True,
subsets_used=["train", "dev"],
normalize_weights=False,
seed=3,
hyperparameters={
"max_features": "auto",
"min_samples_leaf": 1,
"max_depth": 20,
"n_estimators": 1000,
},
extraction_strategy="omp",
non_negative=True,
intermediate_solutions_sizes=intermediate_solutions)
nn_ompforest = NonNegativeOmpForestRegressor(nnmodel_params)
nn_ompforest.fit(X_train, y_train, X_train, y_train)
model_params = ModelParameters(extracted_forest_size=50,
normalize_D=True,
subsets_used=["train", "dev"],
normalize_weights=False,
seed=3,
hyperparameters={
"max_features": "auto",
"min_samples_leaf": 1,
"max_depth": 20,
"n_estimators": 1000,
},
extraction_strategy="omp")
omp_forest = OmpForestRegressor(model_params)
omp_forest.fit(X_train, y_train, X_train, y_train)
print("Boston")
print("Score full forest on train", nn_ompforest.score_base_estimator(X_train, y_train))
print("Score full forest on test", nn_ompforest.score_base_estimator(X_test, y_test))
print("Size full forest", nnmodel_params.hyperparameters["n_estimators"])
print("Size extracted forests", intermediate_solutions)
print("Score non negative omp on train", nn_ompforest.score(X_train, y_train))
print("Score non negative omp on test", nn_ompforest.score(X_test, y_test))
print("Score omp on train", omp_forest.score(X_train, y_train))
print("Score omp on test", omp_forest.score(X_test, y_test))
from bolsonaro import LOG_PATH
from bolsonaro.error_handling.logger_factory import LoggerFactory
from bolsonaro.models.nn_omp import NonNegativeOrthogonalMatchingPursuit
from bolsonaro.utils import omp_premature_warning
from abc import abstractmethod, ABCMeta
......@@ -24,8 +25,6 @@ class OmpForest(BaseEstimator, metaclass=ABCMeta):
def predict_base_estimator(self, X):
return self._base_forest_estimator.predict(X)
def score_base_estimator(self, X, y):
return self._base_forest_estimator.score(X, y)
def _base_estimator_predictions(self, X):
base_predictions = np.array([tree.predict(X) for tree in self._base_forest_estimator.estimators_]).T
......@@ -72,6 +71,7 @@ class OmpForest(BaseEstimator, metaclass=ABCMeta):
@staticmethod
def _make_omp_weighted_prediction(base_predictions, omp_obj, normalize_weights=False):
if normalize_weights:
raise ValueError("Normalize weights is deprecated")
# we can normalize weights (by their sum) so that they sum to 1
# and they can be interpreted as impact percentages for interpretability.
# this necessits to remove the (-) in weights, e.g. move it to the predictions (use unsigned_coef) --> I don't see why
......@@ -105,11 +105,18 @@ class OmpForest(BaseEstimator, metaclass=ABCMeta):
class SingleOmpForest(OmpForest):
def __init__(self, models_parameters, base_forest_estimator):
# fit_intercept shouldn't be set to False as the data isn't necessarily centered here
# normalization is handled outsite OMP
self._omp = OrthogonalMatchingPursuit(
n_nonzero_coefs=models_parameters.extracted_forest_size,
fit_intercept=True, normalize=False)
if models_parameters.non_negative:
self._omp = NonNegativeOrthogonalMatchingPursuit(
max_iter=models_parameters.extracted_forest_size,
intermediate_solutions_sizes=models_parameters.intermediate_solutions_sizes,
fill_with_final_solution=True
)
else:
# fit_intercept shouldn't be set to False as the data isn't necessarily centered here
# normalization is handled outsite OMP
self._omp = OrthogonalMatchingPursuit(
n_nonzero_coefs=models_parameters.extracted_forest_size,
fit_intercept=True, normalize=False)
super().__init__(models_parameters, base_forest_estimator)
......
......@@ -30,6 +30,11 @@ class OmpForestBinaryClassifier(SingleOmpForest):
predictions = (predictions_0_1 - 0.5) * 2
return predictions
def score_base_estimator(self, X, y):
predictions = self._base_estimator_predictions(X)
evaluation = np.sum(np.sign(np.mean(predictions, axis=1)) == y) / len(y)
return evaluation
def predict_no_weights(self, X):
"""
Apply the SingleOmpForest to X without using the weights.
......
......@@ -14,6 +14,12 @@ class OmpForestRegressor(SingleOmpForest):
super().__init__(models_parameters, estimator)
def score_base_estimator(self, X, y):
predictions = self._base_estimator_predictions(X)
evaluation = np.mean(np.square(np.mean(predictions, axis=1) - y))
return evaluation
def score(self, X, y, metric=DEFAULT_SCORE_METRIC):
"""
Evaluate OMPForestRegressor on (`X`, `y`) using `metric`
......
from scipy.optimize import nnls
import numpy as np
def nn_omp(T, y, max_iter, intermediate_solutions_sizes=None):
"""
Ref: Sparse Non-Negative Solution of a
Linear System of Equations is Unique
T: (N x L)
y: (N x 1)
max_iter: the max number of iteration. If intermediate_solutions_sizes is None. Return the max_iter-sparse solution.
intermediate_solutions_sizes: a list of the other returned intermediate solutions than with max_iter (they are returned in a list with same indexes)
"""
if intermediate_solutions_sizes is None:
intermediate_solutions_sizes = [max_iter]
# elif max_iter not in intermediate_solutions_sizes:
# intermediate_solutions_sizes.append(max_iter)
assert all(type(elm) == int for elm in intermediate_solutions_sizes), "All intermediate solution must be size specified as integers."
iter_intermediate_solutions_sizes = iter(intermediate_solutions_sizes)
lst_intermediate_solutions = []
bool_arr_selected_indexes = np.zeros(T.shape[1], dtype=bool)
residual = y
i = 0
next_solution = next(iter_intermediate_solutions_sizes, None)
while i < max_iter and next_solution != None:
print("iter {}".format(i))
dot_products = T.T @ residual
idx_max_dot_product = np.argmax(dot_products)
if dot_products[idx_max_dot_product] <= 0:
print("No other atoms is positively correlated with the residual. End prematurely with {} atoms.".format(i+1))
break
bool_arr_selected_indexes[idx_max_dot_product] = True
tmp_T = T[:, bool_arr_selected_indexes]
sol = nnls(tmp_T, y)[0]
residual = y - tmp_T @ sol
if i+1 == next_solution:
final_vec = np.zeros(T.shape[1])
final_vec[bool_arr_selected_indexes] = sol
lst_intermediate_solutions.append(final_vec)
next_solution = next(iter_intermediate_solutions_sizes, None)
i+=1
if len(lst_intermediate_solutions) == 1:
return lst_intermediate_solutions[-1]
else:
return lst_intermediate_solutions
if __name__ == "__main__":
N = 1000
L = 100
K = 10
T = np.random.rand(N, L)
w_star = np.abs(np.random.rand(L))
T /= np.linalg.norm(T, axis=0)
y = T @ w_star
requested_solutions = list(range(1, L, 10))
solutions = nn_omp(T, y, L, requested_solutions)
for idx_sol, w in enumerate(solutions):
solution = T @ w
non_zero = w.astype(bool)
print(requested_solutions[idx_sol], np.sum(non_zero), np.linalg.norm(solution - y)/np.linalg.norm(y))
from bolsonaro.models.nn_omp import NonNegativeOrthogonalMatchingPursuit
import numpy as np
def test_binary_classif_omp():
N = 1000
L = 100
T = np.random.rand(N, L)
w_star = np.zeros(L)
w_star[:L//2] = np.abs(np.random.rand(L//2))
T /= np.linalg.norm(T, axis=0)
y = T @ w_star
requested_solutions = list(range(10, L, 10))
print()
print(len(requested_solutions))
print(L//2)
nn_omp = NonNegativeOrthogonalMatchingPursuit(max_iter=L, intermediate_solutions_sizes=requested_solutions, fill_with_final_solution=False)
nn_omp.fit(T, y)
lst_predict = nn_omp.predict(T)
print(len(lst_predict))
# solutions = nn_omp(T, y, L, requested_solutions)
#
# for idx_sol, w in enumerate(solutions):
# solution = T @ w
# non_zero = w.astype(bool)
# print(requested_solutions[idx_sol], np.sum(non_zero), np.linalg.norm(solution - y)/np.linalg.norm(y))
# assert isinstance(results, np.ndarray)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment