diff --git a/audb/core/api.py b/audb/core/api.py index c2cb4d7f..ca445b99 100644 --- a/audb/core/api.py +++ b/audb/core/api.py @@ -185,18 +185,15 @@ def cached( flavor_id_paths = audeer.list_dir_names(version_path) # Skip old audb cache (e.g. 1 as flavor) - files = audeer.list_file_names(version_path) - deps_path = os.path.join(version_path, define.DEPENDENCIES_FILE) - deps_path_cached = os.path.join( - version_path, - define.CACHED_DEPENDENCIES_FILE, - ) - if deps_path not in files and deps_path_cached not in files: + files = audeer.list_file_names(version_path, basenames=True) + if ( + define.DEPENDENCIES_FILE not in files + and define.LEGACY_DEPENDENCIES_FILE not in files + and define.CACHED_DEPENDENCIES_FILE not in files + ): # Skip all cache entries - # that don't contain a db.csv or db.pkl file + # that don't contain a dependency file # as those stem from audb<1.0.0. - # We only look for db.csv - # as we switched to db.pkl with audb>=1.0.5 continue # pragma: no cover for flavor_id_path in flavor_id_paths: @@ -260,15 +257,15 @@ def dependencies( version, cache_root=cache_root, ) - deps_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) + cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE) deps = Dependencies() with FolderLock(db_root): try: - deps.load(deps_path) + deps.load(cached_path) except (AttributeError, FileNotFoundError, ValueError, EOFError): - # If loading pickled cached file fails, load again from backend + # If loading cached file fails, load again from backend backend = utils.lookup_backend(name, version) with tempfile.TemporaryDirectory() as tmp_root: archive = backend.join("/", name, define.DB + ".zip") @@ -278,8 +275,16 @@ def dependencies( version, verbose=verbose, ) - deps.load(os.path.join(tmp_root, define.DEPENDENCIES_FILE)) - deps.save(deps_path) + # Load parquet or csv from tmp dir + # and store as pickle in cache + deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE) + legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE) + if os.path.exists(deps_path): + deps.load(deps_path) + else: + deps.load(legacy_path) + # Store as pickle in cache + deps.save(cached_path) return deps diff --git a/audb/core/define.py b/audb/core/define.py index 0adf92f4..6659b66f 100644 --- a/audb/core/define.py +++ b/audb/core/define.py @@ -10,8 +10,9 @@ HEADER_FILE = f"{DB}.yaml" # Dependencies -DEPENDENCIES_FILE = f"{DB}.csv" +DEPENDENCIES_FILE = f"{DB}.parquet" CACHED_DEPENDENCIES_FILE = f"{DB}.pkl" +LEGACY_DEPENDENCIES_FILE = f"{DB}.csv" # Cache lock CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions @@ -48,16 +49,16 @@ class DependField: } DEPEND_FIELD_DTYPES = { - DependField.ARCHIVE: "string", - DependField.BIT_DEPTH: "int32", - DependField.CHANNELS: "int32", - DependField.CHECKSUM: "string", - DependField.DURATION: "float64", - DependField.FORMAT: "string", - DependField.REMOVED: "int32", - DependField.SAMPLING_RATE: "int32", - DependField.TYPE: "int32", - DependField.VERSION: "string", + DependField.ARCHIVE: "string[pyarrow]", + DependField.BIT_DEPTH: "int32[pyarrow]", + DependField.CHANNELS: "int32[pyarrow]", + DependField.CHECKSUM: "string[pyarrow]", + DependField.DURATION: "float64[pyarrow]", + DependField.FORMAT: "string[pyarrow]", + DependField.REMOVED: "int32[pyarrow]", + DependField.SAMPLING_RATE: "int32[pyarrow]", + DependField.TYPE: "int32[pyarrow]", + DependField.VERSION: "string[pyarrow]", } DEPEND_INDEX_DTYPE = "object" diff --git a/audb/core/dependencies.py b/audb/core/dependencies.py index 64a323e5..8bff22fe 100644 --- a/audb/core/dependencies.py +++ b/audb/core/dependencies.py @@ -4,6 +4,9 @@ import typing import pandas as pd +import pyarrow as pa +import pyarrow.csv as csv +import pyarrow.parquet as parquet import audeer @@ -59,6 +62,23 @@ def __init__(self): ): data[name] = pd.Series(dtype=dtype) self._df = pd.DataFrame(data) + # pyarrow schema + # used for reading and writing files + self._schema = pa.schema( + [ + ("file", pa.string()), + ("archive", pa.string()), + ("bit_depth", pa.int32()), + ("channels", pa.int32()), + ("checksum", pa.string()), + ("duration", pa.float64()), + ("format", pa.string()), + ("removed", pa.int32()), + ("sampling_rate", pa.int32()), + ("type", pa.int32()), + ("version", pa.string()), + ] + ) def __call__(self) -> pd.DataFrame: r"""Return dependencies as a table. @@ -285,19 +305,22 @@ def load(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``pkl`` + File extension can be ``csv`` + ``pkl``, + or ``parquet`` Raises: - ValueError: if file extension is not ``csv`` or ``pkl`` + ValueError: if file extension is not one of + ``csv``, ``pkl``, ``parquet`` FileNotFoundError: if ``path`` does not exists """ self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values()) path = audeer.path(path) extension = audeer.file_extension(path) - if extension not in ["csv", "pkl"]: + if extension not in ["csv", "pkl", "parquet"]: raise ValueError( - f"File extension of 'path' has to be 'csv' or 'pkl' " + f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " f"not '{extension}'" ) if not os.path.exists(path): @@ -308,29 +331,27 @@ def load(self, path: str): ) if extension == "pkl": self._df = pd.read_pickle(path) + # Correct dtype of index + # to make backward compatiple + # with old pickle files in cache + # that might use `string` as dtype + if self._df.index.dtype != define.DEPEND_INDEX_DTYPE: + self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) + elif extension == "csv": - # Data type of dependency columns - dtype_mapping = { - name: dtype - for name, dtype in zip( - define.DEPEND_FIELD_NAMES.values(), - define.DEPEND_FIELD_DTYPES.values(), - ) - } - # Data type of index - index = 0 - self._df = pd.read_csv( + table = csv.read_csv( path, - index_col=index, - na_filter=False, - dtype=dtype_mapping, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), ) - self._df.index.name = None - # Set dtype of index for both CSV and PKL - # to make backward compatiple - # with old pickle files in cache - # that might use `string` as dtype - self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) + self._df = self._table_to_dataframe(table) + + elif extension == "parquet": + table = parquet.read_table(path) + self._df = self._table_to_dataframe(table) def removed( self, @@ -367,17 +388,25 @@ def save(self, path: str): Args: path: path to file. - File extension can be ``csv`` or ``pkl`` + File extension can be ``csv``, ``pkl``, or ``parquet`` """ path = audeer.path(path) if path.endswith("csv"): - self._df.to_csv(path) + table = self._dataframe_to_table(self._df) + csv.write_csv( + table, + path, + write_options=csv.WriteOptions(quoting_style="none"), + ) elif path.endswith("pkl"): self._df.to_pickle( path, protocol=4, # supported by Python >= 3.4 ) + elif path.endswith("parquet"): + table = self._dataframe_to_table(self._df, file_column=True) + parquet.write_table(table, path) def type( self, @@ -527,6 +556,35 @@ def _column_loc( values = values.tolist() return values + def _dataframe_to_table( + self, + df: pd.DataFrame, + *, + file_column: bool = False, + ) -> pa.Table: + r"""Convert pandas dataframe to pyarrow table. + + Args: + df: dependency table as pandas dataframe + file_column: if ``False`` + the ``"file"`` column + is renamed to ``""`` + + Returns: + dependency table as pyarrow table + + """ + table = pa.Table.from_pandas( + df.reset_index().rename(columns={"index": "file"}), + preserve_index=False, + schema=self._schema, + ) + if not file_column: + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) + return table + def _drop(self, files: typing.Sequence[str]): r"""Drop files from table. @@ -551,6 +609,33 @@ def _remove(self, file: str): """ self._df.at[file, "removed"] = 1 + def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame: + r"""Convert pyarrow table to pandas dataframe. + + Args: + table: dependency table as pyarrow table + + Returns: + dependency table as pandas dataframe + + """ + df = table.to_pandas( + deduplicate_objects=False, + # Convert to pyarrow dtypes, + # but ensure we use pd.StringDtype("pyarrow") + # and not pd.ArrowDtype(pa.string()) + # see https://pandas.pydata.org/docs/user_guide/pyarrow.html + types_mapper={ + pa.string(): pd.StringDtype("pyarrow"), + pa.int32(): pd.ArrowDtype(pa.int32()), + pa.float64(): pd.ArrowDtype(pa.float64()), + }.get, # we have to provide a callable, not a dict + ) + df.set_index("file", inplace=True) + df.index.name = None + df.index = df.index.astype(define.DEPEND_INDEX_DTYPE) + return df + def _update_media( self, values: typing.Sequence[ diff --git a/audb/core/publish.py b/audb/core/publish.py index 76d70d25..7ffb93cb 100644 --- a/audb/core/publish.py +++ b/audb/core/publish.py @@ -615,10 +615,12 @@ def publish( previous_version = None # load database and dependencies - deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) deps = Dependencies() - if os.path.exists(deps_path): - deps.load(deps_path) + for deps_file in [define.DEPENDENCIES_FILE, define.LEGACY_DEPENDENCIES_FILE]: + deps_path = os.path.join(db_root, deps_file) + if os.path.exists(deps_path): + deps.load(deps_path) + break # check if database folder depends on the right version @@ -626,7 +628,7 @@ def publish( if previous_version is None and len(deps) > 0: raise RuntimeError( f"You did not set a dependency to a previous version, " - f"but you have a '{define.DEPENDENCIES_FILE}' file present " + f"but you have a '{deps_file}' file present " f"in {db_root}." ) @@ -644,32 +646,25 @@ def publish( # dependencies do not match version if previous_version is not None and len(deps) > 0: - with tempfile.TemporaryDirectory() as tmp_dir: - previous_deps_path = os.path.join( - tmp_dir, - define.DEPENDENCIES_FILE, - ) - previous_deps = dependencies( - db.name, - version=previous_version, - cache_root=cache_root, - verbose=verbose, + previous_deps = dependencies( + db.name, + version=previous_version, + cache_root=cache_root, + verbose=verbose, + ) + if not deps().equals(previous_deps()): + raise RuntimeError( + f"You want to depend on '{previous_version}' " + f"of {db.name}, " + f"but the dependency file '{deps_file}' " + f"in {db_root} " + f"does not match the dependency file " + f"for the requested version in the repository. " + f"Did you forgot to call " + f"'audb.load_to({db_root}, {db.name}, " + f"version='{previous_version}') " + f"or modified the file manually?" ) - previous_deps.save(previous_deps_path) - if audeer.md5(deps_path) != audeer.md5(previous_deps_path): - raise RuntimeError( - f"You want to depend on '{previous_version}' " - f"of {db.name}, " - f"but the MD5 sum of your " - f"'{define.DEPENDENCIES_FILE}' file " - f"in {db_root} " - f"does not match the MD5 sum of the corresponding file " - f"for the requested version in the repository. " - f"Did you forgot to call " - f"'audb.load_to({db_root}, {db.name}, " - f"version='{previous_version}') " - f"or modified the file manually?" - ) # load database with table data db = audformat.Database.load( @@ -753,6 +748,7 @@ def publish( ) # publish dependencies and header + deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE) deps.save(deps_path) archive_file = backend.join("/", db.name, define.DB + ".zip") backend.put_archive( diff --git a/docs/dependencies.rst b/docs/dependencies.rst index eb1da6c2..791e37e5 100644 --- a/docs/dependencies.rst +++ b/docs/dependencies.rst @@ -37,7 +37,7 @@ if its content hasn't changed. We keep track of those dependencies and store some additional metadata about the audio files like duration and number of channels -in a dependency table in a file :file:`db.csv` +in a dependency table in a file :file:`db.parquet` for every version of a database. You request a :class:`audb.Dependencies` object with diff --git a/docs/publish.rst b/docs/publish.rst index a353a5e3..5ddbbc52 100644 --- a/docs/publish.rst +++ b/docs/publish.rst @@ -11,20 +11,27 @@ :hide-code: import os - import shutil + import tempfile + + import audb + import audeer + + + _cwd_root = os.getcwd() + _tmp_root = tempfile.mkdtemp() + os.chdir(_tmp_root) folders = [ "./age-test-1.0.0", "./age-test-1.1.0", - "./data", + "./data/data-local", + "./cache", ] for folder in folders: - if os.path.exists(folder): - shutil.rmtree(folder) + audeer.rmdir(folder) + audeer.mkdir(folder) - # create repository - os.mkdir("./data") - os.mkdir("./data/data-local") + audb.config.CACHE_ROOT = "./cache" .. _publish: @@ -249,6 +256,5 @@ to see how to load and use a database. .. jupyter-execute:: :hide-code: - for folder in folders: - if os.path.exists(folder): - shutil.rmtree(folder) + os.chdir(_cwd_root) + audeer.rmdir(_tmp_root) diff --git a/pyproject.toml b/pyproject.toml index 9375f70e..3aee5967 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ 'audiofile >=1.0.0', 'audobject >=0.5.0', 'audresample >=0.1.6', + 'pyarrow', 'filelock', 'oyaml', ] diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 91b3830c..54200f52 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,8 +1,8 @@ -import os - import pandas as pd import pytest +import audeer + import audb @@ -214,20 +214,50 @@ def test_file_bases_methods(deps, files, method, expected_dtype): assert isinstance(result, expected_dtype) -def test_load_save(deps): - deps_file = "deps.csv" +@pytest.mark.parametrize("file", ["deps.csv", "deps.pkl", "deps.parquet"]) +def test_load_save(tmpdir, deps, file): + """Test consistency of dependency table after save/load cycle. + + Dependency values and data types + should remain identical + when first storing and then loading from a file. + This should hold for all possible file formats. + + """ + deps_file = audeer.path(tmpdir, file) deps.save(deps_file) deps2 = audb.Dependencies() deps2.load(deps_file) pd.testing.assert_frame_equal(deps(), deps2()) - os.remove(deps_file) - # Expected dtypes assert list(deps2._df.dtypes) == list(audb.core.define.DEPEND_FIELD_DTYPES.values()) - # Wrong extension or file missng + + +def test_load_save_backward_compatibility(tmpdir, deps): + """Test backward compatibility with old pickle cache files. + + As the dtype of the index has changed, + we need to make sure this is corrected + when loading old cache files. + + """ + deps_file = audeer.path(tmpdir, "deps.pkl") + # Change dtype of index from object to string + # to mimic previous behavior + deps._df.index = deps._df.index.astype("string") + deps.save(deps_file) + deps2 = audb.Dependencies() + deps2.load(deps_file) + assert deps2._df.index.dtype == audb.core.define.DEPEND_INDEX_DTYPE + + +def test_load_save_errors(deps): + """Test possible errors when loading/saving.""" + # Wrong file extension with pytest.raises(ValueError, match=r".*'txt'.*"): - deps2.load("deps.txt") + deps.load("deps.txt") + # File missing with pytest.raises(FileNotFoundError): - deps.load(deps_file) + deps.load("deps.csv") def test_len(deps): diff --git a/tests/test_load.py b/tests/test_load.py index c9930d33..f126c4ce 100644 --- a/tests/test_load.py +++ b/tests/test_load.py @@ -129,8 +129,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - audeer.path(previous_db_root, "db.csv"), - audeer.path(db_root, "db.csv"), + audeer.path(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + audeer.path(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -156,8 +156,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - audeer.path(previous_db_root, "db.csv"), - audeer.path(db_root, "db.csv"), + audeer.path(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + audeer.path(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -192,8 +192,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) shutil.copy( - os.path.join(previous_db_root, "db.csv"), - os.path.join(db_root, "db.csv"), + os.path.join(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + os.path.join(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, @@ -220,8 +220,8 @@ def dbs(tmpdir_factory, persistent_repository): db.save(db_root) audformat.testing.create_audio_files(db) shutil.copy( - os.path.join(previous_db_root, "db.csv"), - os.path.join(db_root, "db.csv"), + os.path.join(previous_db_root, audb.core.define.DEPENDENCIES_FILE), + os.path.join(db_root, audb.core.define.DEPENDENCIES_FILE), ) audb.publish( db_root, diff --git a/tests/test_publish.py b/tests/test_publish.py index ea0fc2b4..0f1e2ab8 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -1120,10 +1120,10 @@ def test_update_database(dbs, persistent_repository): error_msg = ( f"You want to depend on '{audb.latest_version(DB_NAME)}' " f"of {DB_NAME}, " - f"but the MD5 sum of your " - f"'{audb.core.define.DEPENDENCIES_FILE}' file " + f"but the dependency file " + f"'{audb.core.define.DEPENDENCIES_FILE}' " f"in {dbs[version]} " - f"does not match the MD5 sum of the corresponding file " + f"does not match the dependency file " f"for the requested version in the repository. " f"Did you forgot to call " f"'audb.load_to({dbs[version]}, {DB_NAME}, "