Skip to content
Snippets Groups Projects
Commit 29a11860 authored by Charly Lamothe's avatar Charly Lamothe
Browse files

Speedup similarity forest regressor and add parallelization at the extracted...

Speedup similarity forest regressor and add parallelization at the extracted forest size level in the training
parent 0a97ff64
Branches
No related tags found
1 merge request!12Resolve "integration-sota"
from bolsonaro.utils import tqdm_joblib
from sklearn.ensemble import RandomForestRegressor from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error from sklearn.metrics import mean_squared_error
from sklearn.base import BaseEstimator from sklearn.base import BaseEstimator
from abc import abstractmethod, ABCMeta from abc import abstractmethod, ABCMeta
import numpy as np import numpy as np
from joblib import Parallel, delayed
from tqdm import tqdm from tqdm import tqdm
...@@ -34,35 +31,40 @@ class SimilarityForestRegressor(BaseEstimator, metaclass=ABCMeta): ...@@ -34,35 +31,40 @@ class SimilarityForestRegressor(BaseEstimator, metaclass=ABCMeta):
selected_trees = list() selected_trees = list()
tree_list = list(self._estimator.estimators_) tree_list = list(self._estimator.estimators_)
for _ in range(self._extracted_forest_size): val_scores = list()
with tqdm(tree_list) as tree_pred_bar:
tree_pred_bar.set_description('[Initial tree predictions]')
for tree in tree_pred_bar:
val_scores.append(tree.predict(X_val))
tree_pred_bar.update(1)
with tqdm(range(self._extracted_forest_size), disable=False) as pruning_forest_bar:
pruning_forest_bar.set_description(f'[Pruning forest s={self._extracted_forest_size}]')
for i in pruning_forest_bar:
best_similarity = 100000 best_similarity = 100000
found_index = 0 found_index = 0
for i in range(len(tree_list)): with tqdm(range(len(tree_list)), disable=False) as tree_list_bar:
lonely_tree = tree_list[i] tree_list_bar.set_description(f'[Tree selection s={self._extracted_forest_size} #{i}]')
del tree_list[i] for j in tree_list_bar:
begin_time = time.time() lonely_tree = tree_list[j]
with tqdm_joblib(tqdm(total=len(tree_list), disable=True)) as job_pb: del tree_list[j]
val_list = Parallel(n_jobs=-1)(delayed(self._tree_predict_job)( val_mean = np.mean(np.asarray(val_scores), axis=0)
job_pb, tree_list[i], X_val)
for i in range(len(tree_list)))
val_list = np.array(val_list)
val_mean = np.mean(val_list, axis=0)
val_score = self._score_metric(val_mean, y_val) val_score = self._score_metric(val_mean, y_val)
temp_similarity = abs(forest_pred - val_score) temp_similarity = abs(forest_pred - val_score)
if (temp_similarity < best_similarity): if (temp_similarity < best_similarity):
found_index = i found_index = j
best_similarity = temp_similarity best_similarity = temp_similarity
tree_list.insert(i, lonely_tree) tree_list.insert(j, lonely_tree)
val_scores.insert(j, lonely_tree.predict(X_val))
tree_list_bar.update(1)
selected_trees.append(tree_list[found_index]) selected_trees.append(tree_list[found_index])
del tree_list[found_index] del tree_list[found_index]
del val_scores[found_index]
pruning_forest_bar.update(1)
pruned_forest = list(set(forest) - set(selected_trees)) pruned_forest = list(set(forest) - set(selected_trees))
self._estimator.estimators_ = pruned_forest self._estimator.estimators_ = pruned_forest
def _tree_predict_job(self, job_pb, tree, X_val):
val_pred = tree.predict(X_val)
return val_pred
def score(self, X, y): def score(self, X, y):
test_list = list() test_list = list()
for mod in self._estimator.estimators_: for mod in self._estimator.estimators_:
......
...@@ -21,7 +21,7 @@ import numpy as np ...@@ -21,7 +21,7 @@ import numpy as np
import shutil import shutil
def process_job(seed, parameters, experiment_id, hyperparameters): def seed_job(seed_job_pb, seed, parameters, experiment_id, hyperparameters, verbose):
""" """
Experiment function. Experiment function.
...@@ -34,7 +34,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -34,7 +34,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
""" """
logger = LoggerFactory.create(LOG_PATH, 'training_seed{}_ti{}'.format( logger = LoggerFactory.create(LOG_PATH, 'training_seed{}_ti{}'.format(
seed, threading.get_ident())) seed, threading.get_ident()))
logger.info('seed={}'.format(seed))
seed_str = str(seed) seed_str = str(seed)
experiment_id_str = str(experiment_id) experiment_id_str = str(experiment_id)
...@@ -55,9 +54,14 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -55,9 +54,14 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
trainer = Trainer(dataset) trainer = Trainer(dataset)
if parameters['extraction_strategy'] != 'none': if parameters['extraction_strategy'] != 'none':
for extracted_forest_size in parameters['extracted_forest_size']: with tqdm_joblib(tqdm(total=len(parameters['extracted_forest_size']), disable=not verbose)) as extracted_forest_size_job_pb:
logger.info('extracted_forest_size={}'.format(extracted_forest_size)) Parallel(n_jobs=-1)(delayed(extracted_forest_size_job)(extracted_forest_size_job_pb, parameters['extracted_forest_size'][i],
sub_models_dir = models_dir + os.sep + 'extracted_forest_sizes' + os.sep + str(extracted_forest_size) models_dir, seed, parameters, dataset, hyperparameters, experiment_id, trainer)
for i in range(len(parameters['extracted_forest_size'])))
else:
forest_size = hyperparameters['n_estimators']
logger.info('Base forest training with fixed forest size of {}'.format(forest_size))
sub_models_dir = models_dir + os.sep + 'forest_size' + os.sep + str(forest_size)
# Check if the result file already exists # Check if the result file already exists
already_exists = False already_exists = False
...@@ -70,13 +74,11 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -70,13 +74,11 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0 already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0
break break
if already_exists: if already_exists:
logger.info(f'Extracted forest {extracted_forest_size} result already exists. Skipping...') logger.info('Base forest result already exists. Skipping...')
continue else:
pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True) pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True)
model_parameters = ModelParameters( model_parameters = ModelParameters(
extracted_forest_size=extracted_forest_size, extracted_forest_size=forest_size,
normalize_D=parameters['normalize_D'], normalize_D=parameters['normalize_D'],
subsets_used=parameters['subsets_used'], subsets_used=parameters['subsets_used'],
normalize_weights=parameters['normalize_weights'], normalize_weights=parameters['normalize_weights'],
...@@ -91,10 +93,17 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -91,10 +93,17 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
trainer.init(model, subsets_used=parameters['subsets_used']) trainer.init(model, subsets_used=parameters['subsets_used'])
trainer.train(model) trainer.train(model)
trainer.compute_results(model, sub_models_dir) trainer.compute_results(model, sub_models_dir)
else: logger.info(f'Training done for seed {seed_str}')
forest_size = hyperparameters['n_estimators'] seed_job_pb.update(1)
logger.info('Base forest training with fixed forest size of {}'.format(forest_size))
sub_models_dir = models_dir + os.sep + 'forest_size' + os.sep + str(forest_size) def extracted_forest_size_job(extracted_forest_size_job_pb, extracted_forest_size, models_dir,
seed, parameters, dataset, hyperparameters, experiment_id, trainer):
logger = LoggerFactory.create(LOG_PATH, 'training_seed{}_extracted_forest_size{}_ti{}'.format(
seed, extracted_forest_size, threading.get_ident()))
logger.info('extracted_forest_size={}'.format(extracted_forest_size))
sub_models_dir = models_dir + os.sep + 'extracted_forest_sizes' + os.sep + str(extracted_forest_size)
# Check if the result file already exists # Check if the result file already exists
already_exists = False already_exists = False
...@@ -102,17 +111,18 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -102,17 +111,18 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
sub_models_dir_files = os.listdir(sub_models_dir) sub_models_dir_files = os.listdir(sub_models_dir)
for file_name in sub_models_dir_files: for file_name in sub_models_dir_files:
if '.pickle' != os.path.splitext(file_name)[1]: if '.pickle' != os.path.splitext(file_name)[1]:
continue return
else: else:
already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0 already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0
break break
if already_exists: if already_exists:
logger.info('Base forest result already exists. Skipping...') logger.info(f'Extracted forest {extracted_forest_size} result already exists. Skipping...')
else: return
pass
pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True) pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True)
model_parameters = ModelParameters( model_parameters = ModelParameters(
extracted_forest_size=forest_size, extracted_forest_size=extracted_forest_size,
normalize_D=parameters['normalize_D'], normalize_D=parameters['normalize_D'],
subsets_used=parameters['subsets_used'], subsets_used=parameters['subsets_used'],
normalize_weights=parameters['normalize_weights'], normalize_weights=parameters['normalize_weights'],
...@@ -127,7 +137,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters): ...@@ -127,7 +137,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters):
trainer.init(model, subsets_used=parameters['subsets_used']) trainer.init(model, subsets_used=parameters['subsets_used'])
trainer.train(model) trainer.train(model)
trainer.compute_results(model, sub_models_dir) trainer.compute_results(model, sub_models_dir)
logger.info('Training done')
""" """
Command lines example for stage 1: Command lines example for stage 1:
...@@ -287,6 +296,6 @@ if __name__ == "__main__": ...@@ -287,6 +296,6 @@ if __name__ == "__main__":
) )
# Run as much job as there are seeds # Run as much job as there are seeds
with tqdm_joblib(tqdm(total=len(seeds), disable=not args.verbose)) as progress_bar: with tqdm_joblib(tqdm(total=len(seeds), disable=not args.verbose)) as seed_job_pb:
Parallel(n_jobs=args.job_number)(delayed(process_job)(seeds[i], Parallel(n_jobs=args.job_number)(delayed(seed_job)(seed_job_pb, seeds[i],
parameters, experiment_id, hyperparameters) for i in range(len(seeds))) parameters, experiment_id, hyperparameters, args.verbose) for i in range(len(seeds)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment