Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend Configuration IIa] Add dataset identification tools #569

Merged
merged 40 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c4bca8a
port over tool function for defaults
CodyCBakerPhD Sep 18, 2023
38a1fa3
modify iterator as well
CodyCBakerPhD Sep 18, 2023
a981068
factor out backend config stuff to other PR
CodyCBakerPhD Sep 18, 2023
966592c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 18, 2023
2df8c06
Merge branch 'new_backend_pydantic_backend_configuration_models' into…
CodyCBakerPhD Sep 18, 2023
2e7af84
Update CHANGELOG.md
CodyCBakerPhD Sep 18, 2023
85bc927
Update __init__.py
CodyCBakerPhD Sep 18, 2023
8307156
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 18, 2023
87bee35
Merge branch 'new_backend_pydantic_backend_configuration_models' into…
CodyCBakerPhD Sep 18, 2023
87d1116
Merge branch 'new_backend_pydantic_backend_configuration_models' into…
CodyCBakerPhD Oct 4, 2023
13c9b37
use dataset_name in DatasetInfo; other debugs
Oct 4, 2023
b63161a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 4, 2023
d55e2a2
remove comments
Oct 5, 2023
4b848b0
Merge branch 'new_backend_default_dataset_configuration' of https://g…
Oct 5, 2023
446d81d
fix conflict
CodyCBakerPhD Nov 7, 2023
3c7cde8
remove unused typing
CodyCBakerPhD Nov 8, 2023
b845ac6
improve error message and fix import test
CodyCBakerPhD Nov 8, 2023
5c7fb6b
add global static maps; further condense tests with parametrize
CodyCBakerPhD Nov 8, 2023
91aab8c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 8, 2023
65eee6b
fix name
CodyCBakerPhD Nov 8, 2023
a0a4d07
Merge branch 'new_backend_default_dataset_configuration' of https://g…
CodyCBakerPhD Nov 8, 2023
673e2f9
Apply suggestions from code review
CodyCBakerPhD Nov 20, 2023
271660c
Merge branch 'main' into new_backend_default_dataset_configuration
CodyCBakerPhD Nov 20, 2023
8630316
PR suggestions
CodyCBakerPhD Nov 21, 2023
f7e1be6
Update src/neuroconv/tools/nwb_helpers/_dataset_configuration.py
CodyCBakerPhD Nov 21, 2023
46b8cdb
Merge branch 'main' into new_backend_default_dataset_configuration
CodyCBakerPhD Nov 21, 2023
6f0806a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 21, 2023
b07a541
add IO to dataset config names
CodyCBakerPhD Nov 21, 2023
2bb867a
fix conflict
CodyCBakerPhD Nov 21, 2023
89915ab
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 21, 2023
bfe1049
fix minimal test
CodyCBakerPhD Nov 21, 2023
268e7e9
Merge branch 'new_backend_default_dataset_configuration' of https://g…
CodyCBakerPhD Nov 21, 2023
f1683fa
alter private method name
CodyCBakerPhD Nov 21, 2023
185a69d
add extra tests
CodyCBakerPhD Nov 21, 2023
6fad003
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 21, 2023
15d8aed
Merge branch 'main' into new_backend_default_dataset_configuration
h-mayorquin Nov 22, 2023
4f668ec
Merge branch 'main' into new_backend_default_dataset_configuration
CodyCBakerPhD Nov 22, 2023
5ce5914
add test for ragged tables; debug
CodyCBakerPhD Nov 22, 2023
8350d2e
Merge branch 'new_backend_default_dataset_configuration' of https://g…
CodyCBakerPhD Nov 22, 2023
3032755
adjust for cross-platform
CodyCBakerPhD Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Added Pydantic data models of `DatasetInfo` (immutable summary of core dataset values such as maximum shape and dtype) and `DatasetConfiguration` for both HDF5 and Zarr datasets (the optional layer that specifies chunk/buffering/compression). [PR #567](https://github.com/catalystneuro/neuroconv/pull/567)
* Added Pydantic data models of `BackendConfiguration` for both HDF5 and Zarr datasets (container/mapper of all the `DatasetConfiguration`s for a particular file). [PR #568](https://github.com/catalystneuro/neuroconv/pull/568)
* Added tool function `get_default_dataset_configurations` for identifying and collecting all fields of an in-memory `NWBFile` that could become datasets on disk; and return instances of the Pydantic dataset models filled with default values for chunking/buffering/compression. [PR #569](https://github.com/catalystneuro/neuroconv/pull/569)



Expand Down
80 changes: 61 additions & 19 deletions src/neuroconv/tools/hdmf.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,77 @@
"""Collection of modifications of HDMF functions that are to be tested/used on this repo until propagation upstream."""
import math
from typing import Tuple

import numpy as np
from hdmf.data_utils import GenericDataChunkIterator as HDMFGenericDataChunkIterator
from pydantic import Field
from typing_extensions import Annotated


class GenericDataChunkIterator(HDMFGenericDataChunkIterator):
def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]:
num_axes = len(self.maxshape)
chunk_bytes = np.prod(self.chunk_shape) * self.dtype.itemsize
return self.estimate_default_buffer_shape(
buffer_gb=buffer_gb, chunk_shape=self.chunk_shape, maxshape=self.maxshape, dtype=self.dtype
)

# TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own
@staticmethod
def estimate_default_chunk_shape(
chunk_mb: Annotated[float, Field(gt=0.0)],
maxshape: Tuple[int, ...],
dtype: np.dtype,
) -> Tuple[int, ...]:
"""
Select chunk shape with size in MB less than the threshold of chunk_mb.

Keeps the dimensional ratios of the original data.
"""
assert chunk_mb > 0.0, f"chunk_mb ({chunk_mb}) must be greater than zero!"
# Eventually, Pydantic validation can handle this validation for us

n_dims = len(maxshape)
itemsize = dtype.itemsize
chunk_bytes = chunk_mb * 1e6

min_maxshape = min(maxshape)
v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in maxshape)
prod_v = math.prod(v)
while prod_v * itemsize > chunk_bytes and prod_v != 1:
non_unit_min_v = min(x for x in v if x != 1)
v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v)
prod_v = math.prod(v)
k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / n_dims))
return tuple([min(k * x, maxshape[dim]) for dim, x in enumerate(v)])

# TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own
@staticmethod
def estimate_default_buffer_shape(
buffer_gb: Annotated[float, Field(gt=0.0)],
chunk_shape: Tuple[int, ...],
maxshape: Tuple[int, ...],
dtype: np.dtype,
) -> Tuple[int]:
num_axes = len(maxshape)
chunk_bytes = math.prod(chunk_shape) * dtype.itemsize
assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!"
assert (
buffer_gb >= chunk_bytes / 1e9
), f"buffer_gb ({buffer_gb}) must be greater than the chunk size ({chunk_bytes / 1e9})!"
assert all(
np.array(self.chunk_shape) > 0
), f"Some dimensions of chunk_shape ({self.chunk_shape}) are less than zero!"
assert all(np.array(chunk_shape) > 0), f"Some dimensions of chunk_shape ({chunk_shape}) are less than zero!"

maxshape = np.array(self.maxshape)
maxshape = np.array(maxshape)

# Early termination condition
if np.prod(maxshape) * self.dtype.itemsize / 1e9 < buffer_gb:
return tuple(self.maxshape)
if math.prod(maxshape) * dtype.itemsize / 1e9 < buffer_gb:
return tuple(maxshape)

buffer_bytes = chunk_bytes
axis_sizes_bytes = maxshape * self.dtype.itemsize
smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(self.chunk_shape)
axis_sizes_bytes = maxshape * dtype.itemsize
smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(chunk_shape)
target_buffer_bytes = buffer_gb * 1e9

# If the smallest full axis does not fit within the buffer size, form a square along the two smallest axes
sub_square_buffer_shape = np.array(self.chunk_shape)
sub_square_buffer_shape = np.array(chunk_shape)
if min(axis_sizes_bytes) > target_buffer_bytes:
k1 = np.floor((target_buffer_bytes / chunk_bytes) ** 0.5)
for axis in [smallest_chunk_axis, second_smallest_chunk_axis]:
Expand All @@ -40,32 +82,32 @@ def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]:
chunk_to_buffer_ratio = buffer_gb * 1e9 / chunk_bytes
chunk_scaling_factor = np.floor(chunk_to_buffer_ratio ** (1 / num_axes))
unpadded_buffer_shape = [
np.clip(a=int(x), a_min=self.chunk_shape[j], a_max=self.maxshape[j])
for j, x in enumerate(chunk_scaling_factor * np.array(self.chunk_shape))
np.clip(a=int(x), a_min=chunk_shape[j], a_max=maxshape[j])
for j, x in enumerate(chunk_scaling_factor * np.array(chunk_shape))
]

unpadded_buffer_bytes = np.prod(unpadded_buffer_shape) * self.dtype.itemsize
unpadded_buffer_bytes = math.prod(unpadded_buffer_shape) * dtype.itemsize

# Method that starts by filling the smallest axis completely or calculates best partial fill
padded_buffer_shape = np.array(self.chunk_shape)
chunks_per_axis = np.ceil(maxshape / self.chunk_shape)
padded_buffer_shape = np.array(chunk_shape)
chunks_per_axis = np.ceil(maxshape / chunk_shape)
small_axis_fill_size = chunk_bytes * min(chunks_per_axis)
full_axes_used = np.zeros(shape=num_axes, dtype=bool)
if small_axis_fill_size <= target_buffer_bytes:
buffer_bytes = small_axis_fill_size
padded_buffer_shape[smallest_chunk_axis] = self.maxshape[smallest_chunk_axis]
padded_buffer_shape[smallest_chunk_axis] = maxshape[smallest_chunk_axis]
full_axes_used[smallest_chunk_axis] = True
for axis, chunks_on_axis in enumerate(chunks_per_axis):
if full_axes_used[axis]: # If the smallest axis, skip since already used
continue
if chunks_on_axis * buffer_bytes <= target_buffer_bytes: # If multiple axes can be used together
buffer_bytes *= chunks_on_axis
padded_buffer_shape[axis] = self.maxshape[axis]
padded_buffer_shape[axis] = maxshape[axis]
else: # Found an axis that is too large to use with the rest of the buffer; calculate how much can be used
k3 = np.floor(target_buffer_bytes / buffer_bytes)
padded_buffer_shape[axis] *= k3
break
padded_buffer_bytes = np.prod(padded_buffer_shape) * self.dtype.itemsize
padded_buffer_bytes = math.prod(padded_buffer_shape) * dtype.itemsize

if padded_buffer_bytes >= unpadded_buffer_bytes:
return tuple(padded_buffer_shape)
Expand Down
3 changes: 2 additions & 1 deletion src/neuroconv/tools/nwb_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from ._dataset_configuration import get_default_backend_configuration
from ._metadata_and_file_helpers import (
add_device_from_metadata,
get_default_nwbfile_metadata,
get_module,
make_nwbfile_from_metadata,
make_or_load_nwbfile,
)
from ._models._base_models import DatasetConfiguration, DatasetInfo
from ._models._base_models import DatasetInfo
from ._models._hdf5_models import (
AVAILABLE_HDF5_COMPRESSION_METHODS,
HDF5BackendConfiguration,
Expand Down
197 changes: 197 additions & 0 deletions src/neuroconv/tools/nwb_helpers/_dataset_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""Collection of helper functions related to configuration of datasets dependent on backend."""
from typing import Iterable, Literal, Union

import h5py
import numpy as np
import zarr
from hdmf import Container
from hdmf.data_utils import DataChunkIterator, DataIO, GenericDataChunkIterator
from hdmf.utils import get_data_shape
from hdmf_zarr import NWBZarrIO
from pynwb import NWBHDF5IO, NWBFile, TimeSeries
from pynwb.base import DynamicTable

from ._dataset_and_backend_models import (
BACKEND_TO_CONFIGURATION,
BACKEND_TO_DATASET_CONFIGURATION,
DatasetConfiguration,
DatasetInfo,
HDF5BackendConfiguration,
HDF5DatasetConfiguration,
ZarrBackendConfiguration,
ZarrDatasetConfiguration,
)
from ..hdmf import SliceableDataChunkIterator


def _get_mode(io: Union[NWBHDF5IO, NWBZarrIO]) -> str:
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
"""NWBHDF5IO and NWBZarrIO have different ways of storing the mode they used on a path."""
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(io, NWBHDF5IO):
return io.mode
elif isinstance(io, NWBZarrIO):
return io._ZarrIO__mode
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved


def _is_value_already_written_to_file(
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
candidate_dataset: Union[h5py.Dataset, zarr.Array],
backend: Literal["hdf5", "zarr"],
existing_file: Union[h5py.File, zarr.Group, None],
) -> bool:
"""
Determine if the neurodata object is already written to the file on disk.

This object should then be skipped by the `get_io_datasets` function when working in append mode.
"""
return (
isinstance(candidate_dataset, h5py.Dataset) # If the source data is an HDF5 Dataset
and backend == "hdf5" # If working in append mode
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
and candidate_dataset.file == existing_file # If the source HDF5 Dataset is the appending NWBFile
) or (
isinstance(candidate_dataset, zarr.Array) # If the source data is an Zarr Array
and backend == "zarr" # If working in append mode
and candidate_dataset.store == existing_file # If the source Zarr 'file' is the appending NWBFile
)


def _parse_location_in_memory_nwbfile(current_location: str, neurodata_object: Container) -> str:
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
parent = neurodata_object.parent
if isinstance(parent, NWBFile):
# Items in defined top-level places like acquisition, intervals, etc. do not act as 'containers'
# in the .parent sense; ask if object is in their in-memory dictionaries instead
h-mayorquin marked this conversation as resolved.
Show resolved Hide resolved
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
for outer_field_name, outer_field_value in parent.fields.items():
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(outer_field_value, dict) and neurodata_object.name in outer_field_value:
return outer_field_name + "/" + neurodata_object.name + "/" + current_location
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:
When do we get tot this return here? That is,when does the for loop does not find anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this was resolved? Does it make sense now? Or do I still need to try to remember why it's there?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remembering would great. But I would like to test this myself at some point anyway and write some test in isolatio for this method.

But I think this is out of the scope of the PR as the goal is not to make me understand every piece of code or corner case but the new fuctionality : )

return neurodata_object.name + "/" + current_location
return _parse_location_in_memory_nwbfile(
current_location=neurodata_object.name + "/" + current_location, neurodata_object=parent
)
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved


def _get_dataset_metadata(
h-mayorquin marked this conversation as resolved.
Show resolved Hide resolved
neurodata_object: Union[TimeSeries, DynamicTable], field_name: str, backend: Literal["hdf5", "zarr"]
) -> Union[HDF5DatasetConfiguration, ZarrDatasetConfiguration]:
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
"""Fill in the Dataset model with as many values as can be automatically detected or inferred."""
DatasetConfigurationClass = BACKEND_TO_DATASET_CONFIGURATION[backend]

candidate_dataset = getattr(neurodata_object, field_name)
# For now, skip over datasets already wrapped in DataIO
# Could maybe eventually support modifying chunks in place
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
# But setting buffer shape only possible if iterator was wrapped first
if not isinstance(candidate_dataset, DataIO):
# DataChunkIterator has best generic dtype inference, though logic is hard to peel out of it
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
# And it can fail in rare cases but not essential to our default configuration
try:
dtype = str(DataChunkIterator(candidate_dataset).dtype) # string cast to be JSON friendly
except Exception as exception:
if str(exception) != "Data type could not be determined. Please specify dtype in DataChunkIterator init.":
raise exception
else:
dtype = "unknown"

maxshape = get_data_shape(data=candidate_dataset)

if isinstance(candidate_dataset, GenericDataChunkIterator):
chunk_shape = candidate_dataset.chunk_shape
buffer_shape = candidate_dataset.buffer_shape
elif dtype != "unknown":
# TODO: eventually replace this with staticmethods on hdmf.data_utils.GenericDataChunkIterator
chunk_shape = SliceableDataChunkIterator.estimate_default_chunk_shape(
chunk_mb=10.0, maxshape=maxshape, dtype=np.dtype(dtype)
)
buffer_shape = SliceableDataChunkIterator.estimate_default_buffer_shape(
buffer_gb=0.5, chunk_shape=chunk_shape, maxshape=maxshape, dtype=np.dtype(dtype)
)
else:
pass # TODO: think on this; perhaps zarr's standalone estimator?

dataset_info = DatasetInfo(
object_id=neurodata_object.object_id,
object_name=neurodata_object.name,
location=_parse_location_in_memory_nwbfile(current_location=field_name, neurodata_object=neurodata_object),
field=field_name,
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
maxshape=maxshape,
dtype=dtype,
)
dataset_configuration = DatasetConfigurationClass(
dataset_info=dataset_info, chunk_shape=chunk_shape, buffer_shape=buffer_shape
)
return dataset_configuration


def get_default_dataset_configurations(
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
nwbfile: NWBFile,
backend: Union[None, Literal["hdf5", "zarr"]] = None, # None for auto-detect from append mode, otherwise required
) -> Iterable[DatasetConfiguration]:
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
"""
Method for automatically detecting all objects in the file that could be wrapped in a DataIO.

Parameters
----------
nwbfile : pynwb.NWBFile
An in-memory NWBFile object, either generated from the base class or read from an existing file of any backend.
backend : "hdf5" or "zarr"
Which backend format type you would like to use in configuring each datasets compression methods and options.

Yields
------
DatasetConfiguration
A summary of each detected object that can be wrapped in a DataIO.
"""
if backend is None and nwbfile.read_io is None:
raise ValueError(
"Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` was not "
"read from an existing file!"
)
if backend is None and nwbfile.read_io is not None and nwbfile.read_io.mode not in ("r+", "a"):
raise ValueError(
"Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` is being appended."
)

detected_backend = None
existing_file = None
if isinstance(nwbfile.read_io, NWBHDF5IO) and _get_mode(io=nwbfile.read_io) in ("r+", "a"):
detected_backend = "hdf5"
existing_file = nwbfile.read_io._file
elif isinstance(nwbfile.read_io, NWBZarrIO) and _get_mode(io=nwbfile.read_io) in ("r+", "a"):
detected_backend = "zarr"
existing_file = nwbfile.read_io.file.store
backend = backend or detected_backend

if detected_backend is not None and detected_backend != backend:
raise ValueError(
f"Detected backend '{detected_backend}' for appending file, but specified `backend` "
f"({backend}) does not match! Set `backend=None` or remove the keyword argument to allow it to auto-detect."
)

for neurodata_object in nwbfile.objects.values():
if isinstance(neurodata_object, TimeSeries):
time_series = neurodata_object # for readability

for field_name in ("data", "timestamps"):
if field_name not in time_series.fields: # timestamps is optional
continue

candidate_dataset = getattr(time_series, field_name)
if _is_value_already_written_to_file(
candidate_dataset=candidate_dataset, backend=backend, existing_file=existing_file
):
continue # skip

# Edge case of in-memory ImageSeries with external mode; data is in fields and is empty array
if isinstance(candidate_dataset, np.ndarray) and not np.any(candidate_dataset):
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
continue # skip

yield _get_dataset_metadata(neurodata_object=time_series, field_name=field_name, backend=backend)
elif isinstance(neurodata_object, DynamicTable):
dynamic_table = neurodata_object # for readability

for column_name in dynamic_table.colnames:
candidate_dataset = dynamic_table[column_name].data # VectorData object
if _is_value_already_written_to_file(
candidate_dataset=candidate_dataset, backend=backend, existing_file=existing_file
):
continue # skip

yield _get_dataset_metadata(
neurodata_object=dynamic_table[column_name], field_name="data", backend=backend
)
Loading
Loading