Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Dask on Dataproc GCP #85

Closed
anudeepbablu opened this issue Apr 13, 2021 · 21 comments
Closed

Dask on Dataproc GCP #85

anudeepbablu opened this issue Apr 13, 2021 · 21 comments
Assignees

Comments

@anudeepbablu
Copy link

anudeepbablu commented Apr 13, 2021

Will contribute notebooks to show workflow to run Dask on GCP Dataproc

@zronaghi
Copy link
Contributor

Sounds great, thanks @anudeepbablu!

@skirui-source
Copy link
Contributor

Seeing this error:

cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker", 
    worker_gpus=1, worker_vcores=4, worker_memory='24GB', 
    worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)

client = Client(cluster)
client

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In [3], line 1
----> 1 client = Client(cluster)
      2 client

File /opt/conda/miniconda3/lib/python3.8/site-packages/distributed/client.py:834, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    831 elif isinstance(getattr(address, "scheduler_address", None), str):
    832     # It's a LocalCluster or LocalCluster-compatible object
    833     self.cluster = address
--> 834     status = getattr(self.cluster, "status")
    835     if status and status in [Status.closed, Status.closing]:
    836         raise RuntimeError(
    837             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    838         )

AttributeError: 'YarnCluster' object has no attribute 'status'

@jacobtomlinson
Copy link
Member

Looks related to dask/dask-yarn#158 which there is a workaround for.

@jacobtomlinson
Copy link
Member

Once you have things working could you write this up as a documentation page in rapidsai/deployment?

@skirui-source
Copy link
Contributor

Once you have things working could you write this up as a documentation page in rapidsai/deployment?

I have opened PR rapidsai/deployment#99 to update the Dataproc instructions

@skirui-source
Copy link
Contributor

skirui-source commented Jan 13, 2023

Update: PR on-hold awaiting this issue to be resolved -- Google team needs to upgrade the dask rapids installation to v22.12.

@skirui-source
Copy link
Contributor

skirui-source commented Feb 2, 2023

@jacobtomlinson Blocked by this error, fails to load the parquet dataset, perhaps related to apache/arrow#31812

OUTPUT ``` > File already exists. Ready to load at /rapids_hpo/data/airlines.parquet --------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) Cell In[7], line 1 ----> 1 df = prepare_dataset(use_full_dataset=True)

Cell In[6], line 34, in prepare_dataset(use_full_dataset)
28 print(f" > Download complete {file_name}")
30 input_cols = ["Year", "Month", "DayofMonth", "DayofWeek", "CRSDepTime", "CRSArrTime",
31 "UniqueCarrier", "FlightNum", "ActualElapsedTime", "Origin", "Dest",
32 "Distance", "Diverted"]
---> 34 dataset = cudf.read_parquet(parquet_name)
36 # encode categoricals as numeric
37 for col in dataset.select_dtypes(["object"]).columns:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call..inner(*args, **kwds)
76 @wraps(func)
77 def inner(*args, **kwds):
78 with self._recreate_cm():
---> 79 return func(*args, **kwds)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:420, in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, strings_to_categorical, use_pandas_metadata, use_python_file_object, categorical_partitions, open_file_options, *args, **kwargs)
413 partition_categories = {}
414 if fs and paths:
415 (
416 paths,
417 row_groups,
418 partition_keys,
419 partition_categories,
--> 420 ) = _process_dataset(
421 paths,
422 fs,
423 filters=filters,
424 row_groups=row_groups,
425 categorical_partitions=categorical_partitions,
426 )
427 elif filters is not None:
428 raise ValueError("cudf cannot apply filters to open file objects.")

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/contextlib.py:79, in ContextDecorator.call..inner(*args, **kwds)
76 @wraps(func)
77 def inner(*args, **kwds):
78 with self._recreate_cm():
---> 79 return func(*args, **kwds)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/cudf/io/parquet.py:243, in _process_dataset(paths, fs, filters, row_groups, categorical_partitions)
238 filters = pq._filters_to_expression(filters)
240 # Initialize ds.FilesystemDataset
241 # TODO: Remove the if len(paths) workaround after following bug is fixed:
242 # https://issues.apache.org/jira/browse/ARROW-16438
--> 243 dataset = ds.dataset(
244 source=paths[0] if len(paths) == 1 else paths,
245 filesystem=fs,
246 format="parquet",
247 partitioning="hive",
248 )
250 file_list = dataset.files
251 if len(file_list) == 0:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:749, in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
738 kwargs = dict(
739 schema=schema,
740 filesystem=filesystem,
(...)
745 selector_ignore_prefixes=ignore_prefixes
746 )
748 if _is_path_like(source):
--> 749 return _filesystem_dataset(source, **kwargs)
750 elif isinstance(source, (tuple, list)):
751 if all(_is_path_like(elem) for elem in source):

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/dataset.py:451, in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
443 options = FileSystemFactoryOptions(
444 partitioning=partitioning,
445 partition_base_dir=partition_base_dir,
446 exclude_invalid_files=exclude_invalid_files,
447 selector_ignore_prefixes=selector_ignore_prefixes
448 )
449 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
--> 451 return factory.finish(schema)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/_dataset.pyx:1885, in pyarrow._dataset.DatasetFactory.finish()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/pyarrow/error.pxi:100, in pyarrow.lib.check_status()

ArrowInvalid: Error creating dataset. Could not read schema from '/rapids_hpo/data/airlines.parquet': Could not open Parquet input source '/rapids_hpo/data/airlines.parquet': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.. Is this a 'parquet' file?

@jacobtomlinson
Copy link
Member

Looks like there is something wrong with the dataset. Could you share a link to the full notebook?

@skirui-source
Copy link
Contributor

skirui-source commented Feb 6, 2023

I am testing this notebook in a GCP Dataproc cluster using the airline parquet dataset located in my GCS bucket (i think you need to access GCP for this)

@jacobtomlinson
Copy link
Member

In that case my guess from the error would be that either the dataset in GCS is corrupted, or the client/workers can't access the file correctly.

It would help if you could share a complete example of what you ran to get the error. Happy to sync up if that's easier.

Side-note: As you're using the HPO_Demo.ipynb notebook for testing and therefore are the most familiar with it I've assigned #208 to you.

@skirui-source
Copy link
Contributor

skirui-source commented Feb 13, 2023

I loaded the data locally and it works. Seems to be an issue on Dataproc side. I was considering loading the parquet dataset into a BigQuery table as this notebook example does. But I think we should still find time to sync.

OUTPUT
(dataproc) skirui@skirui-HP-Z8-G4-Workstation:~/Desktop$ parquet-tools inspect airline_small.parquet

############ file meta data ############
created_by:
num_columns: 14
num_rows: 200000
num_row_groups: 1
format_version: 1.0
serialized_size: 3102


############ Columns ############
ArrDelayBinary
Year
Month
DayofMonth
DayofWeek
CRSDepTime
CRSArrTime
UniqueCarrier
FlightNum
ActualElapsedTime
Origin
Dest
Distance
Diverted

############ Column(ArrDelayBinary) ############
name: ArrDelayBinary
path: ArrDelayBinary
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 2%)

############ Column(Year) ############
name: Year
path: Year
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(Month) ############
name: Month
path: Month
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(DayofMonth) ############
name: DayofMonth
path: DayofMonth
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(DayofWeek) ############
name: DayofWeek
path: DayofWeek
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(CRSDepTime) ############
name: CRSDepTime
path: CRSDepTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 36%)

############ Column(CRSArrTime) ############
name: CRSArrTime
path: CRSArrTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 7%)

############ Column(UniqueCarrier) ############
name: UniqueCarrier
path: UniqueCarrier
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 9%)

############ Column(FlightNum) ############
name: FlightNum
path: FlightNum
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)

############ Column(ActualElapsedTime) ############
name: ActualElapsedTime
path: ActualElapsedTime
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

############ Column(Origin) ############
name: Origin
path: Origin
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 18%)

############ Column(Dest) ############
name: Dest
path: Dest
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 16%)

############ Column(Distance) ############
name: Distance
path: Distance
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: 1%)

############ Column(Diverted) ############
name: Diverted
path: Diverted
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE
compression: UNCOMPRESSED (space_saved: 0%)

@skirui-source
Copy link
Contributor

skirui-source commented Feb 16, 2023

Update:

For this issue, I will be testing the bigquery_dataproc_dask_xgboost.ipynb that shows how to use Dask to process dataset from big query leveraging Dask-BigQuery connector on a Dataproc cluster.

Whereas hpo_demo.ipynb will be tested on a EC2 instance as tracked by PR #243

@skirui-source
Copy link
Contributor

@jacobtomlinson I am seeing this error when Starting a YarnCluster

from dask.distributed import Client
from dask_yarn import YarnCluster
​
cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker", 
    worker_gpus=1, worker_vcores=4, worker_memory='24GB', 
    worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)
23/02/22 05:17:09 INFO client.RMProxy: Connecting to ResourceManager at test-dask-rapids-2212-m/10.138.0.4:8032
23/02/22 05:17:10 INFO client.AHSProxy: Connecting to Application History server at test-dask-rapids-2212-m/10.138.0.4:10200
23/02/22 05:17:10 INFO skein.Driver: Driver started, listening on 40903
23/02/22 05:17:10 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/conf.empty/resource-types.xml
23/02/22 05:17:10 INFO resource.ResourceUtils: Adding resource type - name = yarn.io/gpu, units = , type = COUNTABLE
23/02/22 05:17:10 INFO skein.Driver: Uploading application resources to hdfs://test-dask-rapids-2212-m/user/root/.skein/application_1677037421032_0005
23/02/22 05:17:11 INFO skein.Driver: Submitting application...
23/02/22 05:17:11 INFO impl.YarnClientImpl: Submitted application application_1677037421032_0005
/opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_yarn/core.py:687: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
  self.scheduler_comm.close_rpc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
client = Client(cluster)
client
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[8], line 1
----> 1 client = Client(cluster)
      2 client

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/distributed/client.py:884, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    881 elif isinstance(getattr(address, "scheduler_address", None), str):
    882     # It's a LocalCluster or LocalCluster-compatible object
    883     self.cluster = address
--> 884     status = self.cluster.status
    885     if status in (Status.closed, Status.closing):
    886         raise RuntimeError(
    887             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    888         )

AttributeError: 'YarnCluster' object has no attribute 'status'

@jacobtomlinson
Copy link
Member

Looks related to this issue dask/dask-yarn#155

@jacobtomlinson
Copy link
Member

As a workaround can you try client = Client(cluster.scheduler_address).

@skirui-source
Copy link
Contributor

@jacobtomlinson Blocked by this when attempting to read from BigQuery table.. I have tried following the instructions to enable authentication with service account but no success, we might need to pair on this?

{
  "error": {
    "code": 401,
    "message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.",
    "errors": [
      {
        "message": "Login Required.",
        "domain": "global",
        "reason": "required",
        "location": "Authorization",
        "locationType": "header"
      }
    ],
    "status": "UNAUTHENTICATED",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.ErrorInfo",
        "reason": "CREDENTIALS_MISSING",
        "domain": "googleapis.com",
        "metadata": {
          "method": "google.cloud.bigquery.v2.TableService.GetTable",
          "service": "bigquery.googleapis.com"
        }
      }
    ]
  }
}
TRACEBACK
Forbidden                                 Traceback (most recent call last)
Cell In[5], line 7
      4 n_workers = len(workers)
      5 print('Number of GPU workers: ', n_workers)
----> 7 ddf = dask_bigquery.read_gbq(
      8     project_id="k80-exploration",
      9     dataset_id="spark_rapids",
     10     table_id="nyc_taxi_0",
     11 )
     13 ddf.head()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/dask_bigquery/core.py:123, in read_gbq(project_id, dataset_id, table_id, row_filter, columns, max_stream_count, read_kwargs)
    121 read_kwargs = read_kwargs or {}
    122 with bigquery_clients(project_id) as (bq_client, bqs_client):
--> 123     table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
    124     if table_ref.table_type == "VIEW":
    125         raise TypeError("Table type VIEW not supported")

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:1011, in Client.get_table(self, table, retry, timeout)
   1009 path = table_ref.path
   1010 span_attributes = {"path": path}
-> 1011 api_response = self._call_api(
   1012     retry,
   1013     span_name="BigQuery.getTable",
   1014     span_attributes=span_attributes,
   1015     method="GET",
   1016     path=path,
   1017     timeout=timeout,
   1018 )
   1019 return Table.from_api_repr(api_response)

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/bigquery/client.py:759, in Client._call_api(self, retry, span_name, span_attributes, job_ref, headers, **kwargs)
    755 if span_name is not None:
    756     with create_span(
    757         name=span_name, attributes=span_attributes, client=self, job_ref=job_ref
    758     ):
--> 759         return call()
    761 return call()

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
    345 target = functools.partial(func, *args, **kwargs)
    346 sleep_generator = exponential_sleep_generator(
    347     self._initial, self._maximum, multiplier=self._multiplier
    348 )
--> 349 return retry_target(
    350     target,
    351     self._predicate,
    352     sleep_generator,
    353     self._timeout,
    354     on_error=on_error,
    355 )

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/api_core/retry.py:191, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
    189 for sleep in sleep_generator:
    190     try:
--> 191         return target()
    193     # pylint: disable=broad-except
    194     # This function explicitly must deal with broad exceptions.
    195     except Exception as exc:

File /opt/conda/miniconda3/envs/dask-rapids/lib/python3.9/site-packages/google/cloud/_http/__init__.py:494, in JSONConnection.api_request(self, method, path, query_params, data, content_type, headers, api_base_url, api_version, expect_json, _target_object, timeout, extra_api_info)
    482 response = self._make_request(
    483     method=method,
    484     url=url,
   (...)
    490     extra_api_info=extra_api_info,
    491 )
    493 if not 200 <= response.status_code < 300:
--> 494     raise exceptions.from_http_response(response)
    496 if expect_json and response.content:
    497     return response.json()

Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/k80-exploration/datasets/spark_rapids/tables/nyc_taxi_0?prettyPrint=false: Access Denied: Table k80-exploration:spark_rapids.nyc_taxi_0: Permission bigquery.tables.get denied on table k80-exploration:spark_rapids.nyc_taxi_0 (or it may not exist).

@jacobtomlinson
Copy link
Member

The error message also says or it may not exist so I had a poke through the k80-exploration project in the Google Cloud Console and couldn't find a dataset called spark_rapids or anything related to nyc_taxi data. Perhaps this dataset has been deleted?

@skirui-source skirui-source removed their assignment Aug 23, 2023
@ebaker-gh
Copy link

Client(cluster.scheduler_address)

Are there any plans for a version release that doesn't require a workaround like this?
In my case, the workaround did not help - Using Client(cluster.scheduler_address) instead of Client(cluster) results in the cluster never connecting.

python==3.9.16
dask==2023.9.2
dask-yarn=0.8.1  # (also tried 0.9)
distributed==2023.9.2

@skirui-source skirui-source self-assigned this Feb 2, 2024
@jacobtomlinson
Copy link
Member

Closing in favour of GoogleCloudDataproc/initialization-actions#1137

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants