Skip to content

Commit

Permalink
Merge pull request #126 from josephnowak/feature/general-improvements
Browse files Browse the repository at this point in the history
Feature/general improvements
  • Loading branch information
josephnowak authored Sep 13, 2024
2 parents cc47e23 + 2064bd7 commit a67c356
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 62 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pandas>=2.0.0
xarray[accel]>=2023.0.0
numpy>=1.26.0
dask[complete]>=2023.0.0
dask[complete]>=2024.0.0
zarr>=2.0.0
orjson>=3.0.0
pydantic>=2.0.0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="TensorDB",
version="0.31.4",
version="0.31.5",
description="Database based in a file system storage combined with Xarray and Zarr",
author="Joseph Nowak",
author_email="[email protected]",
Expand Down
55 changes: 46 additions & 9 deletions tensordb/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ def merge_duplicates_coord(
new_data: Union[xr.DataArray, xr.Dataset],
dim: str,
func: str,
kwargs: Dict[str, Any] = None,
):
"""
Group and merge duplicates coord base on a function, this can be a sum or a max. Read numpy-groupies
Expand All @@ -570,7 +571,12 @@ def merge_duplicates_coord(
new_data.coords[dim] = np.arange(new_data.sizes[dim])

return cls.apply_on_groups(
new_data=new_data, groups=groups, dim=dim, func=func, keep_shape=False
new_data=new_data,
groups=groups,
dim=dim,
func=func,
keep_shape=False,
kwargs=kwargs,
)

@classmethod
Expand Down Expand Up @@ -820,6 +826,7 @@ def reindex_with_pad(
coords,
preferred_chunks: Dict[str, int],
fill_value: Any,
apply_chunk: bool = True,
) -> xr.Dataset | xr.DataArray:
"""
The reindex with pad was created as an alternative to the standard
Expand All @@ -832,21 +839,48 @@ def reindex_with_pad(
into a single chunk of a specific size before using reindex and this reduces
the size of the graph which can become a big bottleneck.
Note: If it is not necessary to pad data then the algorithm is going to
use reindex directly and then chunk the data.
:param data:
Data that wants to be reindexed
:param coords:
Coords to reindex
:param preferred_chunks:
Expected chunk size of the array, if the array is smaller than the chunk
then some dummy data is added with the pad method.
Ideal chunk size of the array, if the array is smaller than the chunk
then some artificial data is added with the pad method to latter be rechunked into a
single chunk.
It is important to highlight that if the size of the data on a dim is bigger
than the preferred chunk size then on that dim the original chunks are going
to be preserved unless the apply_chunk is set to True (default)
:param fill_value:
Used in the reindex method
:param apply_chunk, default True
If true then the chunk method is going to be applied after calling the reindex
method of Xarray (which is called after padding the data).
Disable this option is useful to properly visualize
the chunks created by the algorithm and see the reduction in terms of tasks
and also some other specific use cases
:return:
A chunked reindexed data
"""
data = data.copy()
coords = {k: np.array(v) for k, v in coords.items()}

pad_width = {
dim: (0, preferred_chunks[dim] - size)
for dim, size in data.sizes.items()
# Only add artificial data if the chunk is bigger than the size of the data
if preferred_chunks.get(dim, 0) > size
}
# If it is not necessary to pad additional data then just use
# reindex directly
if len(pad_width) == 0:
data = data.reindex(coords, fill_value=fill_value)
if apply_chunk:
data = data.chunk(preferred_chunks)
return data

# Create an autoincrement index per dim using ints to be able to map the coords
autoincrement_map = {}
for dim, coord in coords.items():
Expand All @@ -864,12 +898,11 @@ def reindex_with_pad(
# Add artificial data at the end of every dim if necessary to complete
# the chunk size.
padded_data = data.pad(
{
dim: (0, max(preferred_chunks.get(dim, size) - size, 0))
for dim, size in data.sizes.items()
},
pad_width,
mode="edge",
).chunk(preferred_chunks)
)
# Create a single chunk on the dims that has artificial data
padded_data = padded_data.chunk({dim: -1 for dim in pad_width.keys()})

# Using the autoincrement index we can assign a unique coord value
# to the artificial data created by the pad method
Expand All @@ -886,13 +919,17 @@ def reindex_with_pad(

# Reindex the padded data using the mapped version of the coords
# this is going to drop all the artificial data automatically
# and should also generate a better chunking
padded_reindex = padded_data.reindex(
{
dim: [autoincrement_map[dim][v] for v in coord]
for dim, coord in coords.items()
},
fill_value=fill_value,
).chunk(preferred_chunks)
)

if apply_chunk:
padded_reindex = padded_reindex.chunk(preferred_chunks)

# Revert the mapping
for dim in padded_reindex.dims:
Expand Down
126 changes: 75 additions & 51 deletions tensordb/tests/test_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ def test_apply_on_groups(dim, keep_shape, func):
if axis == 1:
expected = expected.T

kwargs = {"engine": "numpy"}

if func == "custom":
# Avoid the use of the engine on the custom functions
kwargs = {}
arr = xr.Dataset(
{
"x": arr,
Expand Down Expand Up @@ -350,7 +354,13 @@ def custom_func(dataset):
expected = expected.T

result = Algorithms.apply_on_groups(
arr, groups=groups, dim=dim, func=func, keep_shape=keep_shape, template="x"
arr,
groups=groups,
dim=dim,
func=func,
keep_shape=keep_shape,
template="x",
kwargs=kwargs,
)

expected = xr.DataArray(expected.values, coords=result.coords, dims=result.dims)
Expand Down Expand Up @@ -438,8 +448,8 @@ def test_merge_duplicates_coord(dim):
coords={"a": [1, 5, 5, 0, 1], "b": [0, 1, 1, 0, -1]},
).chunk(a=3, b=2)

g = arr.groupby(dim).max(dim)
arr = Algorithms.merge_duplicates_coord(arr, dim, "max")
g = arr.groupby(dim).max(dim, engine="numpy")
arr = Algorithms.merge_duplicates_coord(arr, dim, "max", kwargs={"engine": "numpy"})
assert g.equals(arr)


Expand Down Expand Up @@ -515,75 +525,89 @@ def test_rolling_overlap(window, apply_ffill):
assert expected.equals(rolling_arr)


def test_reindex_with_pad():
@pytest.mark.parametrize(
"slices",
[{"a": [0, 3, 4], "b": [1, 3]}, {}, {"a": [0, 1], "b": [0]}],
)
@pytest.mark.parametrize(
"coords",
[
{
"a": np.array([100] + list(range(1, 11, 2)), "datetime64[ns]"),
"b": [5, -1, 3],
},
{
"a": np.array([5, 0, 2, 1], "datetime64[ns]"),
"b": [5, -1, 3, 4],
},
{
"a": np.array([5, 5, 0, 2, 2, 3], "datetime64[ns]"),
"b": [-5, -5, 3, 2, 5],
},
],
)
@pytest.mark.parametrize(
"chunks",
[{"a": 3, "b": 2}, {"a": 2, "b": 3}, {"a": 1, "b": 1}, {"a": 2}],
)
def test_reindex_with_pad(slices, coords, chunks):
arr = xr.DataArray(
np.arange(5 * 7).reshape((5, 7)).astype(float),
dims=["a", "b"],
coords={
"a": np.array(list(range(0, 10, 2)), "datetime64[ns]"),
"b": list(range(7)),
},
).chunk(a=3, b=2)
).chunk(chunks)
arr = arr.isel(**slices)
expected = arr.reindex(coords, fill_value=-1.0)

# Test all missing
coords = {
"a": np.array([100] + list(range(1, 11, 2)), "datetime64[ns]"),
"b": [5, -1, 3],
}
result = Algorithms.reindex_with_pad(
data=arr,
coords=coords,
fill_value=0.0,
preferred_chunks={"a": 3, "b": 2},
fill_value=-1.0,
preferred_chunks=chunks,
)
assert result.chunksizes == {"a": (3, 3), "b": (2, 1)}
assert result.equals(arr.reindex(coords, fill_value=0.0))
assert result.chunksizes == expected.chunk(chunks).chunksizes
assert result.equals(expected)

# Test not sorted coords
coords = {
"a": np.array([5, 0, 2, 1], "datetime64[ns]"),
"b": [5, -1, 3, 4],
}
result = Algorithms.reindex_with_pad(
data=arr,
coords=coords,
fill_value=1.0,
preferred_chunks={"a": 3, "b": 2},
)
assert result.chunksizes == {"a": (3, 1), "b": (2, 2)}
assert result.equals(arr.reindex(coords, fill_value=1.0))

# Test duplicated coords
coords = {
"a": np.array([5, 5, 0, 2, 2, 3], "datetime64[ns]"),
"b": [-5, -5, 3, 2, 5],
}
result = Algorithms.reindex_with_pad(
data=arr,
coords=coords,
fill_value=0.0,
preferred_chunks={"a": 3, "b": 2},
)
assert result.chunksizes == {"a": (3, 3), "b": (2, 2, 1)}
assert result.equals(arr.reindex(coords, fill_value=0.0))
@pytest.mark.parametrize("chunks", [{"a": 3, "b": 2}])
def test_reindex_with_pad_no_chunk(chunks):
arr = xr.DataArray(
np.arange(5 * 7).reshape((5, 7)).astype(float),
dims=["a", "b"],
coords={
"a": list(range(5)),
"b": list(range(7)),
},
).chunk(chunks)

test_arr = arr.isel(a=[0, 1], b=[5])

# Test filling with artificial data using pad
coords = {
"a": np.array([5, 0, 2, 1], "datetime64[ns]"),
"b": [5, -1, 3, 4],
"a": [3, 5, 1, 0, -1, 0],
"b": [
5,
0,
3,
],
}
expected = test_arr.reindex(coords, fill_value=-1.0)

result = Algorithms.reindex_with_pad(
data=arr.isel(a=[0, 1], b=[0]),
data=test_arr,
coords=coords,
fill_value=1.0,
preferred_chunks={"a": 3, "b": 2},
fill_value=-1.0,
preferred_chunks={"a": 1, "b": 2},
apply_chunk=False,
)
assert result.chunksizes == {"a": (3, 1), "b": (2, 2)}
assert result.equals(arr.isel(a=[0, 1], b=[0]).reindex(coords, fill_value=1.0))

# TODO: Find a way to measure if the chunks are created properly with the pad
# right now there is always a chunk being applied after the reindexing method
# so it is not possible to test if the method is doing what it has to do
assert result.equals(expected)
# These chunks are a proof that the chunk method is not called after the reindex one
# and the method tries to preserve the original chunks if it is not necessary
# to pad data on certain dims
assert result.chunksizes == {"a": (2, 2, 2), "b": (2, 1)}


if __name__ == "__main__":
Expand Down

0 comments on commit a67c356

Please sign in to comment.