From 7972da22b3b298ecfb9e24c013c10e178685131b Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Sat, 29 Jul 2023 20:16:24 +0000 Subject: [PATCH 01/11] adding WIP --- pyproject.toml | 1 + src/hdmf/backends/io.py | 4 +++ src/hdmf/common/hdmf-common-schema | 2 +- src/hdmf/data_utils.py | 52 ++++++++++++++++++------------ 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 672778849..39bf63aa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "ruamel.yaml>=0.16", "scipy>=1.4", "importlib-resources; python_version < '3.9'", # TODO: remove when minimum python version is 3.9 + "threadpoolct" ] dynamic = ["version"] diff --git a/src/hdmf/backends/io.py b/src/hdmf/backends/io.py index 39ecbdba7..a897d4408 100644 --- a/src/hdmf/backends/io.py +++ b/src/hdmf/backends/io.py @@ -75,6 +75,10 @@ def read(self, **kwargs): return container @docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'}, + {'name': 'number_of_jobs', 'type': int, 'doc': 'Number of jobs to use in parallel during write (only operates on GenericDataChunkIterator-wrapped datasets).', 'default': 1}, + {'name': 'number_of_jobs', 'type': int, + 'doc': "Number of jobs to use in parallel during write (only operates on GenericDataChunkIterator-wrapped datasets).", + 'default': 1}, allow_extra=True) def write(self, **kwargs): """Write a container to the IO source.""" diff --git a/src/hdmf/common/hdmf-common-schema b/src/hdmf/common/hdmf-common-schema index 80efce315..b3e48fcc5 160000 --- a/src/hdmf/common/hdmf-common-schema +++ b/src/hdmf/common/hdmf-common-schema @@ -1 +1 @@ -Subproject commit 80efce315fcd6c198c512ba526e763f81b535d36 +Subproject commit b3e48fcc5fff10dce0585d57b84cfed5816089a3 diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index dfe552e8c..7df7f68cc 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -3,7 +3,7 @@ from abc import ABCMeta, abstractmethod from collections.abc import Iterable from warnings import warn -from typing import Tuple +from typing import Tuple, Callable from itertools import product, chain import h5py @@ -191,9 +191,10 @@ def __init__(self, **kwargs): See https://support.hdfgroup.org/HDF5/doc/TechNotes/TechNote-HDF5-ImprovingIOPerformanceCompressedDatasets.pdf for more details. """ - buffer_gb, buffer_shape, chunk_mb, chunk_shape, self.display_progress, self.progress_bar_options = getargs( + buffer_gb, buffer_shape, chunk_mb, chunk_shape, self.display_progress, progress_bar_options = getargs( "buffer_gb", "buffer_shape", "chunk_mb", "chunk_shape", "display_progress", "progress_bar_options", kwargs ) + self.progress_bar_options = progress_bar_options or dict() if buffer_gb is None and buffer_shape is None: buffer_gb = 1.0 @@ -241,7 +242,7 @@ def __init__(self, **kwargs): for buffer_axis, maxshape_axis in zip(self.buffer_shape, self.maxshape) ], ) - self.buffer_selection_generator = ( + self.buffer_selections = tuple( # temporary solution to cast as tuple to allow indexing tuple( [ slice(lower_bound, upper_bound) @@ -263,11 +264,9 @@ def __init__(self, **kwargs): ), ) ) + self.buffer_selection_generator = iter(self.buffer_selections) if self.display_progress: - if self.progress_bar_options is None: - self.progress_bar_options = dict() - try: from tqdm import tqdm @@ -346,12 +345,6 @@ def _get_default_buffer_shape(self, **kwargs) -> Tuple[int, ...]: ] ) - def recommended_chunk_shape(self) -> Tuple[int, ...]: - return self.chunk_shape - - def recommended_data_shape(self) -> Tuple[int, ...]: - return self.maxshape - def __iter__(self): return self @@ -372,6 +365,11 @@ def __next__(self): self.progress_bar.write("\n") # Allows text to be written to new lines after completion raise StopIteration + def __reduce__(self) -> Tuple[Callable, Iterable]: + instance_constructor = self._from_dict + initialization_args = (self._to_dict(),) + return (instance_constructor, initialization_args) + @abstractmethod def _get_data(self, selection: Tuple[slice]) -> np.ndarray: """ @@ -392,24 +390,38 @@ def _get_data(self, selection: Tuple[slice]) -> np.ndarray: """ raise NotImplementedError("The data fetching method has not been built for this DataChunkIterator!") - @property - def maxshape(self) -> Tuple[int, ...]: - return self._maxshape - @abstractmethod def _get_maxshape(self) -> Tuple[int, ...]: """Retrieve the maximum bounds of the data shape using minimal I/O.""" raise NotImplementedError("The setter for the maxshape property has not been built for this DataChunkIterator!") - @property - def dtype(self) -> np.dtype: - return self._dtype - @abstractmethod def _get_dtype(self) -> np.dtype: """Retrieve the dtype of the data using minimal I/O.""" raise NotImplementedError("The setter for the internal dtype has not been built for this DataChunkIterator!") + def _to_dict(self) -> Iterable: + """Optional method to add in child classes to enable pickling (required for multiprocessing).""" + pass + + @staticmethod + def _from_dict(self) -> Callable: + """Optional method to add in child classes to enable pickling (required for multiprocessing).""" + pass + + def recommended_chunk_shape(self) -> Tuple[int, ...]: + return self.chunk_shape + + def recommended_data_shape(self) -> Tuple[int, ...]: + return self.maxshape + + @property + def maxshape(self) -> Tuple[int, ...]: + return self._maxshape + @property + def dtype(self) -> np.dtype: + return self._dtype + class DataChunkIterator(AbstractDataChunkIterator): """ From a36384e740b70b888868b2b6ebb01bb412875533 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Sun, 30 Jul 2023 20:49:57 +0000 Subject: [PATCH 02/11] add NotImplementedErrors; change typing --- src/hdmf/data_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index 7df7f68cc..dbbddcce4 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -400,14 +400,14 @@ def _get_dtype(self) -> np.dtype: """Retrieve the dtype of the data using minimal I/O.""" raise NotImplementedError("The setter for the internal dtype has not been built for this DataChunkIterator!") - def _to_dict(self) -> Iterable: + def _to_dict(self) -> dict: """Optional method to add in child classes to enable pickling (required for multiprocessing).""" - pass + raise NotImplementedError("The `._to_dict()` method for pickling has not been defined for this DataChunkIterator!") @staticmethod def _from_dict(self) -> Callable: """Optional method to add in child classes to enable pickling (required for multiprocessing).""" - pass + raise NotImplementedError("The `._from_dict()` method for pickling has not been defined for this DataChunkIterator!") def recommended_chunk_shape(self) -> Tuple[int, ...]: return self.chunk_shape From 3283045cf0d48f4cb3fbe1b9659de1e6f4fd84f9 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 15:33:45 +0000 Subject: [PATCH 03/11] remove duplicate --- src/hdmf/backends/io.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/hdmf/backends/io.py b/src/hdmf/backends/io.py index a897d4408..256ec7bc1 100644 --- a/src/hdmf/backends/io.py +++ b/src/hdmf/backends/io.py @@ -75,10 +75,15 @@ def read(self, **kwargs): return container @docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'}, - {'name': 'number_of_jobs', 'type': int, 'doc': 'Number of jobs to use in parallel during write (only operates on GenericDataChunkIterator-wrapped datasets).', 'default': 1}, - {'name': 'number_of_jobs', 'type': int, - 'doc': "Number of jobs to use in parallel during write (only operates on GenericDataChunkIterator-wrapped datasets).", - 'default': 1}, + { + "name": "number_of_jobs", + "type": int, + "doc": ( + "Number of jobs to use in parallel during write " + "(only works with GenericDataChunkIterator-wrapped datasets)." + ), + "default": 1, + }, allow_extra=True) def write(self, **kwargs): """Write a container to the IO source.""" From 4a397817c3c73fe7eb790d5341745ea6562fc2b6 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 15:36:21 +0000 Subject: [PATCH 04/11] revert schema version pin --- src/hdmf/common/hdmf-common-schema | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hdmf/common/hdmf-common-schema b/src/hdmf/common/hdmf-common-schema index b3e48fcc5..80efce315 160000 --- a/src/hdmf/common/hdmf-common-schema +++ b/src/hdmf/common/hdmf-common-schema @@ -1 +1 @@ -Subproject commit b3e48fcc5fff10dce0585d57b84cfed5816089a3 +Subproject commit 80efce315fcd6c198c512ba526e763f81b535d36 From 1a4193ec2980b766adde3d4da661144fcd1ba4b7 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Fri, 4 Aug 2023 11:40:03 -0400 Subject: [PATCH 05/11] remove threadpoolctl --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 39bf63aa4..672778849 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,6 @@ dependencies = [ "ruamel.yaml>=0.16", "scipy>=1.4", "importlib-resources; python_version < '3.9'", # TODO: remove when minimum python version is 3.9 - "threadpoolct" ] dynamic = ["version"] From 631d89dd5f8db4daf467b51c80c9bba57bd00181 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 18:12:02 +0000 Subject: [PATCH 06/11] better annotation typing; restore original generator; remove argument exposure --- src/hdmf/backends/io.py | 12 +----------- src/hdmf/data_utils.py | 5 ++--- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/hdmf/backends/io.py b/src/hdmf/backends/io.py index 256ec7bc1..de9de72a7 100644 --- a/src/hdmf/backends/io.py +++ b/src/hdmf/backends/io.py @@ -74,17 +74,7 @@ def read(self, **kwargs): return container - @docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'}, - { - "name": "number_of_jobs", - "type": int, - "doc": ( - "Number of jobs to use in parallel during write " - "(only works with GenericDataChunkIterator-wrapped datasets)." - ), - "default": 1, - }, - allow_extra=True) + @docval({'name': 'container', 'type': Container, 'doc': 'the Container object to write'}, allow_extra=True) def write(self, **kwargs): """Write a container to the IO source.""" container = popargs('container', kwargs) diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index dbbddcce4..81b4586c2 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -242,7 +242,7 @@ def __init__(self, **kwargs): for buffer_axis, maxshape_axis in zip(self.buffer_shape, self.maxshape) ], ) - self.buffer_selections = tuple( # temporary solution to cast as tuple to allow indexing + self.buffer_selection_generator = ( tuple( [ slice(lower_bound, upper_bound) @@ -264,7 +264,6 @@ def __init__(self, **kwargs): ), ) ) - self.buffer_selection_generator = iter(self.buffer_selections) if self.display_progress: try: @@ -365,7 +364,7 @@ def __next__(self): self.progress_bar.write("\n") # Allows text to be written to new lines after completion raise StopIteration - def __reduce__(self) -> Tuple[Callable, Iterable]: + def __reduce__(self) -> Tuple[Callable, Iterable[dict]]: instance_constructor = self._from_dict initialization_args = (self._to_dict(),) return (instance_constructor, initialization_args) From 268a5c8681c73b5dd6311c7b5424cd02ec03e1b1 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 19:23:08 +0000 Subject: [PATCH 07/11] linting line length and added assertion tests --- src/hdmf/data_utils.py | 9 +++++-- .../test_core_GenericDataChunkIterator.py | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index 81b4586c2..35acaff7b 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -272,6 +272,7 @@ def __init__(self, **kwargs): if "total" in self.progress_bar_options: warn("Option 'total' in 'progress_bar_options' is not allowed to be over-written! Ignoring.") self.progress_bar_options.pop("total") + self.progress_bar = tqdm(total=self.num_buffers, **self.progress_bar_options) except ImportError: warn( @@ -401,12 +402,16 @@ def _get_dtype(self) -> np.dtype: def _to_dict(self) -> dict: """Optional method to add in child classes to enable pickling (required for multiprocessing).""" - raise NotImplementedError("The `._to_dict()` method for pickling has not been defined for this DataChunkIterator!") + raise NotImplementedError( + "The `._to_dict()` method for pickling has not been defined for this DataChunkIterator!" + ) @staticmethod def _from_dict(self) -> Callable: """Optional method to add in child classes to enable pickling (required for multiprocessing).""" - raise NotImplementedError("The `._from_dict()` method for pickling has not been defined for this DataChunkIterator!") + raise NotImplementedError( + "The `._from_dict()` method for pickling has not been defined for this DataChunkIterator!" + ) def recommended_chunk_shape(self) -> Tuple[int, ...]: return self.chunk_shape diff --git a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py index 7df2eac39..f437afcba 100644 --- a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py +++ b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py @@ -1,4 +1,5 @@ import unittest +import pickle import numpy as np from pathlib import Path from tempfile import mkdtemp @@ -204,6 +205,29 @@ def test_progress_bar_assertion(self): progress_bar_options=dict(total=5), ) + def test_private_to_dict_assertion(self): + with self.assertRaisesWith( + exc_type=NotImplementedError, + exc_msg="The `._to_dict()` method for pickling has not been defined for this DataChunkIterator!" + ): + iterator = self.TestNumpyArrayDataChunkIterator(array=self.test_array) + _ = iterator._to_dict() + + def test_private_from_dict_assertion(self): + with self.assertRaisesWith( + exc_type=NotImplementedError, + exc_msg="The `._from_dict()` method for pickling has not been defined for this DataChunkIterator!" + ): + _ = self.TestNumpyArrayDataChunkIterator._from_dict(dict()) + + def test_direct_pickle_assertion(self): + with self.assertRaisesWith( + exc_type=NotImplementedError, + exc_msg="The `._to_dict()` method for pickling has not been defined for this DataChunkIterator!" + ): + iterator = self.TestNumpyArrayDataChunkIterator(array=self.test_array) + _ = pickle.dumps(iterator) + def test_maxshape_attribute_contains_int_type(self): """Motivated by issues described in https://github.com/hdmf-dev/hdmf/pull/780 & 781 regarding return types.""" self.check_all_of_iterable_is_python_int( From 6c2c138086a3513307f291a104d707f56111e283 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Fri, 4 Aug 2023 15:26:10 -0400 Subject: [PATCH 08/11] Update CHANGELOG.md --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index faa2a2a31..83c9cb09c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # HDMF Changelog +## HDMF 3.8.2 (Upcoming) + +### New features and minor improvements +- Added the magic `__reduce__` method as well as two private semi-abstract helper methods to enable pickling of the `GenericDataChunkIterator`. @codycbakerphd [#924](https://github.com/hdmf-dev/hdmf/pull/924) + ## HDMF 3.8.1 (July 25, 2023) ### Bug fixes From a9bbfe893660481b61a54a89a72e8f860f2af2d2 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 19:36:50 +0000 Subject: [PATCH 09/11] enable pre-commit and trim whitespace --- tests/unit/utils_test/test_core_GenericDataChunkIterator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py index f437afcba..5ae3d97e2 100644 --- a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py +++ b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py @@ -227,7 +227,7 @@ def test_direct_pickle_assertion(self): ): iterator = self.TestNumpyArrayDataChunkIterator(array=self.test_array) _ = pickle.dumps(iterator) - + def test_maxshape_attribute_contains_int_type(self): """Motivated by issues described in https://github.com/hdmf-dev/hdmf/pull/780 & 781 regarding return types.""" self.check_all_of_iterable_is_python_int( From 4b8cbd1d5be0068547151a357ce2ebfaa91a255c Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 4 Aug 2023 21:46:01 +0000 Subject: [PATCH 10/11] roll back the typing a bit --- src/hdmf/data_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index 35acaff7b..920e8a390 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -365,7 +365,7 @@ def __next__(self): self.progress_bar.write("\n") # Allows text to be written to new lines after completion raise StopIteration - def __reduce__(self) -> Tuple[Callable, Iterable[dict]]: + def __reduce__(self) -> Tuple[Callable, Iterable]: instance_constructor = self._from_dict initialization_args = (self._to_dict(),) return (instance_constructor, initialization_args) From 131fe32d3c9c59b078facf616315e0cb357c0353 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 7 Aug 2023 18:58:34 +0000 Subject: [PATCH 11/11] add test of pickling --- .../test_core_GenericDataChunkIterator.py | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py index 5ae3d97e2..a490da2c8 100644 --- a/tests/unit/utils_test/test_core_GenericDataChunkIterator.py +++ b/tests/unit/utils_test/test_core_GenericDataChunkIterator.py @@ -4,10 +4,11 @@ from pathlib import Path from tempfile import mkdtemp from shutil import rmtree -from typing import Tuple, Iterable +from typing import Tuple, Iterable, Callable from sys import version_info import h5py +from numpy.testing import assert_array_equal from hdmf.data_utils import GenericDataChunkIterator from hdmf.testing import TestCase @@ -19,6 +20,30 @@ TQDM_INSTALLED = False +class TestPickleableNumpyArrayDataChunkIterator(GenericDataChunkIterator): + def __init__(self, array: np.ndarray, **kwargs): + self.array = array + self._kwargs = kwargs + super().__init__(**kwargs) + + def _get_data(self, selection) -> np.ndarray: + return self.array[selection] + + def _get_maxshape(self) -> Tuple[int, ...]: + return self.array.shape + + def _get_dtype(self) -> np.dtype: + return self.array.dtype + + def _to_dict(self) -> dict: + return dict(array=pickle.dumps(self.array), kwargs=self._kwargs) + + @staticmethod + def _from_dict(dictionary: dict) -> Callable: + array = pickle.loads(dictionary["array"]) + return TestPickleableNumpyArrayDataChunkIterator(array=array, **dictionary["kwargs"]) + + class GenericDataChunkIteratorTests(TestCase): class TestNumpyArrayDataChunkIterator(GenericDataChunkIterator): def __init__(self, array: np.ndarray, **kwargs): @@ -401,3 +426,12 @@ def test_tqdm_not_installed(self): display_progress=True, ) self.assertFalse(dci.display_progress) + + def test_pickle(self): + pre_dump_iterator = TestPickleableNumpyArrayDataChunkIterator(array=self.test_array) + post_dump_iterator = pickle.loads(pickle.dumps(pre_dump_iterator)) + + assert isinstance(post_dump_iterator, TestPickleableNumpyArrayDataChunkIterator) + assert post_dump_iterator.chunk_shape == pre_dump_iterator.chunk_shape + assert post_dump_iterator.buffer_shape == pre_dump_iterator.buffer_shape + assert_array_equal(post_dump_iterator.array, pre_dump_iterator.array)