From be70681457013ea822b28b80071ec8e86a81db34 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 11 Jun 2024 15:55:00 -0700 Subject: [PATCH 01/20] wrapper implementation --- src/nested_dask/__init__.py | 1 + src/nested_dask/utils/__init__.py | 1 + src/nested_dask/utils/utils.py | 54 +++++++++++++++++++++++++++++++ tests/nested_dask/test_utils.py | 17 ++++++++++ 4 files changed, 73 insertions(+) create mode 100644 src/nested_dask/utils/__init__.py create mode 100644 src/nested_dask/utils/utils.py create mode 100644 tests/nested_dask/test_utils.py diff --git a/src/nested_dask/__init__.py b/src/nested_dask/__init__.py index 4b83f63..c84f36d 100644 --- a/src/nested_dask/__init__.py +++ b/src/nested_dask/__init__.py @@ -2,4 +2,5 @@ from .core import NestedFrame # noqa from .io import read_parquet # noqa from .datasets import generate_data # noqa +from .utils import count_nested # noqa from ._version import __version__ # noqa diff --git a/src/nested_dask/utils/__init__.py b/src/nested_dask/utils/__init__.py new file mode 100644 index 0000000..ed5d0c5 --- /dev/null +++ b/src/nested_dask/utils/__init__.py @@ -0,0 +1 @@ +from .utils import * # noqa diff --git a/src/nested_dask/utils/utils.py b/src/nested_dask/utils/utils.py new file mode 100644 index 0000000..f115cac --- /dev/null +++ b/src/nested_dask/utils/utils.py @@ -0,0 +1,54 @@ +import nested_pandas as npd +import pandas as pd +from nested_pandas import utils as npd_utils + +from ..core import NestedFrame + + +def count_nested(df, nested, by=None, join=True) -> NestedFrame: + """Counts the number of rows of a nested dataframe. + + Wraps Nested-Pandas count_nested. + + Parameters + ---------- + df: NestedFrame + A NestedFrame that contains the desired `nested` series + to count. + nested: 'str' + The label of the nested series to count. + by: 'str', optional + Specifies a column within nested to count by, returning + a count for each unique value in `by`. + join: bool, optional + Join the output count columns to df and return df, otherwise + just return a NestedFrame containing only the count columns. + + Returns + ------- + NestedFrame + """ + + # The meta varies depending on the parameters + + # first depending on by + if by is not None: + # will have one column per unique value of the specified column + # requires some computation to determine these values + # TODO: Requires modification of nested-pandas to always produce + # sorted output columns for meta + by_cols = sorted(df[nested].nest.to_flat()[by].unique()) + out_cols = [f"n_{nested}_{col}" for col in by_cols] + else: + # otherwise just have a single column output + out_cols = [f"n_{nested}"] + + # add dtypes + meta = npd.NestedFrame({col: 0 for col in out_cols}, index=[]) + + # and second depending on join + if join: + # adds the meta onto the existing meta + meta = pd.concat([df.head(0), meta]) + + return df.map_partitions(lambda x: npd_utils.count_nested(x, nested, by=by, join=join), meta=meta) diff --git a/tests/nested_dask/test_utils.py b/tests/nested_dask/test_utils.py new file mode 100644 index 0000000..d23bc8f --- /dev/null +++ b/tests/nested_dask/test_utils.py @@ -0,0 +1,17 @@ +import nested_dask as nd +import pytest +from nested_pandas.utils import count_nested + + +@pytest.mark.parametrize("join", [True, False]) +@pytest.mark.parametrize("by", [None, "band"]) +def test_count_nested(test_dataset, join, by): + """test the count_nested wrapper""" + + # count_nested functionality is tested on the nested-pandas side + # let's just make sure the behavior here is identical. + + result_dsk = nd.utils.count_nested(test_dataset, "nested", join=join, by=by).compute() + result_pd = count_nested(test_dataset.compute(), "nested", join=join, by=by) + + assert result_dsk.equals(result_pd) From de80e9ed0e2ee6d3e21d03ded2de9eadd270bad5 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 1 Jul 2024 10:20:32 -0700 Subject: [PATCH 02/20] add testing of seed stability --- tests/nested_dask/test_datasets.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/nested_dask/test_datasets.py b/tests/nested_dask/test_datasets.py index 26f9ad7..7a2e66a 100644 --- a/tests/nested_dask/test_datasets.py +++ b/tests/nested_dask/test_datasets.py @@ -1,4 +1,5 @@ import nested_dask as nd +import pytest def test_generate_data(): @@ -18,3 +19,8 @@ def test_generate_data(): # test the length assert len(generate_1) == 10 assert len(generate_1.nested.nest.to_flat()) == 1000 + + # test seed stability + assert pytest.approx(generate_1.compute().loc[0]["a"], 0.1) == 0.417 + assert pytest.approx(generate_1.compute().loc[0]["b"], 0.1) == 0.838 + assert pytest.approx(generate_1.nested.nest.to_flat().compute().iloc[0]["t"], 0.1) == 16.015 From 9b8cf7c9fa374022e3865fa585faa58ebb6980ac Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 1 Jul 2024 10:30:43 -0700 Subject: [PATCH 03/20] try just iloc --- tests/nested_dask/test_accessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/nested_dask/test_accessor.py b/tests/nested_dask/test_accessor.py index 8cc7603..48c97b3 100644 --- a/tests/nested_dask/test_accessor.py +++ b/tests/nested_dask/test_accessor.py @@ -31,9 +31,9 @@ def test_to_flat(test_dataset): # Make sure we retain all rows assert len(flat_ztf.loc[1]) == 500 - one_row = flat_ztf.loc[1].compute().iloc[1] - assert pytest.approx(one_row["t"], 0.01) == 5.4584 - assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + one_row = flat_ztf.compute().iloc[0] + assert pytest.approx(one_row["t"], 0.01) == 6.5329 + assert pytest.approx(one_row["flux"], 0.01) == 19.0794 assert one_row["band"] == "r" From 9603e5175014cc4a4c75fbc1f4331f9d72392b02 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 1 Jul 2024 10:42:08 -0700 Subject: [PATCH 04/20] try from generate_data --- tests/nested_dask/test_accessor.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/nested_dask/test_accessor.py b/tests/nested_dask/test_accessor.py index 48c97b3..23ff703 100644 --- a/tests/nested_dask/test_accessor.py +++ b/tests/nested_dask/test_accessor.py @@ -1,6 +1,7 @@ import pandas as pd import pyarrow as pa import pytest +import nested_dask as nd def test_nest_accessor(test_dataset): @@ -19,6 +20,28 @@ def test_fields(test_dataset): assert test_dataset.nested.nest.fields == ["t", "flux", "band"] +def test_to_flat_gen(): + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + + flat_nf = nf.nested.nest.to_flat() + + # check dtypes + assert flat_nf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["band"] == pd.ArrowDtype(pa.string()) + + # Make sure we retain all rows + assert len(flat_nf.loc[1]) == 100 + + one_row = flat_nf.compute().iloc[0] + + assert pytest.approx(one_row["t"], 0.01) == 16.0149 + assert pytest.approx(one_row["flux"], 0.01) == 51.2061 + assert one_row["band"] == "r" + + + + def test_to_flat(test_dataset): """test the to_flat function""" flat_ztf = test_dataset.nested.nest.to_flat() From d0fa21c1c146d34f26e4114311e7c854a8a3bd9a Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 1 Jul 2024 10:57:22 -0700 Subject: [PATCH 05/20] use generate_data for accessor tests --- tests/nested_dask/test_accessor.py | 85 ++++++++++++++---------------- 1 file changed, 39 insertions(+), 46 deletions(-) diff --git a/tests/nested_dask/test_accessor.py b/tests/nested_dask/test_accessor.py index 23ff703..2d89adf 100644 --- a/tests/nested_dask/test_accessor.py +++ b/tests/nested_dask/test_accessor.py @@ -1,7 +1,7 @@ +import nested_dask as nd import pandas as pd import pyarrow as pa import pytest -import nested_dask as nd def test_nest_accessor(test_dataset): @@ -20,7 +20,8 @@ def test_fields(test_dataset): assert test_dataset.nested.nest.fields == ["t", "flux", "band"] -def test_to_flat_gen(): +def test_to_flat(): + """test the to_flat function""" nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) flat_nf = nf.nested.nest.to_flat() @@ -40,77 +41,69 @@ def test_to_flat_gen(): assert one_row["band"] == "r" - - -def test_to_flat(test_dataset): +def test_to_flat_with_fields(): """test the to_flat function""" - flat_ztf = test_dataset.nested.nest.to_flat() - - # check dtypes - assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["band"] == pd.ArrowDtype(pa.large_string()) - - # Make sure we retain all rows - assert len(flat_ztf.loc[1]) == 500 - - one_row = flat_ztf.compute().iloc[0] - assert pytest.approx(one_row["t"], 0.01) == 6.5329 - assert pytest.approx(one_row["flux"], 0.01) == 19.0794 - assert one_row["band"] == "r" + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + flat_nf = nf.nested.nest.to_flat(fields=["t", "flux"]) -def test_to_flat_with_fields(test_dataset): - """test the to_flat function""" - flat_ztf = test_dataset.nested.nest.to_flat(fields=["t", "flux"]) + assert "band" not in flat_nf.columns # check dtypes - assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) # Make sure we retain all rows - assert len(flat_ztf.loc[1]) == 500 + assert len(flat_nf.loc[1]) == 100 + + one_row = flat_nf.compute().iloc[0] - one_row = flat_ztf.loc[1].compute().iloc[1] - assert pytest.approx(one_row["t"], 0.01) == 5.4584 - assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + assert pytest.approx(one_row["t"], 0.01) == 16.0149 + assert pytest.approx(one_row["flux"], 0.01) == 51.2061 -def test_to_lists(test_dataset): +def test_to_lists(): """test the to_lists function""" - list_ztf = test_dataset.nested.nest.to_lists() + + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + list_nf = nf.nested.nest.to_lists() # check dtypes - assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.large_string())) + assert list_nf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.string())) # Make sure we have a single row for an id - assert len(list_ztf.loc[1]) == 1 + assert len(list_nf.loc[1]) == 1 # Make sure we retain all rows -- double loc for speed and pandas get_item - assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + assert len(list_nf.loc[1].compute().loc[1]["t"]) == 100 + one_row = list_nf.compute().iloc[1] # spot-check values - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 - assert list_ztf.loc[1].compute().loc[1]["band"][0] == "g" + assert pytest.approx(one_row["t"][0], 0.01) == 19.3652 + assert pytest.approx(one_row["flux"][0], 0.01) == 61.7461 + assert one_row["band"][0] == "g" -def test_to_lists_with_fields(test_dataset): +def test_to_lists_with_fields(): """test the to_lists function""" - list_ztf = test_dataset.nested.nest.to_lists(fields=["t", "flux"]) + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + list_nf = nf.nested.nest.to_lists(fields=["t", "flux"]) + + assert "band" not in list_nf.columns # check dtypes - assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) # Make sure we have a single row for an id - assert len(list_ztf.loc[1]) == 1 + assert len(list_nf.loc[1]) == 1 # Make sure we retain all rows -- double loc for speed and pandas get_item - assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + assert len(list_nf.loc[1].compute().loc[1]["t"]) == 100 + one_row = list_nf.compute().iloc[1] # spot-check values - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 + assert pytest.approx(one_row["t"][0], 0.01) == 19.3652 + assert pytest.approx(one_row["flux"][0], 0.01) == 61.7461 From 668d38dfd2b1d5a6278ab38ce411b4a3ad09e308 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 1 Jul 2024 11:28:24 -0700 Subject: [PATCH 06/20] Fix broken readme links --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 54e1cfd..78c5907 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ [![Read The Docs](https://img.shields.io/readthedocs/nested-dask)](https://nested-dask.readthedocs.io/) [![Benchmarks](https://img.shields.io/github/actions/workflow/status/lincc-frameworks/nested-dask/asv-main.yml?label=benchmarks)](https://lincc-frameworks.github.io/nested-dask/) -A ![dask](https://www.dask.org/) extension of -![nested-pandas](https://nested-pandas.readthedocs.io/en/latest/). +A [dask](https://www.dask.org/) extension of +[nested-pandas](https://nested-pandas.readthedocs.io/en/latest/). Nested-pandas is a pandas extension package that empowers efficient analysis of nested associated datasets. This package wraps the majority of the From 9ba0f7c572039d1edfd83f4469e69f8ae807708c Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 3 Jul 2024 10:37:38 -0700 Subject: [PATCH 07/20] add wrapper functions --- docs/tutorials/loading_data.ipynb | 6 +- docs/tutorials/work_with_lsdb.ipynb | 2 +- src/nested_dask/core.py | 117 ++++++++++++++++++++++++- src/nested_dask/datasets/generation.py | 2 +- tests/nested_dask/conftest.py | 12 +-- tests/nested_dask/test_nestedframe.py | 51 +++++++++++ 6 files changed, 176 insertions(+), 14 deletions(-) diff --git a/docs/tutorials/loading_data.ipynb b/docs/tutorials/loading_data.ipynb index ddd9843..242314a 100644 --- a/docs/tutorials/loading_data.ipynb +++ b/docs/tutorials/loading_data.ipynb @@ -28,7 +28,7 @@ "source": [ "## From Nested-Pandas\n", "\n", - "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_nested_pandas` class function." + "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_pandas` class function." ] }, { @@ -48,7 +48,7 @@ "nf = nf.add_nested(nested, \"nested\")\n", "\n", "# Convert to Nested-Dask NestedFrame\n", - "nf = nd.NestedFrame.from_nested_pandas(nf)\n", + "nf = nd.NestedFrame.from_pandas(nf)\n", "nf" ] }, @@ -225,7 +225,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/docs/tutorials/work_with_lsdb.ipynb b/docs/tutorials/work_with_lsdb.ipynb index f028496..c3fed88 100644 --- a/docs/tutorials/work_with_lsdb.ipynb +++ b/docs/tutorials/work_with_lsdb.ipynb @@ -439,7 +439,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.6" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 814aadc..13f418e 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -64,7 +64,7 @@ def __getitem__(self, key): return result @classmethod - def from_nested_pandas( + def from_pandas( cls, data, npartitions=None, @@ -72,11 +72,11 @@ def from_nested_pandas( sort=True, ) -> NestedFrame: """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas - NestedFrame. + NestedFrame or Pandas DataFrame. Parameters ---------- - data: `NestedFrame` + data: `NestedFrame` or `DataFrame` Nested-Pandas NestedFrame containing the underlying data npartitions: `int`, optional The number of partitions of the index to create. Note that depending on @@ -112,6 +112,117 @@ def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame: """ return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy())) + @classmethod + def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True): + """ + Create Nested-Dask NestedFrames from many Dask Delayed objects. + + Docstring is copied from `dask.dataframe.from_delayed`. + + Parameters + ---------- + dfs : + A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either + of these objects, e.g. returned by ``client.submit``. These comprise the + individual partitions of the resulting dataframe. + If a single object is provided (not an iterable), then the resulting dataframe + will have only one partition. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + prefix : + Prefix to prepend to the keys. + verify_meta : + If True check that the partitions have consistent metadata, defaults to True. + + """ + nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta) + return NestedFrame.from_dask_dataframe(nf) + + @classmethod + def from_map( + cls, + func, + *iterables, + args=None, + meta=None, + divisions=None, + label=None, + enforce_metadata=True, + **kwargs, + ): + """ + Create a DataFrame collection from a custom function map + + WARNING: The ``from_map`` API is experimental, and stability is not + yet guaranteed. Use at your own risk! + + Parameters + ---------- + func : callable + Function used to create each partition. If ``func`` satisfies the + ``DataFrameIOFunction`` protocol, column projection will be enabled. + *iterables : Iterable objects + Iterable objects to map to each output partition. All iterables must + be the same length. This length determines the number of partitions + in the output collection (only one element of each iterable will + be passed to ``func`` for each partition). + args : list or tuple, optional + Positional arguments to broadcast to each output partition. Note + that these arguments will always be passed to ``func`` after the + ``iterables`` positional arguments. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : tuple, str, optional + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + label : str, optional + String to use as the function-name label in the output + collection-key names. + enforce_metadata : bool, default True + Whether to enforce at runtime that the structure of the DataFrame + produced by ``func`` actually matches the structure of ``meta``. + This will rename and reorder columns for each partition, + and will raise an error if this doesn't work, + but it won't raise if dtypes don't match. + **kwargs: + Key-word arguments to broadcast to each output partition. These + same arguments will be passed to ``func`` for every output partition. + """ + nf = dd.from_map( + func, + *iterables, + args=args, + meta=meta, + divisions=divisions, + label=label, + enforce_metadata=enforce_metadata, + **kwargs, + ) + return NestedFrame.from_dask_dataframe(nf) + def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs)) diff --git a/src/nested_dask/datasets/generation.py b/src/nested_dask/datasets/generation.py index 6c283f3..70873d7 100644 --- a/src/nested_dask/datasets/generation.py +++ b/src/nested_dask/datasets/generation.py @@ -37,7 +37,7 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame: base_nf = datasets.generate_data(n_base, n_layer, seed=seed) # Convert to nested-dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=npartitions) return base_nf diff --git a/tests/nested_dask/conftest.py b/tests/nested_dask/conftest.py index 778553b..bdcfad7 100644 --- a/tests/nested_dask/conftest.py +++ b/tests/nested_dask/conftest.py @@ -23,8 +23,8 @@ def test_dataset(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -53,8 +53,8 @@ def test_dataset_with_nans(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -78,7 +78,7 @@ def test_dataset_no_add_nested(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return (base_nd, layer_nd) diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index cc21a4d..d110ecd 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -1,5 +1,6 @@ import dask.dataframe as dd import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pytest @@ -186,3 +187,53 @@ def test_from_epyc(): # just make sure the result was successfully computed assert len(result) == 9817 + + +@pytest.mark.parametrize("pkg", ["pandas", "nested-pandas"]) +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_pandas(pkg, with_nested): + """Test that from_pandas returns a NestedFrame""" + + if pkg == "pandas": + df = pd.DataFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + elif pkg == "nested-pandas": + df = npd.NestedFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + if with_nested: + nested = npd.NestedFrame({"b": [5, 10, 15, 20, 25, 30]}, index=[1, 1, 2, 2, 3, 3]) + df = df.add_nested(nested, "nested") + + ndf = nd.NestedFrame.from_pandas(df) + assert isinstance(ndf, nd.NestedFrame) + + +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_delayed(with_nested): + """Test that from_delayed returns a NestedFrame""" + + nf = nd.datasets.generate_data(10, 10) + if not with_nested: + nf = nf.drop("nested", axis=1) + + delayed = nf.to_delayed() + + ndf = nd.NestedFrame.from_delayed(dfs=delayed, meta=nf._meta) + assert isinstance(ndf, nd.NestedFrame) + + +def test_from_map(test_dataset, tmp_path): + """Test that from_map returns a NestedFrame""" + + # Setup a temporary directory for files + test_save_path = tmp_path / "test_dataset" + + # Save Base to Parquet + test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True) + + # Load from_map + paths = [ + tmp_path / "test_dataset" / "0.parquet", + tmp_path / "test_dataset" / "1.parquet", + tmp_path / "test_dataset" / "2.parquet", + ] + ndf = nd.NestedFrame.from_map(nd.read_parquet, paths, meta=test_dataset[["a", "b"]]._meta) + assert isinstance(ndf, nd.NestedFrame) From ddd764a7e871788ab0b999c7220500cf3efe240e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 3 Jul 2024 10:44:42 -0700 Subject: [PATCH 08/20] from_pandas switch --- benchmarks/benchmarks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 26fc7be..a220f98 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -31,8 +31,8 @@ def _generate_benchmark_data(add_nested=True): layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() # Convert to Dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5) - layer_nf = nd.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=5) + layer_nf = nd.NestedFrame.from_pandas(layer_nf).repartition(npartitions=50) # Return based on add_nested if add_nested: From ae39e3a01917590bd25b33acb2c0989904d821c3 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 15 Jul 2024 15:57:45 -0700 Subject: [PATCH 09/20] upgrade to nested-pandas v0.1.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9115223..919c1aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ - 'nested-pandas==0.1.1', + 'nested-pandas==0.1.2', 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', From 5cf63205ae91c3b434f794e366101a7c87c5dfb4 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 16 Jul 2024 10:04:12 -0700 Subject: [PATCH 10/20] switch head(0) to use _meta directly --- src/nested_dask/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nested_dask/utils/utils.py b/src/nested_dask/utils/utils.py index f115cac..e6c250d 100644 --- a/src/nested_dask/utils/utils.py +++ b/src/nested_dask/utils/utils.py @@ -49,6 +49,6 @@ def count_nested(df, nested, by=None, join=True) -> NestedFrame: # and second depending on join if join: # adds the meta onto the existing meta - meta = pd.concat([df.head(0), meta]) + meta = pd.concat([df._meta, meta]) return df.map_partitions(lambda x: npd_utils.count_nested(x, nested, by=by, join=join), meta=meta) From 511900c905318dc6351bd91a1486fe3d2df41cfb Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 1 Aug 2024 13:42:01 -0700 Subject: [PATCH 11/20] bump to nested-pandas 0.1.3 --- pyproject.toml | 2 +- src/nested_dask/utils/utils.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 919c1aa..c84b873 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ - 'nested-pandas==0.1.2', + 'nested-pandas==0.1.3', 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', diff --git a/src/nested_dask/utils/utils.py b/src/nested_dask/utils/utils.py index e6c250d..163f97b 100644 --- a/src/nested_dask/utils/utils.py +++ b/src/nested_dask/utils/utils.py @@ -35,8 +35,6 @@ def count_nested(df, nested, by=None, join=True) -> NestedFrame: if by is not None: # will have one column per unique value of the specified column # requires some computation to determine these values - # TODO: Requires modification of nested-pandas to always produce - # sorted output columns for meta by_cols = sorted(df[nested].nest.to_flat()[by].unique()) out_cols = [f"n_{nested}_{col}" for col in by_cols] else: From 24efdaf0eef56dfd5523acbb69428b57b39b79ea Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 15 Aug 2024 14:24:00 -0700 Subject: [PATCH 12/20] get and set interface --- src/nested_dask/core.py | 74 +++++++++++++++++++++++++-- tests/nested_dask/test_nestedframe.py | 50 ++++++++++++++++++ 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 13f418e..886700c 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -5,9 +5,11 @@ import dask.dataframe as dd import dask_expr as dx import nested_pandas as npd +import pandas as pd +import pyarrow as pa from dask_expr._collection import new_collection from nested_pandas.series.dtype import NestedDtype -from nested_pandas.series.packer import pack_flat +from nested_pandas.series.packer import pack, pack_flat from pandas._libs import lib from pandas._typing import AnyAll, Axis, IndexLabel from pandas.api.extensions import no_default @@ -59,9 +61,64 @@ class NestedFrame( _partition_type = npd.NestedFrame # Tracks the underlying data type - def __getitem__(self, key): - result = super().__getitem__(key) - return result + def __getitem__(self, item): + """Adds custom __getitem__ functionality for nested columns""" + if isinstance(item, str) and self._is_known_hierarchical_column(item): + nested, col = item.split(".") + meta = pd.Series(name=col, dtype=pd.ArrowDtype(self.dtypes[nested].fields[col])) + return self.map_partitions(lambda x: x[nested].nest.get_flat_series(col), meta=meta) + else: + return super().__getitem__(item) + + def _nested_meta_from_flat(self, flat, name): + """construct meta for a packed series from a flat dataframe""" + pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes + pyarrow_fields = {} # grab underlying pyarrow dtypes + for field, dtype in pd_fields.items(): + if hasattr(dtype, "pyarrow_dtype"): + pyarrow_fields[field] = dtype.pyarrow_dtype + else: # or convert from numpy types + pyarrow_fields[field] = pa.from_numpy_dtype(dtype) + return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields)) + + def __setitem__(self, key, value): + """Adds custom __setitem__ behavior for nested columns""" + + # Replacing or adding columns to a nested structure + # Allows statements like ndf["nested.t"] = ndf["nested.t"] - 5 + # Or ndf["nested.base_t"] = ndf["nested.t"] - 5 + # Performance note: This requires building a new nested structure + if self._is_known_hierarchical_column(key) or ( + "." in key and key.split(".")[0] in self.nested_columns + ): + nested, col = key.split(".") + + # View the nested column as a flat df + new_flat = self[nested].nest.to_flat() + new_flat[col] = value + + # Handle strings specially + if isinstance(value, str): + new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())}) + + # pack the modified df back into a nested column + meta = self._nested_meta_from_flat(new_flat, nested) + packed = new_flat.map_partitions(lambda x: pack(x), meta=meta) + return super().__setitem__(nested, packed) + + # Adding a new nested structure from a column + # Allows statements like ndf["new_nested.t"] = ndf["nested.t"] - 5 + elif "." in key: + new_nested, col = key.split(".") + if isinstance(value, dd.Series): + value.name = col + value = value.to_frame() + + meta = self._nested_meta_from_flat(value, new_nested) + packed = value.map_partitions(lambda x: pack(x), meta=meta) + return super().__setitem__(new_nested, packed) + + return super().__setitem__(key, value) @classmethod def from_pandas( @@ -246,6 +303,15 @@ def nested_columns(self) -> list: nest_cols.append(column) return nest_cols + def _is_known_hierarchical_column(self, colname) -> bool: + """Determine whether a string is a known hierarchical column name""" + if "." in colname: + left, right = colname.split(".") + if left in self.nested_columns: + return right in self.all_columns[left] + return False + return False + def add_nested(self, nested, name, how="outer") -> NestedFrame: # type: ignore[name-defined] # noqa: F821 """Packs a dataframe into a nested column diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index d110ecd..5dead59 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd import pytest +from nested_dask.datasets import generate_data from nested_pandas.series.dtype import NestedDtype @@ -39,6 +40,55 @@ def test_nested_columns(test_dataset): assert test_dataset.nested_columns == ["nested"] +def test_getitem_on_nested(): + """test getitem with nested columns""" + ndf = generate_data(10, 10, npartitions=3, seed=1) + + nest_col = ndf["nested.t"] + + assert len(nest_col) == 100 + assert nest_col.name == "t" + + +def test_set_or_replace_nested_col(): + """Test that __setitem__ can set or replace a column in a existing nested structure""" + + ndf = generate_data(10, 10, npartitions=3, seed=1) + + # test direct replacement, with ints + orig_t_head = ndf["nested.t"].head(10, npartitions=-1) + + ndf["nested.t"] = ndf["nested.t"] + 1 + assert np.array_equal(ndf["nested.t"].head(10).values.to_numpy(), orig_t_head.values.to_numpy() + 1) + + # test direct replacement, with str + ndf["nested.band"] = "lsst" + assert np.all(ndf["nested.band"].compute().values.to_numpy() == "lsst") + + # test setting a new column within nested + ndf["nested.t_plus_flux"] = ndf["nested.t"] + ndf["nested.flux"] + + true_vals = (ndf["nested.t"] + ndf["nested.flux"]).head(10).values.to_numpy() + assert np.array_equal(ndf["nested.t_plus_flux"].head(10).values.to_numpy(), true_vals) + + +def test_set_new_nested_col(): + """Test that __setitem__ can create a new nested structure""" + + ndf = generate_data(10, 10, npartitions=3, seed=1) + + # assign column in new nested structure from columns in nested + ndf["new_nested.t_plus_flux"] = ndf["nested.t"] + ndf["nested.flux"] + + assert "new_nested" in ndf.nested_columns + assert "t_plus_flux" in ndf["new_nested"].nest.fields + + assert np.array_equal( + ndf["new_nested.t_plus_flux"].compute().values.to_numpy(), + ndf["nested.t"].compute().values.to_numpy() + ndf["nested.flux"].compute().values.to_numpy(), + ) + + def test_add_nested(test_dataset_no_add_nested): """test the add_nested function""" base, layer = test_dataset_no_add_nested From d81f5fd2bd2c0585a0309f367d3fe814fec057f6 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 23 Aug 2024 11:42:07 -0700 Subject: [PATCH 13/20] from_flat and tests --- src/nested_dask/core.py | 80 ++++++++++++++++++++++----- tests/nested_dask/test_nestedframe.py | 70 +++++++++++++++++++++++ 2 files changed, 137 insertions(+), 13 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 886700c..df4f032 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -46,6 +46,18 @@ def _rebuild(self, graph, func, args): # type: ignore return collection +def _nested_meta_from_flat(flat, name): + """construct meta for a packed series from a flat dataframe""" + pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes + pyarrow_fields = {} # grab underlying pyarrow dtypes + for field, dtype in pd_fields.items(): + if hasattr(dtype, "pyarrow_dtype"): + pyarrow_fields[field] = dtype.pyarrow_dtype + else: # or convert from numpy types + pyarrow_fields[field] = pa.from_numpy_dtype(dtype) + return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields)) + + class NestedFrame( _Frame, dd.DataFrame ): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) @@ -70,17 +82,6 @@ def __getitem__(self, item): else: return super().__getitem__(item) - def _nested_meta_from_flat(self, flat, name): - """construct meta for a packed series from a flat dataframe""" - pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes - pyarrow_fields = {} # grab underlying pyarrow dtypes - for field, dtype in pd_fields.items(): - if hasattr(dtype, "pyarrow_dtype"): - pyarrow_fields[field] = dtype.pyarrow_dtype - else: # or convert from numpy types - pyarrow_fields[field] = pa.from_numpy_dtype(dtype) - return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields)) - def __setitem__(self, key, value): """Adds custom __setitem__ behavior for nested columns""" @@ -102,7 +103,7 @@ def __setitem__(self, key, value): new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())}) # pack the modified df back into a nested column - meta = self._nested_meta_from_flat(new_flat, nested) + meta = _nested_meta_from_flat(new_flat, nested) packed = new_flat.map_partitions(lambda x: pack(x), meta=meta) return super().__setitem__(nested, packed) @@ -114,7 +115,7 @@ def __setitem__(self, key, value): value.name = col value = value.to_frame() - meta = self._nested_meta_from_flat(value, new_nested) + meta = _nested_meta_from_flat(value, new_nested) packed = value.map_partitions(lambda x: pack(x), meta=meta) return super().__setitem__(new_nested, packed) @@ -280,6 +281,59 @@ def from_map( ) return NestedFrame.from_dask_dataframe(nf) + @classmethod + def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nested"): + """Creates a NestedFrame with base and nested columns from a flat + dataframe. + + Parameters + ---------- + df: pd.DataFrame or NestedFrame + A flat dataframe. + base_columns: list-like + The columns that should be used as base (flat) columns in the + output dataframe. + nested_columns: list-like, or None + The columns that should be packed into a nested column. All columns + in the list will attempt to be packed into a single nested column + with the name provided in `nested_name`. If None, is defined as all + columns not in `base_columns`. + index: str, or None + The name of a column to use as the new index. Typically, the index + should have a unique value per row for base columns, and should + repeat for nested columns. For example, a dataframe with two + columns; a=[1,1,1,2,2,2] and b=[5,10,15,20,25,30] would want an + index like [0,0,0,1,1,1] if a is chosen as a base column. If not + provided the current index will be used. + name: + The name of the output column the `nested_columns` are packed into. + + Returns + ------- + NestedFrame + A NestedFrame with the specified nesting structure. + """ + # Handle the meta + # Pathway 1: Some base columns and one nested column -> nestedframe + # Pathway 2: Only a single nested column -> nestedframe as defined in npd + # Pathway 3: Only a set of base columns, technically possible -> nestedframe + + if nested_columns is None: + nested_columns = [col for col in df.columns if (col not in base_columns) and col != index] + + meta = npd.NestedFrame(df[base_columns]._meta) + + if len(nested_columns) > 0: + nested_meta = _nested_meta_from_flat(df[nested_columns], name) + meta = meta.join(nested_meta) + + return df.map_partitions( + lambda x: npd.NestedFrame.from_flat( + df=x, base_columns=base_columns, nested_columns=nested_columns, index=index, name=name + ), + meta=meta, + ) + def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs)) diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index 5dead59..0546a08 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -108,6 +108,76 @@ def test_add_nested(test_dataset_no_add_nested): assert len(base_with_nested.compute()) == 50 +def test_from_flat(): + """Test the from_flat wrapping, make sure meta is assigned correctly""" + + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "a": [1, 1, 1, 2, 2, 2], + "b": [2, 2, 2, 4, 4, 4], + "c": [1, 2, 3, 4, 5, 6], + "d": [2, 4, 6, 8, 10, 12], + }, + index=[0, 0, 0, 1, 1, 1], + ) + ) + + # Check full inputs + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check omitting a base column + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a"], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["a", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check omitting a nested column + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"], nested_columns=["d"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check no base columns + ndf = nd.NestedFrame.from_flat(nf, base_columns=[], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check inferred nested columns + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check using an index + ndf = nd.NestedFrame.from_flat(nf, base_columns=["b"], index="a") + assert list(ndf.columns) == ["b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + def test_query_on_base(test_dataset): """test the query function on base columns""" From e70db6c82f7688999a45b681c7a73e458abf3b90 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 26 Aug 2024 14:01:49 -0700 Subject: [PATCH 14/20] from_lists implementation and tests --- src/nested_dask/core.py | 102 ++++++++++++++++++++++++-- tests/nested_dask/test_io.py | 4 + tests/nested_dask/test_nestedframe.py | 82 +++++++++++++++++++++ 3 files changed, 181 insertions(+), 7 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index df4f032..3a4381f 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -288,7 +288,7 @@ def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nest Parameters ---------- - df: pd.DataFrame or NestedFrame + df: dd.DataFrame or nd.NestedFrame A flat dataframe. base_columns: list-like The columns that should be used as base (flat) columns in the @@ -313,16 +313,13 @@ def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nest NestedFrame A NestedFrame with the specified nesting structure. """ - # Handle the meta - # Pathway 1: Some base columns and one nested column -> nestedframe - # Pathway 2: Only a single nested column -> nestedframe as defined in npd - # Pathway 3: Only a set of base columns, technically possible -> nestedframe + + # Handle meta + meta = npd.NestedFrame(df[base_columns]._meta) if nested_columns is None: nested_columns = [col for col in df.columns if (col not in base_columns) and col != index] - meta = npd.NestedFrame(df[base_columns]._meta) - if len(nested_columns) > 0: nested_meta = _nested_meta_from_flat(df[nested_columns], name) meta = meta.join(nested_meta) @@ -334,6 +331,97 @@ def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nest meta=meta, ) + @classmethod + def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): + """Creates a NestedFrame with base and nested columns from a flat + dataframe. + + Parameters + ---------- + df: dd.DataFrame or nd.NestedFrame + A dataframe with list columns. + base_columns: list-like, or None + Any columns that have non-list values in the input df. These will + simply be kept as identical columns in the result + list_columns: list-like, or None + The list-value columns that should be packed into a nested column. + All columns in the list will attempt to be packed into a single + nested column with the name provided in `nested_name`. All columns + in list_columns must have pyarrow list dtypes, otherwise the + operation will fail. If None, is defined as all columns not in + `base_columns`. + name: + The name of the output column the `nested_columns` are packed into. + + Returns + ------- + NestedFrame + A NestedFrame with the specified nesting structure. + + Note + ---- + As noted above, all columns in `list_columns` must have a pyarrow + ListType dtype. This is needed for proper meta propagation. To convert + a list column to this dtype, you can use this command structure: + `nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))})` + + Where pa.int64 above should be replaced with the correct dtype of the + underlying data accordingly. + + Additionally, it's a known issue in Dask + (https://github.com/dask/dask/issues/10139) that columns with list + values will by default be converted to the string type. This will + interfere with the ability to recast these to pyarrow lists. We + recommend setting the following dask config setting to prevent this: + `dask.config.set({"dataframe.convert-string":False})` + + """ + + # Handle meta + # meta = npd.NestedFrame(df[base_columns]._meta) + + # Resolve inputs for meta + if base_columns is None: + if list_columns is None: + # with no inputs, assume all columns are list-valued + list_columns = df.columns + else: + # if list_columns are defined, assume everything else is base + base_columns = [col for col in df.columns if col not in list_columns] + else: + if list_columns is None: + # with defined base_columns, assume everything else is list + list_columns = [col for col in df.columns if col not in base_columns] + + # from_lists should have at least one list column defined + if len(list_columns) == 0: + raise ValueError("No columns were assigned as list columns.") + else: + # reject any list columns that are not pyarrow dtyped + for col in list_columns: + if not hasattr(df[col].dtype, "pyarrow_dtype"): + raise TypeError( + f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. +Refer to the docstring for guidance on dtype requirements and assignment.""" + ) + elif not pa.types.is_list(df[col].dtype.pyarrow_dtype): + raise TypeError( + f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. +Refer to the docstring for guidance on dtype requirements and assignment.""" + ) + + meta = npd.NestedFrame(df[base_columns]._meta) + + nested_meta = _nested_meta_from_flat(df[list_columns], name) + meta = meta.join(nested_meta) + + return df.map_partitions( + lambda x: npd.NestedFrame.from_lists( + df=x, base_columns=base_columns, list_columns=list_columns, name=name + ), + meta=meta, + ) + def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs)) diff --git a/tests/nested_dask/test_io.py b/tests/nested_dask/test_io.py index c02c051..40baa32 100644 --- a/tests/nested_dask/test_io.py +++ b/tests/nested_dask/test_io.py @@ -1,4 +1,6 @@ import nested_dask as nd +import pandas as pd +import pyarrow as pa def test_read_parquet(test_dataset, tmp_path): @@ -19,6 +21,8 @@ def test_read_parquet(test_dataset, tmp_path): base = nd.read_parquet(test_save_path, calculate_divisions=True) nested = nd.read_parquet(nested_save_path, calculate_divisions=True) + # this is read as a large_string, just make it a string + nested = nested.astype({"band": pd.ArrowDtype(pa.string())}) base = base.add_nested(nested, "nested") # Check the loaded dataset against the original diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index 0546a08..632b449 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -1,12 +1,16 @@ +import dask import dask.dataframe as dd import nested_dask as nd import nested_pandas as npd import numpy as np import pandas as pd +import pyarrow as pa import pytest from nested_dask.datasets import generate_data from nested_pandas.series.dtype import NestedDtype +dask.config.set({"dataframe.convert-string": False}) + def test_nestedframe_construction(test_dataset): """test the construction of a nestedframe""" @@ -178,6 +182,82 @@ def test_from_flat(): assert len(ndf_comp) == 2 +def test_from_lists(): + """Test the from_lists wrapping, make sure meta is assigned correctly""" + + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "c": [1, 2, 3], + "d": [2, 4, 6], + "e": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + "f": [["dog", "cat", "bird"], ["dog", "cat", "bird"], ["dog", "cat", "bird"]], + }, + index=[0, 1, 2], + ) + ) + nf = nf.astype({"e": pd.ArrowDtype(pa.list_(pa.int64())), "f": pd.ArrowDtype(pa.list_(pa.string()))}) + + # Check with just base_columns + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c", "d"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with just list_columns + ndf = nd.NestedFrame.from_lists(nf, list_columns=["e", "f"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with base subset + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c"], list_columns=["e", "f"]) + assert list(ndf.columns) == ["c", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with list subset + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c", "d"], list_columns=["f"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + +def test_from_lists_errors(): + """test that the dtype errors are appropriately raised""" + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "c": [1, 2, 3], + "d": [2, 4, 6], + "e": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + "f": [["dog", "cat", "bird"], ["dog", "cat", "bird"], ["dog", "cat", "bird"]], + }, + index=[0, 1, 2], + ) + ) + # first check for no list_column error + with pytest.raises(ValueError): + nd.NestedFrame.from_lists(nf, base_columns=["c", "d", "e", "f"]) + + # next check for non-pyarrow dtype in list_column + with pytest.raises(TypeError): + nd.NestedFrame.from_lists(nf, base_columns=["e"]) + + # And check for non-list pyarrow type in list_column + nf = nf.astype({"d": pd.ArrowDtype(pa.int64())}) + with pytest.raises(TypeError): + nd.NestedFrame.from_lists(nf, base_columns=["d"]) + + def test_query_on_base(test_dataset): """test the query function on base columns""" @@ -269,6 +349,8 @@ def test_to_parquet_by_layer(test_dataset, tmp_path): loaded_base = nd.read_parquet(test_save_path / "base", calculate_divisions=True) loaded_nested = nd.read_parquet(test_save_path / "nested", calculate_divisions=True) + # this is read as a large_string, just make it a string + loaded_nested = loaded_nested.astype({"band": pd.ArrowDtype(pa.string())}) loaded_dataset = loaded_base.add_nested(loaded_nested, "nested") # Check for equivalence From e78ffc7ba5c90ca0142d39712d67c2adbeb5755e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 26 Aug 2024 14:25:40 -0700 Subject: [PATCH 15/20] update nested-pandas dep to 0.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c84b873..9f10002 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ - 'nested-pandas==0.1.3', + 'nested-pandas==0.2.0', 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', From 83147fe40043a3703d952fe2823c08e4cf0bf843 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 26 Aug 2024 14:31:19 -0700 Subject: [PATCH 16/20] update nested-pandas dep to 0.2.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9f10002..e60c60b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ - 'nested-pandas==0.2.0', + 'nested-pandas==0.2.1', 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', From 03656af1cb3f4009e9c21506631f07b50e9dccac Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 26 Aug 2024 14:49:25 -0700 Subject: [PATCH 17/20] remove commented out code --- src/nested_dask/core.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 3a4381f..a27bd76 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -377,9 +377,6 @@ def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): """ - # Handle meta - # meta = npd.NestedFrame(df[base_columns]._meta) - # Resolve inputs for meta if base_columns is None: if list_columns is None: From db943d57b343a11f32175cd7be0decea9ba2f950 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 27 Aug 2024 10:26:53 -0700 Subject: [PATCH 18/20] use meta dtype in pack operation --- src/nested_dask/core.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index a27bd76..721ab79 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -104,7 +104,7 @@ def __setitem__(self, key, value): # pack the modified df back into a nested column meta = _nested_meta_from_flat(new_flat, nested) - packed = new_flat.map_partitions(lambda x: pack(x), meta=meta) + packed = new_flat.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(nested, packed) # Adding a new nested structure from a column @@ -116,7 +116,7 @@ def __setitem__(self, key, value): value = value.to_frame() meta = _nested_meta_from_flat(value, new_nested) - packed = value.map_partitions(lambda x: pack(x), meta=meta) + packed = value.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(new_nested, packed) return super().__setitem__(key, value) @@ -321,7 +321,8 @@ def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nest nested_columns = [col for col in df.columns if (col not in base_columns) and col != index] if len(nested_columns) > 0: - nested_meta = _nested_meta_from_flat(df[nested_columns], name) + # nested_meta = _nested_meta_from_flat(df[nested_columns], name) + nested_meta = pack(df[nested_columns]._meta, name) meta = meta.join(nested_meta) return df.map_partitions( @@ -409,7 +410,8 @@ def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): meta = npd.NestedFrame(df[base_columns]._meta) - nested_meta = _nested_meta_from_flat(df[list_columns], name) + # nested_meta = _nested_meta_from_flat(df[list_columns], name) + nested_meta = pack(df[list_columns]._meta, name) meta = meta.join(nested_meta) return df.map_partitions( From 745b5d1c1c104b72fff431e15cccc9b6aeb99a86 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 27 Aug 2024 12:42:58 -0700 Subject: [PATCH 19/20] remove commented --- src/nested_dask/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 721ab79..8d27a32 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -321,7 +321,6 @@ def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nest nested_columns = [col for col in df.columns if (col not in base_columns) and col != index] if len(nested_columns) > 0: - # nested_meta = _nested_meta_from_flat(df[nested_columns], name) nested_meta = pack(df[nested_columns]._meta, name) meta = meta.join(nested_meta) @@ -410,7 +409,6 @@ def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): meta = npd.NestedFrame(df[base_columns]._meta) - # nested_meta = _nested_meta_from_flat(df[list_columns], name) nested_meta = pack(df[list_columns]._meta, name) meta = meta.join(nested_meta) From 70a359dd7ba1f7d2808d0876568506be9221c11a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 1 Sep 2024 11:20:58 +0000 Subject: [PATCH 20/20] Bump asv from 0.6.3 to 0.6.4 Bumps [asv](https://github.com/airspeed-velocity/asv) from 0.6.3 to 0.6.4. - [Release notes](https://github.com/airspeed-velocity/asv/releases) - [Changelog](https://github.com/airspeed-velocity/asv/blob/main/CHANGES.rst) - [Commits](https://github.com/airspeed-velocity/asv/compare/v0.6.3...v0.6.4) --- updated-dependencies: - dependency-name: asv dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e60c60b..cd1e8be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes) [project.optional-dependencies] dev = [ - "asv==0.6.3", # Used to compute performance benchmarks + "asv==0.6.4", # Used to compute performance benchmarks "jupyter", # Clears output from Jupyter notebooks "mypy", # Used for static type checking of files "pre-commit", # Used to run checks before finalizing a git commit