From 29a11860ea1fbdc3b1860d92e62a4a53bbdccfc8 Mon Sep 17 00:00:00 2001 From: Charly Lamothe <charly.lamothe@univ-amu.fr> Date: Fri, 6 Mar 2020 05:37:38 +0100 Subject: [PATCH] Speedup similarity forest regressor and add parallelization at the extracted forest size level in the training --- .../models/similarity_forest_regressor.py | 58 +++++------ code/train.py | 95 ++++++++++--------- 2 files changed, 82 insertions(+), 71 deletions(-) diff --git a/code/bolsonaro/models/similarity_forest_regressor.py b/code/bolsonaro/models/similarity_forest_regressor.py index bbdb147..8f02daf 100644 --- a/code/bolsonaro/models/similarity_forest_regressor.py +++ b/code/bolsonaro/models/similarity_forest_regressor.py @@ -1,11 +1,8 @@ -from bolsonaro.utils import tqdm_joblib - from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error from sklearn.base import BaseEstimator from abc import abstractmethod, ABCMeta import numpy as np -from joblib import Parallel, delayed from tqdm import tqdm @@ -34,35 +31,40 @@ class SimilarityForestRegressor(BaseEstimator, metaclass=ABCMeta): selected_trees = list() tree_list = list(self._estimator.estimators_) - for _ in range(self._extracted_forest_size): - best_similarity = 100000 - found_index = 0 - for i in range(len(tree_list)): - lonely_tree = tree_list[i] - del tree_list[i] - begin_time = time.time() - with tqdm_joblib(tqdm(total=len(tree_list), disable=True)) as job_pb: - val_list = Parallel(n_jobs=-1)(delayed(self._tree_predict_job)( - job_pb, tree_list[i], X_val) - for i in range(len(tree_list))) - val_list = np.array(val_list) - val_mean = np.mean(val_list, axis=0) - val_score = self._score_metric(val_mean, y_val) - temp_similarity = abs(forest_pred - val_score) - if (temp_similarity < best_similarity): - found_index = i - best_similarity = temp_similarity - tree_list.insert(i, lonely_tree) - selected_trees.append(tree_list[found_index]) - del tree_list[found_index] + val_scores = list() + with tqdm(tree_list) as tree_pred_bar: + tree_pred_bar.set_description('[Initial tree predictions]') + for tree in tree_pred_bar: + val_scores.append(tree.predict(X_val)) + tree_pred_bar.update(1) + + with tqdm(range(self._extracted_forest_size), disable=False) as pruning_forest_bar: + pruning_forest_bar.set_description(f'[Pruning forest s={self._extracted_forest_size}]') + for i in pruning_forest_bar: + best_similarity = 100000 + found_index = 0 + with tqdm(range(len(tree_list)), disable=False) as tree_list_bar: + tree_list_bar.set_description(f'[Tree selection s={self._extracted_forest_size} #{i}]') + for j in tree_list_bar: + lonely_tree = tree_list[j] + del tree_list[j] + val_mean = np.mean(np.asarray(val_scores), axis=0) + val_score = self._score_metric(val_mean, y_val) + temp_similarity = abs(forest_pred - val_score) + if (temp_similarity < best_similarity): + found_index = j + best_similarity = temp_similarity + tree_list.insert(j, lonely_tree) + val_scores.insert(j, lonely_tree.predict(X_val)) + tree_list_bar.update(1) + selected_trees.append(tree_list[found_index]) + del tree_list[found_index] + del val_scores[found_index] + pruning_forest_bar.update(1) pruned_forest = list(set(forest) - set(selected_trees)) self._estimator.estimators_ = pruned_forest - def _tree_predict_job(self, job_pb, tree, X_val): - val_pred = tree.predict(X_val) - return val_pred - def score(self, X, y): test_list = list() for mod in self._estimator.estimators_: diff --git a/code/train.py b/code/train.py index 66761ab..0e438cf 100644 --- a/code/train.py +++ b/code/train.py @@ -21,7 +21,7 @@ import numpy as np import shutil -def process_job(seed, parameters, experiment_id, hyperparameters): +def seed_job(seed_job_pb, seed, parameters, experiment_id, hyperparameters, verbose): """ Experiment function. @@ -34,7 +34,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters): """ logger = LoggerFactory.create(LOG_PATH, 'training_seed{}_ti{}'.format( seed, threading.get_ident())) - logger.info('seed={}'.format(seed)) seed_str = str(seed) experiment_id_str = str(experiment_id) @@ -55,42 +54,10 @@ def process_job(seed, parameters, experiment_id, hyperparameters): trainer = Trainer(dataset) if parameters['extraction_strategy'] != 'none': - for extracted_forest_size in parameters['extracted_forest_size']: - logger.info('extracted_forest_size={}'.format(extracted_forest_size)) - sub_models_dir = models_dir + os.sep + 'extracted_forest_sizes' + os.sep + str(extracted_forest_size) - - # Check if the result file already exists - already_exists = False - if os.path.isdir(sub_models_dir): - sub_models_dir_files = os.listdir(sub_models_dir) - for file_name in sub_models_dir_files: - if '.pickle' != os.path.splitext(file_name)[1]: - continue - else: - already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0 - break - if already_exists: - logger.info(f'Extracted forest {extracted_forest_size} result already exists. Skipping...') - continue - - pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True) - - model_parameters = ModelParameters( - extracted_forest_size=extracted_forest_size, - normalize_D=parameters['normalize_D'], - subsets_used=parameters['subsets_used'], - normalize_weights=parameters['normalize_weights'], - seed=seed, - hyperparameters=hyperparameters, - extraction_strategy=parameters['extraction_strategy'] - ) - model_parameters.save(sub_models_dir, experiment_id) - - model = ModelFactory.build(dataset.task, model_parameters) - - trainer.init(model, subsets_used=parameters['subsets_used']) - trainer.train(model) - trainer.compute_results(model, sub_models_dir) + with tqdm_joblib(tqdm(total=len(parameters['extracted_forest_size']), disable=not verbose)) as extracted_forest_size_job_pb: + Parallel(n_jobs=-1)(delayed(extracted_forest_size_job)(extracted_forest_size_job_pb, parameters['extracted_forest_size'][i], + models_dir, seed, parameters, dataset, hyperparameters, experiment_id, trainer) + for i in range(len(parameters['extracted_forest_size']))) else: forest_size = hyperparameters['n_estimators'] logger.info('Base forest training with fixed forest size of {}'.format(forest_size)) @@ -109,7 +76,6 @@ def process_job(seed, parameters, experiment_id, hyperparameters): if already_exists: logger.info('Base forest result already exists. Skipping...') else: - pass pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True) model_parameters = ModelParameters( extracted_forest_size=forest_size, @@ -127,7 +93,50 @@ def process_job(seed, parameters, experiment_id, hyperparameters): trainer.init(model, subsets_used=parameters['subsets_used']) trainer.train(model) trainer.compute_results(model, sub_models_dir) - logger.info('Training done') + logger.info(f'Training done for seed {seed_str}') + seed_job_pb.update(1) + +def extracted_forest_size_job(extracted_forest_size_job_pb, extracted_forest_size, models_dir, + seed, parameters, dataset, hyperparameters, experiment_id, trainer): + + logger = LoggerFactory.create(LOG_PATH, 'training_seed{}_extracted_forest_size{}_ti{}'.format( + seed, extracted_forest_size, threading.get_ident())) + logger.info('extracted_forest_size={}'.format(extracted_forest_size)) + + sub_models_dir = models_dir + os.sep + 'extracted_forest_sizes' + os.sep + str(extracted_forest_size) + + # Check if the result file already exists + already_exists = False + if os.path.isdir(sub_models_dir): + sub_models_dir_files = os.listdir(sub_models_dir) + for file_name in sub_models_dir_files: + if '.pickle' != os.path.splitext(file_name)[1]: + return + else: + already_exists = os.path.getsize(os.path.join(sub_models_dir, file_name)) > 0 + break + if already_exists: + logger.info(f'Extracted forest {extracted_forest_size} result already exists. Skipping...') + return + + pathlib.Path(sub_models_dir).mkdir(parents=True, exist_ok=True) + + model_parameters = ModelParameters( + extracted_forest_size=extracted_forest_size, + normalize_D=parameters['normalize_D'], + subsets_used=parameters['subsets_used'], + normalize_weights=parameters['normalize_weights'], + seed=seed, + hyperparameters=hyperparameters, + extraction_strategy=parameters['extraction_strategy'] + ) + model_parameters.save(sub_models_dir, experiment_id) + + model = ModelFactory.build(dataset.task, model_parameters) + + trainer.init(model, subsets_used=parameters['subsets_used']) + trainer.train(model) + trainer.compute_results(model, sub_models_dir) """ Command lines example for stage 1: @@ -287,6 +296,6 @@ if __name__ == "__main__": ) # Run as much job as there are seeds - with tqdm_joblib(tqdm(total=len(seeds), disable=not args.verbose)) as progress_bar: - Parallel(n_jobs=args.job_number)(delayed(process_job)(seeds[i], - parameters, experiment_id, hyperparameters) for i in range(len(seeds))) + with tqdm_joblib(tqdm(total=len(seeds), disable=not args.verbose)) as seed_job_pb: + Parallel(n_jobs=args.job_number)(delayed(seed_job)(seed_job_pb, seeds[i], + parameters, experiment_id, hyperparameters, args.verbose) for i in range(len(seeds))) -- GitLab