Skip to content

Commit

Permalink
ENH: Calculate hashes in memory if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
tnatt committed Oct 14, 2024
1 parent f97153d commit d206c09
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 64 deletions.
120 changes: 67 additions & 53 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import os
import uuid
from io import BufferedIOBase, BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Final, Literal
Expand Down Expand Up @@ -85,20 +86,26 @@ def export_metadata_file(

def export_file(
obj: types.Inferrable,
filename: Path,
file: Path | BytesIO,
file_suffix: str | None = None,
fmt: str = "",
) -> str:
) -> None:
"""
Export a valid object to file. If xtgeo is in the fmt string, xtgeo
xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons
Export a valid object to file or memory buffer. If xtgeo is in the fmt string,
xtgeo xyz-column names will be preserved for xtgeo.Points and xtgeo.Polygons
"""

# create output folder if not existing
filename.parent.mkdir(parents=True, exist_ok=True)
if isinstance(file, Path):
# create output folder if not existing
file.parent.mkdir(parents=True, exist_ok=True)
file_suffix = file.suffix

if filename.suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface):
obj.to_file(filename, fformat="irap_binary")
elif filename.suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
elif not file_suffix:
raise ValueError("'suffix' must be provided when file is a BytesIO object")

if file_suffix == ".gri" and isinstance(obj, xtgeo.RegularSurface):
obj.to_file(file, fformat="irap_binary")
elif file_suffix == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
out = obj.copy() # to not modify incoming instance!
if "xtgeo" not in fmt:
out.xname = "X"
Expand All @@ -109,72 +116,79 @@ def export_file(
out.get_dataframe(copy=False).rename(
columns={out.pname: "ID"}, inplace=True
)
out.get_dataframe(copy=False).to_csv(filename, index=False)
elif filename.suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
obj.to_file(filename)
elif filename.suffix == ".segy" and isinstance(obj, xtgeo.Cube):
obj.to_file(filename, fformat="segy")
elif filename.suffix == ".roff" and isinstance(
obj, (xtgeo.Grid, xtgeo.GridProperty)
):
obj.to_file(filename, fformat="roff")
elif filename.suffix == ".csv" and isinstance(obj, pd.DataFrame):
out.get_dataframe(copy=False).to_csv(file, index=False)
elif file_suffix == ".pol" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
obj.to_file(file)
elif file_suffix == ".segy" and isinstance(obj, xtgeo.Cube):
obj.to_file(file, fformat="segy")
elif file_suffix == ".roff" and isinstance(obj, (xtgeo.Grid, xtgeo.GridProperty)):
obj.to_file(file, fformat="roff")
elif file_suffix == ".csv" and isinstance(obj, pd.DataFrame):
logger.info(
"Exporting dataframe to csv. Note: index columns will not be "
"preserved unless calling 'reset_index()' on the dataframe."
)
obj.to_csv(filename, index=False)
elif filename.suffix == ".parquet":
obj.to_csv(file, index=False)
elif file_suffix == ".parquet":
from pyarrow import Table

if isinstance(obj, Table):
from pyarrow import parquet
from pyarrow import output_stream, parquet

parquet.write_table(obj, where=output_stream(file))

parquet.write_table(obj, where=str(filename))
elif file_suffix == ".json":
if isinstance(obj, FaultRoomSurface):
serialized_json = json.dumps(obj.storage, indent=4)
else:
serialized_json = json.dumps(obj)

if isinstance(file, Path):
with open(file, "w") as stream:
stream.write(serialized_json)
else:
file.write(serialized_json.encode("utf-8"))

elif filename.suffix == ".json" and isinstance(obj, FaultRoomSurface):
with open(filename, "w") as stream:
json.dump(obj.storage, stream, indent=4)
elif filename.suffix == ".json":
with open(filename, "w") as stream:
json.dump(obj, stream)
else:
raise TypeError(f"Exporting {filename.suffix} for {type(obj)} is not supported")
raise TypeError(f"Exporting {file_suffix} for {type(obj)} is not supported")

return str(filename)

def md5sum(file: Path | BytesIO) -> str:
if isinstance(file, Path):
with open(file, "rb") as stream:
return md5sum_stream(stream)
return md5sum_stream(file)


def md5sum_stream(stream: BufferedIOBase) -> str:
"""Calculate the MD5 checksum of a stream."""
stream.seek(0)

def md5sum(fname: Path) -> str:
"""Calculate the MD5 checksum of a file."""
hash_md5 = hashlib.md5()
with open(fname, "rb") as fil:
for chunk in iter(lambda: fil.read(4096), b""):
hash_md5.update(chunk)
while True:
chunk = stream.read(4096)
if not chunk:
break
hash_md5.update(chunk)
return hash_md5.hexdigest()


def export_file_compute_checksum_md5(
obj: types.Inferrable,
filename: Path,
fmt: str = "",
) -> str:
"""Export and compute checksum"""
export_file(obj, filename, fmt=fmt)
return md5sum(filename)
def compute_md5(obj: types.Inferrable, file_suffix: str, fmt: str = "") -> str:
"""Compute an MD5 sum for an object."""
memory_stream = BytesIO()
export_file(obj, memory_stream, file_suffix, fmt=fmt)
return md5sum(memory_stream)


def compute_md5_using_temp_file(
obj: types.Inferrable, extension: str, fmt: str = ""
obj: types.Inferrable, file_suffix: str, fmt: str = ""
) -> str:
"""Compute an MD5 sum using a temporary file."""
if not extension.startswith("."):
raise ValueError("An extension must start with '.'")

with NamedTemporaryFile(buffering=0, suffix=extension) as tf:
logger.info("Compute MD5 sum for tmp file...: %s", tf.name)
return export_file_compute_checksum_md5(
obj=obj, filename=Path(tf.name), fmt=fmt
)
with NamedTemporaryFile(buffering=0, suffix=file_suffix) as tf:
logger.info("Compute MD5 sum for tmp file")
tempfile = Path(tf.name)
export_file(obj=obj, file=tempfile, fmt=fmt)
return md5sum(tempfile)


def create_symlink(source: str, target: str) -> None:
Expand Down
11 changes: 10 additions & 1 deletion src/fmu/dataio/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,20 @@ def _set_metadata(

objdata = objectdata_provider_factory(obj=obj, dataio=etemp)

try:
checksum_md5 = _utils.compute_md5(obj, objdata.extension)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(obj)}. Will use tempfile instead."
)
checksum_md5 = _utils.compute_md5_using_temp_file(obj, objdata.extension)

template["tracklog"] = [fields.Tracklog.initialize()[0]]
template["file"] = {
"relative_path": str(relpath),
"absolute_path": str(abspath) if abspath else None,
"checksum_md5": _utils.compute_md5_using_temp_file(obj, objdata.extension),
"checksum_md5": checksum_md5,
}

# data section
Expand Down
5 changes: 3 additions & 2 deletions src/fmu/dataio/dataio.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _validate_variable(key: str, value: type, legals: dict[str, str | type]) ->
valid_type = eval(legal_key) if isinstance(legal_key, str) else legal_key

try:
validcheck = valid_type.__args__
validcheck = valid_type.__args__ # type: ignore
except AttributeError:
validcheck = valid_type

Expand Down Expand Up @@ -821,7 +821,8 @@ def _export_without_metadata(self, obj: types.Inferrable) -> str:
).get_metadata()

assert filemeta.absolute_path is not None # for mypy
return export_file(obj, filename=filemeta.absolute_path, fmt=objdata.fmt)
export_file(obj, file=filemeta.absolute_path, fmt=objdata.fmt)
return str(filemeta.absolute_path)

# ==================================================================================
# Public methods:
Expand Down
23 changes: 15 additions & 8 deletions src/fmu/dataio/providers/_filedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

from fmu.dataio._logging import null_logger
from fmu.dataio._model import enums, fields
from fmu.dataio._utils import (
compute_md5_using_temp_file,
)
from fmu.dataio._utils import compute_md5, compute_md5_using_temp_file

from ._base import Provider

Expand Down Expand Up @@ -108,11 +106,20 @@ def _get_share_folders(self) -> Path:

def _compute_md5(self) -> str:
"""Compute an MD5 sum using a temporary file."""
if self.obj is None:
raise ValueError("Can't compute MD5 sum without an object.")
return compute_md5_using_temp_file(
self.obj, self.objdata.extension, fmt=self.objdata.fmt
)
try:
return compute_md5(
obj=self.obj,
file_suffix=self.objdata.extension,
fmt=self.objdata.fmt,
)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(self.obj)}. Will use tempfile instead."
)
return compute_md5_using_temp_file(
self.obj, self.objdata.extension, fmt=self.objdata.fmt
)

def _add_filename_to_path(self, path: Path) -> Path:
stem = self._get_filestem()
Expand Down
Loading

0 comments on commit d206c09

Please sign in to comment.