Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update() broken with Future argument (newer dask version) #12

Open
robin-cls opened this issue Oct 25, 2024 · 1 comment · May be fixed by #15
Open

update() broken with Future argument (newer dask version) #12

robin-cls opened this issue Oct 25, 2024 · 1 comment · May be fixed by #15
Assignees

Comments

@robin-cls
Copy link

robin-cls commented Oct 25, 2024

Hi,

I recently bumped dask in my conda environment and zcollection.Collection.update now gives a RuntimeError when trying to access a Future object in the callback. This error appears with dask=2024.9.0, but not with older versions

image

Below is the code to reproduce the problem. It works using a local cluster and a zcollection in memory

from __future__ import annotations

from typing import Iterator
import datetime
import pprint
import dask_jobqueue
import os
import dask.distributed
import fsspec
import numpy
import dask

import zcollection
import zcollection.tests.data

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

def create_dataset() -> zcollection.Dataset:
    """Create a dataset to record."""
    generator: Iterator[zcollection.Dataset] = \
        zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)

zds = create_dataset()

fs = fsspec.filesystem('memory')
path = '/my_collection'

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')
collection: zcollection.Collection = zcollection.create_collection(
    'time', zds, partition_handler, path, filesystem=fs)

collection.insert(zds)
scattered = client.scatter(numpy.ones(2), broadcast=True)

def callback(zds, arg_future):
    arg_future.result()
    return {'var1': zds['var1'].values * 2}
collection.update(callback, scattered)

My preliminary analysis is that the underlying wrapper for the update() function stores the *args and **kwargs arguments. This might give dask troubles for serialization/deserialization because the Future contained in *args is not directly submitted by the client.

@Thomas-Z Thomas-Z self-assigned this Nov 19, 2024
@Thomas-Z
Copy link
Collaborator

Hello,

Your analyse is correct.

The problem is the following one:

import distributed as dist
import numpy

if __name__ == '__main__':
    cluster = dist.LocalCluster(processes=False)
    client = dist.Client(cluster)

    scattered = client.scatter(numpy.ones(2), broadcast=True)

    def callback(arg_future):
        print(arg_future)

    def wrap_update_func(func, *args, **kwargs):

        def _wrapped_function() -> None:
            func(*args, **kwargs)

        return _wrapped_function

    def do_something(func, *args):
        client = dist.get_client()
        local_func = wrap_update_func(func, *args)
        futures = client.submit(local_func)

        client.compute(futures, sync=True)

    do_something(callback, scattered)

The wrapped function wraps the parameters and dask loses track of them.
This problem appeared with the 2024.2 release and I think this is similar to what they discussed in this issue.

I'm pushing something that should fix it with almost no side effect.

Something like this approach:

import distributed as dist
import numpy

if __name__ == '__main__':
    cluster = dist.LocalCluster(processes=False)
    client = dist.Client(cluster)

    scattered = client.scatter(numpy.ones(2), broadcast=True, hash=False)

    def callback(arg_not_a_future):
        print(arg_not_a_future)

    def wrap_update_func(func):

        def _wrapped_function(*args, **kwargs) -> None:
            func(*args, **kwargs)

        return _wrapped_function

    def do_something(func, *args):
        client = dist.get_client()
        local_func = wrap_update_func(func)
        futures = client.submit(local_func, *args)

        client.compute(futures, sync=True)

    do_something(callback, scattered)

Regarding your usage you'll have to adapt your callback function.
It won't receive a future so you do not have to call .result() on your parameters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants