Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NestedFrame.reduce #32

Merged
merged 19 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion src/nested_pandas/nestedframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import numpy as np
import pandas as pd
from pandas._libs import lib
from pandas._typing import AnyAll, Axis, IndexLabel
from pandas._typing import Any, AnyAll, Axis, IndexLabel
from pandas.api.extensions import no_default
from typing import List

from nested_pandas.series import packer
from nested_pandas.series.dtype import NestedDtype
Expand Down Expand Up @@ -58,6 +59,10 @@ def _is_known_hierarchical_column(self, colname) -> bool:
return False
return False

def _is_known_column(self, colname) -> bool:
"""Determine whether a string is a known column name"""
return colname in self.columns or self._is_known_hierarchical_column(colname)

def add_nested(self, nested, name) -> Self: # type: ignore[name-defined] # noqa: F821
"""Packs a dataframe into a nested column"""
# Add sources to objects
Expand Down Expand Up @@ -310,3 +315,85 @@ def dropna(
)
)
return new_df

def reduce(self, func, *args, **kwargs) -> NestedFrame:
wilsonbb marked this conversation as resolved.
Show resolved Hide resolved
"""
Takes a function and applies it to each top-level row of the NestedFrame.

The user may specify which columns the function is applied to, with
columns from the 'base' layer being passsed to the function as
scalars and columns from the nested layers being passed as numpy arrays.

Parameters
----------
func : callable
Function to apply to each nested dataframe. The first arguments to `func` should be which
columns to apply the function to.
args : positional arguments
Positional arguments to pass to the function, the first *args should be the names of the
columns to apply the function to.
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.

Returns
-------
`NestedFrame`
`NestedFrame` with the results of the function applied to the columns of the frame.

Notes
-----
The recommend return value of func should be a `pd.Series` where the indices are the names of the
wilsonbb marked this conversation as resolved.
Show resolved Hide resolved
output columns in the dataframe returned by `reduce`.

Example User Function:
```
import pandas as pd

def my_sum(col1, col2):
return pd.Series(
[sum(col1), sum(col2)],
index=["sum_col1", "sum_col2"],
)

```

"""
# Parse through the initial args to determine the columns to apply the function to
requested_columns = []
for arg in args:
if not isinstance(arg, str) or not self._is_known_column(arg):
# We've reached an argument that is not a valid column, so we assume
# the remaining args are extra arguments to the function
break
layer = "base" if "." not in arg else arg.split(".")[0]
col = arg.split(".")[-1]
requested_columns.append((layer, col))

# We require the first *args to be the columns to apply the function to
if not requested_columns:
raise ValueError("No columns in `*args` specified to apply function to")

# The remaining args are the extra arguments to the function other than columns
extra_args: List[Any] = []
if len(requested_columns) < len(args):
extra_args = args[len(requested_columns) :]

# Translates the requested columns into the scalars or arrays we pass to func.
def translate_cols(frame, layer, col):
if layer == "base":
# We pass the "base" column as a scalar
return frame[col]
return frame[layer][col].to_numpy()

# Note that this applys the function to each row of the nested dataframe. For
# the columns within packed frames, note taht we're directly accessing the dataframe
# within the cell of that row without having to unpack and flatten.
result = self.apply(
wilsonbb marked this conversation as resolved.
Show resolved Hide resolved
lambda x: func(
*[translate_cols(x, layer, col) for layer, col in requested_columns], *extra_args, **kwargs
),
axis=1, # to apply func on each row of our nested frame
result_type="expand", # to return a DataFrame when possible
)

return result
80 changes: 80 additions & 0 deletions tests/nested_pandas/nestedframe/test_nestedframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,83 @@ def test_dropna_errors():
# Test on-nested + subset disagreement
with pytest.raises(ValueError):
base.dropna(on_nested="nested", subset=["b"])


def test_reduce():
"""Tests that we can call reduce on a NestedFrame with a custom function."""
nf = NestedFrame(
data={"a": [1, 2, 3], "b": [2, 4, 6]},
index=pd.Index([0, 1, 2], name="idx"),
)

to_pack = pd.DataFrame(
data={
"time": [1, 2, 3, 1, 2, 4, 2, 1, 4],
"c": [0, 2, 4, 10, 4, 3, 1, 4, 1],
"d": [5, 4, 7, 5, 3, 1, 9, 3, 4],
},
index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"),
)

to_pack2 = pd.DataFrame(
data={
"time": [1, 2, 3, 1, 2, 3, 1, 2, 4],
"e": [2, 9, 4, 1, 23, 3, 1, 4, 1],
"f": [5, 4, 7, 5, 3, 25, 9, 3, 4],
},
index=pd.Index([0, 0, 0, 1, 1, 1, 2, 2, 2], name="idx"),
)

# Add two nested layers to pack into our dataframe
nf = nf.add_nested(to_pack, "packed").add_nested(to_pack2, "packed2")

# Define a simple custom function to apply to the nested data
def get_max(col1, col2):
# returns the max value within each specified colun
return pd.Series([col1.max(), col2.max()], index=["max_col1", "max_col2"])

# The expected max values for of our nested columns
expected_max_c = [4, 10, 4]
expected_max_d = [7, 5, 9]
expected_max_e = [9, 23, 4]

# Test that we raise an error when no arguments are provided
with pytest.raises(ValueError):
nf.reduce(get_max)

# Batch only on columns in the first packed layer
result = nf.reduce(get_max, "packed.c", "packed.d")
wilsonbb marked this conversation as resolved.
Show resolved Hide resolved
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["max_col1"].values[i] == expected_max_c[i]
assert result["max_col2"].values[i] == expected_max_d[i]

# Batch on columns in the first and second packed layers
result = nf.reduce(get_max, "packed.c", "packed2.e")
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["max_col1"].values[i] == expected_max_c[i]
assert result["max_col2"].values[i] == expected_max_e[i]

# Test that we can pass a scalar from the base layer to the reduce function and that
# the user can also provide non-column arguments (in this case, the list of column names)
def offset_avg(offset, col_to_avg, column_names):
# A simple function which adds a scalar 'offset' to a column which is then averaged.
return pd.Series([(offset + col_to_avg).mean()], index=column_names)

expected_offset_avg = [
sum([2, 4, 6]) / 3.0,
sum([14, 8, 7]) / 3.0,
sum([7, 10, 7]) / 3.0,
]

result = nf.reduce(offset_avg, "b", "packed.c", ["offset_avg"])
assert len(result) == len(nf)
assert isinstance(result, NestedFrame)
assert result.index.name == "idx"
for i in range(len(result)):
assert result["offset_avg"].values[i] == expected_offset_avg[i]
Loading