From 72b2759b3987baa8fd3b07fab2ef5c7942d057aa Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 9 Apr 2024 10:58:21 -0500 Subject: [PATCH] Support orc and text IO with dask-expr using legacy conversion (#15439) Related to orc and text support in https://github.com/rapidsai/cudf/issues/15027 Follow-up work can to enable predicate pushdown and column projection with ORC, but the goal of this PR is basic functionality (and parity with the legacy API). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/15439 --- python/dask_cudf/dask_cudf/__init__.py | 3 ++- python/dask_cudf/dask_cudf/backends.py | 9 +++++++++ python/dask_cudf/dask_cudf/expr/_collection.py | 12 ++++++++++++ python/dask_cudf/dask_cudf/io/orc.py | 4 ++-- python/dask_cudf/dask_cudf/io/tests/test_json.py | 4 ++-- python/dask_cudf/dask_cudf/io/tests/test_orc.py | 4 ++-- python/dask_cudf/dask_cudf/io/tests/test_parquet.py | 5 +++-- python/dask_cudf/dask_cudf/io/tests/test_text.py | 4 ++-- 8 files changed, 34 insertions(+), 11 deletions(-) diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index c66e85ed2af..04c2ad65b99 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -51,8 +51,9 @@ def inner_func(*args, **kwargs): from .expr._collection import DataFrame, Index, Series groupby_agg = raise_not_implemented_error("groupby_agg") - read_text = raise_not_implemented_error("read_text") + read_text = DataFrame.read_text to_orc = raise_not_implemented_error("to_orc") + else: from .core import DataFrame, Index, Series from .groupby import groupby_agg diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index d05be30602e..5401bcd3767 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -699,6 +699,15 @@ def read_json(*args, engine="auto", **kwargs): **kwargs, ) + @staticmethod + def read_orc(*args, **kwargs): + from dask_expr import from_legacy_dataframe + + from dask_cudf.io.orc import read_orc as legacy_read_orc + + ddf = legacy_read_orc(*args, **kwargs) + return from_legacy_dataframe(ddf) + # Import/register cudf-specific classes for dask-expr try: diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py index 799e6eddab3..516e35a4335 100644 --- a/python/dask_cudf/dask_cudf/expr/_collection.py +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -81,6 +81,18 @@ def groupby( **kwargs, ) + def to_orc(self, *args, **kwargs): + return self.to_legacy_dataframe().to_orc(*args, **kwargs) + + @staticmethod + def read_text(*args, **kwargs): + from dask_expr import from_legacy_dataframe + + from dask_cudf.io.text import read_text as legacy_read_text + + ddf = legacy_read_text(*args, **kwargs) + return from_legacy_dataframe(ddf) + class Series(VarMixin, DXSeries): def groupby(self, by, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/orc.py b/python/dask_cudf/dask_cudf/io/orc.py index 49fea0d7602..bed69f038b0 100644 --- a/python/dask_cudf/dask_cudf/io/orc.py +++ b/python/dask_cudf/dask_cudf/io/orc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from io import BufferedWriter, IOBase @@ -100,7 +100,7 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs): **kwargs, ) - name = "read-orc-" + tokenize(fs_token, path, columns, **kwargs) + name = "read-orc-" + tokenize(fs_token, path, columns, filters, **kwargs) dsk = {} N = 0 for path, n in zip(paths, nstripes_per_file): diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index 8dcf3f05e89..a09dfbff188 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -12,8 +12,8 @@ import dask_cudf from dask_cudf.tests.utils import skip_dask_expr -# No dask-expr support for dask_expr<=1.0.5 -pytestmark = skip_dask_expr(lt_version="1.0.5+a") +# No dask-expr support for dask_expr<1.0.6 +pytestmark = skip_dask_expr(lt_version="1.0.6") def test_read_json_backend_dispatch(tmp_path): diff --git a/python/dask_cudf/dask_cudf/io/tests/test_orc.py b/python/dask_cudf/dask_cudf/io/tests/test_orc.py index 8ccb7a7bfe7..7be6c712511 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_orc.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_orc.py @@ -14,8 +14,8 @@ import dask_cudf from dask_cudf.tests.utils import skip_dask_expr -# No dask-expr support -pytestmark = skip_dask_expr() +# No dask-expr support for dask_expr<1.0.6 +pytestmark = skip_dask_expr(lt_version="1.0.6") cur_dir = os.path.dirname(__file__) sample_orc = os.path.join(cur_dir, "data/orc/sample.orc") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index df41ef77b7c..68460653119 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -185,7 +185,6 @@ def test_dask_timeseries_from_dask(tmpdir, index, divisions): ) -@xfail_dask_expr("Categorical column support") @pytest.mark.parametrize("index", [False, None]) @pytest.mark.parametrize("divisions", [False, True]) def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): @@ -193,7 +192,9 @@ def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): ddf2 = dask_cudf.from_cudf( cudf.datasets.timeseries(freq="D"), npartitions=4 ) - ddf2.name = ddf2.name.astype("object") + # Use assign in lieu of `ddf2.name = ...` + # See: https://github.com/dask/dask-expr/issues/1010 + ddf2 = ddf2.assign(name=ddf2.name.astype("object")) ddf2.to_parquet(fn, write_index=index) read_df = dask_cudf.read_parquet( fn, index=index, calculate_divisions=divisions diff --git a/python/dask_cudf/dask_cudf/io/tests/test_text.py b/python/dask_cudf/dask_cudf/io/tests/test_text.py index d3dcd386d0d..e3a9d380857 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_text.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_text.py @@ -11,8 +11,8 @@ import dask_cudf from dask_cudf.tests.utils import skip_dask_expr -# No dask-expr support -pytestmark = skip_dask_expr() +# No dask-expr support for dask_expr<1.0.6 +pytestmark = skip_dask_expr(lt_version="1.0.6") cur_dir = os.path.dirname(__file__) text_file = os.path.join(cur_dir, "data/text/sample.pgn")