Skip to content

Commit

Permalink
Merge pull request #48 from perib/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
perib authored Sep 8, 2023
2 parents c18ad4c + 492394c commit 984ca42
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 72 deletions.
10 changes: 7 additions & 3 deletions Tutorial/3_Genetic_Feature_Set_Selectors.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
" If X is a dataframe, items in sel_subset list must correspond to column names\n",
" If X is a numpy array, items in sel_subset list must correspond to column indexes\n",
" int: index of a single column\n",
"```\n"
"```\n",
"\n"
]
},
{
Expand Down Expand Up @@ -1113,8 +1114,11 @@
}
],
"metadata": {
"interpreter": {
"hash": "57aedbec84c390a3287b44649e400696ed2b6dcd408c8519583e8e995dbe6e9b"
},
"kernelspec": {
"display_name": "tpot_dev",
"display_name": "Python 3.10.12 ('tpot2env2')",
"language": "python",
"name": "python3"
},
Expand All @@ -1128,7 +1132,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
"version": "3.10.12"
},
"orig_nbformat": 4,
"vscode": {
Expand Down
106 changes: 62 additions & 44 deletions Tutorial/7_dask_parallelization.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
"source": [
"# Parallelization\n",
"\n",
"TPOT2 uses the Dask package for parallelization either locally (dask.destributed.LocalCluster) or multi-node via a job schedule (dask-jobqueue). \n",
"This tutorial covers advanced setups for parallelizing TPOT2 with Dask. If you just want to parallelize TPOT2 within a single computer with multiple processes, set the n_jobs parameter to the number of threads you want to use and skip this tutorial. \n",
"\n",
"To parallelize TPOT2 all you need to do is set the n_jobs parameter to the number of cores you want to use. Alternatively, users can create a custom Dask client and pass it in to TPOT2.\n",
"\n",
"This is supported the same in all of the different estimators (TPOTEstimator, TPOTEstimatorSteadyState, TPOTClassifier, or TPOTRegressor)"
"TPOT2 uses Dask for parallelization and defaults to using a dask.distributed.LocalCluster for local parallelization. A user can pass in a custom Dask client or cluster for advanced usage. For example, a multi-node parallelization is possible using the dask-jobqueue package."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Best Practices\n",
"### TPOT2 with Python Scripts\n",
"\n",
"When running tpot from an .py script, it is important to protect code with `if __name__==\"__main__\":`\n",
"\n",
"When running tpot from an .py script, it is important to protect code with `if __name__==\"__main__\":`\n"
"This is due to how parallelization is handled in Python. In short, when Python spawns new processes, each new process reimports code from the relevant .py files, including rerunning code. The context under `if __name__==\"__main__\":` ensures the code under it only executed by the main process and only once. More info [here](https://docs.dask.org/en/stable/scheduling.html#standalone-python-scripts)."
]
},
{
Expand All @@ -33,14 +33,14 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Evaluations: : 232it [02:01, 1.90it/s]\n"
"Evaluations: : 242it [02:01, 1.99it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.9998431179414371\n"
"0.9995194086144522\n"
]
}
],
Expand Down Expand Up @@ -85,14 +85,14 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Evaluations: : 231it [02:00, 1.92it/s]\n"
"Evaluations: : 224it [02:00, 1.86it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.9998143035770981\n"
"0.9996005895289903\n"
]
}
],
Expand Down Expand Up @@ -193,22 +193,23 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Evaluations: : 142it [02:00, 1.18it/s]\n"
"Evaluations: : 119it [02:01, 1.02s/it]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.999735780838626\n"
"0.9988827327847432\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-08-16 16:10:04,735 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n",
"2023-08-16 16:10:04,735 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing\n"
"2023-08-23 13:49:06,747 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing\n",
"2023-08-23 13:49:06,748 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n",
"2023-08-23 13:49:06,748 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n"
]
}
],
Expand Down Expand Up @@ -243,23 +244,22 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Evaluations: : 131it [02:02, 1.07it/s]\n"
"Evaluations: : 132it [02:00, 1.10it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.9999114413297068\n"
"0.999973663151898\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-08-16 16:12:11,659 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing\n",
"2023-08-16 16:12:11,659 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n",
"2023-08-16 16:12:11,660 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n"
"2023-08-23 13:51:14,527 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing\n",
"2023-08-23 13:51:14,528 - distributed.nanny - WARNING - Worker process still alive after 3.19999984741211 seconds, killing\n"
]
}
],
Expand Down Expand Up @@ -293,53 +293,71 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask multi node parallelization\n",
"## Dask multi node parallelization on HPC\n",
"\n",
"Dask can parallelize across multiple nodes via job queueing systems. This is done using the Dask-Jobqueue package. More information can be found in the official [documentation here.]( https://jobqueue.dask.org/en/latest/)\n",
"\n",
"To parallelize TPOT2 with Dask-Jobqueue, simply pass in a client based on a Jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.\n",
"\n",
"Dask can parallelize across multiple nodes via job queueing systems. This is done using the dask-jobqueue package. More information can be found in the official [documentation here.]( https://jobqueue.dask.org/en/latest/)\n",
"Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the Dask cluster. \n",
"\n",
"To parallelize TPOT2 with dask-jobqueue, simply pass in a client based on a jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.\n",
"\n",
"Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the dask cluster. "
"The following example is specific to the Sun Grid Engine. Other supported clusters can be found in the [Dask-Jobqueue documentation here](https://jobqueue.dask.org/en/latest/examples.html)"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.\n"
]
}
],
"source": [
"from dask.distributed import Client, LocalCluster\n",
"import sklearn\n",
"import sklearn.datasets\n",
"import sklearn.metrics\n",
"import sklearn.model_selection\n",
"import tpot2\n",
"\n",
"from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.\n",
"cluster = SGECluster(\n",
" queue='all.q',\n",
" cores=2,\n",
" memory=\"50 GB\"\n",
"import os\n",
"\n",
")\n",
"if os.system(\"which qsub\") != 0:\n",
" print(\"Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.\")\n",
"else:\n",
" print(\"Sun Grid Engine is installed.\")\n",
"\n",
"cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs\n",
" \n",
" cluster = SGECluster(\n",
" queue='all.q',\n",
" cores=2,\n",
" memory=\"50 GB\"\n",
"\n",
"client = Client(cluster)\n",
" )\n",
"\n",
"scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n",
"X, y = sklearn.datasets.load_digits(return_X_y=True)\n",
"X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n",
" cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs\n",
"\n",
"est = tpot2.TPOTEstimatorSteadyState( client=client, classification=True, max_eval_time_seconds=60, max_time_seconds=120, scorers=['roc_auc_ovr'], scorers_weights=[1], verbose=1)\n",
"# this is equivalent to: \n",
"# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n",
"est.fit(X_train, y_train)\n",
"print(scorer(est, X_test, y_test))\n",
" client = Client(cluster)\n",
"\n",
"#It is good to close the client and cluster when you are done with them\n",
"client.close()\n",
"cluster.close()"
" scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n",
" X, y = sklearn.datasets.load_digits(return_X_y=True)\n",
" X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n",
"\n",
" est = tpot2.TPOTEstimatorSteadyState( client=client, classification=True, max_eval_time_seconds=60, max_time_seconds=120, scorers=['roc_auc_ovr'], scorers_weights=[1], verbose=1)\n",
" # this is equivalent to: \n",
" # est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n",
" est.fit(X_train, y_train)\n",
" print(scorer(est, X_test, y_test))\n",
"\n",
" #It is good to close the client and cluster when you are done with them\n",
" client.close()\n",
" cluster.close()"
]
}
],
Expand Down
2 changes: 1 addition & 1 deletion tpot2/config/classifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def params_SGDClassifier(trial, name=None):
'penalty': 'elasticnet',
'alpha': trial.suggest_float(f'alpha_{name}', 1e-5, 0.01, log=True),
'learning_rate': trial.suggest_categorical(f'learning_rate_{name}', ['invscaling', 'constant']),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'l1_ratio': trial.suggest_float(f'l1_ratio_{name}', 0.0, 1.0),
'eta0': trial.suggest_float(f'eta0_{name}', 0.01, 1.0),
'power_t': trial.suggest_float(f'power_t_{name}', 1e-5, 100.0, log=True),
Expand Down
32 changes: 16 additions & 16 deletions tpot2/config/regressors.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def params_RandomForestRegressor(trial, name=None):
# SGDRegressor parameters
def params_SGDRegressor(trial, name=None):
params = {
'loss': trial.suggest_categorical(f'loss_{name}', ['squared_loss', 'huber', 'epsilon_insensitive']),
'loss': trial.suggest_categorical(f'loss_{name}', ['huber', 'squared_error', 'epsilon_insensitive', 'squared_epsilon_insensitive']),
'penalty': 'elasticnet',
'alpha': trial.suggest_float(f'alpha_{name}', 1e-5, 0.01, log=True),
'learning_rate': trial.suggest_categorical(f'learning_rate_{name}', ['invscaling', 'constant']),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept':True,
'l1_ratio': trial.suggest_float(f'l1_ratio_{name}', 0.0, 1.0),
'eta0': trial.suggest_float(f'eta0_{name}', 0.01, 1.0),
'power_t': trial.suggest_float(f'power_t_{name}', 1e-5, 100.0, log=True)
Expand All @@ -58,7 +58,7 @@ def params_SGDRegressor(trial, name=None):
def params_Ridge(trial, name=None):
params = {
'alpha': trial.suggest_float(f'alpha_{name}', 0.0, 1.0),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,


#'max_iter': trial.suggest_int(f'max_iter_{name}', 100, 1000),
Expand All @@ -72,7 +72,7 @@ def params_Ridge(trial, name=None):
def params_Lasso(trial, name=None):
params = {
'alpha': trial.suggest_float(f'alpha_{name}', 0.0, 1.0),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
# 'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),
'precompute': trial.suggest_categorical(f'precompute_{name}', [True, False, 'auto']),

Expand All @@ -95,7 +95,7 @@ def params_ElasticNet(trial, name=None):
# Lars parameters
def params_Lars(trial, name=None):
params = {
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'verbose': trial.suggest_categorical(f'verbose_{name}', [True, False]),
'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),

Expand All @@ -113,7 +113,7 @@ def params_OrthogonalMatchingPursuit(trial, name=None):
params = {
'n_nonzero_coefs': trial.suggest_int(f'n_nonzero_coefs_{name}', 1, 100),
'tol': trial.suggest_float(f'tol_{name}', 1e-5, 1e-1, log=True),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),
'precompute': trial.suggest_categorical(f'precompute_{name}', ['auto', True, False]),
}
Expand All @@ -129,7 +129,7 @@ def params_BayesianRidge(trial, name=None):
'lambda_1': trial.suggest_float(f'lambda_1_{name}', 1e-6, 1e-1, log=True),
'lambda_2': trial.suggest_float(f'lambda_2_{name}', 1e-6, 1e-1, log=True),
'compute_score': trial.suggest_categorical(f'compute_score_{name}', [True, False]),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),
'copy_X': trial.suggest_categorical(f'copy_X_{name}', [True, False]),
}
Expand All @@ -139,7 +139,7 @@ def params_BayesianRidge(trial, name=None):
def params_LassoLars(trial, name=None):
params = {
'alpha': trial.suggest_float(f'alpha_{name}', 0.0, 1.0),
# 'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
# 'fit_intercept': True,
# 'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),
# 'precompute': trial.suggest_categorical(f'precompute_{name}', ['auto_{name}', True, False]),
#'max_iter': trial.suggest_int(f'max_iter_{name}', 100, 1000),
Expand Down Expand Up @@ -178,7 +178,7 @@ def params_ARDRegression(trial, name=None):
'lambda_2': trial.suggest_float(f'lambda_2_{name}', 1e-6, 1e-1, log=True),
'compute_score': trial.suggest_categorical(f'compute_score_{name}', [True, False]),
'threshold_lambda': trial.suggest_int(f'threshold_lambda_{name}', 100, 1000),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'normalize': trial.suggest_categorical(f'normalize_{name}', [True, False]),
'copy_X': trial.suggest_categorical(f'copy_X_{name}', [True, False]),
}
Expand All @@ -191,7 +191,7 @@ def params_TheilSenRegressor(trial, name=None):
params = {
'n_subsamples': trial.suggest_int(f'n_subsamples_{name}', 10, 100),
'max_subpopulation': trial.suggest_int(f'max_subpopulation_{name}', 100, 1000),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
'copy_X': trial.suggest_categorical(f'copy_X_{name}', [True, False]),
'verbose': trial.suggest_categorical(f'verbose_{name}', [True, False]),
}
Expand All @@ -215,7 +215,7 @@ def params_Perceptron(trial, name=None):
'penalty': trial.suggest_categorical(f'penalty_{name}', [None, 'l2', 'l1', 'elasticnet']),
'alpha': trial.suggest_float(f'alpha_{name}', 1e-5, 1e-1, log=True),
'l1_ratio': trial.suggest_float(f'l1_ratio_{name}', 0.0, 1.0),
'fit_intercept': trial.suggest_categorical(f'fit_intercept_{name}', [True, False]),
'fit_intercept': True,
#'max_iter': trial.suggest_int(f'max_iter_{name}', 100, 1000),
'tol': trial.suggest_float(f'tol_{name}', 1e-5, 1e-1, log=True),
'shuffle': trial.suggest_categorical(f'shuffle_{name}', [True, False]),
Expand Down Expand Up @@ -244,10 +244,6 @@ def params_MLPRegressor(trial, name=None):
def params_GradientBoostingRegressor(trial, name=None):
loss = trial.suggest_categorical(f'loss_{name}', ['ls', 'lad', 'huber', 'quantile'])

if loss == 'quantile' or loss == 'huber':
alpha = trial.suggest_float(f'alpha_{name}', 0.05, 0.95)
else:
alpha = None
params = {

'n_estimators': 100,
Expand All @@ -258,9 +254,13 @@ def params_GradientBoostingRegressor(trial, name=None):
'min_samples_leaf': trial.suggest_int(f'min_samples_leaf_{name}', 1, 21),
'subsample': 1-trial.suggest_float(f'subsample_{name}', 0.05, 1.00, log=True),
'max_features': 1-trial.suggest_float(f'max_features_{name}', 0.05, 1.00, log=True),
'alpha': alpha,

}

if loss == 'quantile' or loss == 'huber':
alpha = trial.suggest_float(f'alpha_{name}', 0.05, 0.95)
params['alpha'] = alpha

return params


Expand Down
Loading

0 comments on commit 984ca42

Please sign in to comment.