Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freemem #160

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

Freemem #160

wants to merge 7 commits into from

Conversation

perib
Copy link
Collaborator

@perib perib commented Nov 1, 2024

[please review the Contribution Guidelines prior to submitting your pull request. go ahead and delete this line if you've already reviewed said guidelines.]

What does this PR do?

adds a few lines of code to call future.release when we are done with a future. Also adds client.run(gc.collect) after each evaluation to clear up a potential memory leak.

Any background context you want to provide?

I noticed that running graph pipelines with large datasets caused an issue where training effectively stopped early. TPOT did not crash, but after a number of generations, all subsequent pipelines evaluated as "INVALID". My theory is that a memory leak ate up all the available RAM, causing all subsequent pipelines to fail.

I was able to reproduce a memory leak by running graph pipelines with several polynomial transformers (as this transformation exponentially increases data size). By watching the dask dashboard, a memory leak seems to happen when a future starts using a large amount of RAM. Normally dask is able to clean the data when the pipeline is done evaluating. But in some cases with particularly large data/transformations, it is not able to free the ram. My theory is that this happens when a transformation inside the future goes beyond system memory and crashes.

I was able to free this memory by calling client.run(gc.collect) and future.release() . I added these to the parallel evaluation function in hopes that it helps TPOT to free memory more often and prevent this issue.

While I think this does help the issue, it looks like with large datasets and graph search spaces that training still sometimes terminates early in the same way as described, so there is more to look into.

Here's a short script I was able to run in a jupyter notebook that could cause a potential memory leak with dask. The unmanaged memory was able to be freed with client.run(gc.collect)


import numpy as np
import sklearn
import tpot2
from dask.distributed import Client
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=3, #if no client is passed in and no global client exists, create our own
        threads_per_worker=1,
        processes=True,
        silence_logs=False,
        memory_limit="2GB")
client = Client(cluster)

from tpot2.search_spaces.nodes import *
from tpot2.search_spaces.pipelines import *

sp = GraphSearchPipeline(
    root_search_space=tpot2.config.get_search_space("DecisionTreeClassifier"),
    inner_search_space=tpot2.config.get_search_space("PolynomialFeatures"),
    max_size=7,
)

individual_list = [sp.generate() for i in range(30)]
for ind in individual_list:
    for i in range(0,np.random.randint(1,5)):
        ind.mutate()
        
individual_list[0].export_pipeline().plot()


import sklearn.datasets
import tpot2
X, y = sklearn.datasets.load_breast_cancer(return_X_y=True)

x_scatter = client.scatter(X)
y_scatter = client.scatter(y)

objective_kwargs = {"X": x_scatter, "y": y_scatter}


def my_objective_function(ind, X, y):
    pipeline = ind.export_pipeline()
    pipeline.fit(X, y)
    return pipeline.score(X, y)


objective_list = [my_objective_function]


for i in range(100):

    results = tpot2.utils.eval_utils.parallel_eval_objective_list(individual_list,
                                    objective_list,
                                    verbose=0,
                                    max_eval_time_mins=999,
                                    n_expected_columns=1,
                                    client=client,
                                    scheduled_timeout_time=None,
                                    **objective_kwargs)

@nickotto
Copy link
Contributor

nickotto commented Nov 8, 2024

can you check why we get this error in the unit test:
Exception: No individuals could be evaluated in the initial population as the max_eval_mins time limit was reached before any individuals could be evaluated.

Seems like the gc might be running too prematurely or have issues with the eval time.

@perib
Copy link
Collaborator Author

perib commented Nov 8, 2024

The tests passed on my machine. My guess is that "client.run(gc.collect)" makes the code run slower/less efficiently.

I commented out "client.run(gc.collect)" to see if it allows the github test server to pass the tests. It looks like that chance does allow the server to pass the test. Alternatively we could increase the max_time_mins / max_eval_time_mins for the tests.

I haven't had the chance to benchmark the time cost of client.run(gc.collect), didn't seem to make a large difference in my tests, but may have reduced memory usage some. Something worth exploring.

Another option is to only call it when a pipeline fails due to a future being canceled. (This is the condition that prints the warning "Cancelled future (likely memory related)") This condition is generally only called when a pipeline goes over the memory limit and is cancelled. I made this edit just now to see if it works now.

Might be useful to put a call in the "Exception in future, but not caught by dask" condition as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants