Share your experiences with worker-saturation
config to reduce memory usage
#7128
Replies: 15 comments 25 replies
-
I think there is a typo in the code blocks you suggested, I had to use:
or else I get a type error. |
Beta Was this translation helpful? Give feedback.
-
Epic stuff 👏🏾 👏🏾 👏🏾 @gjoseph92 See this repo for performance reports, csv of memory usage timeseries, and code. Anecdotally, it seemed like for TEMI found one ridiculous improvement (8x?! TEM for "transformed eulerian mean"): dims = ("time", "level", "lat", "lon")
# 1 is number of years, adjust to make bigger,
# full dataset is 60-ish years.
shape = (1 * 365 * 24, 37, 72, 1440)
chunks = (24, 15, -1, -1)
ds = xr.Dataset(
{
"U": (dims, dask.array.random.random(shape, chunks=chunks)),
"V": (dims, dask.array.random.random(shape, chunks=chunks)),
"W": (dims, dask.array.random.random(shape, chunks=chunks)),
"T": (dims, dask.array.random.random(shape, chunks=chunks)),
},
coords={"time": pd.date_range("2001-01-01", periods=shape[0], freq="H")},
)
zonal_means = ds.mean("lon")
anomaly = ds - zonal_means
anomaly['uv'] = anomaly.U*anomaly.V
anomaly['vt'] = anomaly.V*anomaly.T
anomaly['uw'] = anomaly.U*anomaly.W
temdiags = zonal_means.merge(anomaly[['uv','vt','uw']].mean("lon"))
temdiags = temdiags.groupby('time.dayofyear').mean()
temdiags = temdiags.rename({'dayofyear':'time'})
with performance_report(f"tem-saturation-{val}.html"):
for repeat in range(repeats):
with ms.sample(f"{val}_{repeat}"):
temdiags.compute()
client.restart() uvmeanThis one had no difference (uvmean from this xarray issue; this one I ran two repeats for each value), ds = xr.Dataset(
dict(
anom_u=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
anom_v=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
)
)
mean = ds**2
mean["uv"] = ds.anom_u * ds.anom_v
mean = mean.mean("time")
with performance_report(f"uvmean-saturation-{val}.html"):
for repeat in range(repeats):
with ms.sample(f"{val}_{repeat}"):
mean.compute()
client.restart() |
Beta Was this translation helpful? Give feedback.
-
This works fine, just kinda stalls the scheduler if the tasks take to long you have the |
Beta Was this translation helpful? Give feedback.
-
well the It only happens before the first tasks was finished, on a KubeCluster. I will try to look out for it and try to report further, if this keeps happening. |
Beta Was this translation helpful? Give feedback.
-
@gjoseph92 I finally tried this out on my vorticity problem (on the LEAP hub now that we have Notes:
Overall this looks like a huge improvement! 🥳 Next step is for me to try it out on the larger dataset. (I will also give it more workers and more memory per worker in that case though.) |
Beta Was this translation helpful? Give feedback.
-
I believe there may be a typo in the gateway/pangeo codeblock:
Versions:
??? Should the statement instead be ???:
|
Beta Was this translation helpful? Give feedback.
-
Hi @gjoseph92, thank you soooo much for this improvement. I was chatting with @dcherian yesterday and we coincidentally stumbled over an example for an improvement here. The code to calculate this trend is a simple xarray one liner trend = da.fillna(-1e10).polyfit('time', 1, skipna=False) but computing this in the past would ultimately lead to spilling and eventually killed workers. Setting the worker saturation fixes this without the need for rechunking the array first. This is a great improvement for this very common workflow in geosciences. I made a little demo notebook(can be reproduced on the pangeo 2i2c hub - I used a Huge deployment) in case that is helpful! I ran the computation for a range of saturation values, but interestingly Ill check this out on another real world example soon and report back. |
Beta Was this translation helpful? Give feedback.
-
I have not properly tested this yet, but it seems to me that each time I start a cluster with worker-saturation set to 1.0, the start up takes quite a bit longer. Is this something others here have experienced? |
Beta Was this translation helpful? Give feedback.
-
Thanks so much for this - I've seen big improvements in reliability with this when using dask.array.tensordot to aggregate gridded data to regions. My workflow was to load gridded data from cloud storage (around 20 GiB), aggregate using tensordot, and save the resulting averaged data back to cloud storage. Without worker saturation, if my cluster was not sufficiently large (24 workers, 30 GiB each), workers would become overwhelmed with data and crash, taking the whole cluster down with it. With this new parameter, I've been able to reduce the number of workers by a factor of 3 and worker memory in half, and it now runs without crashing. I had previously tried all sorts of different chunking schemes and other cluster settings (worker memory spill/target/terminate, Malloc trim threshold, etc), but none made much of a difference. |
Beta Was this translation helpful? Give feedback.
-
This solves a lot of our memory bound problems! I just tried it with our workflow (structured binary serialization of Zarr arrays) and it keeps memory under check. In the past, we have been spilling to disk and having trouble with memory usage. Should a reasonable number like 1.1 or 1.25 be the default instead of |
Beta Was this translation helpful? Give feedback.
-
Thanks to @gjoseph92 for the worker saturation upgrade and the support for our use case! We've been struggling with high memory usage (spills, killed workers, timeouts...) along with inconsistent and unbalanced processing for our process that resulted in long runtime and underutilization of cloud resources. With worker saturation, we have immediately seen 30% reduction in runtime and thus cost, 60% reduction in peak memory usage, and significantly more reasonable and consistent task assignment and task processing among Dask workers! |
Beta Was this translation helpful? Give feedback.
-
🎉
…On Tue, Nov 15, 2022, 6:08 PM Gabe Joseph ***@***.***> wrote:
FYI, 1.1 is now the default in the latest release! (2022.11.0)
—
Reply to this email directly, view it on GitHub
<#7128 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGE7TTJOOOMMODCRCDWIQQYBANCNFSM6AAAAAAQ73VLKA>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
Seems to setting saturation to 0.0 does not have the effect i wished for. Still 256 Tasks for 128 Workers with singlethreads :-/ |
Beta Was this translation helpful? Give feedback.
-
Thanks for the solution! It solved a problem we had where we kept using a larger EC2 instance but the job would fail to finish. Eventually it dawned on us that it probably wasn't a memory issue but how we were handling it. |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
If you have a workload that struggles with running out of memory, please try it with this new setting and report back!
(and if you don't, please try it anyway and report back too, whether it goes well or poorly! we want to hear as much feedback as possible.)
distributed>=2022.9.2
includes a new configuration option:distributed.scheduler.worker-saturation
. This setting controls how many extra initial data-loading tasks workers will run. Full documentation is here.It's currently set toThis setting is now on by default in version 2022.11.0 and later!inf
by default, to be consistent with previous behavior. But setting it to1.0
-1.5
can enormously reduce memory usage in many cases:How do I set it?
Update: This setting is now on by default in version 2022.11.0 and later. You no longer need to follow these instructions to set it; just upgrade to the latest version.
Instructions
However you set dask configuration. Different deployment systems have different ways to do this.
The easiest is often to set an environment variable on the cluster (note this must be set before the scheduler process starts):
dask-gateway / Pangeo
Note that different dask-gateway systems may use an option name besides
environment
(environment_vars
, for example), or may not offer the option at all.Coiled
Local client
dask-kubernetes
dask-cloudprovider
YAML config file
See https://docs.dask.org/en/latest/configuration.html#yaml-files
Last-resort option / modify a live cluster without restarting
If you have no other option for setting config, or want to change the value on a live cluster without re-creating it, you can do this. Only run this while the scheduler is idle (no tasks):
Remember that in all cases, you must be using the latest version of
distributed
(>=2022.9.2
), otherwise nothing will happen!What does this magic actually do?
It prevents root task overproduction, a phenomenon where workers are too quick to load initial data and then start to run out of memory.
With
worker-saturation: 1.0
, each worker will never have more thannthreads
input chunks in memory at once. So in many cases, you'll not only see lower memory usage, but more constant memory usage (see the flatter blue lines compared to more "peak-y" yellow lines above).When can I expect it to work?
When you think, "this feels like it should be embarrassingly parallel / dask shouldn't need to load much data to compute each output, so why is it?"
When can't I expect it to work?
When your workload inherently requires having all data in memory at once, this won't help much. Those are generally computations that need to reorganize a dataset, like
DataFrame.set_index
,DataFrame.merge
, and some instances ofArray.rechunk
.What are the downsides?
In a number of cases, it's slower. (In every case we've seen that's been significantly slower, though, it's also reduced memory use a lot.)
If your workload is currently spilling a lot to disk (or just crashing), this will probably make things (much) faster.
If your workload is currently fast and happy, this may make it slower. We'd love to hear about that! We hope to make this setting the default soon, so all feedback is very helpful.
This also loses co-assignment #4892, so you may see more data transfer than before. However, benchmarks that we thought were dependent on co-assignment seem to work okay with
worker-saturation: 1.0
(they still work and don't use much memory, they're just slower).What value should I set the magic parameter to? What does it actually mean?
Set it to
1.1
. If it's still using too much memory, then set it to1.0
, and please let us know.Specifically,
ceil(worker_nthreads * worker_saturation)
controls how many input chunks a worker will have in memory at once.For the old scheduling behavior, set it to
"inf"
—this turns off the feature flag. This will also re-enable co-assignment.We haven't seen larger values reliably make things much faster (but they do seem to reliably increase memory use). Feel free to play around with it though, and please share what you find.
What should I do if I read this far and have tried it out?
Please reply to this discussion (in a new thread) and share your experience! Please include the size of your cluster (# workers, worker CPU/memory), size of your data (total size, # chunks), and what
worker-saturation
value you used. And of course, how it went! Dashboard screenshots or performance reports are also welcome.Beta Was this translation helpful? Give feedback.
All reactions