Skip to content

Commit

Permalink
pyupgrade --py3-plus
Browse files Browse the repository at this point in the history
  • Loading branch information
JB Lovland committed Jan 8, 2024
1 parent 32576f2 commit c3ba888
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 85 deletions.
1 change: 0 additions & 1 deletion .dmypy.json

This file was deleted.

3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,6 @@ venv.bak/

# setuptools_scm version
src/fmu/dataio/version.py

# mypy
.dmypy.json
2 changes: 1 addition & 1 deletion examples/s/d/nn/_project/aggregate_surfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _parse_yaml(fname):
dict
"""

with open(fname, "r") as stream:
with open(fname) as stream:
data = yaml.safe_load(stream)
return data

Expand Down
8 changes: 3 additions & 5 deletions src/fmu/dataio/_design_kw.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def run(

key_vals.update(rm_genkw_prefix(key_vals))

with open(template_file_name, "r") as template_file:
with open(template_file_name) as template_file:
template = template_file.readlines()

with open(result_file_name, "w") as result_file:
Expand All @@ -60,10 +60,8 @@ def all_matched(line: str, template_file_name: str, template: list[str]) -> bool
for unmatched in unmatched_templates(line):
if is_perl(template_file_name, template):
_logger.warning( # pylint: disable=logging-fstring-interpolation
(
f"{unmatched} not found in design matrix, "
f"but this is probably a Perl file"
)
f"{unmatched} not found in design matrix, "
f"but this is probably a Perl file"
)
else:
_logger.error( # pylint: disable=logging-fstring-interpolation
Expand Down
3 changes: 2 additions & 1 deletion src/fmu/dataio/_filedata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Populate and verify stuff in the 'file' block in fmu (partial excpetion is checksum_md5
as this is convinient to populate later, on demand)
"""
from __future__ import annotations

import logging
from copy import deepcopy
Expand Down Expand Up @@ -234,6 +235,6 @@ def _get_path_generic(

# check that destination actually exists if verifyfolder is True
if self.dataio.verifyfolder and not dest.exists():
raise IOError(f"Folder {str(dest)} is not present.")
raise OSError(f"Folder {str(dest)} is not present.")

return dest
2 changes: 1 addition & 1 deletion src/fmu/dataio/_fmu_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def get_ert2_information(self) -> None:
if self.dataio.include_ert2jobs:
jobs_file = self.iter_path / "jobs.json"
if jobs_file.is_file():
with open(jobs_file, "r") as stream:
with open(jobs_file) as stream:
self.ert2["jobs"] = json.load(stream)
logger.debug("jobs.json parsed.")
logger.debug("jobs.json was not found")
Expand Down
4 changes: 2 additions & 2 deletions src/fmu/dataio/_objectdata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class _ObjectDataProvider:
# input fields
obj: Any
dataio: Any
meta_existing: Optional[dict] = None
meta_existing: dict | None = None

# result properties; the most important is metadata which IS the 'data' part in
# the resulting metadata. But other variables needed later are also given
Expand Down Expand Up @@ -687,7 +687,7 @@ def _derive_from_existing(self) -> None:

self.time0, self.time1 = parse_timedata(self.meta_existing["data"])

def _process_content(self) -> Tuple[str, Optional[dict]]:
def _process_content(self) -> tuple[str, dict | None]:
"""Work with the `content` metadata"""

# content == "unset" is not wanted, but in case metadata has been produced while
Expand Down
55 changes: 26 additions & 29 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union, cast
from typing import cast

import pandas as pd # type: ignore
import pandas as pd
import yaml
from fmu.config import utilities as ut

try:
import pyarrow as pa # type: ignore
import pyarrow as pa
except ImportError:
HAS_PYARROW = False
else:
HAS_PYARROW = True
from pyarrow import feather

import xtgeo # type: ignore
import xtgeo

from . import _design_kw
from . import _oyaml as oyaml
Expand Down Expand Up @@ -82,15 +82,15 @@ def drop_nones(dinput: dict) -> dict:


def export_metadata_file(
yfile: str, metadata: dict, savefmt: str = "yaml", verbosity: str = "WARNING"
yfile: Path, metadata: dict, savefmt: str = "yaml", verbosity: str = "WARNING"
) -> None:
"""Export genericly and ordered to the complementary metadata file."""
logger.setLevel(level=verbosity)
if metadata:
xdata = drop_nones(metadata)

if savefmt == "yaml":
yamlblock = oyaml.safe_dump(xdata, allow_unicode=True)
yamlblock = oyaml.safe_dump(xdata, allow_unicode=True) # type: ignore
with open(yfile, "w", encoding="utf8") as stream:
stream.write(yamlblock)
else:
Expand All @@ -107,7 +107,7 @@ def export_metadata_file(


def export_file(
obj: object, filename: str, extension: str, flag: str | None = None
obj: object, filename: Path, extension: str, flag: str | None = None
) -> str:
"""Export a valid object to file"""

Expand Down Expand Up @@ -154,7 +154,7 @@ def export_file(
return str(filename)


def md5sum(fname: str) -> str:
def md5sum(fname: Path) -> str:
hash_md5 = hashlib.md5()
with open(fname, "rb") as fil:
for chunk in iter(lambda: fil.read(4096), b""):
Expand All @@ -164,17 +164,17 @@ def md5sum(fname: str) -> str:

def export_file_compute_checksum_md5(
obj: object,
filename: str,
filename: Path,
extension: str,
flag: str | None = None,
tmp: bool = False,
) -> tuple[str | Path | None, str,]:
"""Export and compute checksum, with possibility to use a tmp file."""

usefile: str | None = filename
usefile: Path | None = filename
if tmp:
tmpdir = tempfile.TemporaryDirectory()
usefile = str(Path(tmpdir.name) / "tmpfile")
usefile = Path(tmpdir.name) / "tmpfile"

assert usefile is not None
export_file(obj, usefile, extension, flag=flag)
Expand All @@ -190,17 +190,17 @@ def create_symlink(source: str, target: str) -> None:

thesource = Path(source)
if not thesource.exists():
raise IOError(f"Cannot symlink: Source file {thesource} does not exist.")
raise OSError(f"Cannot symlink: Source file {thesource} does not exist.")

thetarget = Path(target)

if thetarget.exists() and not thetarget.is_symlink():
raise IOError(f"Target file {thetarget} exists already as a normal file.")
raise OSError(f"Target file {thetarget} exists already as a normal file.")

os.symlink(source, target)

if not (thetarget.exists() and thetarget.is_symlink()):
raise IOError(f"Target file {thesource} does not exist or is not a symlink.")
raise OSError(f"Target file {thesource} does not exist or is not a symlink.")


def size(fname: str) -> int:
Expand All @@ -212,9 +212,7 @@ def uuid_from_string(string: str) -> str:
return str(uuid.UUID(hashlib.md5(string.encode("utf-8")).hexdigest()))


def read_parameters_txt(
pfile: Union[Path, str]
) -> Dict[str, Union[str, float, int | None]]:
def read_parameters_txt(pfile: Path | str) -> dict[str, str | float | int | None]:
"""Read the parameters.txt file and convert to a dict.
The parameters.txt file has this structure::
SENSNAME rms_seed
Expand Down Expand Up @@ -258,8 +256,8 @@ def read_parameters_txt(


def nested_parameters_dict(
paramdict: Dict[str, Union[str, int, float]]
) -> Dict[str, Union[str, int, float, Dict[str, Union[str, int, float]]]]:
paramdict: dict[str, str | int | float]
) -> dict[str, str | int | float | dict[str, str | int | float]]:
"""Interpret a flat parameters dictionary into a nested dictionary, based on
presence of colons in keys.
Expand All @@ -268,10 +266,8 @@ def nested_parameters_dict(
In design_kw (semeio) this namespace identifier is actively ignored, meaning that
the keys without the namespace must be unique.
"""
nested_dict: Dict[
str, Union[str, int, float, Dict[str, Union[str, int, float]]]
] = {}
unique_keys: List[str] = []
nested_dict: dict[str, str | int | float | dict[str, str | int | float]] = {}
unique_keys: list[str] = []
for key, value in paramdict.items():
if ":" in key:
subdict, newkey = key.split(":", 1)
Expand Down Expand Up @@ -391,7 +387,7 @@ def filter_validate_metadata(metadata_in: dict) -> dict:
return metadata


def generate_description(desc: Optional[Union[str, list]] = None) -> Union[list, None]:
def generate_description(desc: str | list | None = None) -> list | None:
"""Parse desciption input (generic)."""
if not desc:
return None
Expand All @@ -404,7 +400,7 @@ def generate_description(desc: Optional[Union[str, list]] = None) -> Union[list,
raise ValueError("Description of wrong type, must be list of strings or string")


def read_metadata(filename: Union[str, Path]) -> dict:
def read_metadata(filename: str | Path) -> dict:
"""Read the metadata as a dictionary given a filename.
If the filename is e.g. /some/path/mymap.gri, the assosiated metafile
Expand All @@ -418,13 +414,13 @@ def read_metadata(filename: Union[str, Path]) -> dict:
"""
fname = Path(filename)
if fname.stem.startswith("."):
raise IOError(f"The input is a hidden file, cannot continue: {fname.stem}")
raise OSError(f"The input is a hidden file, cannot continue: {fname.stem}")

metafile = str(fname.parent) + "/." + fname.stem + fname.suffix + ".yml"
metafilepath = Path(metafile)
if not metafilepath.exists():
raise IOError(f"Cannot find requested metafile: {metafile}")
with open(metafilepath, "r") as stream:
raise OSError(f"Cannot find requested metafile: {metafile}")
with open(metafilepath) as stream:
metacfg = yaml.safe_load(stream)

return metacfg
Expand All @@ -450,7 +446,8 @@ def glue_metadata_preprocessed(


def parse_timedata(
datablock: dict, isoformat: bool = True
datablock: dict,
isoformat: bool = True,
) -> tuple[str | None, str | None]:
"""The time section under datablock has variants to parse.
Expand Down
Loading

0 comments on commit c3ba888

Please sign in to comment.