Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed 2022.3.0 no more compatible with dask-yarn because of missing "status" attribute in YarnCluster #155

Open
NHanser opened this issue Mar 30, 2022 · 7 comments

Comments

@NHanser
Copy link

NHanser commented Mar 30, 2022

What happened: error while creating Client object

What you expected to happen: correct init

Minimal Complete Verifiable Example:

from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)

Anything else we need to know?:

Environment:

  • Dask version: 2022.3.0
  • Python version: 3.8.X
  • Operating System: Linux
  • Install method (conda, pip, source): pip
@jcrist
Copy link
Member

jcrist commented Mar 30, 2022

Can you provide the traceback of the error you saw?

@NHanser
Copy link
Author

NHanser commented Mar 30, 2022

AttributeError                            Traceback (most recent call last)
Input In [2], in <cell line: 7>()
      3 from dask import dataframe as dd
      6 cluster = YarnCluster()
----> 7 client = Client(cluster)
      8 cluster

File ~/dask_tests/daskenv/lib64/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    831 elif isinstance(getattr(address, "scheduler_address", None), str):
    832     # It's a LocalCluster or LocalCluster-compatible object
    833     self.cluster = address
--> 834     status = getattr(self.cluster, "status")
    835     if status and status in [Status.closed, Status.closing]:
    836         raise RuntimeError(
    837             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    838         )

AttributeError: 'YarnCluster' object has no attribute 'status'

@jacobtomlinson
Copy link
Member

I've noticed this error popping up in other places too.

@yup111
Copy link

yup111 commented May 9, 2022

I have same issue.

@jtrive84
Copy link

Same issue using dask/distributed 2022.6.1 with dask-yarn 0.9.

@santosh-d3vpl3x
Copy link

santosh-d3vpl3x commented Aug 30, 2022

As per newer implementations in distributed we need to extend SpecCluster to implement resource managers like yarn.

Minimal example I hacked together:

import skein
from distributed import SpecCluster
from distributed.deploy import ProcessInterface


class YarnProcess(ProcessInterface):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name = None
        self.cli: skein.ApplicationClient = skein.ApplicationClient.from_current()
        self.container = None
        _ = kwargs

    async def close(self):
        self.cli.kill_container(self.container)
        await super().close()


class DaskYarnScheduler(YarnProcess):
    def __init__(self, **kwargs):
        super().__init__()
        self.service_name: str = "dask.scheduler"
        self.container = None
        _ = kwargs

    async def start(self):
        self.cli.scale(self.service_name, count=1)
        self.address = self.cli.kv.wait("dask.scheduler").decode()
        self.container = self.cli.get_containers(services=[self.service_name])[0].id
        await super().start()


class DaskYarnWorker(YarnProcess):
    def __init__(self, address, **kwargs):
        super().__init__()
        self.service_name: str = "dask.worker"
        _ = kwargs, address

    async def start(self):
        self.container = self.cli.add_container(self.service_name).id
        await super().start()


class YarnCluster(SpecCluster):
    def __init__(self, security=None):
        super().__init__(
            scheduler={"cls": DaskYarnScheduler, "options": {}},
            worker={"cls": DaskYarnWorker, "options": {}},
            security=security,
        )
        self.spec = skein.ApplicationClient.from_current().get_specification()

cluster = YarnCluster()
client = Client(cluster.scheduler_address)

@cjac
Copy link

cjac commented Aug 10, 2024

@santosh-d3vpl3x - can you please review #162 to make sure that I'm implementing correctly?

@bradmiro - I could use some help executing the tests and sanity checking these changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants