From 7254ca2a8bc2bbacf13406e5f9a903d48e07f587 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 15:26:24 +0100 Subject: [PATCH 01/22] Use pyarrow for save/load/dtypes in Dependencies --- audb/core/define.py | 20 ++++++------- audb/core/dependencies.py | 59 ++++++++++++++++++++++++++++---------- tests/test_dependencies.py | 4 +++ 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/audb/core/define.py b/audb/core/define.py index 0adf92f4..3a617ba1 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -48,16 +48,16 @@ class DependField: } DEPEND_FIELD_DTYPES = { - DependField.ARCHIVE: "string", - DependField.BIT_DEPTH: "int32", - DependField.CHANNELS: "int32", - DependField.CHECKSUM: "string", - DependField.DURATION: "float64", - DependField.FORMAT: "string", - DependField.REMOVED: "int32", - DependField.SAMPLING_RATE: "int32", - DependField.TYPE: "int32", - DependField.VERSION: "string", + DependField.ARCHIVE: "string[pyarrow]", + DependField.BIT_DEPTH: "int32[pyarrow]", + DependField.CHANNELS: "int32[pyarrow]", + DependField.CHECKSUM: "string[pyarrow]", + DependField.DURATION: "float64[pyarrow]", + DependField.FORMAT: "string[pyarrow]", + DependField.REMOVED: "int32[pyarrow]", + DependField.SAMPLING_RATE: "int32[pyarrow]", + DependField.TYPE: "int32[pyarrow]", + DependField.VERSION: "string[pyarrow]", } DEPEND_INDEX_DTYPE = "object" diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 64a323e5..5b05a26d 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -4,6 +4,8 @@ import typing import pandas as pd +import pyarrow as pa +import pyarrow.csv as csv import audeer @@ -59,6 +61,23 @@ def __init__(self): ): data[name] = pd.Series(dtype=dtype) self._df = pd.DataFrame(data) + # pyarrow schema + # used for reading and writing files + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -309,23 +328,21 @@ def load(self, path: str): if extension == "pkl": self._df = pd.read_pickle(path) elif extension == "csv": - # Data type of dependency columns - dtype_mapping = { - name: dtype - for name, dtype in zip( - define.DEPEND_FIELD_NAMES.values(), - define.DEPEND_FIELD_DTYPES.values(), - ) - } - # Data type of index - index = 0 - self._df = pd.read_csv( + table = csv.read_csv( path, - index_col=index, - na_filter=False, - dtype=dtype_mapping, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), ) + self._df = table.to_pandas( + deduplicate_objects=False, + types_mapper=pd.ArrowDtype, # use pyarrow dtypes + ) + self._df.set_index("file", inplace=True) self._df.index.name = None + # Set dtype of index for both CSV and PKL # to make backward compatiple # with old pickle files in cache @@ -372,7 +389,19 @@ def save(self, path: str): """ path = audeer.path(path) if path.endswith("csv"): - self._df.to_csv(path) + table = pa.Table.from_pandas( + self._df.reset_index().rename(columns={"index": "file"}), + preserve_index=False, + schema=self._schema, + ) + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) + csv.write_csv( + table, + path, + write_options=csv.WriteOptions(quoting_style="none"), + ) elif path.endswith("pkl"): self._df.to_pickle( path, diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 91b3830c..19013907 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -219,6 +219,10 @@ def test_load_save(deps): deps.save(deps_file) deps2 = audb.Dependencies() deps2.load(deps_file) + print(f"{deps._df=}") + print(f"{deps2._df=}") + print(f"{deps._df.archive.dtype=}") + print(f"{deps2._df.archive.dtype=}") pd.testing.assert_frame_equal(deps(), deps2()) os.remove(deps_file) # Expected dtypes From 921f8d38553a8875fa9808f832d9b074b669b0aa Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 16:44:34 +0100 Subject: [PATCH 02/22] Fix dtype mapping --- audb/core/dependencies.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 5b05a26d..3895f7dc 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -338,7 +338,15 @@ def load(self, path: str): ) self._df = table.to_pandas( deduplicate_objects=False, - types_mapper=pd.ArrowDtype, # use pyarrow dtypes + # Convert to pyarrow dtypes, + # but ensure we use pd.StringDtype("pyarrow") + # and not pd.ArrowDtype(pa.string()) + # see https://pandas.pydata.org/docs/user_guide/pyarrow.html + types_mapper={ + pa.string(): pd.StringDtype("pyarrow"), + pa.int32(): pd.ArrowDtype(pa.int32()), + pa.float64(): pd.ArrowDtype(pa.float64()), + }.get, # we have to provide a callable, not a dict ) self._df.set_index("file", inplace=True) self._df.index.name = None From d95848eae50ac832469f505130a9d14b7cd43991 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 16:50:27 +0100 Subject: [PATCH 03/22] Fix expected str representation output --- tests/test_dependencies.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 19013907..91b3830c 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -219,10 +219,6 @@ def test_load_save(deps): deps.save(deps_file) deps2 = audb.Dependencies() deps2.load(deps_file) - print(f"{deps._df=}") - print(f"{deps2._df=}") - print(f"{deps._df.archive.dtype=}") - print(f"{deps2._df.archive.dtype=}") pd.testing.assert_frame_equal(deps(), deps2()) os.remove(deps_file) # Expected dtypes From dfd0fe9a933b560457743b35a5a99e2df11cdc8c Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 16:55:51 +0100 Subject: [PATCH 04/22] Add pyarrow as dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9375f70e..3aee5967 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ 'audiofile >=1.0.0', 'audobject >=0.5.0', 'audresample >=0.1.6', + 'pyarrow', 'filelock', 'oyaml', ] From 07e1bfc8a1f0d974db0d926c3ac34015a5b7a4fe Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 17:27:17 +0100 Subject: [PATCH 05/22] Add parquet format to save()/load() --- audb/core/dependencies.py | 90 ++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 3895f7dc..ee951708 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -6,6 +6,7 @@ import pandas as pd import pyarrow as pa import pyarrow.csv as csv +import pyarrow.parquet as parquet import audeer @@ -304,19 +305,22 @@ def load(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``pkl`` + File extension can be ``csv`` + ``pkl``, + or ``parquet`` Raises: - ValueError: if file extension is not ``csv`` or ``pkl`` + ValueError: if file extension is not one of + ``csv``, ``pkl``, ``parquet`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "pkl"]: + if extension not in ["csv", "pkl", "parquet"]: raise ValueError( - f"File extension of 'path' has to be 'csv' or 'pkl' " + f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " f"not '{extension}'" ) if not os.path.exists(path): @@ -327,6 +331,7 @@ def load(self, path: str): ) if extension == "pkl": self._df = pd.read_pickle(path) + elif extension == "csv": table = csv.read_csv( path, @@ -336,20 +341,11 @@ def load(self, path: str): ), convert_options=csv.ConvertOptions(column_types=self._schema), ) - self._df = table.to_pandas( - deduplicate_objects=False, - # Convert to pyarrow dtypes, - # but ensure we use pd.StringDtype("pyarrow") - # and not pd.ArrowDtype(pa.string()) - # see https://pandas.pydata.org/docs/user_guide/pyarrow.html - types_mapper={ - pa.string(): pd.StringDtype("pyarrow"), - pa.int32(): pd.ArrowDtype(pa.int32()), - pa.float64(): pd.ArrowDtype(pa.float64()), - }.get, # we have to provide a callable, not a dict - ) - self._df.set_index("file", inplace=True) - self._df.index.name = None + self._df = self._table_to_pandas(table) + + elif extension == "parquet": + table = parquet.read_table(path) + self._df = self._table_to_dataframe(table) # Set dtype of index for both CSV and PKL # to make backward compatiple @@ -397,14 +393,7 @@ def save(self, path: str): """ path = audeer.path(path) if path.endswith("csv"): - table = pa.Table.from_pandas( - self._df.reset_index().rename(columns={"index": "file"}), - preserve_index=False, - schema=self._schema, - ) - columns = table.column_names - columns = ["" if c == "file" else c for c in columns] - table = table.rename_columns(columns) + table = self._dataframe_to_table(self._df) csv.write_csv( table, path, @@ -415,6 +404,9 @@ def save(self, path: str): path, protocol=4, # supported by Python >= 3.4 ) + elif path.endswith("parquet"): + table = self._dataframe_to_table(self._df) + parquet.write_table(table, path) def type( self, @@ -564,6 +556,26 @@ def _column_loc( values = values.tolist() return values + def _dataframe_to_table(self, df: pd.DataFrame) -> pa.Table: + r"""Convert pandas dataframe to pyarrow table. + + Args: + df: dependency table as pandas dataframe + + Returns: + dependency table as pyarrow table + + """ + table = pa.Table.from_pandas( + df.reset_index().rename(columns={"index": "file"}), + preserve_index=False, + schema=self._schema, + ) + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) + return table + def _drop(self, files: typing.Sequence[str]): r"""Drop files from table. @@ -588,6 +600,32 @@ def _remove(self, file: str): """ self._df.at[file, "removed"] = 1 + def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame: + r"""Convert pyarrow table to pandas dataframe. + + Args: + table: dependency table as pyarrow table + + Returns: + dependency table as pandas dataframe + + """ + df = table.to_pandas( + deduplicate_objects=False, + # Convert to pyarrow dtypes, + # but ensure we use pd.StringDtype("pyarrow") + # and not pd.ArrowDtype(pa.string()) + # see https://pandas.pydata.org/docs/user_guide/pyarrow.html + types_mapper={ + pa.string(): pd.StringDtype("pyarrow"), + pa.int32(): pd.ArrowDtype(pa.int32()), + pa.float64(): pd.ArrowDtype(pa.float64()), + }.get, # we have to provide a callable, not a dict + ) + df.set_index("file", inplace=True) + df.index.name = None + return df + def _update_media( self, values: typing.Sequence[ From 418a6f8e745a0efcb649f0da921bd4c08edfc537 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 13 Feb 2024 17:40:12 +0100 Subject: [PATCH 06/22] Add tests for parquet files --- audb/core/dependencies.py | 21 +++++++++++++++------ tests/test_dependencies.py | 18 ++++++++++-------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index ee951708..0b4e2e5a 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -341,7 +341,7 @@ def load(self, path: str): ), convert_options=csv.ConvertOptions(column_types=self._schema), ) - self._df = self._table_to_pandas(table) + self._df = self._table_to_dataframe(table) elif extension == "parquet": table = parquet.read_table(path) @@ -405,7 +405,7 @@ def save(self, path: str): protocol=4, # supported by Python >= 3.4 ) elif path.endswith("parquet"): - table = self._dataframe_to_table(self._df) + table = self._dataframe_to_table(self._df, file_column=True) parquet.write_table(table, path) def type( @@ -556,11 +556,19 @@ def _column_loc( values = values.tolist() return values - def _dataframe_to_table(self, df: pd.DataFrame) -> pa.Table: + def _dataframe_to_table( + self, + df: pd.DataFrame, + *, + file_column: bool = False, + ) -> pa.Table: r"""Convert pandas dataframe to pyarrow table. Args: df: dependency table as pandas dataframe + file_column: if ``False`` + the ``"file"`` column + is renamed to ``""`` Returns: dependency table as pyarrow table @@ -571,9 +579,10 @@ def _dataframe_to_table(self, df: pd.DataFrame) -> pa.Table: preserve_index=False, schema=self._schema, ) - columns = table.column_names - columns = ["" if c == "file" else c for c in columns] - table = table.rename_columns(columns) + if not file_column: + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) return table def _drop(self, files: typing.Sequence[str]): diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 91b3830c..30369379 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,8 +1,8 @@ -import os - import pandas as pd import pytest +import audeer + import audb @@ -214,20 +214,22 @@ def test_file_bases_methods(deps, files, method, expected_dtype): assert isinstance(result, expected_dtype) -def test_load_save(deps): - deps_file = "deps.csv" +@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +def test_load_save(tmpdir, deps, file): + deps_file = audeer.path(tmpdir, file) deps.save(deps_file) deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - os.remove(deps_file) - # Expected dtypes assert list(deps2._df.dtypes) == list(audb.core.define.DEPEND_FIELD_DTYPES.values()) + + +def test_load_save_errors(deps): # Wrong extension or file missng with pytest.raises(ValueError, match=r".*'txt'.*"): - deps2.load("deps.txt") + deps.load("deps.txt") with pytest.raises(FileNotFoundError): - deps.load(deps_file) + deps.load("deps.csv") def test_len(deps): From 7ba0f8ae6201b7634e6d1da55c6badeae0fad89a Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 11:38:02 +0100 Subject: [PATCH 07/22] Fix docstring of Dependencies.save() --- audb/core/dependencies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 0b4e2e5a..033c43b4 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -388,7 +388,7 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``pkl`` + File extension can be ``csv``, ``pkl``, or ``parquet`` """ path = audeer.path(path) From 7817ab0d7fadecc1cfb36bafb356364e73e924cf Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 12:59:46 +0100 Subject: [PATCH 08/22] Publish dependency table as parquet file --- audb/core/api.py | 33 ++++++++++++++++++++--- audb/core/define.py | 3 ++- audb/core/dependencies.py | 3 +++ audb/core/load_to.py | 4 +-- audb/core/publish.py | 55 ++++++++++++++++++--------------------- tests/test_load.py | 16 ++++++------ tests/test_publish.py | 2 +- 7 files changed, 70 insertions(+), 46 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index c2cb4d7f..8f3d12a8 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -260,13 +260,23 @@ def dependencies( version, cache_root=cache_root, ) - deps_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) deps = Dependencies() with FolderLock(db_root): try: - deps.load(deps_path) + file_found = False + for deps_file in [ + define.DEPENDENCIES_FILE, + define.CACHED_DEPENDENCIES_FILE, + ]: + deps_path = os.path.join(db_root, deps_file) + if os.path.exists(deps_path): + deps.load(deps_path) + file_found = True + break + if not file_found: + raise FileNotFoundError except (AttributeError, FileNotFoundError, ValueError, EOFError): # If loading pickled cached file fails, load again from backend backend = utils.lookup_backend(name, version) @@ -278,8 +288,23 @@ def dependencies( version, verbose=verbose, ) - deps.load(os.path.join(tmp_root, define.DEPENDENCIES_FILE)) - deps.save(deps_path) + # Look first for legacy file, + # that would correspond to cached pickle file + legacy_deps_path = os.path.join( + tmp_root, define.LEGACY_DEPENDENCIES_FILE + ) + cached_deps_path = os.path.join( + db_root, define.CACHED_DEPENDENCIES_FILE + ) + if os.path.exists(legacy_deps_path): + deps.load(legacy_deps_path) + deps.save(cached_deps_path) + else: + # New dependency files are stored directly in cache + deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) + cached_deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) + audeer.move_file(deps_path, cached_deps_path) + deps.load(cached_deps_path) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 3a617ba1..fbf252b5 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -10,7 +10,8 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCIES_FILE = f"{DB}.csv" +DEPENDENCIES_FILE = f"{DB}.parquet" +LEGACY_DEPENDENCIES_FILE = f"{DB}.csv" CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" # Cache lock diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 033c43b4..4943bb95 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -79,6 +79,8 @@ def __init__(self): ("version", pa.string()), ] ) + # Store location of last loaded dependency file + self._path = None def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -352,6 +354,7 @@ def load(self, path: str): # with old pickle files in cache # that might use `string` as dtype self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) + self._path = path def removed( self, diff --git a/audb/core/load_to.py b/audb/core/load_to.py index 3ed14eb5..9bd988ac 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -390,10 +390,8 @@ def load_to( # save dependencies - dep_path_tmp = os.path.join(db_root_tmp, define.DEPENDENCIES_FILE) - deps.save(dep_path_tmp) audeer.move_file( - dep_path_tmp, + deps._path, os.path.join(db_root, define.DEPENDENCIES_FILE), ) diff --git a/audb/core/publish.py b/audb/core/publish.py index 76d70d25..bda6aec2 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -615,10 +615,12 @@ def publish( previous_version = None # load database and dependencies - deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) deps = Dependencies() - if os.path.exists(deps_path): - deps.load(deps_path) + for deps_file in [define.DEPENDENCIES_FILE, define.LEGACY_DEPENDENCIES_FILE]: + deps_path = os.path.join(db_root, deps_file) + if os.path.exists(deps_path): + deps.load(deps_path) + break # check if database folder depends on the right version @@ -626,7 +628,7 @@ def publish( if previous_version is None and len(deps) > 0: raise RuntimeError( f"You did not set a dependency to a previous version, " - f"but you have a '{define.DEPENDENCIES_FILE}' file present " + f"but you have a '{deps_file}' file present " f"in {db_root}." ) @@ -644,32 +646,26 @@ def publish( # dependencies do not match version if previous_version is not None and len(deps) > 0: - with tempfile.TemporaryDirectory() as tmp_dir: - previous_deps_path = os.path.join( - tmp_dir, - define.DEPENDENCIES_FILE, - ) - previous_deps = dependencies( - db.name, - version=previous_version, - cache_root=cache_root, - verbose=verbose, + previous_deps = dependencies( + db.name, + version=previous_version, + cache_root=cache_root, + verbose=verbose, + ) + if audeer.md5(deps_path) != audeer.md5(previous_deps._path): + raise RuntimeError( + f"You want to depend on '{previous_version}' " + f"of {db.name}, " + f"but the MD5 sum of your " + f"'{deps_file}' file " + f"in {db_root} " + f"does not match the MD5 sum of the corresponding file " + f"for the requested version in the repository. " + f"Did you forgot to call " + f"'audb.load_to({db_root}, {db.name}, " + f"version='{previous_version}') " + f"or modified the file manually?" ) - previous_deps.save(previous_deps_path) - if audeer.md5(deps_path) != audeer.md5(previous_deps_path): - raise RuntimeError( - f"You want to depend on '{previous_version}' " - f"of {db.name}, " - f"but the MD5 sum of your " - f"'{define.DEPENDENCIES_FILE}' file " - f"in {db_root} " - f"does not match the MD5 sum of the corresponding file " - f"for the requested version in the repository. " - f"Did you forgot to call " - f"'audb.load_to({db_root}, {db.name}, " - f"version='{previous_version}') " - f"or modified the file manually?" - ) # load database with table data db = audformat.Database.load( @@ -753,6 +749,7 @@ def publish( ) # publish dependencies and header + deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) deps.save(deps_path) archive_file = backend.join("/", db.name, define.DB + ".zip") backend.put_archive( diff --git a/tests/test_load.py b/tests/test_load.py index c9930d33..f126c4ce 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -129,8 +129,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - audeer.path(previous_db_root, "db.csv"), - audeer.path(db_root, "db.csv"), + audeer.path(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + audeer.path(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -156,8 +156,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - audeer.path(previous_db_root, "db.csv"), - audeer.path(db_root, "db.csv"), + audeer.path(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + audeer.path(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -192,8 +192,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) shutil.copy( - os.path.join(previous_db_root, "db.csv"), - os.path.join(db_root, "db.csv"), + os.path.join(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + os.path.join(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -220,8 +220,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - os.path.join(previous_db_root, "db.csv"), - os.path.join(db_root, "db.csv"), + os.path.join(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + os.path.join(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, diff --git a/tests/test_publish.py b/tests/test_publish.py index ea0fc2b4..1d27b65d 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -823,7 +823,7 @@ def test_publish_error_messages( dbs[version], audb.core.define.DEPENDENCIES_FILE, ) - deps.save(path) + shutil.copyfile(deps._path, path) audb.publish( dbs[version], version, From 517efd3d9f7e50fd3b22031e0acc77b286639d24 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 13:28:56 +0100 Subject: [PATCH 09/22] Fix cache handling for docs/publish.rst --- docs/publish.rst | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/docs/publish.rst b/docs/publish.rst index a353a5e3..5ddbbc52 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -11,20 +11,27 @@ :hide-code: import os - import shutil + import tempfile + + import audb + import audeer + + + _cwd_root = os.getcwd() + _tmp_root = tempfile.mkdtemp() + os.chdir(_tmp_root) folders = [ "./age-test-1.0.0", "./age-test-1.1.0", - "./data", + "./data/data-local", + "./cache", ] for folder in folders: - if os.path.exists(folder): - shutil.rmtree(folder) + audeer.rmdir(folder) + audeer.mkdir(folder) - # create repository - os.mkdir("./data") - os.mkdir("./data/data-local") + audb.config.CACHE_ROOT = "./cache" .. _publish: @@ -249,6 +256,5 @@ to see how to load and use a database. .. jupyter-execute:: :hide-code: - for folder in folders: - if os.path.exists(folder): - shutil.rmtree(folder) + os.chdir(_cwd_root) + audeer.rmdir(_tmp_root) From 7c1704ba08c47599133dae4976da492e34062234 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 13:43:22 +0100 Subject: [PATCH 10/22] Compare dependency tables instead of MD5 sums --- audb/core/publish.py | 7 +++---- tests/test_publish.py | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/audb/core/publish.py b/audb/core/publish.py index bda6aec2..7ffb93cb 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -652,14 +652,13 @@ def publish( cache_root=cache_root, verbose=verbose, ) - if audeer.md5(deps_path) != audeer.md5(previous_deps._path): + if not deps().equals(previous_deps()): raise RuntimeError( f"You want to depend on '{previous_version}' " f"of {db.name}, " - f"but the MD5 sum of your " - f"'{deps_file}' file " + f"but the dependency file '{deps_file}' " f"in {db_root} " - f"does not match the MD5 sum of the corresponding file " + f"does not match the dependency file " f"for the requested version in the repository. " f"Did you forgot to call " f"'audb.load_to({db_root}, {db.name}, " diff --git a/tests/test_publish.py b/tests/test_publish.py index 1d27b65d..39934793 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1120,10 +1120,10 @@ def test_update_database(dbs, persistent_repository): error_msg = ( f"You want to depend on '{audb.latest_version(DB_NAME)}' " f"of {DB_NAME}, " - f"but the MD5 sum of your " - f"'{audb.core.define.DEPENDENCIES_FILE}' file " + f"but the dependency file " + f"'{audb.core.define.DEPENDENCIES_FILE}' " f"in {dbs[version]} " - f"does not match the MD5 sum of the corresponding file " + f"does not match the dependency file " f"for the requested version in the repository. " f"Did you forgot to call " f"'audb.load_to({dbs[version]}, {DB_NAME}, " From 2acb7fe7b001ee469992e8eddb7b9e444e360abf Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 13:53:45 +0100 Subject: [PATCH 11/22] Store always as parquet in cache --- audb/core/api.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index 8f3d12a8..f7ebfc5f 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -278,7 +278,7 @@ def dependencies( if not file_found: raise FileNotFoundError except (AttributeError, FileNotFoundError, ValueError, EOFError): - # If loading pickled cached file fails, load again from backend + # If loading cached file fails, load again from backend backend = utils.lookup_backend(name, version) with tempfile.TemporaryDirectory() as tmp_root: archive = backend.join("/", name, define.DB + ".zip") @@ -288,23 +288,17 @@ def dependencies( version, verbose=verbose, ) - # Look first for legacy file, - # that would correspond to cached pickle file - legacy_deps_path = os.path.join( - tmp_root, define.LEGACY_DEPENDENCIES_FILE - ) - cached_deps_path = os.path.join( - db_root, define.CACHED_DEPENDENCIES_FILE - ) - if os.path.exists(legacy_deps_path): - deps.load(legacy_deps_path) - deps.save(cached_deps_path) + deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) + legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE) + cached_path = os.path.join(db_root, define.DEPENDENCIES_FILE) + if os.path.exists(deps_path): + # Copy parquet file from tmp dir to cache + audeer.move_file(deps_path, cached_path) + deps.load(cached_path) else: - # New dependency files are stored directly in cache - deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) - cached_deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) - audeer.move_file(deps_path, cached_deps_path) - deps.load(cached_deps_path) + # Load CSV file from tmp dir and store as parquet in cache + deps.load(legacy_path) + deps.save(cached_path) return deps From 1b864214e233710d011575546e1a1d48fa1b6806 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 14:11:07 +0100 Subject: [PATCH 12/22] Fix skipping of old audb caches --- audb/core/api.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index f7ebfc5f..e73379d0 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -185,18 +185,15 @@ def cached( flavor_id_paths = audeer.list_dir_names(version_path) # Skip old audb cache (e.g. 1 as flavor) - files = audeer.list_file_names(version_path) - deps_path = os.path.join(version_path, define.DEPENDENCIES_FILE) - deps_path_cached = os.path.join( - version_path, - define.CACHED_DEPENDENCIES_FILE, - ) - if deps_path not in files and deps_path_cached not in files: + files = audeer.list_file_names(version_path, basenames=True) + if ( + define.DEPENDENCIES_FILE not in files + and define.LEGACY_DEPENDENCIES_FILE not in files + and define.CACHED_DEPENDENCIES_FILE not in files + ): # Skip all cache entries - # that don't contain a db.csv or db.pkl file + # that don't contain a dependency file # as those stem from audb<1.0.0. - # We only look for db.csv - # as we switched to db.pkl with audb>=1.0.5 continue # pragma: no cover for flavor_id_path in flavor_id_paths: From 6e893c055f2dcef4814d62d374f06ee515bda160 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 14:12:50 +0100 Subject: [PATCH 13/22] Add LEGACY to old depedendency cache file name --- audb/core/api.py | 4 ++-- audb/core/define.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index e73379d0..0fc21c5b 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -189,7 +189,7 @@ def cached( if ( define.DEPENDENCIES_FILE not in files and define.LEGACY_DEPENDENCIES_FILE not in files - and define.CACHED_DEPENDENCIES_FILE not in files + and define.LEGACY_CACHED_DEPENDENCIES_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -265,7 +265,7 @@ def dependencies( file_found = False for deps_file in [ define.DEPENDENCIES_FILE, - define.CACHED_DEPENDENCIES_FILE, + define.LEGACY_CACHED_DEPENDENCIES_FILE, ]: deps_path = os.path.join(db_root, deps_file) if os.path.exists(deps_path): diff --git a/audb/core/define.py b/audb/core/define.py index fbf252b5..97d54a3d 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -12,7 +12,7 @@ # Dependencies DEPENDENCIES_FILE = f"{DB}.parquet" LEGACY_DEPENDENCIES_FILE = f"{DB}.csv" -CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" +LEGACY_CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" # Cache lock CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions From e1ccb38af342dade92a6824ec2b505d4e48edd26 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 15:27:12 +0100 Subject: [PATCH 14/22] Use pickle in cache --- audb/core/api.py | 16 ++++++++-------- audb/core/define.py | 2 +- audb/core/dependencies.py | 3 --- audb/core/load_to.py | 5 ++++- tests/test_publish.py | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index 0fc21c5b..fdf5fc69 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -189,7 +189,7 @@ def cached( if ( define.DEPENDENCIES_FILE not in files and define.LEGACY_DEPENDENCIES_FILE not in files - and define.LEGACY_CACHED_DEPENDENCIES_FILE not in files + and define.CACHED_DEPENDENCIES_FILE not in files ): # Skip all cache entries # that don't contain a dependency file @@ -265,7 +265,7 @@ def dependencies( file_found = False for deps_file in [ define.DEPENDENCIES_FILE, - define.LEGACY_CACHED_DEPENDENCIES_FILE, + define.CACHED_DEPENDENCIES_FILE, ]: deps_path = os.path.join(db_root, deps_file) if os.path.exists(deps_path): @@ -285,17 +285,17 @@ def dependencies( version, verbose=verbose, ) + # Load parquet or csv from tmp dir + # and store as pickle in cache deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE) - cached_path = os.path.join(db_root, define.DEPENDENCIES_FILE) + cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) if os.path.exists(deps_path): - # Copy parquet file from tmp dir to cache - audeer.move_file(deps_path, cached_path) - deps.load(cached_path) + deps.load(deps_path) else: - # Load CSV file from tmp dir and store as parquet in cache deps.load(legacy_path) - deps.save(cached_path) + # Store as pickle in cache + deps.save(cached_path) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 97d54a3d..6659b66f 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -11,8 +11,8 @@ # Dependencies DEPENDENCIES_FILE = f"{DB}.parquet" +CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" LEGACY_DEPENDENCIES_FILE = f"{DB}.csv" -LEGACY_CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" # Cache lock CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 4943bb95..033c43b4 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -79,8 +79,6 @@ def __init__(self): ("version", pa.string()), ] ) - # Store location of last loaded dependency file - self._path = None def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -354,7 +352,6 @@ def load(self, path: str): # with old pickle files in cache # that might use `string` as dtype self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) - self._path = path def removed( self, diff --git a/audb/core/load_to.py b/audb/core/load_to.py index 9bd988ac..a7780fc1 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -390,8 +390,10 @@ def load_to( # save dependencies + dep_path_tmp = os.path.join(db_root_tmp, define.DEPENDENCIES_FILE) + deps.save(dep_path_tmp) audeer.move_file( - deps._path, + dep_path_tmp, os.path.join(db_root, define.DEPENDENCIES_FILE), ) @@ -405,6 +407,7 @@ def load_to( verbose=verbose, ) + print(audeer.list_file_names(db_root_tmp, recursive=True)) # remove the temporal directory # to signal all files were correctly loaded try: diff --git a/tests/test_publish.py b/tests/test_publish.py index 39934793..0f1e2ab8 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -823,7 +823,7 @@ def test_publish_error_messages( dbs[version], audb.core.define.DEPENDENCIES_FILE, ) - shutil.copyfile(deps._path, path) + deps.save(path) audb.publish( dbs[version], version, From 1dbcc38b489a2086c56765c837985141c6406565 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 14 Feb 2024 15:36:35 +0100 Subject: [PATCH 15/22] Remove debug print statement --- audb/core/load_to.py | 1 - 1 file changed, 1 deletion(-) diff --git a/audb/core/load_to.py b/audb/core/load_to.py index a7780fc1..3ed14eb5 100644 --- a/audb/core/load_to.py +++ b/audb/core/load_to.py @@ -407,7 +407,6 @@ def load_to( verbose=verbose, ) - print(audeer.list_file_names(db_root_tmp, recursive=True)) # remove the temporal directory # to signal all files were correctly loaded try: From 9ee1f2463824cc84ebd50bad8b36e0d954c55675 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Thu, 15 Feb 2024 09:12:11 +0100 Subject: [PATCH 16/22] Mention correct dependency file in docs --- docs/dependencies.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dependencies.rst b/docs/dependencies.rst index eb1da6c2..791e37e5 100644 --- a/docs/dependencies.rst +++ b/docs/dependencies.rst @@ -37,7 +37,7 @@ if its content hasn't changed. We keep track of those dependencies and store some additional metadata about the audio files like duration and number of channels -in a dependency table in a file :file:`db.csv` +in a dependency table in a file :file:`db.parquet` for every version of a database. You request a :class:`audb.Dependencies` object with From ebb5c1b1a390997840402b369e87d80551a2a283 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 19 Mar 2024 14:48:26 +0100 Subject: [PATCH 17/22] Add docstring to test --- tests/test_dependencies.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 30369379..80f7f06d 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -216,6 +216,14 @@ def test_file_bases_methods(deps, files, method, expected_dtype): @pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) def test_load_save(tmpdir, deps, file): + """Test consistency of dependency table after save/load cycle. + + Dependency values and data types + should remain identical + when first storing and then loading from a file. + This should hold for all possible file formats. + + """ deps_file = audeer.path(tmpdir, file) deps.save(deps_file) deps2 = audb.Dependencies() From f484433836784b23c26fae522106fc8985df7432 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Tue, 19 Mar 2024 14:50:58 +0100 Subject: [PATCH 18/22] Fix comment for errors test --- tests/test_dependencies.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 80f7f06d..5afd3ba5 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -233,9 +233,11 @@ def test_load_save(tmpdir, deps, file): def test_load_save_errors(deps): - # Wrong extension or file missng + """Test possible errors when loading/saving.""" + # Wrong file extension with pytest.raises(ValueError, match=r".*'txt'.*"): deps.load("deps.txt") + # File missing with pytest.raises(FileNotFoundError): deps.load("deps.csv") From 88af9f7d0bb303438ceaa343bea8fadb5438859b Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 20 Mar 2024 08:26:15 +0100 Subject: [PATCH 19/22] Simplify dependency file loading code --- audb/core/api.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/audb/core/api.py b/audb/core/api.py index fdf5fc69..26a0cd6c 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -257,23 +257,13 @@ def dependencies( version, cache_root=cache_root, ) + cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) deps = Dependencies() with FolderLock(db_root): try: - file_found = False - for deps_file in [ - define.DEPENDENCIES_FILE, - define.CACHED_DEPENDENCIES_FILE, - ]: - deps_path = os.path.join(db_root, deps_file) - if os.path.exists(deps_path): - deps.load(deps_path) - file_found = True - break - if not file_found: - raise FileNotFoundError + deps.load(cached_path) except (AttributeError, FileNotFoundError, ValueError, EOFError): # If loading cached file fails, load again from backend backend = utils.lookup_backend(name, version) From fb0c3f455284030b2d42fb545e46055376e4be30 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 20 Mar 2024 08:49:40 +0100 Subject: [PATCH 20/22] Only convert dtype if needed during loading --- audb/core/dependencies.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 033c43b4..8bff22fe 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -331,6 +331,12 @@ def load(self, path: str): ) if extension == "pkl": self._df = pd.read_pickle(path) + # Correct dtype of index + # to make backward compatiple + # with old pickle files in cache + # that might use `string` as dtype + if self._df.index.dtype != define.DEPEND_INDEX_DTYPE: + self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) elif extension == "csv": table = csv.read_csv( @@ -347,12 +353,6 @@ def load(self, path: str): table = parquet.read_table(path) self._df = self._table_to_dataframe(table) - # Set dtype of index for both CSV and PKL - # to make backward compatiple - # with old pickle files in cache - # that might use `string` as dtype - self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) - def removed( self, files: typing.Union[str, typing.Sequence[str]], @@ -633,6 +633,7 @@ def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame: ) df.set_index("file", inplace=True) df.index.name = None + df.index = df.index.astype(define.DEPEND_INDEX_DTYPE) return df def _update_media( From aca07d60fc67b7c091164f1aab00a24385b19ab0 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 20 Mar 2024 09:13:54 +0100 Subject: [PATCH 21/22] Add test for backward compatibility --- tests/test_dependencies.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 5afd3ba5..54200f52 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -232,6 +232,24 @@ def test_load_save(tmpdir, deps, file): assert list(deps2._df.dtypes) == list(audb.core.define.DEPEND_FIELD_DTYPES.values()) +def test_load_save_backward_compatibility(tmpdir, deps): + """Test backward compatibility with old pickle cache files. + + As the dtype of the index has changed, + we need to make sure this is corrected + when loading old cache files. + + """ + deps_file = audeer.path(tmpdir, "deps.pkl") + # Change dtype of index from object to string + # to mimic previous behavior + deps._df.index = deps._df.index.astype("string") + deps.save(deps_file) + deps2 = audb.Dependencies() + deps2.load(deps_file) + assert deps2._df.index.dtype == audb.core.define.DEPEND_INDEX_DTYPE + + def test_load_save_errors(deps): """Test possible errors when loading/saving.""" # Wrong file extension From 30cdd3af23193c65d72875ed6436da0765641242 Mon Sep 17 00:00:00 2001 From: Hagen Wierstorf Date: Wed, 20 Mar 2024 09:32:53 +0100 Subject: [PATCH 22/22] Remove unneeded line --- audb/core/api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/audb/core/api.py b/audb/core/api.py index 26a0cd6c..ca445b99 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -279,7 +279,6 @@ def dependencies( # and store as pickle in cache deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE) - cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) if os.path.exists(deps_path): deps.load(deps_path) else: