diff --git a/AFMReader/io.py b/AFMReader/io.py index 4189626..852f684 100644 --- a/AFMReader/io.py +++ b/AFMReader/io.py @@ -2,6 +2,7 @@ import struct from typing import BinaryIO +import h5py def read_uint8(open_file: BinaryIO) -> int: @@ -215,3 +216,43 @@ def skip_bytes(open_file: BinaryIO, length_bytes: int = 1) -> bytes: The bytes that were skipped. """ return open_file.read(length_bytes) + + +def unpack_hdf5(open_hdf5_file: h5py.File, group_path: str = "/") -> dict: + """ + Read a dictionary from an open hdf5 file. + + Parameters + ---------- + open_hdf5_file : h5py.File + An open hdf5 file object. + group_path : str + Path to the group in the hdf5 file to start reading the data from. + + Returns + ------- + dict + Dictionary containing the data from the hdf5 file. + + Examples + -------- + Read the data from the root group of the hdf5 file. + >>> with h5py.File("path/to/file.h5", "r") as f: + >>> data = unpack_hdf5(open_hdf5_file=f, group_path="/") + Read data from a particular dataset in the hdf5 file. + >>> with h5py.File("path/to/file.h5", "r") as f: + >>> data = unpack_hdf5(open_hdf5_file=f, group_path="/dataset_name") + """ + data = {} + for key, item in open_hdf5_file[group_path].items(): + if isinstance(item, h5py.Group): + # Incur recursion for nested groups + data[key] = unpack_hdf5(open_hdf5_file, f"{group_path}/{key}") + # Decode byte strings to utf-8. The data type "O" is a byte string. + elif isinstance(item, h5py.Dataset) and item.dtype == "O": + # Byte string + data[key] = item[()].decode("utf-8") + else: + # Another type of dataset + data[key] = item[()] + return data diff --git a/AFMReader/topostats.py b/AFMReader/topostats.py new file mode 100644 index 0000000..d92b270 --- /dev/null +++ b/AFMReader/topostats.py @@ -0,0 +1,55 @@ +"""For decoding and loading .topostats (HDF5 format) AFM file format into Python Nympy arrays.""" + +from __future__ import annotations +from pathlib import Path + +import h5py + +from AFMReader.logging import logger +from AFMReader.io import unpack_hdf5 + +logger.enable(__package__) + + +def load_topostats(file_path: Path | str) -> tuple: + """ + Extract image and pixel to nm scaling from the .topostats (HDF5 format) file. + + Parameters + ---------- + file_path : Path or str + Path to the .topostats file. + + Returns + ------- + tuple(np.ndarray, float) + A tuple containing the image, its pixel to nm scaling factor and the data dictionary + containing all the extra image data and metadata in dictionary format. + + Raises + ------ + OSError + If the file is not found. + + Examples + -------- + >>> image, pixel_to_nm_scaling = load_topostats("path/to/topostats_file.topostats") + """ + logger.info(f"Loading image from : {file_path}") + file_path = Path(file_path) + filename = file_path.stem + try: + with h5py.File(file_path, "r") as f: + data = unpack_hdf5(open_hdf5_file=f, group_path="/") + + file_version = data["topostats_file_version"] + logger.info(f"[{filename}] TopoStats file version : {file_version}") + image = data["image"] + pixel_to_nm_scaling = data["pixel_to_nm_scaling"] + + except OSError as e: + if "Unable to open file" in str(e): + logger.error(f"[{filename}] File not found : {file_path}") + raise e + + return (image, pixel_to_nm_scaling, data) diff --git a/README.md b/README.md index 6e41153..3e1f449 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Supported file formats | `.ibw` | [WaveMetrics](https://www.wavemetrics.com/) | | `.spm` | [Bruker's Format](https://www.bruker.com/) | | `.jpk` | [Bruker](https://www.bruker.com/) | +| `.topostats`| [TopoStats](https://github.com/AFM-SPM/TopoStats) | Support for the following additional formats is planned. Some of these are already supported in TopoStats and are awaiting refactoring to move their functionality into AFMReader these are denoted in bold below. @@ -41,6 +42,16 @@ awaiting refactoring to move their functionality into AFMReader these are denote If you wish to process AFM images supported by `AFMReader` it is recommend you use [TopoStats](https://github.com/AFM-SPM/TopoStats) to do so, however the library can be used on its own. +### .topostats + +You can open `.topostats` files using the `load_topostats` function. Just pass in the path to the file. + +```python +from AFMReader.topostats import load_topostats + +image, pixel_to_nanometre_scaling_factor, metadata = load_topostats(file_path="./my_topostats_file.topostats") +``` + ### .spm You can open `.spm` files using the `load_spm` function. Just pass in the path to the file and the diff --git a/examples/example_01.ipynb b/examples/example_01.ipynb index c2762d0..91a1582 100644 --- a/examples/example_01.ipynb +++ b/examples/example_01.ipynb @@ -187,7 +187,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# IBW Files" + "# TopoStats Files" ] }, { @@ -197,7 +197,7 @@ "outputs": [], "source": [ "# Import the load_ibw function from AFMReader\n", - "from AFMReader.ibw import load_ibw" + "from AFMReader.topostats import load_topostats" ] }, { @@ -206,9 +206,9 @@ "metadata": {}, "outputs": [], "source": [ - "# Load the IBW file as an image and pixel to nm scaling factor\n", - "FILE = \"../tests/resources/sample_0.ibw\"\n", - "image, pixel_to_nm_scaling = load_ibw(file_path=FILE, channel=\"HeightTracee\")" + "# Load the TopoStats file as an image, pixel to nm scaling factor, and metadata\n", + "FILE = \"../tests/resources/sample_0_1.topostats\"\n", + "image, pixel_to_nm_scaling, metadata = load_topostats(file_path=FILE)" ] }, { @@ -241,7 +241,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.11.9" } }, "nbformat": 4, diff --git a/pyproject.toml b/pyproject.toml index bd44678..583a8fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dependencies = [ "tifffile", "pySPM", "loguru", + "h5py", ] [project.optional-dependencies] diff --git a/tests/resources/sample_0_1.topostats b/tests/resources/sample_0_1.topostats new file mode 100644 index 0000000..c714e7a Binary files /dev/null and b/tests/resources/sample_0_1.topostats differ diff --git a/tests/test_io.py b/tests/test_io.py new file mode 100644 index 0000000..f4545fd --- /dev/null +++ b/tests/test_io.py @@ -0,0 +1,187 @@ +"""Test the reading and writing of data from / to files.""" + +from pathlib import Path + +import numpy as np +import h5py + + +from AFMReader.io import unpack_hdf5 + + +def test_unpack_hdf5_all_together_group_path_default(tmp_path: Path) -> None: + """Test loading a nested dictionary with arrays from HDF5 format with group path as default.""" + to_save = { + "a": 1, + "b": np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), + "c": "test", + "d": {"e": 1, "f": np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), "g": "test"}, + } + + group_path = "/" + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_nested_with_arrays_group_path_standard.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + f.create_dataset("c", data=to_save["c"]) + d = f.create_group("d") + d.create_dataset("e", data=to_save["d"]["e"]) + d.create_dataset("f", data=to_save["d"]["f"]) + d.create_dataset("g", data=to_save["d"]["g"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_nested_with_arrays_group_path_standard.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, to_save) + + +def test_unpack_hdf5_all_together_group_path_non_standard(tmp_path: Path) -> None: + """Test loading a nested dictionary with arrays from HDF5 format with a non-standard group path.""" + to_save = { + "a": 1, + "b": np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), + "c": "test", + "d": {"e": 1, "f": np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), "g": "test"}, + } + + group_path = "/d/" + + expected = { + "e": 1, + "f": np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), + "g": "test", + } + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_all_together_group_path_nonstandard.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + f.create_dataset("c", data=to_save["c"]) + d = f.create_group("d") + d.create_dataset("e", data=to_save["d"]["e"]) + d.create_dataset("f", data=to_save["d"]["f"]) + d.create_dataset("g", data=to_save["d"]["g"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_all_together_group_path_nonstandard.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, expected) + + +def test_unpack_hdf5_int(tmp_path: Path) -> None: + """Test loading a dictionary with an integer from HDF5 format.""" + to_save = {"a": 1, "b": 2} + + group_path = "/" + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_int.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_int.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, to_save) + + +def test_unpack_hdf5_float(tmp_path: Path) -> None: + """Test loading a dictionary with a float from HDF5 format.""" + to_save = {"a": 0.01, "b": 0.02} + + group_path = "/" + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_float.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_float.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, to_save) + + +def test_unpack_hdf5_str(tmp_path: Path) -> None: + """Test loading a dictionary with a string from HDF5 format.""" + to_save = {"a": "test", "b": "test2"} + + group_path = "/" + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_str.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_str.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, to_save) + + +def test_unpack_hdf5_dict_nested_dict(tmp_path: Path) -> None: + """Test loading a nested dictionary from HDF5 format.""" + to_save = { + "a": 1, + "b": 2, + "c": {"d": 3, "e": 4}, + } + + group_path = "/" + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_nested_dict.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + c = f.create_group("c") + c.create_dataset("d", data=to_save["c"]["d"]) + c.create_dataset("e", data=to_save["c"]["e"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_nested_dict.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, to_save) + + +def test_unpack_hdf5_nested_dict_group_path(tmp_path: Path) -> None: + """Test loading a nested dictionary from HDF5 format with a non-standard group path.""" + to_save = { + "a": 1, + "b": 2, + "c": {"d": 3, "e": 4}, + } + + group_path = "/c/" + + expected = { + "d": 3, + "e": 4, + } + + # Manually save the dictionary to HDF5 format + with h5py.File(tmp_path / "hdf5_file_nested_dict_group_path.hdf5", "w") as f: + # Write the datasets and groups to the file without using the dict_to_hdf5 function + f.create_dataset("a", data=to_save["a"]) + f.create_dataset("b", data=to_save["b"]) + c = f.create_group("c") + c.create_dataset("d", data=to_save["c"]["d"]) + c.create_dataset("e", data=to_save["c"]["e"]) + + # Load it back in and check if the dictionary is the same + with h5py.File(tmp_path / "hdf5_file_nested_dict_group_path.hdf5", "r") as f: + result = unpack_hdf5(open_hdf5_file=f, group_path=group_path) + + np.testing.assert_equal(result, expected) diff --git a/tests/test_topostats.py b/tests/test_topostats.py new file mode 100644 index 0000000..e6de6b3 --- /dev/null +++ b/tests/test_topostats.py @@ -0,0 +1,49 @@ +"""Test the loading of topostats (HDF5 format) files.""" + +from pathlib import Path +import pytest + +import numpy as np + +from AFMReader.topostats import load_topostats + +BASE_DIR = Path.cwd() +RESOURCES = BASE_DIR / "tests" / "resources" + + +@pytest.mark.parametrize( + ("file_name", "topostats_file_version", "image_shape", "pixel_to_nm_scaling", "data_keys", "image_sum"), + [ + pytest.param( + "sample_0_1.topostats", + 0.1, + (64, 64), + 1.97601171875, + {"topostats_file_version", "image", "pixel_to_nm_scaling"}, + 112069.51332503435, + id="version", + ), + ], +) +def test_load_topostats( + file_name: str, + topostats_file_version: float, + image_shape: tuple[int, int], + pixel_to_nm_scaling: float, + data_keys: set[str], + image_sum: float, +) -> None: + """Test the normal operation of loading a .topostats (HDF5 format) file.""" + result_image = np.ndarray + result_pixel_to_nm_scaling = float + result_data = dict + + file_path = RESOURCES / file_name + result_image, result_pixel_to_nm_scaling, result_data = load_topostats(file_path) + + assert result_pixel_to_nm_scaling == pixel_to_nm_scaling + assert isinstance(result_image, np.ndarray) + assert result_image.shape == image_shape + assert set(result_data.keys()) == data_keys + assert result_data["topostats_file_version"] == topostats_file_version + assert result_image.sum() == image_sum