From b47b0d10406e5b655148b11ce5bbd9b5cd072775 Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Fri, 7 Jun 2024 16:52:25 -0400 Subject: [PATCH] Fix the update method and fix the dask lock (need test) --- setup.py | 2 +- tensordb/algorithms.py | 1 - tensordb/storages/lock.py | 8 ++++++++ tensordb/storages/zarr_storage.py | 7 +++++-- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index cfe01c7..1fea90b 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='TensorDB', - version='0.30.3', + version='0.30.4', description='Database based in a file system storage combined with Xarray and Zarr', author='Joseph Nowak', author_email='josephgonowak97@gmail.com', diff --git a/tensordb/algorithms.py b/tensordb/algorithms.py index fc597a9..9bf5612 100644 --- a/tensordb/algorithms.py +++ b/tensordb/algorithms.py @@ -767,7 +767,6 @@ def rolling_overlap( data = new_data.data def _apply_on_valid(x): - x = x.copy() bitmask = ~np.isnan(x) filter_x = x[bitmask] diff --git a/tensordb/storages/lock.py b/tensordb/storages/lock.py index 247872b..6bfc0f3 100644 --- a/tensordb/storages/lock.py +++ b/tensordb/storages/lock.py @@ -2,6 +2,8 @@ import os.path from typing import Type +import dask.distributed + class BaseLock(abc.ABC): def __enter__(self): @@ -30,3 +32,9 @@ def __init__(self, prefix: str, lock: Type[BaseLock] = None): def __getitem__(self, path): path = os.path.join(self.prefix, path) return self.lock(path) + + +class DaskLock(PrefixLock): + def __getitem__(self, path): + path = os.path.join(self.prefix, path) + return dask.distributed.Lock(path) diff --git a/tensordb/storages/zarr_storage.py b/tensordb/storages/zarr_storage.py index 0c3f11d..910f611 100644 --- a/tensordb/storages/zarr_storage.py +++ b/tensordb/storages/zarr_storage.py @@ -404,6 +404,8 @@ def update( compute=compute, synchronizer=self.synchronizer, region=regions, + # This option is save based on this https://github.com/pydata/xarray/issues/9072 + safe_chunks=False ) return delayed_write @@ -473,13 +475,14 @@ def read(self) -> Union[xr.DataArray, xr.Dataset]: with some names or a name """ try: - arr = xr.open_zarr( + dataset = xr.open_zarr( self.base_map, consolidated=True, synchronizer=None if self.synchronize_only_write else self.synchronizer, group=self.group, ) - return arr[self.data_names] + dataset = dataset[self.data_names] + return dataset except KeyError as e: raise KeyError( f"The data_names {self.data_names} does not exist on the tensor "