Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store Dependencies as parquet file #372

Merged
merged 22 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions audb/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,15 @@ def cached(
flavor_id_paths = audeer.list_dir_names(version_path)

# Skip old audb cache (e.g. 1 as flavor)
files = audeer.list_file_names(version_path)
deps_path = os.path.join(version_path, define.DEPENDENCIES_FILE)
deps_path_cached = os.path.join(
version_path,
define.CACHED_DEPENDENCIES_FILE,
)
if deps_path not in files and deps_path_cached not in files:
files = audeer.list_file_names(version_path, basenames=True)
if (
define.DEPENDENCIES_FILE not in files
and define.LEGACY_DEPENDENCIES_FILE not in files
and define.CACHED_DEPENDENCIES_FILE not in files
):
# Skip all cache entries
# that don't contain a db.csv or db.pkl file
# that don't contain a dependency file
# as those stem from audb<1.0.0.
# We only look for db.csv
# as we switched to db.pkl with audb>=1.0.5
continue # pragma: no cover

for flavor_id_path in flavor_id_paths:
Expand Down Expand Up @@ -260,15 +257,15 @@ def dependencies(
version,
cache_root=cache_root,
)
deps_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE)
cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE)

deps = Dependencies()

with FolderLock(db_root):
try:
deps.load(deps_path)
deps.load(cached_path)
except (AttributeError, FileNotFoundError, ValueError, EOFError):
# If loading pickled cached file fails, load again from backend
# If loading cached file fails, load again from backend
backend = utils.lookup_backend(name, version)
with tempfile.TemporaryDirectory() as tmp_root:
archive = backend.join("/", name, define.DB + ".zip")
Expand All @@ -278,8 +275,16 @@ def dependencies(
version,
verbose=verbose,
)
deps.load(os.path.join(tmp_root, define.DEPENDENCIES_FILE))
deps.save(deps_path)
# Load parquet or csv from tmp dir
# and store as pickle in cache
deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE)
legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE)
if os.path.exists(deps_path):
deps.load(deps_path)
else:
deps.load(legacy_path)
# Store as pickle in cache
deps.save(cached_path)
ChristianGeng marked this conversation as resolved.
Show resolved Hide resolved

return deps

Expand Down
23 changes: 12 additions & 11 deletions audb/core/define.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
HEADER_FILE = f"{DB}.yaml"

# Dependencies
DEPENDENCIES_FILE = f"{DB}.csv"
DEPENDENCIES_FILE = f"{DB}.parquet"
CACHED_DEPENDENCIES_FILE = f"{DB}.pkl"
LEGACY_DEPENDENCIES_FILE = f"{DB}.csv"

# Cache lock
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
Expand Down Expand Up @@ -48,16 +49,16 @@ class DependField:
}

DEPEND_FIELD_DTYPES = {
DependField.ARCHIVE: "string",
DependField.BIT_DEPTH: "int32",
DependField.CHANNELS: "int32",
DependField.CHECKSUM: "string",
DependField.DURATION: "float64",
DependField.FORMAT: "string",
DependField.REMOVED: "int32",
DependField.SAMPLING_RATE: "int32",
DependField.TYPE: "int32",
DependField.VERSION: "string",
DependField.ARCHIVE: "string[pyarrow]",
DependField.BIT_DEPTH: "int32[pyarrow]",
DependField.CHANNELS: "int32[pyarrow]",
DependField.CHECKSUM: "string[pyarrow]",
DependField.DURATION: "float64[pyarrow]",
DependField.FORMAT: "string[pyarrow]",
DependField.REMOVED: "int32[pyarrow]",
DependField.SAMPLING_RATE: "int32[pyarrow]",
DependField.TYPE: "int32[pyarrow]",
DependField.VERSION: "string[pyarrow]",
}

DEPEND_INDEX_DTYPE = "object"
Expand Down
137 changes: 111 additions & 26 deletions audb/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import typing

import pandas as pd
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as parquet

import audeer

Expand Down Expand Up @@ -59,6 +62,23 @@ def __init__(self):
):
data[name] = pd.Series(dtype=dtype)
self._df = pd.DataFrame(data)
# pyarrow schema
# used for reading and writing files
self._schema = pa.schema(
[
("file", pa.string()),
("archive", pa.string()),
("bit_depth", pa.int32()),
("channels", pa.int32()),
("checksum", pa.string()),
("duration", pa.float64()),
("format", pa.string()),
("removed", pa.int32()),
("sampling_rate", pa.int32()),
("type", pa.int32()),
("version", pa.string()),
]
)

def __call__(self) -> pd.DataFrame:
r"""Return dependencies as a table.
Expand Down Expand Up @@ -285,19 +305,22 @@ def load(self, path: str):

Args:
path: path to file.
File extension can be ``csv`` or ``pkl``
File extension can be ``csv``
``pkl``,
or ``parquet``

Raises:
ValueError: if file extension is not ``csv`` or ``pkl``
ValueError: if file extension is not one of
``csv``, ``pkl``, ``parquet``
FileNotFoundError: if ``path`` does not exists

"""
self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values())
path = audeer.path(path)
extension = audeer.file_extension(path)
if extension not in ["csv", "pkl"]:
if extension not in ["csv", "pkl", "parquet"]:
raise ValueError(
f"File extension of 'path' has to be 'csv' or 'pkl' "
f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' "
f"not '{extension}'"
)
if not os.path.exists(path):
Expand All @@ -308,29 +331,27 @@ def load(self, path: str):
)
if extension == "pkl":
self._df = pd.read_pickle(path)
# Correct dtype of index
# to make backward compatiple
# with old pickle files in cache
# that might use `string` as dtype
if self._df.index.dtype != define.DEPEND_INDEX_DTYPE:
self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE)

elif extension == "csv":
# Data type of dependency columns
dtype_mapping = {
name: dtype
for name, dtype in zip(
define.DEPEND_FIELD_NAMES.values(),
define.DEPEND_FIELD_DTYPES.values(),
)
}
# Data type of index
index = 0
self._df = pd.read_csv(
table = csv.read_csv(
path,
index_col=index,
na_filter=False,
dtype=dtype_mapping,
read_options=csv.ReadOptions(
column_names=self._schema.names,
skip_rows=1,
),
convert_options=csv.ConvertOptions(column_types=self._schema),
)
self._df.index.name = None
# Set dtype of index for both CSV and PKL
# to make backward compatiple
# with old pickle files in cache
# that might use `string` as dtype
self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE)
self._df = self._table_to_dataframe(table)

elif extension == "parquet":
table = parquet.read_table(path)
self._df = self._table_to_dataframe(table)
ChristianGeng marked this conversation as resolved.
Show resolved Hide resolved

def removed(
self,
Expand Down Expand Up @@ -367,17 +388,25 @@ def save(self, path: str):

Args:
path: path to file.
File extension can be ``csv`` or ``pkl``
File extension can be ``csv``, ``pkl``, or ``parquet``

"""
path = audeer.path(path)
if path.endswith("csv"):
self._df.to_csv(path)
table = self._dataframe_to_table(self._df)
csv.write_csv(
table,
path,
write_options=csv.WriteOptions(quoting_style="none"),
)
elif path.endswith("pkl"):
self._df.to_pickle(
path,
protocol=4, # supported by Python >= 3.4
)
elif path.endswith("parquet"):
table = self._dataframe_to_table(self._df, file_column=True)
parquet.write_table(table, path)

def type(
self,
Expand Down Expand Up @@ -527,6 +556,35 @@ def _column_loc(
values = values.tolist()
return values

def _dataframe_to_table(
self,
df: pd.DataFrame,
*,
file_column: bool = False,
) -> pa.Table:
r"""Convert pandas dataframe to pyarrow table.

Args:
df: dependency table as pandas dataframe
file_column: if ``False``
the ``"file"`` column
is renamed to ``""``

Returns:
dependency table as pyarrow table

"""
table = pa.Table.from_pandas(
df.reset_index().rename(columns={"index": "file"}),
preserve_index=False,
schema=self._schema,
)
if not file_column:
columns = table.column_names
columns = ["" if c == "file" else c for c in columns]
table = table.rename_columns(columns)
return table

def _drop(self, files: typing.Sequence[str]):
r"""Drop files from table.

Expand All @@ -551,6 +609,33 @@ def _remove(self, file: str):
"""
self._df.at[file, "removed"] = 1

def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame:
r"""Convert pyarrow table to pandas dataframe.

Args:
table: dependency table as pyarrow table

Returns:
dependency table as pandas dataframe

"""
df = table.to_pandas(
deduplicate_objects=False,
# Convert to pyarrow dtypes,
# but ensure we use pd.StringDtype("pyarrow")
# and not pd.ArrowDtype(pa.string())
# see https://pandas.pydata.org/docs/user_guide/pyarrow.html
types_mapper={
pa.string(): pd.StringDtype("pyarrow"),
pa.int32(): pd.ArrowDtype(pa.int32()),
pa.float64(): pd.ArrowDtype(pa.float64()),
}.get, # we have to provide a callable, not a dict
)
df.set_index("file", inplace=True)
df.index.name = None
df.index = df.index.astype(define.DEPEND_INDEX_DTYPE)
return df

def _update_media(
self,
values: typing.Sequence[
Expand Down
54 changes: 25 additions & 29 deletions audb/core/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,20 @@ def publish(
previous_version = None

# load database and dependencies
deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE)
deps = Dependencies()
if os.path.exists(deps_path):
deps.load(deps_path)
for deps_file in [define.DEPENDENCIES_FILE, define.LEGACY_DEPENDENCIES_FILE]:
deps_path = os.path.join(db_root, deps_file)
if os.path.exists(deps_path):
deps.load(deps_path)
break

# check if database folder depends on the right version

# dependencies shouldn't be there
if previous_version is None and len(deps) > 0:
raise RuntimeError(
f"You did not set a dependency to a previous version, "
f"but you have a '{define.DEPENDENCIES_FILE}' file present "
f"but you have a '{deps_file}' file present "
f"in {db_root}."
)

Expand All @@ -644,32 +646,25 @@ def publish(

# dependencies do not match version
if previous_version is not None and len(deps) > 0:
with tempfile.TemporaryDirectory() as tmp_dir:
previous_deps_path = os.path.join(
tmp_dir,
define.DEPENDENCIES_FILE,
)
previous_deps = dependencies(
db.name,
version=previous_version,
cache_root=cache_root,
verbose=verbose,
previous_deps = dependencies(
db.name,
version=previous_version,
cache_root=cache_root,
verbose=verbose,
)
if not deps().equals(previous_deps()):
ChristianGeng marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(
f"You want to depend on '{previous_version}' "
f"of {db.name}, "
f"but the dependency file '{deps_file}' "
f"in {db_root} "
f"does not match the dependency file "
f"for the requested version in the repository. "
f"Did you forgot to call "
f"'audb.load_to({db_root}, {db.name}, "
f"version='{previous_version}') "
f"or modified the file manually?"
)
previous_deps.save(previous_deps_path)
if audeer.md5(deps_path) != audeer.md5(previous_deps_path):
raise RuntimeError(
f"You want to depend on '{previous_version}' "
f"of {db.name}, "
f"but the MD5 sum of your "
f"'{define.DEPENDENCIES_FILE}' file "
f"in {db_root} "
f"does not match the MD5 sum of the corresponding file "
f"for the requested version in the repository. "
f"Did you forgot to call "
f"'audb.load_to({db_root}, {db.name}, "
f"version='{previous_version}') "
f"or modified the file manually?"
)

# load database with table data
db = audformat.Database.load(
Expand Down Expand Up @@ -753,6 +748,7 @@ def publish(
)

# publish dependencies and header
deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE)
deps.save(deps_path)
archive_file = backend.join("/", db.name, define.DB + ".zip")
backend.put_archive(
Expand Down
Loading
Loading