Skip to content

Commit

Permalink
Merge pull request #90 from lincc-frameworks/fix/issue/89
Browse files Browse the repository at this point in the history
  • Loading branch information
hombit authored May 28, 2024
2 parents 8197907 + ebcc24b commit 41cce93
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 20 deletions.
38 changes: 24 additions & 14 deletions src/nested_pandas/series/accessor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python 3.9 doesn't support "|" for types
from __future__ import annotations

from collections import defaultdict
from collections.abc import Generator, MutableMapping
from typing import cast

Expand Down Expand Up @@ -53,14 +54,19 @@ def to_lists(self, fields: list[str] | None = None) -> pd.DataFrame:
if len(fields) == 0:
raise ValueError("Cannot convert a struct with no fields to lists")

struct_array = cast(pa.StructArray, pa.array(self._series))
list_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
list_chunks[field].append(list_array)

list_series = {}
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
for field, chunks in list_chunks.items():
chunked_array = pa.chunked_array(chunks)
list_series[field] = pd.Series(
list_array,
dtype=pd.ArrowDtype(list_array.type),
chunked_array,
dtype=pd.ArrowDtype(chunked_array.type),
index=self._series.index,
name=field,
copy=False,
Expand All @@ -85,21 +91,25 @@ def to_flat(self, fields: list[str] | None = None) -> pd.DataFrame:
if len(fields) == 0:
raise ValueError("Cannot flatten a struct with no fields")

struct_array = cast(pa.StructArray, pa.array(self._series))
index = pd.Series(self.get_flat_index(), name=self._series.index.name)

flat_chunks = defaultdict(list)
for chunk in self._series.array._pa_array.iterchunks():
struct_array = cast(pa.StructArray, chunk)
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
flat_array = list_array.flatten()
flat_chunks[field].append(flat_array)

flat_series = {}
index = None
for field in fields:
list_array = cast(pa.ListArray, struct_array.field(field))
flat_array = list_array.flatten()
if index is None:
index = self.get_flat_index()
for field, chunks in flat_chunks.items():
chunked_array = pa.chunked_array(chunks)
flat_series[field] = pd.Series(
flat_array,
chunked_array,
index=pd.Series(index, name=self._series.index.name),
name=field,
copy=False,
dtype=pd.ArrowDtype(flat_array.type),
dtype=pd.ArrowDtype(chunked_array.type),
)

return pd.DataFrame(flat_series)
Expand Down
30 changes: 26 additions & 4 deletions src/nested_pandas/series/ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> Non
self._dtype = NestedDtype(pa_array.chunk(0).type)

@property
def list_offsets(self) -> pa.ChunkedArray:
def list_offsets(self) -> pa.Array:
"""The list offsets of the field arrays.
It is a chunk array of list offsets of the first field array.
Expand All @@ -609,7 +609,25 @@ def list_offsets(self) -> pa.ChunkedArray:
pa.ChunkedArray
The list offsets of the field arrays.
"""
return pa.chunked_array([chunk.field(0).offsets for chunk in self._pa_array.iterchunks()])
# Quick and cheap path for a single chunk
if self._pa_array.num_chunks == 1:
struct_array = cast(pa.StructArray, self._pa_array.chunk(0))
return cast(pa.ListArray, struct_array.field(0)).offsets

chunks = []
# The offset of the current chunk in the flat array.
# It is 0 for the first chunk, and the last offset of the previous chunk for the next chunks,
# as a pa.Scalar.
chunk_offset: pa.Scalar | int = 0
for chunk in self._pa_array.iterchunks():
list_array = cast(pa.ListArray, chunk.field(0))
if chunk_offset == 0:
offsets = list_array.offsets
else:
offsets = pa.compute.add(list_array.offsets[1:], chunk_offset)
chunks.append(offsets)
chunk_offset = offsets[-1]
return pa.concat_arrays(chunks)

@property
def field_names(self) -> list[str]:
Expand All @@ -621,6 +639,11 @@ def flat_length(self) -> int:
"""Length of the flat arrays"""
return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks())

@property
def num_chunks(self) -> int:
"""Number of chunks in underlying pyarrow.ChunkedArray"""
return self._pa_array.num_chunks

def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821
"""Get a view of the series with only the specified fields
Expand Down Expand Up @@ -673,8 +696,7 @@ def set_flat_field(self, field: str, value: ArrayLike) -> None:
if len(pa_array) != self.flat_length:
raise ValueError("The input must be a struct_scalar or have the same length as the flat arrays")

offsets = self.list_offsets.combine_chunks()
list_array = pa.ListArray.from_arrays(values=pa_array, offsets=offsets)
list_array = pa.ListArray.from_arrays(values=pa_array, offsets=self.list_offsets)

return self.set_list_field(field, list_array)

Expand Down
29 changes: 29 additions & 0 deletions tests/nested_pandas/e2e_tests/test_issue89.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Based on https://github.com/lincc-frameworks/nested-pandas/issues/89"""

import nested_pandas as npd
import numpy as np


def test_issue89():
"""Check that code snippet from issue 89 works as expected
https://github.com/lincc-frameworks/nested-pandas/issues/89
"""

# Load some ZTF data
catalogs_dir = "https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/"

object_ndf = npd.read_parquet(
f"{catalogs_dir}/ztf_object/Norder=3/Dir=0/Npix=432.parquet",
columns=["ra", "dec", "ps1_objid"],
).set_index("ps1_objid")

source_ndf = npd.read_parquet(
f"{catalogs_dir}/ztf_source/Norder=6/Dir=20000/Npix=27711.parquet",
columns=["mjd", "mag", "magerr", "band", "ps1_objid", "catflags"],
).set_index("ps1_objid")

object_ndf = object_ndf.add_nested(source_ndf, "ztf_source")

nf = object_ndf
nf.reduce(np.mean, "ztf_source.mjd")
70 changes: 70 additions & 0 deletions tests/nested_pandas/series/test_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,39 @@ def test_to_lists():
assert_frame_equal(lists, desired)


def test_to_lists_for_chunked_array():
""" ""Test that the .nest.to_lists() when underlying array is chunked"""
struct_array = pa.StructArray.from_arrays(
arrays=[
[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])],
[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])],
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 3)
assert chunked_array.length() == 6
series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5])
assert series.array.num_chunks == 3

lists = series.nest.to_lists()

desired = pd.DataFrame(
data={
"a": pd.Series(
data=[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])] * 3,
dtype=pd.ArrowDtype(pa.list_(pa.float64())),
index=[0, 1, 2, 3, 4, 5],
),
"b": pd.Series(
data=[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])] * 3,
dtype=pd.ArrowDtype(pa.list_(pa.float64())),
index=[0, 1, 2, 3, 4, 5],
),
},
)
assert_frame_equal(lists, desired)


def test_to_lists_with_fields():
"""Test that the .nest.to_lists(fields=...) method works."""
struct_array = pa.StructArray.from_arrays(
Expand Down Expand Up @@ -154,6 +187,43 @@ def test_to_flat():
assert_array_equal(flat[column], desired[column])


def test_to_flat_for_chunked_array():
"""Test that the .nest.to_flat() when underlying array is pa.ChunkedArray."""
struct_array = pa.StructArray.from_arrays(
arrays=[
[np.array([1.0, 2.0, 3.0]), -np.array([1.0, 2.0, 1.0])],
[np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])],
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 3)
assert chunked_array.length() == 6
series = pd.Series(chunked_array, dtype=NestedDtype(chunked_array.type), index=[0, 1, 2, 3, 4, 5])
assert series.array.num_chunks == 3

flat = series.nest.to_flat()

desired = pd.DataFrame(
data={
"a": pd.Series(
data=[1.0, 2.0, 3.0, -1.0, -2.0, -1.0] * 3,
name="a",
index=np.repeat([0, 1, 2, 3, 4, 5], 3),
dtype=pd.ArrowDtype(pa.float64()),
),
"b": pd.Series(
data=[4.0, 5.0, 6.0, -3.0, -4.0, -5.0] * 3,
name="b",
index=np.repeat([0, 1, 2, 3, 4, 5], 3),
dtype=pd.ArrowDtype(pa.float64()),
),
},
index=pd.Index(np.repeat([0, 1, 2, 3, 4, 5], 3)),
)

assert_frame_equal(flat, desired)


def test_to_flat_with_fields():
"""Test that the .nest.to_flat(fields=...) method works."""
struct_array = pa.StructArray.from_arrays(
Expand Down
35 changes: 33 additions & 2 deletions tests/nested_pandas/series/test_ext_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ def test_series_built_raises(data):
_array = NestedExtensionArray(pa_array)


def test_list_offsets():
"""Test that the list offsets are correct."""
def test_list_offsets_single_chunk():
"""Test that the .list_offset property is correct for a single chunk."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())),
Expand All @@ -603,6 +603,22 @@ def test_list_offsets():
assert_array_equal(ext_array.list_offsets, desired)


def test_list_offsets_multiple_chunks():
"""Test that the .list_offset property is correct for multiple chunks."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])], type=pa.list_(pa.uint8())),
pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])]),
],
names=["a", "b"],
)
chunked_arrray = pa.chunked_array([struct_array, struct_array[:1], struct_array])
ext_array = NestedExtensionArray(chunked_arrray)

desired = chunked_arrray.combine_chunks().field("a").offsets
assert_array_equal(ext_array.list_offsets, desired)


def test___getitem___with_integer():
"""Test [i] is a valid DataFrame."""
struct_array = pa.StructArray.from_arrays(
Expand Down Expand Up @@ -1207,6 +1223,21 @@ def test_flat_length():
assert ext_array.flat_length == 7


def test_num_chunks():
"""Tests .num_chunks property."""
struct_array = pa.StructArray.from_arrays(
arrays=[
pa.array([np.array([1.0, 2.0, 3.0]), np.array([1.0, 2.0, 1.0, 2.0])]),
pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0, 6.0])]),
],
names=["a", "b"],
)
chunked_array = pa.chunked_array([struct_array] * 7)
ext_array = NestedExtensionArray(chunked_array)

assert ext_array.num_chunks == 7


def test_view_fields_with_single_field():
"""Tests ext_array.view("field")"""
arrays = [
Expand Down

0 comments on commit 41cce93

Please sign in to comment.