Skip to content

Commit

Permalink
Support orc and text IO with dask-expr using legacy conversion (#15439)
Browse files Browse the repository at this point in the history
Related to orc and text support in #15027

Follow-up work can to enable predicate pushdown and column projection with ORC, but the goal of this PR is basic functionality (and parity with the legacy API).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15439
  • Loading branch information
rjzamora authored Apr 9, 2024
1 parent a2f625a commit 72b2759
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 11 deletions.
3 changes: 2 additions & 1 deletion python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ def inner_func(*args, **kwargs):
from .expr._collection import DataFrame, Index, Series

groupby_agg = raise_not_implemented_error("groupby_agg")
read_text = raise_not_implemented_error("read_text")
read_text = DataFrame.read_text
to_orc = raise_not_implemented_error("to_orc")

else:
from .core import DataFrame, Index, Series
from .groupby import groupby_agg
Expand Down
9 changes: 9 additions & 0 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,15 @@ def read_json(*args, engine="auto", **kwargs):
**kwargs,
)

@staticmethod
def read_orc(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf.io.orc import read_orc as legacy_read_orc

ddf = legacy_read_orc(*args, **kwargs)
return from_legacy_dataframe(ddf)


# Import/register cudf-specific classes for dask-expr
try:
Expand Down
12 changes: 12 additions & 0 deletions python/dask_cudf/dask_cudf/expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ def groupby(
**kwargs,
)

def to_orc(self, *args, **kwargs):
return self.to_legacy_dataframe().to_orc(*args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf.io.text import read_text as legacy_read_text

ddf = legacy_read_text(*args, **kwargs)
return from_legacy_dataframe(ddf)


class Series(VarMixin, DXSeries):
def groupby(self, by, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/io/orc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from io import BufferedWriter, IOBase

Expand Down Expand Up @@ -100,7 +100,7 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
**kwargs,
)

name = "read-orc-" + tokenize(fs_token, path, columns, **kwargs)
name = "read-orc-" + tokenize(fs_token, path, columns, filters, **kwargs)
dsk = {}
N = 0
for path, n in zip(paths, nstripes_per_file):
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import dask_cudf
from dask_cudf.tests.utils import skip_dask_expr

# No dask-expr support for dask_expr<=1.0.5
pytestmark = skip_dask_expr(lt_version="1.0.5+a")
# No dask-expr support for dask_expr<1.0.6
pytestmark = skip_dask_expr(lt_version="1.0.6")


def test_read_json_backend_dispatch(tmp_path):
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import dask_cudf
from dask_cudf.tests.utils import skip_dask_expr

# No dask-expr support
pytestmark = skip_dask_expr()
# No dask-expr support for dask_expr<1.0.6
pytestmark = skip_dask_expr(lt_version="1.0.6")

cur_dir = os.path.dirname(__file__)
sample_orc = os.path.join(cur_dir, "data/orc/sample.orc")
Expand Down
5 changes: 3 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ def test_dask_timeseries_from_dask(tmpdir, index, divisions):
)


@xfail_dask_expr("Categorical column support")
@pytest.mark.parametrize("index", [False, None])
@pytest.mark.parametrize("divisions", [False, True])
def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions):
fn = str(tmpdir)
ddf2 = dask_cudf.from_cudf(
cudf.datasets.timeseries(freq="D"), npartitions=4
)
ddf2.name = ddf2.name.astype("object")
# Use assign in lieu of `ddf2.name = ...`
# See: https://github.com/dask/dask-expr/issues/1010
ddf2 = ddf2.assign(name=ddf2.name.astype("object"))
ddf2.to_parquet(fn, write_index=index)
read_df = dask_cudf.read_parquet(
fn, index=index, calculate_divisions=divisions
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import dask_cudf
from dask_cudf.tests.utils import skip_dask_expr

# No dask-expr support
pytestmark = skip_dask_expr()
# No dask-expr support for dask_expr<1.0.6
pytestmark = skip_dask_expr(lt_version="1.0.6")

cur_dir = os.path.dirname(__file__)
text_file = os.path.join(cur_dir, "data/text/sample.pgn")
Expand Down

0 comments on commit 72b2759

Please sign in to comment.