diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dfa1ddf..0001c3e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tests written as pytests instead of unittest. + ## [0.11.0] - 2023-08-09 ### Added diff --git a/poetry.lock b/poetry.lock index 6d10969e..ed1967c6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -146,14 +146,14 @@ files = [ [[package]] name = "click" -version = "8.1.6" +version = "8.1.7" description = "Composable command line interface toolkit" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "click-8.1.6-py3-none-any.whl", hash = "sha256:fa244bb30b3b5ee2cae3da8f55c9e5e0c0e86093306301fb418eb9dc40fbded5"}, - {file = "click-8.1.6.tar.gz", hash = "sha256:48ee849951919527a045bfe3bf7baa8a959c423134e1a5b98c05c20ba75a1cbd"}, + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, ] [package.dependencies] @@ -224,14 +224,14 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.1.2" +version = "1.1.3" description = "Backport of PEP 654 (exception groups)" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "exceptiongroup-1.1.2-py3-none-any.whl", hash = "sha256:e346e69d186172ca7cf029c8c1d16235aa0e04035e5750b4b95039e65204328f"}, - {file = "exceptiongroup-1.1.2.tar.gz", hash = "sha256:12c3e887d6485d16943a309616de20ae5582633e0a2eda17f4e10fd61c1e8af5"}, + {file = "exceptiongroup-1.1.3-py3-none-any.whl", hash = "sha256:343280667a4585d195ca1cf9cef84a4e178c4b6cf2274caef9859782b567d5e3"}, + {file = "exceptiongroup-1.1.3.tar.gz", hash = "sha256:097acd85d473d75af5bb98e41b61ff7fe35efe6675e4f9370ec6ec5126d160e9"}, ] [package.extras] @@ -352,21 +352,6 @@ files = [ {file = "packaging-23.1.tar.gz", hash = "sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f"}, ] -[[package]] -name = "parameterized" -version = "0.8.1" -description = "Parameterized testing with any Python test framework" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "parameterized-0.8.1-py2.py3-none-any.whl", hash = "sha256:9cbb0b69a03e8695d68b3399a8a5825200976536fe1cb79db60ed6a4c8c9efe9"}, - {file = "parameterized-0.8.1.tar.gz", hash = "sha256:41bbff37d6186430f77f900d777e5bb6a24928a1c46fb1de692f8b52b8833b5c"}, -] - -[package.extras] -dev = ["jinja2"] - [[package]] name = "pathspec" version = "0.11.2" @@ -505,14 +490,14 @@ files = [ [[package]] name = "pydicom" -version = "2.4.2" +version = "2.4.3" description = "A pure Python package for reading and writing DICOM data" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "pydicom-2.4.2-py3-none-any.whl", hash = "sha256:d2801c234a2f99ac97c6c0b5e50a908ed16d2fef905a2fda49fecd311cd88802"}, - {file = "pydicom-2.4.2.tar.gz", hash = "sha256:b4ae58ec16dc4155c18b0c87f75813fe32c1206f0eb088875b4fb12e5e596c85"}, + {file = "pydicom-2.4.3-py3-none-any.whl", hash = "sha256:797e84f7b22e5f8bce403da505935b0787dca33550891f06495d14b3f6c70504"}, + {file = "pydicom-2.4.3.tar.gz", hash = "sha256:51906e0b9fb6e184a0f56298cb43ed716b7cf7edc00f6b71d5c769bc1f982402"}, ] [package.extras] @@ -773,4 +758,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "264f7d4363a7398ef5c95db0a9946e5b7d3f177b291c914818935d0988ee72c9" +content-hash = "7f9b7fd4fe66377d59aa14a4e0086d7b49c898a4630f1762acd5d6b50bdd5cde" diff --git a/pyproject.toml b/pyproject.toml index 9aa3f7e4..cfa2fb75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,6 @@ shapely = "^1.7.0" pycodestyle = "^2.8.0" black = "^23.1.0" flake8 = "^4.0.1" -parameterized = "^0.8.1" codespell = "^2.2.5" [build-system] diff --git a/tests/wsi_test_files.py b/tests/conftest.py similarity index 50% rename from tests/wsi_test_files.py rename to tests/conftest.py index 746709bb..8ad25fba 100644 --- a/tests/wsi_test_files.py +++ b/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright 2021, 2022, 2023 SECTRA AB +# Copyright 2023 SECTRA AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,15 @@ import json import os -import unittest from enum import Enum +from io import BufferedReader from pathlib import Path -from typing import Any, BinaryIO, Dict, Iterable, List, Tuple +from tempfile import TemporaryDirectory +from typing import Any, Dict, Iterable, List, Tuple +import pytest + +from tests.data_gen import create_layer_file from wsidicom import WsiDicom from wsidicom.web.wsidicom_web_client import WsiDicomFileClient @@ -33,34 +37,47 @@ class WsiInputType(Enum): class WsiTestDefinitions: + """Interface for reading test parameters from definition file.""" + with open(REGION_DEFINITIONS_FILE) as json_file: test_definitions: Dict[str, Dict[str, Any]] = json.load(json_file) if len(test_definitions) == 0: - raise unittest.SkipTest("No test definition found, skipping.") + pytest.skip("No test definition found, skipping.") + + @classmethod + def folders(cls) -> Iterable[Path]: + return ( + SLIDE_FOLDER.joinpath(wsi_name, path) + for wsi_name, path in cls._get_parameter("path") + ) + + @classmethod + def wsi_names(cls) -> Iterable[str]: + return cls.test_definitions.keys() @classmethod def read_region(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_region") + return cls._get_dict("read_region") @classmethod def read_region_mm(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_region_mm") + return cls._get_dict("read_region_mm") @classmethod def read_region_mpp(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_region_mpp") + return cls._get_dict("read_region_mpp") @classmethod def read_tile(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_tile") + return cls._get_dict("read_tile") @classmethod def read_encoded_tile(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_encoded_tile") + return cls._get_dict("read_encoded_tile") @classmethod def read_thumbnail(cls) -> Iterable[Tuple[str, Dict[str, Any]]]: - return cls._get_region("read_thumbnail") + return cls._get_dict("read_thumbnail") @classmethod def levels(cls) -> Iterable[Tuple[str, int]]: @@ -75,75 +92,68 @@ def has_overview(cls) -> Iterable[Tuple[str, bool]]: return cls._get_parameter("overview") @classmethod - def _get_region(cls, region_name: str) -> Iterable[Tuple[str, Dict[str, Any]]]: + def _get_dict(cls, region_name: str) -> Iterable[Tuple[str, Dict[str, Any]]]: return [ - (folder, region) - for folder, folder_definition in cls.test_definitions.items() - for region in folder_definition[region_name] + (wsi_name, region) + for wsi_name, wsi_definition in cls.test_definitions.items() + for region in wsi_definition[region_name] ] @classmethod def _get_parameter(cls, parameter_name: str) -> Iterable[Tuple[str, Any]]: return [ - (folder, folder_definition[parameter_name]) - for folder, folder_definition in cls.test_definitions.items() + ( + wsi_name, + wsi_definition[parameter_name], + ) + for wsi_name, wsi_definition in cls.test_definitions.items() ] -class WsiTestFiles: - folders = {} - if SLIDE_FOLDER.exists(): - folders = { - item.parts[-1]: item for item in SLIDE_FOLDER.iterdir() if item.is_dir - } - if len(folders) == 0: - raise unittest.SkipTest( - f"No test slide files found for {SLIDE_FOLDER}, skipping." - ) - - def __init__(self, input_type: WsiInputType = WsiInputType.FILE): - self._input_type = input_type - self._wsis: Dict[str, WsiDicom] = {} - self._opened_streams: List[BinaryIO] = [] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.close() - - def close(self): - for wsi in self._wsis.values(): - wsi.close() - for stream in self._opened_streams: - stream.close() - - def get_wsi(self, wsi_name: str) -> WsiDicom: - if wsi_name in self._wsis: - return self._wsis[wsi_name] - if not wsi_name in self.folders: - raise unittest.SkipTest("WSI files not found, skipping.") - folder = self.folders[wsi_name] - try: - while next(folder.iterdir()).is_dir(): - folder = next(folder.iterdir()) - except StopIteration: - raise unittest.SkipTest("WSI files not found, skipping.") - if self._input_type == WsiInputType.FILE: +@pytest.fixture() +def wsi(tmp_path: Path): + test_file_path = tmp_path.joinpath("test_im.dcm") + create_layer_file(test_file_path) + with WsiDicom.open(tmp_path) as wsi: + yield wsi + + +@pytest.fixture(scope="module") +def wsi_factory(): + """Fixture providing a callable that takes a wsi name and input type and returns a + WsiDicom object. Caches opened objects and closes when tests using fixture are done. + """ + streams: List[BufferedReader] = [] + wsis: Dict[Tuple[WsiInputType, Path], WsiDicom] = {} + + def open_wsi( + wsi_name: str, input_type: WsiInputType = WsiInputType.FILE + ) -> WsiDicom: + test_definition = WsiTestDefinitions.test_definitions[wsi_name] + folder = Path(SLIDE_FOLDER).joinpath(wsi_name, test_definition["path"]) + if (input_type, folder) in wsis: + return wsis[(input_type, folder)] + if not folder.exists(): + pytest.skip(f"Folder {folder} does not exist.") + if input_type == WsiInputType.FILE: wsi = WsiDicom.open(folder) - elif self._input_type == WsiInputType.WEB: + elif input_type == WsiInputType.WEB: client = WsiDicomFileClient(folder) - test_definition = WsiTestDefinitions.test_definitions[wsi_name] wsi = WsiDicom.open_web( client, test_definition["study_instance_uid"], test_definition["series_instance_uid"], ) - elif self._input_type == WsiInputType.STREAM: + elif input_type == WsiInputType.STREAM: streams = [open(file, "rb") for file in folder.iterdir() if file.is_file()] - self._opened_streams.extend(streams) wsi = WsiDicom.open(streams) else: - raise ValueError(f"Unknown test_type {self._input_type}.") - self._wsis[(wsi_name)] = wsi + raise NotImplementedError() + wsis[(input_type, folder)] = wsi return wsi + + yield open_wsi + for wsi in wsis.values(): + wsi.close() + for stream in streams: + stream.close() diff --git a/tests/test_annotation.py b/tests/test_annotation.py index 4756b77c..8a9f2ad3 100644 --- a/tests/test_annotation.py +++ b/tests/test_annotation.py @@ -1,4 +1,4 @@ -# Copyright 2021 SECTRA AB +# Copyright 2021, 2023 SECTRA AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ import json import os -import unittest from collections import OrderedDict from copy import deepcopy from dataclasses import dataclass @@ -24,13 +23,13 @@ import numpy as np import pytest -from shapely.geometry import Point as ShapelyPoint -from shapely.geometry import LineString as ShapelyLineString -from shapely.geometry import Polygon as ShapelyPolygon - import xmltodict from pydicom.uid import generate_uid from shapely import wkt +from shapely.geometry import LineString as ShapelyLineString +from shapely.geometry import Point as ShapelyPoint +from shapely.geometry import Polygon as ShapelyPolygon + from wsidicom import WsiDicom from wsidicom.conceptcode import ( AnnotationCategoryCode, @@ -54,8 +53,6 @@ ) from wsidicom.uid import SlideUids -from tests.data_gen import create_layer_file - ANNOTATION_FOLDER = Path("tests/testdata/annotation") type_code = AnnotationTypeCode("Nucleus") @@ -67,349 +64,219 @@ ) -@pytest.mark.annotation -class WsiDicomAnnotationTests(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.tempdir = TemporaryDirectory() - dirpath = Path(cls.tempdir.name) - test_file_path = dirpath.joinpath("test_im_annotated.dcm") - create_layer_file(test_file_path) - cls.slide = WsiDicom.open(cls.tempdir.name) - - cls.test_files = {} - for folder in ANNOTATION_FOLDER.iterdir(): - folder_name = os.path.basename(folder) - cls.test_files[folder_name] = list(folder.iterdir()) - - @classmethod - def tearDownClass(cls): - cls.slide.close() - cls.tempdir.cleanup() +test_files = { + os.path.basename(folder): list(folder.iterdir()) + for folder in ANNOTATION_FOLDER.iterdir() +} - @classmethod - def dicom_round_trip(cls, dicom: AnnotationInstance) -> AnnotationInstance: - """Saves the annotation collection to temporary file, reads the file - and return the parsed annotation collection. +area = MeasurementCode("Area") +pixels = UnitCode("Pixels") +measurement0 = Measurement(area, 5, pixels) +measurement1 = Measurement(area, 10, pixels) +measurement2 = Measurement(area, 15, pixels) +point = Point(1, 1) - Parameters - ---------- - dicom: AnnotationInstance - Collection of annotations to save. - Returns - ---------- - AnnotationInstance - Read back annotation collection. - - """ - tempdir = TemporaryDirectory() - dirpath = Path(tempdir.name) - filename = "annotation_round_trip.dcm" - dcm_path = str(dirpath.joinpath(filename)) - dicom.save(dcm_path) - read_annotations = list(AnnotationInstance.open([dcm_path]))[0] - tempdir.cleanup() - return read_annotations +@pytest.mark.annotation +class TestWsiDicomAnnotation: + @pytest.mark.parametrize("file_path", test_files["qupath_geojson"]) + def test_qupath_geojson(self, file_path: Path): + with open(file_path) as f: + # Arrange + input_dict: Dict[str, Any] = json.load(f) + group = AnnotationGroup.from_geometries( + Geometry.from_geojson(input_dict["geometry"]), + label=input_dict["properties"]["name"], + category_code=category_code, + type_code=type_code, + ) - @staticmethod - def annotation_group_to_qupath( - group: AnnotationGroup, - ) -> Tuple[str, Union[List[float], List[List[float]], List[List[List[float]]]]]: - if isinstance(group, PointAnnotationGroup): - if len(group) == 1: - return ("Point", group.annotations[0].geometry.to_list_coords()[0]) - else: - coordinates = [] - for annotation in group.annotations: - coordinates += annotation.geometry.to_list_coords() - return ("MultiPoint", coordinates) - elif isinstance(group, PolylineAnnotationGroup): - return ("LineString", group.annotations[0].geometry.to_list_coords()) - elif isinstance(group, PolygonAnnotationGroup): - return ("Polygon", [group.annotations[0].geometry.to_list_coords()]) - raise NotImplementedError() + # Act + dicom = AnnotationInstance([group], "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) - def test_qupath_geojson(self): - files = self.test_files["qupath_geojson"] - for file_path in files: - with open(file_path) as f: - print(file_path) - input_dict: Dict[str, Any] = json.load(f) - group = AnnotationGroup.from_geometries( - Geometry.from_geojson(input_dict["geometry"]), - label=input_dict["properties"]["name"], - category_code=category_code, - type_code=type_code, + # Assert + output_group = dicom[0] + geometry_type, coordinates = self.annotation_group_to_qupath(output_group) + output_dict = deepcopy(input_dict) + output_dict["geometry"]["type"] = geometry_type + output_dict["geometry"]["coordinates"] = coordinates + output_dict["properties"]["name"] = output_group.label + assert input_dict == output_dict + + @pytest.mark.parametrize("file_path", test_files["qupath_geojson_advanced"]) + def test_qupath_geojson_advanced(self, file_path: Path): + with open(file_path) as f: + # Arrange + input_dict: Dict = json.load(f) + # Group annotations by type and type using key + + @dataclass(unsafe_hash=True) + class Key: + annotation_type: type + type_code: AnnotationTypeCode + + @dataclass(unsafe_hash=True) + class Value: + label: str + color: LabColor + geometries: List[Geometry] + + grouped_annotations: Dict[Key, Value] = {} + + # For each annotation, make a type-category key and insert the + # annotation in the correct dict-group. If no group exists, + # create one using the key and insert the label. + for input_annotation in input_dict: + geometries = Geometry.from_geojson(input_annotation["geometry"]) + group_key = Key( + annotation_type=type(geometries[0]), + type_code=self.qupath_get_type_code(input_annotation), ) - dicom = AnnotationInstance([group], "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] - geometry_type, coordinates = self.annotation_group_to_qupath( - output_group + try: + group = grouped_annotations[group_key] + except KeyError: + group = Value( + label=self.qupath_get_label(input_annotation), + color=LabColor(0, 0, 0), + geometries=[], + ) + grouped_annotations[group_key] = group + group.geometries += geometries + + assert grouped_annotations != {} + + # For each group of annotations (same type and category) make + # an annotation group + annotation_groups: List[AnnotationGroup] = [] + for group_keys, group_values in grouped_annotations.items(): + annotation_group = AnnotationGroup.from_geometries( + geometries=group_values.geometries, + label=group_values.label, + category_code=category_code, + type_code=group_keys.type_code, ) - output_dict = deepcopy(input_dict) - output_dict["geometry"]["type"] = geometry_type - output_dict["geometry"]["coordinates"] = coordinates - output_dict["properties"]["name"] = output_group.label - self.assertDictEqual(input_dict, output_dict) - - @staticmethod - def qupath_get_type_code(annotation: Dict) -> AnnotationTypeCode: - type_code = annotation["properties"]["classification"]["name"] - return AnnotationTypeCode(type_code) + annotation_groups.append(annotation_group) + assert annotation_groups != [] - @staticmethod - def qupath_get_color(annotation: Dict) -> Tuple[int, int, int]: - color: Tuple[int, int, int] = annotation["properties"]["color"] - return color + # Act + # Make a group collection and do dicom round-trip + dicom = AnnotationInstance(annotation_groups, "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) - @staticmethod - def qupath_get_label(annotation: Dict) -> str: - label: str = annotation["properties"]["name"] - return label + # Assert + assert dicom.groups != [] + # For each annotation group, produce a type-category_code key. + # Get the original group using the key and check that the + # annotations are the same. + for output_group in dicom.groups: + assert output_group.annotations != [] + key = Key( + annotation_type=output_group.geometry_type, + type_code=output_group.type_code, + ) + input_group = grouped_annotations[key] + output = enumerate(output_group.annotations) + for i, output_annotation in output: + input_annotation = input_group.geometries[i] + assert output_annotation.geometry == input_annotation + + @pytest.mark.parametrize("file_path", test_files["asap"]) + def test_asap(self, file_path: Path): + with open(file_path) as f: + # Arrange + annotation_xml = xmltodict.parse(f.read())["ASAP_Annotations"] + input_dict: Dict[str, Any] = annotation_xml["Annotations"]["Annotation"] + group = AnnotationGroup.from_geometries( + self.asap_to_geometries(input_dict), + label=input_dict["@Name"], + category_code=category_code, + type_code=type_code, + ) - def test_qupath_geojson_advanced(self): - files = self.test_files["qupath_geojson_advanced"] - for file_path in files: - with open(file_path) as f: - print(file_path) - input_dict: Dict = json.load(f) - # Group annotations by type and type using key - - @dataclass(unsafe_hash=True) - class Key: - annotation_type: type - type_code: AnnotationTypeCode - - @dataclass(unsafe_hash=True) - class Value: - label: str - color: LabColor - geometries: List[Geometry] - - grouped_annotations: Dict[Key, Value] = {} - - # For each annotation, make a type-category key and insert the - # annotation in the correct dict-group. If no group exists, - # create one using the key and insert the label. - for input_annotation in input_dict: - geometries = Geometry.from_geojson(input_annotation["geometry"]) - group_key = Key( - annotation_type=type(geometries[0]), - type_code=self.qupath_get_type_code(input_annotation), - ) - try: - group = grouped_annotations[group_key] - except KeyError: - group = Value( - label=self.qupath_get_label(input_annotation), - color=LabColor(0, 0, 0), - geometries=[], - ) - grouped_annotations[group_key] = group - group.geometries += geometries - - self.assertNotEqual(grouped_annotations, {}) - - # For each group of annotations (same type and category) make - # an annotation group - annotation_groups: List[AnnotationGroup] = [] - for group_keys, group_values in grouped_annotations.items(): - annotation_group = AnnotationGroup.from_geometries( - geometries=group_values.geometries, - label=group_values.label, - category_code=category_code, - type_code=group_keys.type_code, - ) - annotation_groups.append(annotation_group) - self.assertNotEqual(annotation_groups, []) - - # Make a group collection and do dicom round-trip - dicom = AnnotationInstance(annotation_groups, "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - - self.assertNotEqual(dicom.groups, []) - # For each annotation group, produce a type-category_code key. - # Get the original group using the key and check that the - # annotations are the same. - for output_group in dicom.groups: - self.assertNotEqual(output_group.annotations, []) - key = Key( - annotation_type=output_group.geometry_type, - type_code=output_group.type_code, - ) - input_group = grouped_annotations[key] - output = enumerate(output_group.annotations) - for i, output_annotation in output: - input_annotation = input_group.geometries[i] - self.assertEqual(output_annotation.geometry, input_annotation) + # Act + dicom = AnnotationInstance([group], "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) - @staticmethod - def asap_annotation_group_to_geometries( - group: AnnotationGroup, - ) -> Tuple[str, List[OrderedDict]]: - increase_order_by_annotation = False - if isinstance(group, PointAnnotationGroup): - if len(group) == 1: - geometry_type = "Dot" + # Assert + output_group = dicom[0] + output_dict = deepcopy(input_dict) + geometry_type, output_coords = self.asap_annotation_group_to_geometries( + output_group + ) + output_dict["@Name"] = output_group.label + output_dict["@Type"] = geometry_type + if len(output_coords) == 1: + output_dict["Coordinates"]["Coordinate"] = output_coords[0] else: - geometry_type = "PointSet" - increase_order_by_annotation = True - elif isinstance(group, PolygonAnnotationGroup): - geometry_type = "Polygon" - else: - raise NotImplementedError(group) - group_coordinates = [ - OrderedDict( - { - "@Order": str( - point_index + annotation_index * increase_order_by_annotation - ), - "@X": str(item[0]), - "@Y": str(item[1]), - } + output_dict["Coordinates"]["Coordinate"] = output_coords + self.maxDiff = None + assert input_dict == output_dict + + @pytest.mark.parametrize("file_path", test_files["sectra"]) + def test_sectra(self, file_path: Path): + with open(file_path) as f: + # Arrange + input_dict: Union[Dict[str, Any], List[Dict[str, Any]]] = json.load(f) + if isinstance(input_dict, list): + input_dict = input_dict[0] + group = AnnotationGroup.from_geometries( + [self.sectra_to_geometry(input_dict)], + label=input_dict["name"], + category_code=category_code, + type_code=type_code, ) - for annotation_index, annotation in enumerate(group.annotations) - for point_index, item in enumerate(annotation.geometry.to_coords()) - ] - return geometry_type, group_coordinates - @classmethod - def asap_to_geometries(cls, dictionary: Dict[str, Any]) -> List[Geometry]: - annotation_type: str = dictionary["@Type"] - coordinate_dict = dictionary["Coordinates"]["Coordinate"] - if annotation_type == "Dot": - return [Point.from_dict(coordinate_dict, "@X", "@Y")] - elif annotation_type == "PointSet": - points = Point.multiple_from_dict(coordinate_dict, "@X", "@Y") - return points # type: ignore - elif annotation_type == "Polygon": - return [Polygon.from_dict(coordinate_dict, "@X", "@Y")] - - raise NotImplementedError(annotation_type) - - def test_asap(self): - files = self.test_files["asap"] - for file_path in files: - with open(file_path) as f: - print(file_path) - annotation_xml = xmltodict.parse(f.read())["ASAP_Annotations"] - input_dict: Dict[str, Any] = annotation_xml["Annotations"]["Annotation"] - group = AnnotationGroup.from_geometries( - self.asap_to_geometries(input_dict), - label=input_dict["@Name"], - category_code=category_code, - type_code=type_code, - ) - dicom = AnnotationInstance([group], "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] - output_dict = deepcopy(input_dict) - geometry_type, output_coords = self.asap_annotation_group_to_geometries( - output_group - ) - output_dict["@Name"] = output_group.label - output_dict["@Type"] = geometry_type - if len(output_coords) == 1: - output_dict["Coordinates"]["Coordinate"] = output_coords[0] - else: - output_dict["Coordinates"]["Coordinate"] = output_coords - self.maxDiff = None - self.assertDictEqual(input_dict, output_dict) - - @staticmethod - def annotation_to_sectra( - annotation: Annotation, - ) -> Tuple[str, List[Dict[str, float]]]: - if type(annotation.geometry) == Polyline: - geometry_type = "Polyline" - elif type(annotation.geometry) == Polygon: - geometry_type = "Area" - else: - raise NotImplementedError() - group_coordinates = [ - {"x": float(item[0]), "y": float(item[1])} - for item in annotation.geometry.to_coords() - ] - return geometry_type, group_coordinates - - @staticmethod - def sectra_to_geometry(dictionary: Dict[str, Any]) -> Geometry: - geometry_type: str = dictionary["type"] - if geometry_type == "Area": - return Polygon.from_dict(dictionary["content"], "x", "y") - elif geometry_type == "Polyline": - return Polyline.from_dict(dictionary["content"], "x", "y") - raise NotImplementedError() + # Act + dicom = AnnotationInstance([group], "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) - def test_sectra(self): - files = self.test_files["sectra"] - for file_path in files: - with open(file_path) as f: - print(file_path) - input_dict: Union[Dict[str, Any], List[Dict[str, Any]]] = json.load(f) - if isinstance(input_dict, list): - input_dict = input_dict[0] - group = AnnotationGroup.from_geometries( - [self.sectra_to_geometry(input_dict)], - label=input_dict["name"], - category_code=category_code, - type_code=type_code, - ) - dicom = AnnotationInstance([group], "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] - output_annotation = output_group.annotations[0] - geometry_type, result_coordinates = self.annotation_to_sectra( - output_annotation - ) - output_dict = deepcopy(input_dict) - output_dict["type"] = geometry_type - output_dict["content"] = result_coordinates - if input_dict["name"] is not None: - output_dict["name"] = output_group.label - self.maxDiff = None - self.assertDictEqual(input_dict, output_dict) - - def test_cytomine(self): - files = self.test_files["cytomine"] - for file_path in files: - with open(file_path) as f: - print(file_path) - input_dict = json.load(f) - geometry = wkt.loads(input_dict["annotation"]["location"]) - # wkt.loads trims excess decimals, so we do the same to the - # input - input_dict["annotation"]["location"] = wkt.dumps(geometry, trim=True) - group = AnnotationGroup.from_geometries( - [Geometry.from_shapely_like(geometry)], - label=str(input_dict["annotation"]["id"]), - category_code=category_code, - type_code=type_code, - ) - dicom = AnnotationInstance([group], "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] - output_annotation = output_group.annotations[0] - output_geometry = self.annotation_to_shapely(output_annotation) - output_dict = deepcopy(input_dict) - output_dict["annotation"]["location"] = output_geometry.wkt - output_dict["annotation"]["id"] = int(dicom[0].label) - self.assertDictEqual(input_dict, output_dict) + # Assert + output_group = dicom[0] + output_annotation = output_group.annotations[0] + geometry_type, result_coordinates = self.annotation_to_sectra( + output_annotation + ) + output_dict = deepcopy(input_dict) + output_dict["type"] = geometry_type + output_dict["content"] = result_coordinates + if input_dict["name"] is not None: + output_dict["name"] = output_group.label + self.maxDiff = None + assert input_dict == output_dict + + @pytest.mark.parametrize("file_path", test_files["cytomine"]) + def test_cytomine(self, file_path: Path): + with open(file_path) as f: + # Arrange + input_dict = json.load(f) + geometry = wkt.loads(input_dict["annotation"]["location"]) + # wkt.loads trims excess decimals, so we do the same to the + # input + input_dict["annotation"]["location"] = wkt.dumps(geometry, trim=True) + group = AnnotationGroup.from_geometries( + [Geometry.from_shapely_like(geometry)], + label=str(input_dict["annotation"]["id"]), + category_code=category_code, + type_code=type_code, + ) - @staticmethod - def annotation_to_shapely(annotation: Annotation): - if type(annotation.geometry) == Point: - return ShapelyPoint(annotation.geometry.to_coords()) - elif type(annotation.geometry) == Polyline: - return ShapelyLineString(annotation.geometry.to_coords()) - elif type(annotation.geometry) == Polygon: - return ShapelyPolygon(annotation.geometry.to_coords()) - raise NotImplementedError(annotation) + # Act + dicom = AnnotationInstance([group], "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) - def test_shapely(self): - input_geometries = [ + # Assert + output_group = dicom[0] + output_annotation = output_group.annotations[0] + output_geometry = self.annotation_to_shapely(output_annotation) + output_dict = deepcopy(input_dict) + output_dict["annotation"]["location"] = output_geometry.wkt + output_dict["annotation"]["id"] = int(dicom[0].label) + assert input_dict == output_dict + + @pytest.mark.parametrize( + "input_geometry", + [ ShapelyPoint(15123.21, 12410.01), ShapelyPolygon( [ @@ -429,81 +296,84 @@ def test_shapely(self): (26984, 17562.75), ] ), - ] - for input_geometry in input_geometries: - print(input_geometry) - group = AnnotationGroup.from_geometries( - [Geometry.from_shapely_like(input_geometry)], - label="shapely test", - category_code=category_code, - type_code=type_code, - ) - dicom = AnnotationInstance([group], "volume", slide_uids) - dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] - output_annotation = output_group.annotations[0] - output_geometry = self.annotation_to_shapely(output_annotation) - self.assertEqual(input_geometry, output_geometry) - - def test_point_annotation(self): - input_annotation = Annotation(Point(0.0, 0.1)) - group = PointAnnotationGroup( - [input_annotation], "test", category_code, type_code + ], + ) + def test_shapely(self, input_geometry): + # Arrange + group = AnnotationGroup.from_geometries( + [Geometry.from_shapely_like(input_geometry)], + label="shapely test", + category_code=category_code, + type_code=type_code, ) - output_group = group[0] - self.assertEqual(input_annotation, output_group) - input_annotations = [Annotation(Point(0.0, 0.1)), Annotation(Point(0.0, 0.1))] + # Act + dicom = AnnotationInstance([group], "volume", slide_uids) + dicom = self.dicom_round_trip(dicom) + + # Assert + output_group = dicom[0] + output_annotation = output_group.annotations[0] + output_geometry = self.annotation_to_shapely(output_annotation) + assert input_geometry == output_geometry + + @pytest.mark.parametrize( + "input_annotations", + [ + [Annotation(Point(0.0, 0.1))], + [Annotation(Point(0.0, 0.1)), Annotation(Point(0.0, 0.1))], + ], + ) + def test_point_annotation(self, input_annotations: List[Annotation]): + # Arrange + + # Act group = PointAnnotationGroup( input_annotations, "test", category_code, type_code ) - output_group = group[0] - self.assertEqual(input_annotations[0], output_group) - - output_group = group[1] - self.assertEqual(input_annotations[1], output_group) - - output_groups = group[list(range(0, 2))] - self.assertEqual(input_annotations, output_groups) - - def test_line_annotation(self): - input_annotation = Annotation(Polyline([(0.0, 0.1), (1.0, 1.1), (2.0, 2.1)])) - group = PolylineAnnotationGroup( - [input_annotation], "test", category_code, type_code - ) - output_group = group[0] - self.assertEqual(input_annotation, output_group) - - input_annotations = [ - Annotation( - Polyline([(10.0, 10.1), (11.0, 11.1), (12.0, 12.1), (13.0, 13.1)]) - ), - Annotation( - Polyline( - [ - (20.0, 20.1), - (21.0, 11.1), - (22.0, 22.1), - (23.0, 23.1), - (24.0, 24.1), - ] - ) - ), - ] + # Assert + assert input_annotations == group.annotations + for input_annotation, annotation in zip(input_annotations, group.annotations): + assert annotation == input_annotation + + @pytest.mark.parametrize( + "input_annotations", + [ + [Annotation(Polyline([(0.0, 0.1), (1.0, 1.1), (2.0, 2.1)]))], + [ + Annotation( + Polyline([(10.0, 10.1), (11.0, 11.1), (12.0, 12.1), (13.0, 13.1)]) + ), + Annotation( + Polyline( + [ + (20.0, 20.1), + (21.0, 11.1), + (22.0, 22.1), + (23.0, 23.1), + (24.0, 24.1), + ] + ) + ), + ], + ], + ) + def test_line_annotation(self, input_annotations: List[Annotation]): + # Arrange + + # Act group = PolylineAnnotationGroup( input_annotations, "test", category_code, type_code ) - output_group = group[0] - self.assertEqual(input_annotations[0], output_group) - output_group = group[1] - self.assertEqual(input_annotations[1], output_group) - - output_groups = group[list(range(0, 2))] - self.assertEqual(input_annotations, output_groups) + # Assert + assert input_annotations == group.annotations + for input_annotation, annotation in zip(input_annotations, group.annotations): + assert annotation == input_annotation def test_float_32_to_32(self): + # Arrange np_input = np.array([0.0254, 0.12405], dtype=np.float32) input_point = Point(float(np_input[0]), float(np_input[1])) input_annotation = Annotation(input_point) @@ -511,32 +381,39 @@ def test_float_32_to_32(self): [input_annotation], "test", category_code, type_code, is_double=False ) + # act dicom = AnnotationInstance([group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] + # Assert + output_group = dicom[0] if isinstance(output_group, AnnotationGroup): output_point = output_group.annotations[0].geometry.to_coords()[0] np_output = np.array(output_point, dtype=np.float32) - self.assertEqual(np_input.all(), np_output.all()) + assert np_input.all() == np_output.all() def test_float_32_to_64(self): + # Arrange np_input = np.array([0.0254, 0.12405], dtype=np.float32) input_point = Point(float(np_input[0]), float(np_input[1])) input_annotation = Annotation(input_point) group = PointAnnotationGroup( [input_annotation], "test", category_code, type_code, is_double=True ) + + # Act dicom = AnnotationInstance([group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] + # Assert + output_group = dicom[0] if isinstance(output_group, AnnotationGroup): output_point = output_group.annotations[0].geometry.to_coords()[0] np_output = np.array(output_point, dtype=np.float32) - self.assertEqual(np_input.all(), np_output.all()) + assert np_input.all() == np_output.all() def test_float_64_to_64(self): + # Arrange np_input = np.array([0.0254, 0.12405], dtype=np.float64) input_point = Point(float(np_input[0]), float(np_input[1])) @@ -544,16 +421,20 @@ def test_float_64_to_64(self): group = PointAnnotationGroup( [input_annotation], "test", category_code, type_code, is_double=True ) + + # Act dicom = AnnotationInstance([group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] + # Assert + output_group = dicom[0] if isinstance(output_group, AnnotationGroup): output_point = output_group.annotations[0].geometry.to_coords()[0] np_output = np.array(output_point, dtype=np.float64) - self.assertEqual(np_input.all(), np_output.all()) + assert np_input.all() == np_output.all() def test_float_64_to_32(self): + # Arrange np_input = np.array([0.0254, 0.12405], dtype=np.float64) input_point = Point(float(np_input[0]), float(np_input[1])) @@ -562,59 +443,69 @@ def test_float_64_to_32(self): [input_annotation], "test", category_code, type_code, is_double=False ) + # Act dicom = AnnotationInstance([group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) - output_group = dicom[0] + # Assert + output_group = dicom[0] if isinstance(output_group, AnnotationGroup): output_point = output_group.annotations[0].geometry.to_coords()[0] np_output = np.array(output_point, dtype=np.float64) - self.assertEqual(np_input.all(), np_output.all()) - - def test_measurement(self): - area = MeasurementCode("Area") - pixels = UnitCode("Pixels") - measurement0 = Measurement(area, 5, pixels) + assert np_input.all() == np_output.all() + + @pytest.mark.parametrize( + "measurements", + [ + [Measurement(MeasurementCode("Area"), 5, UnitCode("Pixels"))], + [ + Measurement(MeasurementCode("Area"), 5, UnitCode("Pixels")), + Measurement(MeasurementCode("Area"), 10, UnitCode("Pixels")), + ], + ], + ) + def test_measurement(self, measurements: List[Measurement]): + # Arrange point = Point(1, 1) - annotation = Annotation(point, [measurement0]) - output = annotation.measurements[0] - self.assertEqual(measurement0, output) - - measurement1 = Measurement(area, 10, pixels) - annotation = Annotation(point, [measurement0, measurement1]) - outputs = annotation.measurements - self.assertEqual([measurement0, measurement1], outputs) - - def test_measurement_one_to_one(self): - area = MeasurementCode("Area") - pixels = UnitCode("Pixels") - measurement0 = Measurement(area, 5, pixels) - measurement1 = Measurement(area, 10, pixels) - measurement2 = Measurement(area, 15, pixels) - point = Point(1, 1) - annotation0 = Annotation(point, [measurement0]) - annotation1 = Annotation(point, [measurement1]) - annotation_group = PointAnnotationGroup( - [annotation0, annotation1], "label", category_code, type_code - ) - - self.assertTrue(annotation_group._measurements_is_one_to_one(area, pixels)) - annotation0 = Annotation(point, [measurement0]) - annotation1 = Annotation(point, []) + # Act + annotation = Annotation(point, measurements) + + # Assert + assert annotation.measurements == measurements + + @pytest.mark.parametrize( + ["annotation1", "annotation2", "expected_result"], + [ + [ + Annotation(point, [measurement0]), + Annotation(point, [measurement1]), + True, + ], + [Annotation(point, [measurement0]), Annotation(point, []), False], + [ + Annotation(point, [measurement0, measurement1]), + Annotation(point, [measurement2]), + False, + ], + ], + ) + def test_measurement_one_to_one( + self, annotation1: Annotation, annotation2: Annotation, expected_result: bool + ): + # Arrange annotation_group = PointAnnotationGroup( - [annotation0, annotation1], "label", category_code, type_code + [annotation1, annotation2], "label", category_code, type_code ) - self.assertFalse(annotation_group._measurements_is_one_to_one(area, pixels)) - annotation0 = Annotation(point, [measurement0, measurement1]) - annotation1 = Annotation(point, [measurement2]) - annotation_group = PointAnnotationGroup( - [annotation0, annotation1], "label", category_code, type_code + # Assert + assert ( + annotation_group._measurements_is_one_to_one(area, pixels) + == expected_result ) - self.assertFalse(annotation_group._measurements_is_one_to_one(area, pixels)) def test_measurement_cycle(self): + # Arrange area = MeasurementCode("Area") pixels = UnitCode("Pixels") measurement0 = Measurement(area, 5, pixels) @@ -626,39 +517,41 @@ def test_measurement_cycle(self): [annotation0, annotation1], "label", category_code, type_code ) + # Act dicom = AnnotationInstance([annotation_group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) + # Assert output_group = dicom[0] if isinstance(output_group, PointAnnotationGroup): annotation0 = output_group.annotations[0] - self.assertEqual( - measurement0, annotation0.get_measurements(area, pixels)[0] - ) + assert measurement0 == annotation0.get_measurements(area, pixels)[0] def test_large_number_of_annotations(self): + # Arrange count = 10000 point_annotations = [ Annotation(Point(float(i), float(i))) for i in range(count) ] - print(type(point_annotations[0])) group = PointAnnotationGroup( point_annotations, "test", category_code, type_code ) + # Act dicom = AnnotationInstance([group], "volume", slide_uids) dicom = self.dicom_round_trip(dicom) + + # Assert output_group = dicom[0] if isinstance(output_group, PointAnnotationGroup): for i in range(count): - self.assertEqual( - Point(float(i), float(i)), output_group.annotations[i].geometry - ) + assert Point(float(i), float(i)) == output_group.annotations[i].geometry - def test_make_annotated_wsi_slide(self): + def test_make_annotated_wsi_slide(self, wsi: WsiDicom, tmp_path: Path): + # Arrange point_annotation = Annotation(Point(10.0, 20.0)) group = PointAnnotationGroup( annotations=[point_annotation], @@ -667,13 +560,148 @@ def test_make_annotated_wsi_slide(self): type_code=AnnotationTypeCode("Nucleus"), description="description", ) - assert self.slide.uids is not None - annotations = AnnotationInstance([group], "volume", self.slide.uids) - dirpath = Path(self.tempdir.name) - annotation_file_path = dirpath.joinpath("annotation_for_slide.dcm") + assert wsi.uids is not None + annotation_file_path = tmp_path.joinpath("annotation_for_slide.dcm") + + # Act + annotations = AnnotationInstance([group], "volume", wsi.uids) annotations.save(annotation_file_path) - slide = WsiDicom.open(self.tempdir.name) - output_group = slide.annotations[0][0] - slide.close() - self.assertEqual(output_group, group) + # Assert + with WsiDicom.open(tmp_path) as wsi: + output_group = wsi.annotations[0][0] + assert output_group == group + + @classmethod + def dicom_round_trip(cls, dicom: AnnotationInstance) -> AnnotationInstance: + """Saves the annotation collection to temporary file, reads the file + and return the parsed annotation collection. + + Parameters + ---------- + dicom: AnnotationInstance + Collection of annotations to save. + + Returns + ---------- + AnnotationInstance + Read back annotation collection. + + """ + with TemporaryDirectory() as tempdir: + filename = "annotation_round_trip.dcm" + dcm_path = str(Path(tempdir).joinpath(filename)) + dicom.save(dcm_path) + return list(AnnotationInstance.open([dcm_path]))[0] + + @staticmethod + def annotation_group_to_qupath( + group: AnnotationGroup, + ) -> Tuple[str, Union[List[float], List[List[float]], List[List[List[float]]]]]: + if isinstance(group, PointAnnotationGroup): + if len(group) == 1: + return ("Point", group.annotations[0].geometry.to_list_coords()[0]) + else: + coordinates = [] + for annotation in group.annotations: + coordinates += annotation.geometry.to_list_coords() + return ("MultiPoint", coordinates) + elif isinstance(group, PolylineAnnotationGroup): + return ("LineString", group.annotations[0].geometry.to_list_coords()) + elif isinstance(group, PolygonAnnotationGroup): + return ("Polygon", [group.annotations[0].geometry.to_list_coords()]) + raise NotImplementedError() + + @staticmethod + def qupath_get_type_code(annotation: Dict) -> AnnotationTypeCode: + type_code = annotation["properties"]["classification"]["name"] + return AnnotationTypeCode(type_code) + + @staticmethod + def qupath_get_color(annotation: Dict) -> Tuple[int, int, int]: + color: Tuple[int, int, int] = annotation["properties"]["color"] + return color + + @staticmethod + def qupath_get_label(annotation: Dict) -> str: + label: str = annotation["properties"]["name"] + return label + + @staticmethod + def asap_annotation_group_to_geometries( + group: AnnotationGroup, + ) -> Tuple[str, List[OrderedDict]]: + increase_order_by_annotation = False + if isinstance(group, PointAnnotationGroup): + if len(group) == 1: + geometry_type = "Dot" + else: + geometry_type = "PointSet" + increase_order_by_annotation = True + elif isinstance(group, PolygonAnnotationGroup): + geometry_type = "Polygon" + else: + raise NotImplementedError(group) + group_coordinates = [ + OrderedDict( + { + "@Order": str( + point_index + annotation_index * increase_order_by_annotation + ), + "@X": str(item[0]), + "@Y": str(item[1]), + } + ) + for annotation_index, annotation in enumerate(group.annotations) + for point_index, item in enumerate(annotation.geometry.to_coords()) + ] + return geometry_type, group_coordinates + + @classmethod + def asap_to_geometries(cls, dictionary: Dict[str, Any]) -> List[Geometry]: + annotation_type: str = dictionary["@Type"] + coordinate_dict = dictionary["Coordinates"]["Coordinate"] + if annotation_type == "Dot": + return [Point.from_dict(coordinate_dict, "@X", "@Y")] + elif annotation_type == "PointSet": + points = Point.multiple_from_dict(coordinate_dict, "@X", "@Y") + return points # type: ignore + elif annotation_type == "Polygon": + return [Polygon.from_dict(coordinate_dict, "@X", "@Y")] + + raise NotImplementedError(annotation_type) + + @staticmethod + def annotation_to_sectra( + annotation: Annotation, + ) -> Tuple[str, List[Dict[str, float]]]: + if type(annotation.geometry) == Polyline: + geometry_type = "Polyline" + elif type(annotation.geometry) == Polygon: + geometry_type = "Area" + else: + raise NotImplementedError() + group_coordinates = [ + {"x": float(item[0]), "y": float(item[1])} + for item in annotation.geometry.to_coords() + ] + return geometry_type, group_coordinates + + @staticmethod + def sectra_to_geometry(dictionary: Dict[str, Any]) -> Geometry: + geometry_type: str = dictionary["type"] + if geometry_type == "Area": + return Polygon.from_dict(dictionary["content"], "x", "y") + elif geometry_type == "Polyline": + return Polyline.from_dict(dictionary["content"], "x", "y") + raise NotImplementedError() + + @staticmethod + def annotation_to_shapely(annotation: Annotation): + if type(annotation.geometry) == Point: + return ShapelyPoint(annotation.geometry.to_coords()) + elif type(annotation.geometry) == Polyline: + return ShapelyLineString(annotation.geometry.to_coords()) + elif type(annotation.geometry) == Polygon: + return ShapelyPolygon(annotation.geometry.to_coords()) + raise NotImplementedError(annotation) diff --git a/tests/test_codes.py b/tests/test_codes.py index 854d5671..e5d26026 100644 --- a/tests/test_codes.py +++ b/tests/test_codes.py @@ -1,4 +1,4 @@ -# Copyright 2021 SECTRA AB +# Copyright 2021, 2023 SECTRA AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,24 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest from typing import Type import pytest -from parameterized import parameterized from wsidicom.conceptcode import CidConceptCode, Code @pytest.mark.unittest -class WsiDicomCodeTests(unittest.TestCase): - @parameterized.expand( +class TestWsiDicomCode: + @pytest.mark.parametrize( + ["code_class", "code"], ( - code_class, - code, - ) - for code_class in CidConceptCode.__subclasses__() - for code in code_class.cid.values() + ( + code_class, + code, + ) + for code_class in CidConceptCode.__subclasses__() + for code in code_class.cid.values() + ), ) def test_create_code_from_meaning( self, code_class: Type[CidConceptCode], code: Code @@ -40,4 +41,4 @@ def test_create_code_from_meaning( created_code = code_class(code.meaning) # Assert - self.assertEqual(code, created_code) + assert code == created_code diff --git a/tests/test_geometry.py b/tests/test_geometry.py index 55e88e42..35bd74b2 100644 --- a/tests/test_geometry.py +++ b/tests/test_geometry.py @@ -1,4 +1,4 @@ -# Copyright 2022 SECTRA AB +# Copyright 2022, 2023 SECTRA AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest from typing import Union import pytest -from parameterized import parameterized from wsidicom.geometry import ( Orientation, @@ -31,7 +29,7 @@ @pytest.mark.unittest -class WsiDicomGeomtryTests(unittest.TestCase): +class TestWsiDicomGeomtry: def test_size_subraction(self): # Arrange size_0 = Size(10, 10) @@ -41,7 +39,7 @@ def test_size_subraction(self): result = size_0 - size_1 # Assert - self.assertEqual(result, Size(9, 9)) + assert result == Size(9, 9) def test_size_multiplication(self): # Arrange @@ -51,7 +49,7 @@ def test_size_multiplication(self): result = size_0 * 2 # Assert - self.assertEqual(result, Size(20, 20)) + assert result == Size(20, 20) def test_size_division(self): # Arrange @@ -61,7 +59,7 @@ def test_size_division(self): result = size_0 // 3 # Assert - self.assertEqual(result, Size(3, 3)) + assert result == Size(3, 3) def test_size_to_tuple(self): # Arrange @@ -71,15 +69,16 @@ def test_size_to_tuple(self): result = size_0.to_tuple() # Assert - self.assertEqual(result, (10, 10)) + assert result == (10, 10) - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), False), (Size(10, 10), Size(20, 1), True), (Size(10, 10), Size(1, 20), True), (Size(10, 10), Size(20, 20), True), - ] + ], ) def test_size_any_less_than( self, size_1: Size, size_2: Size, expected_result: bool @@ -89,9 +88,10 @@ def test_size_any_less_than( result = size_1.any_less_than(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), False), (Size(10, 10), Size(20, 1), True), @@ -100,7 +100,7 @@ def test_size_any_less_than( (Size(10, 10), Size(10, 10), True), (Size(10, 10), Size(1, 10), True), (Size(10, 10), Size(10, 1), True), - ] + ], ) def test_size_any_less_than_or_equal( self, size_1: Size, size_2: Size, expected_result: bool @@ -110,15 +110,16 @@ def test_size_any_less_than_or_equal( result = size_1.any_less_than_or_equal(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), True), (Size(10, 10), Size(20, 1), True), (Size(10, 10), Size(1, 20), True), (Size(10, 10), Size(20, 20), False), - ] + ], ) def test_size_any_greater_than( self, size_1: Size, size_2: Size, expected_result: bool @@ -128,9 +129,10 @@ def test_size_any_greater_than( result = size_1.any_greater_than(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), True), (Size(10, 10), Size(20, 1), True), @@ -139,7 +141,7 @@ def test_size_any_greater_than( (Size(10, 10), Size(10, 10), True), (Size(10, 10), Size(1, 10), True), (Size(10, 10), Size(10, 1), True), - ] + ], ) def test_size_any_greater_than_or_equal( self, size_1: Size, size_2: Size, expected_result: bool @@ -149,15 +151,16 @@ def test_size_any_greater_than_or_equal( result = size_1.any_greater_than_or_equal(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), False), (Size(10, 10), Size(20, 1), False), (Size(10, 10), Size(1, 20), False), (Size(10, 10), Size(20, 20), True), - ] + ], ) def test_size_all_less_than( self, size_1: Size, size_2: Size, expected_result: bool @@ -167,9 +170,10 @@ def test_size_all_less_than( result = size_1.all_less_than(size_2) # Assert - self.assertEqual(expected_result, result) + assert expected_result == result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), False), (Size(10, 10), Size(20, 1), False), @@ -178,7 +182,7 @@ def test_size_all_less_than( (Size(10, 10), Size(10, 10), True), (Size(10, 10), Size(1, 10), False), (Size(10, 10), Size(10, 1), False), - ] + ], ) def test_size_all_less_than_or_equal( self, size_1: Size, size_2: Size, expected_result: bool @@ -188,15 +192,16 @@ def test_size_all_less_than_or_equal( result = size_1.all_less_than_or_equal(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), True), (Size(10, 10), Size(20, 1), False), (Size(10, 10), Size(1, 20), False), (Size(10, 10), Size(20, 20), False), - ] + ], ) def test_size_all_greater_than( self, size_1: Size, size_2: Size, expected_result: bool @@ -206,9 +211,10 @@ def test_size_all_greater_than( result = size_1.all_greater_than(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["size_1", "size_2", "expected_result"], [ (Size(10, 10), Size(1, 1), True), (Size(10, 10), Size(20, 1), False), @@ -217,7 +223,7 @@ def test_size_all_greater_than( (Size(10, 10), Size(10, 10), True), (Size(10, 10), Size(1, 10), True), (Size(10, 10), Size(10, 1), True), - ] + ], ) def test_size_all_greater_than_or_equal( self, size_1: Size, size_2: Size, expected_result: bool @@ -227,10 +233,11 @@ def test_size_all_greater_than_or_equal( result = size_1.all_greater_than_or_equal(size_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( - [(Point(3, 2), Point(30, 20)), (Size(3, 2), Point(30, 20)), (2, Point(20, 20))] + @pytest.mark.parametrize( + ["by", "expected_result"], + [(Point(3, 2), Point(30, 20)), (Size(3, 2), Point(30, 20)), (2, Point(20, 20))], ) def test_point_multiplication( self, by: Union[Point, Size, int], expected_result: Point @@ -242,7 +249,7 @@ def test_point_multiplication( result = point * by # Assert - self.assertEqual(expected_result, result) + assert result == expected_result def test_point_division(self): # Arrange @@ -254,14 +261,15 @@ def test_point_division(self): result = point // by # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( + @pytest.mark.parametrize( + ["by", "expected_result"], [ (Point(2, 2), Point(0, 0)), (Point(3, 3), Point(1, 1)), (Size(2, 2), Point(0, 0)), - ] + ], ) def test_point_mod(self, by: Union[Point, Size], expected_result: Point): # Arrange @@ -271,10 +279,11 @@ def test_point_mod(self, by: Union[Point, Size], expected_result: Point): result = point % by # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( - [(Point(3, 2), Point(13, 12)), (2, Point(12, 12)), (Size(3, 2), Point(13, 12))] + @pytest.mark.parametrize( + ["by", "expected_result"], + [(Point(3, 2), Point(13, 12)), (2, Point(12, 12)), (Size(3, 2), Point(13, 12))], ) def test_point_addition(self, by: Union[Point, Size, int], expected_result: Point): # Arrange @@ -284,10 +293,11 @@ def test_point_addition(self, by: Union[Point, Size, int], expected_result: Poin result = point + by # Assert - self.assertEqual(expected_result, result) + assert result == expected_result - @parameterized.expand( - [(Point(3, 2), Point(7, 8)), (2, Point(8, 8)), (Size(3, 2), Point(7, 8))] + @pytest.mark.parametrize( + ["by", "expected_result"], + [(Point(3, 2), Point(7, 8)), (2, Point(8, 8)), (Size(3, 2), Point(7, 8))], ) def test_point_subtraction( self, by: Union[Point, Size, int], expected_result: Point @@ -299,7 +309,7 @@ def test_point_subtraction( result = point - by # Assert - self.assertEqual(expected_result, result) + assert result == expected_result def test_point_max(self): # Arrange @@ -311,7 +321,7 @@ def test_point_max(self): result = Point.max(point_1, point_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result def test_point_min(self): # Arrange @@ -323,7 +333,7 @@ def test_point_min(self): result = Point.min(point_1, point_2) # Assert - self.assertEqual(expected_result, result) + assert result == expected_result def test_point_to_tuple(self): # Arrange @@ -334,7 +344,7 @@ def test_point_to_tuple(self): result = point.to_tuple() # Assert - self.assertEqual(expected_result, result) + assert result == expected_result def test_region_mm(self): # Arrange @@ -342,8 +352,8 @@ def test_region_mm(self): region = RegionMm(PointMm(1.0, 2.0), SizeMm(3.0, 4.0)) # Assert - self.assertEqual(region.start, PointMm(1.0, 2.0)) - self.assertEqual(region.end, PointMm(4.0, 6.0)) + assert region.start == PointMm(1.0, 2.0) + assert region.end == PointMm(4.0, 6.0) def test_region_mm_subtract(self): # Arrange @@ -353,8 +363,8 @@ def test_region_mm_subtract(self): region = region - PointMm(1.0, 2.0) # Assert - self.assertEqual(region.start, PointMm(0.0, 0.0)) - self.assertEqual(region.end, PointMm(3.0, 4.0)) + assert region.start == PointMm(0.0, 0.0) + assert region.end == PointMm(3.0, 4.0) def test_region_mm_add(self): # Arrange @@ -364,10 +374,11 @@ def test_region_mm_add(self): region = region + PointMm(1.0, 2.0) # Assert - self.assertEqual(region.start, PointMm(2.0, 4.0)) - self.assertEqual(region.end, PointMm(5.0, 8.0)) + assert region.start == PointMm(2.0, 4.0) + assert region.end == PointMm(5.0, 8.0) - @parameterized.expand( + @pytest.mark.parametrize( + ["region", "origin", "expected_start", "expected_end"], [ ( # Image x along slide y, Image y along slide x RegionMm(PointMm(2.0, 4.0), SizeMm(1.0, 2.0)), @@ -401,7 +412,7 @@ def test_region_mm_add(self): PointMm(1.0, 1.0), PointMm(3.0, 4.0), ), - ] + ], ) def test_region_mm_to_other_origin( self, @@ -415,10 +426,11 @@ def test_region_mm_to_other_origin( transformed_region = origin.slide_to_image(region) # Assert - self.assertEqual(transformed_region.start, expected_start) - self.assertEqual(transformed_region.end, expected_end) + assert transformed_region.start == expected_start + assert transformed_region.end == expected_end - @parameterized.expand( + @pytest.mark.parametrize( + ["region", "expected_start", "zoom"], [ (Region(Point(3, 4), Size(6, 4)), Point(9, 10), 2.0), (Region(Point(9, 10), Size(6, 4)), Point(3, 4), 0.5), @@ -426,7 +438,7 @@ def test_region_mm_to_other_origin( (Region(Point(4, 7), Size(2, 6)), Point(9, 17), 2.0), (Region(Point(9, 17), Size(2, 6)), Point(4, 7), 0.5), (Region(Point(4, 7), Size(2, 6)), Point(14, 27), 3.0), - ] + ], ) def test_region_zoom(self, region: Region, expected_start: Point, zoom: float): # Arrange @@ -434,10 +446,11 @@ def test_region_zoom(self, region: Region, expected_start: Point, zoom: float): zoomed_region = region.zoom(zoom) # Assert - self.assertEqual(zoomed_region.start, expected_start) - self.assertEqual(zoomed_region.size, region.size) + assert zoomed_region.start == expected_start + assert zoomed_region.size == region.size - @parameterized.expand( + @pytest.mark.parametrize( + ["region", "expected_start", "zoom"], [ (RegionMm(PointMm(3, 4), SizeMm(6, 4)), PointMm(9, 10), 2.0), (RegionMm(PointMm(9, 10), SizeMm(6, 4)), PointMm(3, 4), 0.5), @@ -445,7 +458,7 @@ def test_region_zoom(self, region: Region, expected_start: Point, zoom: float): (RegionMm(PointMm(4, 7), SizeMm(2, 6)), PointMm(9, 17), 2.0), (RegionMm(PointMm(9, 17), SizeMm(2, 6)), PointMm(4, 7), 0.5), (RegionMm(PointMm(4, 7), SizeMm(2, 6)), PointMm(14, 27), 3.0), - ] + ], ) def test_region_mm_zoom( self, region: RegionMm, expected_start: PointMm, zoom: float @@ -455,10 +468,11 @@ def test_region_mm_zoom( zoomed_region = region.zoom(zoom) # Assert - self.assertEqual(zoomed_region.start, expected_start) - self.assertEqual(zoomed_region.size, region.size) + assert zoomed_region.start == expected_start + assert zoomed_region.size == region.size - @parameterized.expand( + @pytest.mark.parametrize( + ["region", "point", "size", "expected_result"], [ ( Region(position=Point(x=0, y=0), size=Size(width=100, height=100)), @@ -480,7 +494,7 @@ def test_region_mm_zoom( Size(1024, 1024), Region(position=Point(176, 176), size=Size(300, 300)), ), - ] + ], ) def test_inside_crop( self, region: Region, point: Point, size: Size, expected_result: Region @@ -491,4 +505,4 @@ def test_inside_crop( cropped_region = region.inside_crop(point, size) # Assert - self.assertEqual(cropped_region, expected_result) + assert cropped_region == expected_result diff --git a/tests/test_optical.py b/tests/test_optical.py index 2d6fabad..c2484cfd 100644 --- a/tests/test_optical.py +++ b/tests/test_optical.py @@ -1,4 +1,4 @@ -# Copyright 2021 SECTRA AB +# Copyright 2021, 2023 SECTRA AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,47 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest -from pathlib import Path -from tempfile import TemporaryDirectory from typing import List import numpy as np import pytest -from parameterized import parameterized from pydicom.dataset import Dataset from pydicom.sequence import Sequence as DicomSequence -from tests.data_gen import create_layer_file, create_main_dataset +from tests.data_gen import create_main_dataset from wsidicom import WsiDicom -from wsidicom.conceptcode import ( - IlluminationCode, - IlluminationColorCode, -) +from wsidicom.conceptcode import IlluminationCode, IlluminationColorCode from wsidicom.optical import Illumination, Lut, OpticalManager, OpticalPath @pytest.mark.unittest -class WsiDicomOpticalTests(unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.tempdir: TemporaryDirectory - self.slide: WsiDicom - - @classmethod - def setUpClass(cls): - cls.tempdir = TemporaryDirectory() - dirpath = Path(cls.tempdir.name) - test_file_path = dirpath.joinpath("test_im.dcm") - create_layer_file(test_file_path) - cls.slide = WsiDicom.open(cls.tempdir.name) - - @classmethod - def tearDownClass(cls): - cls.slide.close() - cls.tempdir.cleanup() - - @parameterized.expand( +class TestWsiDicomOptical: + @pytest.mark.parametrize( + [ + "red_lut_descriptor", + "red_lut", + "green_lut", + "blue_lut", + "channel", + "expected", + ], [ ( [256, 0, 8], @@ -70,7 +53,7 @@ def tearDownClass(cls): 0, np.linspace(0, 65535, 256, dtype=np.uint16), ), - ] + ], ) def test_parse_lut( self, @@ -94,9 +77,9 @@ def test_parse_lut( lut = Lut(DicomSequence([ds])) # Assert - self.assertTrue(np.array_equal(lut.get(), expected_lut)) + assert np.array_equal(lut.get(), expected_lut) - def test_recreate_optical_module(self): + def test_recreate_optical_module(self, wsi: WsiDicom): # Arrange ds = create_main_dataset() original_optical = Dataset() @@ -104,10 +87,10 @@ def test_recreate_optical_module(self): original_optical.NumberOfOpticalPaths = ds.NumberOfOpticalPaths # Act - restored_optical_ds = self.slide.optical.insert_into_ds(Dataset()) + restored_optical_ds = wsi.optical.insert_into_ds(Dataset()) # Assert - self.assertEqual(original_optical, restored_optical_ds) + assert original_optical == restored_optical_ds def test_make_optical(self): # Arrange @@ -128,4 +111,4 @@ def test_make_optical(self): optical = OpticalManager([path]) # Assert - self.assertEqual(optical.get("1"), path) + assert optical.get("1") == path diff --git a/tests/test_save.py b/tests/test_save.py deleted file mode 100644 index 7a44a0bb..00000000 --- a/tests/test_save.py +++ /dev/null @@ -1,490 +0,0 @@ -# Copyright 2021 SECTRA AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math -import os -import random -import sys -import unittest -from pathlib import Path -from struct import unpack -from tempfile import TemporaryDirectory -from typing import List, Optional, OrderedDict, Sequence, Tuple, cast - -import pytest -from PIL import ImageChops, ImageFilter, ImageStat -from PIL.Image import Image as PILImage -from pydicom import Sequence as DicomSequence -from pydicom.dataset import Dataset -from pydicom.filebase import DicomFile -from pydicom.filereader import read_file_meta_info -from pydicom.misc import is_dicom -from pydicom.tag import ItemTag, SequenceDelimiterTag, Tag -from pydicom.uid import UID, JPEGBaseline8Bit, generate_uid -from parameterized import parameterized -from tests.wsi_test_files import WsiTestFiles - -from wsidicom import WsiDicom -from wsidicom.file.wsidicom_file import WsiDicomFile -from wsidicom.file.wsidicom_file_base import OffsetTableType -from wsidicom.file.wsidicom_file_target import WsiDicomFileTarget -from wsidicom.file.wsidicom_file_writer import WsiDicomFileWriter -from wsidicom.geometry import Point, Size, SizeMm -from wsidicom.group.level import Level -from wsidicom.instance import ImageData, ImageCoordinateSystem -from wsidicom.uid import WSI_SOP_CLASS_UID - -SLIDE_FOLDER = Path(os.environ.get("WSIDICOM_TESTDIR", "tests/testdata/slides")) - - -class WsiDicomTestFile(WsiDicomFile): - """Test version of WsiDicomFile that overrides __init__.""" - - def __init__(self, filepath: Path, transfer_syntax: UID, frame_count: int): - self._filepath = filepath - self._file = DicomFile(filepath, mode="rb") - self._file.is_little_endian = transfer_syntax.is_little_endian - self._file.is_implicit_VR = transfer_syntax.is_implicit_VR - self._frame_count = frame_count - self._pixel_data_position = 0 - self._owned = True - self.__enter__() - - @property - def frame_count(self) -> int: - return self._frame_count - - -class WsiDicomTestImageData(ImageData): - def __init__(self, data: Sequence[bytes], tiled_size: Size) -> None: - if len(data) != tiled_size.area: - raise ValueError("Number of frames and tiled size area differ") - TILE_SIZE = Size(10, 10) - self._data = data - self._tile_size = TILE_SIZE - self._image_size = tiled_size * TILE_SIZE - - @property - def transfer_syntax(self) -> UID: - return JPEGBaseline8Bit - - @property - def image_size(self) -> Size: - return self._image_size - - @property - def tile_size(self) -> Size: - return self._tile_size - - @property - def pixel_spacing(self) -> SizeMm: - return SizeMm(1.0, 1.0) - - @property - def samples_per_pixel(self) -> int: - return 3 - - @property - def photometric_interpretation(self) -> str: - return "YBR" - - @property - def image_coordinate_system(self) -> Optional[ImageCoordinateSystem]: - return None - - def _get_decoded_tile(self, tile_point: Point, z: float, path: str) -> PILImage: - raise NotImplementedError() - - def _get_encoded_tile(self, tile: Point, z: float, path: str) -> bytes: - return self._data[tile.x + tile.y * self.tiled_size.width] - - -@pytest.mark.save -class WsiDicomFileSaveTests(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.tiled_size = Size(10, 10) - cls.frame_count = cls.tiled_size.area - SEED = 0 - MIN_FRAME_LENGTH = 2 - MAX_FRAME_LENGTH = 100 - # Generate test data by itemizing random bytes of random length - # from MIN_FRAME_LENGTH to MAX_FRAME_LENGTH. - rng = random.Random(SEED) - lengths = [ - rng.randint(MIN_FRAME_LENGTH, MAX_FRAME_LENGTH) - for i in range(cls.frame_count) - ] - cls.test_data = [ - rng.getrandbits(length * 8).to_bytes(length, sys.byteorder) - for length in lengths - ] - cls.image_data = WsiDicomTestImageData(cls.test_data, cls.tiled_size) - cls.test_dataset = cls.create_test_dataset(cls.frame_count, cls.image_data) - cls.wsi_test_files = WsiTestFiles() - folders = cls._get_folders(SLIDE_FOLDER) - cls.test_folders = {} - for folder in folders: - relative_path = cls._get_relative_path(folder) - cls.test_folders[relative_path] = cls.open(folder) - - if len(cls.test_folders) == 0: - raise unittest.SkipTest( - f"No test slide files found for {SLIDE_FOLDER}, skipping." - ) - - @classmethod - def tearDownClass(cls): - cls.wsi_test_files.close() - - def setUp(self): - self.tempdir = TemporaryDirectory() - - def tearDown(self): - self.tempdir.cleanup() - - @staticmethod - def open(folder: Path) -> WsiDicom: - while next(folder.iterdir()).is_dir(): - folder = next(folder.iterdir()) - return WsiDicom.open(folder) - - @staticmethod - def _get_folders(slide_folder: Path) -> List[Path]: - if not slide_folder.exists(): - print("slide folder does not exist") - return [] - return [item for item in slide_folder.iterdir() if item.is_dir] - - @staticmethod - def _get_relative_path(slide_path: Path) -> Path: - parts = slide_path.parts - return Path(parts[-1]) - - @staticmethod - def create_test_dataset(frame_count: int, image_data: WsiDicomTestImageData): - dataset = Dataset() - dataset.SOPClassUID = WSI_SOP_CLASS_UID - dataset.ImageType = ["ORIGINAL", "PRIMARY", "VOLUME", "NONE"] - dataset.NumberOfFrames = frame_count - dataset.SOPInstanceUID = generate_uid() - dataset.StudyInstanceUID = generate_uid() - dataset.SeriesInstanceUID = generate_uid() - dataset.FrameOfReferenceUID = generate_uid() - dataset.DimensionOrganizationType = "TILED_FULL" - dataset.Rows = image_data.tile_size.width - dataset.Columns = image_data.tile_size.height - dataset.SamplesPerPixel = image_data.samples_per_pixel - dataset.PhotometricInterpretation = image_data.photometric_interpretation - dataset.TotalPixelMatrixColumns = image_data.image_size.width - dataset.TotalPixelMatrixRows = image_data.image_size.height - dataset.OpticalPathSequence = DicomSequence([]) - dataset.ImagedVolumeWidth = 1.0 - dataset.ImagedVolumeHeight = 1.0 - dataset.ImagedVolumeDepth = 1.0 - dataset.InstanceNumber = 0 - pixel_measure = Dataset() - pixel_measure.PixelSpacing = [ - image_data.pixel_spacing.width, - image_data.pixel_spacing.height, - ] - pixel_measure.SpacingBetweenSlices = 1.0 - pixel_measure.SliceThickness = 1.0 - shared_functional_group = Dataset() - shared_functional_group.PixelMeasuresSequence = DicomSequence([pixel_measure]) - dataset.SharedFunctionalGroupsSequence = DicomSequence( - [shared_functional_group] - ) - dataset.TotalPixelMatrixFocalPlanes = 1 - dataset.NumberOfOpticalPaths = 1 - dataset.ExtendedDepthOfField = "NO" - dataset.FocusMethod = "AUTO" - return dataset - - def test_write_preamble(self): - # Arrange - filepath = Path(self.tempdir.name + "/1.dcm") - - # Act - with WsiDicomFileWriter.open(filepath) as write_file: - write_file._write_preamble() - - # Assert - self.assertTrue(is_dicom(filepath)) - - def test_write_meta(self): - # Arrange - transfer_syntax = JPEGBaseline8Bit - instance_uid = generate_uid() - class_uid = WSI_SOP_CLASS_UID - filepath = Path(self.tempdir.name + "/1.dcm") - - # Act - with WsiDicomFileWriter.open(filepath) as write_file: - write_file._write_preamble() - write_file._write_file_meta(instance_uid, transfer_syntax) - file_meta = read_file_meta_info(filepath) - - # Assert - self.assertEqual(file_meta.TransferSyntaxUID, transfer_syntax) - self.assertEqual(file_meta.MediaStorageSOPInstanceUID, instance_uid) - self.assertEqual(file_meta.MediaStorageSOPClassUID, class_uid) - - def write_table( - self, file_path: Path, offset_table: OffsetTableType - ) -> List[Tuple[int, int]]: - with WsiDicomFileWriter.open(file_path) as write_file: - table_start, pixel_data_start = write_file._write_pixel_data_start( - number_of_frames=self.frame_count, offset_table=offset_table - ) - positions = write_file._write_pixel_data( - self.image_data, - self.image_data.default_z, - self.image_data.default_path, - 1, - 100, - ) - pixel_data_end = write_file._file.tell() - write_file._write_pixel_data_end() - if offset_table != OffsetTableType.NONE: - if table_start is None: - raise ValueError("Table start should not be None") - if offset_table == OffsetTableType.EXTENDED: - write_file._write_eot( - table_start, pixel_data_start, positions, pixel_data_end - ) - elif offset_table == OffsetTableType.BASIC: - write_file._write_bot(table_start, pixel_data_start, positions) - - TAG_BYTES = 4 - LENGTH_BYTES = 4 - frame_offsets = [] - for position in positions: # Positions are from frame data start - frame_offsets.append(position + TAG_BYTES + LENGTH_BYTES) - frame_lengths = [ # Lengths are disiable with 2 - 2 * math.ceil(len(frame) / 2) for frame in self.test_data - ] - expected_frame_index = [ - (offset, length) for offset, length in zip(frame_offsets, frame_lengths) - ] - return expected_frame_index - - def assertEndOfFile(self, file: WsiDicomTestFile): - with self.assertRaises(EOFError): - file._file.read(1, need_exact_length=True) - - @parameterized.expand( - [ - (OffsetTableType.NONE,), - (OffsetTableType.BASIC,), - (OffsetTableType.EXTENDED,), - ] - ) - def test_write_and_read_table(self, writen_table_type: OffsetTableType): - # Arrange - filepath = Path(self.tempdir.name + "/" + str(writen_table_type)) - writen_frame_indices = self.write_table(filepath, writen_table_type) - - # Act - with WsiDicomTestFile( - filepath, JPEGBaseline8Bit, self.frame_count - ) as read_file: - read_frame_indices, read_table_type = read_file._parse_pixel_data() - - # Assert - self.assertEqual(writen_frame_indices, read_frame_indices) - self.assertEqual(writen_table_type, read_table_type) - - def test_reserve_bot(self): - # Arrange - filepath = Path(self.tempdir.name + "/1.dcm") - - # Act - with WsiDicomFileWriter.open(filepath) as write_file: - write_file._reserve_bot(self.frame_count) - - # Assert - with WsiDicomTestFile( - filepath, JPEGBaseline8Bit, self.frame_count - ) as read_file: - tag = read_file._file.read_tag() - self.assertEqual(tag, ItemTag) - BOT_ITEM_LENGTH = 4 - length = read_file._read_tag_length(False) - self.assertEqual(length, BOT_ITEM_LENGTH * self.frame_count) - for frame in range(self.frame_count): - self.assertEqual(read_file._file.read_UL(), 0) - self.assertEndOfFile(read_file) - - def test_reserve_eot(self): - # Arrange - filepath = Path(self.tempdir.name + "/1.dcm") - - # Act - with WsiDicomFileWriter.open(filepath) as write_file: - write_file._reserve_eot(self.frame_count) - - # Assert - with WsiDicomTestFile( - filepath, JPEGBaseline8Bit, self.frame_count - ) as read_file: - tag = read_file._file.read_tag() - self.assertEqual(tag, Tag("ExtendedOffsetTable")) - EOT_ITEM_LENGTH = 8 - length = read_file._read_tag_length(True) - self.assertEqual(length, EOT_ITEM_LENGTH * self.frame_count) - for frame in range(self.frame_count): - self.assertEqual( - unpack(" Tuple[WsiDicomFile, Dataset]: - if name in self._files: - return self._files[name] - file_setting = FILE_SETTINGS[name] - dataset = create_main_dataset( - file_setting["tile_type"], file_setting["bot_type"] - ) - test_file = WsiDicomFileTestFile( - Path(self.tempdir.name).joinpath(file_setting["name"]), - file_setting["tile_type"], - file_setting["bot_type"], - dataset, - ) - create_layer_file(test_file.path, test_file.ds, self.meta_dataset) - file = WsiDicomFile.open(test_file.path) - self._files[name] = file, dataset - return file, dataset +@pytest.fixture() +def meta_dataset(): + yield create_meta_dataset() + + +@pytest.fixture() +def padded_test_frame(): + yield TESTFRAME + b"\x00" * (len(TESTFRAME) % 2) - @parameterized.expand((FILE_SETTINGS.items)) - def test_open(self, name: str, settings: Dict[str, Any]): + +@pytest.fixture() +def dataset(name: str): + file_setting = FILE_SETTINGS[name] + dataset = create_main_dataset(file_setting["tile_type"], file_setting["bot_type"]) + yield dataset + + +@pytest.fixture() +def test_file(name: str, dataset: Dataset, meta_dataset: FileMetaDataset): + file_setting = FILE_SETTINGS[name] + with TemporaryDirectory() as tempdir: + path = Path(tempdir).joinpath(file_setting["name"]) + create_layer_file(path, dataset, meta_dataset) + with WsiDicomFile.open(path) as test_file: + yield test_file + + +@pytest.mark.unittest +class TestWsiDicomFile: + @pytest.mark.parametrize(["name", "settings"], FILE_SETTINGS.items()) + def test_offset_table_type_property( + self, test_file: WsiDicomFile, settings: Dict[str, Any] + ): # Arrange - test_file, _ = self.get_file(name) # Act offset_table_type = test_file.offset_table_type + + # Assert + assert offset_table_type == settings["bot_type"] + + @pytest.mark.parametrize(["name", "settings"], FILE_SETTINGS.items()) + def test_tile_type_property( + self, test_file: WsiDicomFile, settings: Dict[str, Any] + ): + # Arrange + + # Act tile_type = test_file.dataset.tile_type # Assert - self.assertEqual(offset_table_type, settings["bot_type"]) - self.assertEqual(tile_type, settings["tile_type"]) + assert tile_type == settings["tile_type"] - @parameterized.expand((FILE_SETTINGS.keys)) - def test_dataset_property(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_dataset_property(self, test_file: WsiDicomFile): # Arrange - test_file, _ = self.get_file(name) path = test_file.filepath assert path is not None # Act - ds = WsiDataset(dcmread(path, stop_before_pixels=True)) + dataset = WsiDataset(dcmread(path, stop_before_pixels=True)) # Assert - self.assertEqual(test_file.dataset, ds) + assert test_file.dataset == dataset - @parameterized.expand((FILE_SETTINGS.keys)) - def test_image_type_property(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_image_type_property( + self, + test_file: WsiDicomFile, + ): # Arrage - test_file, _ = self.get_file(name) # Act image_type = test_file.image_type # Assert - self.assertEqual(image_type, ImageType.VOLUME) + assert image_type == ImageType.VOLUME - @parameterized.expand((FILE_SETTINGS.keys)) - def test_uids_property(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_uids_property(self, test_file: WsiDicomFile, dataset: Dataset): # Arrange - test_file, dataset = self.get_file(name) # Act uids = test_file.uids + # Assert - self.assertEqual(uids.instance, dataset.SOPInstanceUID) - self.assertEqual( - uids.concatenation, - getattr(dataset, "SOPInstanceUIDOfConcatenationSource", None), + assert uids.instance == dataset.SOPInstanceUID + assert uids.concatenation == getattr( + dataset, "SOPInstanceUIDOfConcatenationSource", None ) - self.assertEqual(uids.slide.frame_of_reference, dataset.FrameOfReferenceUID) - self.assertEqual(uids.slide.study_instance, dataset.StudyInstanceUID) - self.assertEqual(uids.slide.series_instance, dataset.SeriesInstanceUID) - - @parameterized.expand((FILE_SETTINGS.keys)) - def test_transfer_syntax_property(self, name: str): + assert uids.slide.frame_of_reference == dataset.FrameOfReferenceUID + assert uids.slide.study_instance == dataset.StudyInstanceUID + assert uids.slide.series_instance == dataset.SeriesInstanceUID + + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_transfer_syntax_property( + self, test_file: WsiDicomFile, meta_dataset: FileMetaDataset + ): # Arrange - test_file, _ = self.get_file(name) # Act transfer_syntax = test_file.transfer_syntax # Assert - self.assertEqual(transfer_syntax, self.meta_dataset.TransferSyntaxUID) + assert transfer_syntax == meta_dataset.TransferSyntaxUID - @parameterized.expand((FILE_SETTINGS.keys)) - def test_frame_offset_property(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_frame_offset_property(self, test_file: WsiDicomFile): # Arrange - test_file, _ = self.get_file(name) # Act frame_offset = test_file.frame_offset # Assert - self.assertEqual(frame_offset, 0) + assert frame_offset == 0 - @parameterized.expand((FILE_SETTINGS.keys)) - def test_frame_count_property(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_frame_count_property(self, test_file: WsiDicomFile): # Arrange - test_file, _ = self.get_file(name) # Act frame_count = test_file.frame_count # Assert - self.assertEqual(frame_count, 1) + assert frame_count == 1 - @parameterized.expand((FILE_SETTINGS.items)) - def test_get_offset_table_type(self, name: str, settings: Dict[str, Any]): + @pytest.mark.parametrize(["name", "settings"], FILE_SETTINGS.items()) + def test_get_offset_table_type( + self, test_file: WsiDicomFile, settings: Dict[str, Any] + ): # Arrange - test_file, _ = self.get_file(name) # Act offset_type = test_file._get_offset_table_type() # Assert - self.assertEqual(offset_type, settings["bot_type"]) + assert offset_type == settings["bot_type"] - @parameterized.expand((FILE_SETTINGS.keys)) - def test_validate_pixel_data_start(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_validate_pixel_data_start(self, test_file: WsiDicomFile): # Arrange - test_file, _ = self.get_file(name) test_file._file.seek(test_file._pixel_data_position) # Act @@ -214,10 +205,9 @@ def test_validate_pixel_data_start(self, name: str): # Assert test_file._validate_pixel_data_start(tag) - @parameterized.expand((FILE_SETTINGS.items)) - def test_read_bot_length(self, name: str, settings: Dict[str, Any]): + @pytest.mark.parametrize(["name", "settings"], FILE_SETTINGS.items()) + def test_read_bot_length(self, test_file: WsiDicomFile, settings: Dict[str, Any]): # Arrange - test_file, _ = self.get_file(name) test_file._file.seek(test_file._pixel_data_position) tag = test_file._file.read_tag() test_file._validate_pixel_data_start(tag) @@ -230,12 +220,11 @@ def test_read_bot_length(self, name: str, settings: Dict[str, Any]): length = test_file._read_bot_length() # Assert - self.assertEqual(length, expected_bot_length) + assert length == expected_bot_length - @parameterized.expand((FILE_SETTINGS.items)) - def test_read_bot(self, name: str, settings: Dict[str, Any]): + @pytest.mark.parametrize(["name", "settings"], FILE_SETTINGS.items()) + def test_read_bot(self, test_file: WsiDicomFile, settings: Dict[str, Any]): # Arrange - test_file, _ = self.get_file(name) test_file._file.seek(test_file._pixel_data_position) tag = test_file._file.read_tag() test_file._validate_pixel_data_start(tag) @@ -248,12 +237,11 @@ def test_read_bot(self, name: str, settings: Dict[str, Any]): bot = test_file._read_bot() # Assert - self.assertEqual(bot, first_bot_entry) + assert bot == first_bot_entry - @parameterized.expand((FILE_SETTINGS.keys)) - def test_parse_bot_table(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_parse_bot_table(self, test_file: WsiDicomFile): # Arrange - test_file, _ = self.get_file(name) TAG_BYTES = 4 LENGTH_BYTES = 4 test_file._file.seek(test_file._pixel_data_position) @@ -269,20 +257,18 @@ def test_parse_bot_table(self, name: str): ) # Assert - self.assertEqual( - positions, - [ - ( - (first_frame_item_position + TAG_BYTES + LENGTH_BYTES), - math.ceil(len(TESTFRAME) / 2) * 2, - ) - ], - ) + assert positions == [ + ( + (first_frame_item_position + TAG_BYTES + LENGTH_BYTES), + math.ceil(len(TESTFRAME) / 2) * 2, + ) + ] - @parameterized.expand((FILE_SETTINGS.keys)) - def test_read_positions_from_pixeldata(self, name: str): + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_read_positions_from_pixeldata( + self, test_file: WsiDicomFile, padded_test_frame: bytes + ): # Arrange - test_file, _ = self.get_file(name) TAG_BYTES = 4 LENGTH_BYTES = 4 test_file._file.seek(test_file._pixel_data_position) @@ -295,32 +281,33 @@ def test_read_positions_from_pixeldata(self, name: str): positions = test_file._read_positions_from_pixeldata() # Assert - self.assertEqual( - positions, - [ - ( - (first_frame_item + TAG_BYTES + LENGTH_BYTES), - len(self.padded_test_frame), - ) - ], - ) + assert positions == [ + ( + (first_frame_item + TAG_BYTES + LENGTH_BYTES), + len(padded_test_frame), + ) + ] - @parameterized.expand((FILE_SETTINGS.keys)) - def test_read_sequence_delimiter(self, name: str): - test_file, _ = self.get_file(name) + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_read_sequence_delimiter(self, test_file: WsiDicomFile): + # Arrange (last_item_position, last_item_length) = test_file.frame_positions[-1] last_item_end = last_item_position + last_item_length test_file._file.seek(last_item_end) test_file._file.read_tag() - test_file._read_sequence_delimiter() - @parameterized.expand((FILE_SETTINGS.keys)) - def test_read_frame(self, name: str): + # Act & Assert + try: + test_file._read_sequence_delimiter() + except: + pytest.fail() + + @pytest.mark.parametrize("name", FILE_SETTINGS.keys()) + def test_read_frame(self, test_file: WsiDicomFile, padded_test_frame: bytes): # Arrange - test_file, _ = self.get_file(name) # Act frame = test_file.read_frame(0) # Assert - self.assertEqual(frame, self.padded_test_frame) + assert frame == padded_test_frame diff --git a/tests/test_wsidicom_file_target.py b/tests/test_wsidicom_file_target.py index cc6aa048..99bd5678 100644 --- a/tests/test_wsidicom_file_target.py +++ b/tests/test_wsidicom_file_target.py @@ -13,54 +13,43 @@ # limitations under the License. from pathlib import Path -from tempfile import TemporaryDirectory -import unittest +from typing import Callable import pytest -from parameterized import parameterized from pydicom.uid import generate_uid -from tests.wsi_test_files import WsiTestFiles +from tests.conftest import WsiTestDefinitions from wsidicom import WsiDicom from wsidicom.file.wsidicom_file_target import WsiDicomFileTarget from wsidicom.series.levels import Levels @pytest.mark.integration -class WsiDicomFileTargetIntegrationTests(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.wsi_test_files = WsiTestFiles() - - @classmethod - def tearDownClass(cls): - cls.wsi_test_files.close() - - def setUp(self): - self.tempdir = TemporaryDirectory() - - def tearDown(self): - self.tempdir.cleanup() - - @parameterized.expand((WsiTestFiles.folders.keys)) - def test_save_levels(self, wsi_name: str): +class TestWsiDicomFileTargetIntegration: + @pytest.mark.parametrize("wsi_name", WsiTestDefinitions.wsi_names()) + def test_save_levels( + self, wsi_name: str, wsi_factory: Callable[[str], WsiDicom], tmp_path: Path + ): # Arrange - wsi = self.wsi_test_files.get_wsi(wsi_name) + wsi = wsi_factory(wsi_name) expected_levels_count = len(wsi.levels) # Act - with WsiDicomFileTarget( - Path(self.tempdir.name), generate_uid, 1, 16, "bot" - ) as target: + with WsiDicomFileTarget(tmp_path, generate_uid, 1, 16, "bot") as target: target.save_levels(wsi.levels) # Assert - with WsiDicom.open(self.tempdir.name) as saved_wsi: - self.assertEqual(expected_levels_count, len(saved_wsi.levels)) + with WsiDicom.open(tmp_path) as saved_wsi: + assert expected_levels_count == len(saved_wsi.levels) - @parameterized.expand((WsiTestFiles.folders.keys)) - def test_save_levels_add_missing(self, wsi_name: str): + @pytest.mark.parametrize("wsi_name", WsiTestDefinitions.wsi_names()) + def test_save_levels_add_missing( + self, + wsi_name: str, + wsi_factory: Callable[[str], WsiDicom], + tmp_path: Path, + ): # Arrange - wsi = self.wsi_test_files.get_wsi(wsi_name) + wsi = wsi_factory(wsi_name) levels_larger_than_tile_size = [ level for level in wsi.levels if level.size.any_greater_than(wsi.tile_size) ] @@ -68,17 +57,11 @@ def test_save_levels_add_missing(self, wsi_name: str): levels_missing_smallest_levels = Levels(levels_larger_than_tile_size) # Act - with WsiDicomFileTarget( - Path(self.tempdir.name), generate_uid, 1, 16, "bot", True - ) as target: + with WsiDicomFileTarget(tmp_path, generate_uid, 1, 16, "bot", True) as target: target.save_levels(levels_missing_smallest_levels) # Assert - with WsiDicom.open(self.tempdir.name) as saved_wsi: - self.assertEqual(expected_levels_count, len(saved_wsi.levels)) - self.assertTrue( - saved_wsi.levels[-1].size.all_less_than_or_equal(saved_wsi.tile_size) - ) - self.assertTrue( - saved_wsi.levels[-2].size.any_greater_than(saved_wsi.tile_size) - ) + with WsiDicom.open(tmp_path) as saved_wsi: + assert expected_levels_count == len(saved_wsi.levels) + assert saved_wsi.levels[-1].size.all_less_than_or_equal(saved_wsi.tile_size) + assert saved_wsi.levels[-2].size.any_greater_than(saved_wsi.tile_size) diff --git a/tests/test_wsidicom_file_writer.py b/tests/test_wsidicom_file_writer.py new file mode 100644 index 00000000..e451102d --- /dev/null +++ b/tests/test_wsidicom_file_writer.py @@ -0,0 +1,476 @@ +# Copyright 2021, 2023 SECTRA AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import os +import random +import sys +from pathlib import Path +from struct import unpack +from typing import Callable, List, Optional, OrderedDict, Sequence, Tuple, cast + +import pytest +from PIL import ImageChops, ImageFilter, ImageStat +from PIL.Image import Image as PILImage +from pydicom import Sequence as DicomSequence +from pydicom.dataset import Dataset +from pydicom.filebase import DicomFile +from pydicom.filereader import read_file_meta_info +from pydicom.misc import is_dicom +from pydicom.tag import ItemTag, SequenceDelimiterTag, Tag +from pydicom.uid import UID, JPEGBaseline8Bit, generate_uid +from tests.conftest import WsiTestDefinitions + +from wsidicom import WsiDicom +from wsidicom.file.wsidicom_file import WsiDicomFile +from wsidicom.file.wsidicom_file_base import OffsetTableType +from wsidicom.file.wsidicom_file_target import WsiDicomFileTarget +from wsidicom.file.wsidicom_file_writer import WsiDicomFileWriter +from wsidicom.geometry import Point, Size, SizeMm +from wsidicom.group.level import Level +from wsidicom.instance import ImageData, ImageCoordinateSystem +from wsidicom.uid import WSI_SOP_CLASS_UID + +SLIDE_FOLDER = Path(os.environ.get("WSIDICOM_TESTDIR", "tests/testdata/slides")) + + +class WsiDicomTestFile(WsiDicomFile): + """Test version of WsiDicomFile that overrides __init__.""" + + def __init__(self, filepath: Path, transfer_syntax: UID, frame_count: int): + self._filepath = filepath + self._file = DicomFile(filepath, mode="rb") + self._file.is_little_endian = transfer_syntax.is_little_endian + self._file.is_implicit_VR = transfer_syntax.is_implicit_VR + self._frame_count = frame_count + self._pixel_data_position = 0 + self._owned = True + self.__enter__() + + @property + def frame_count(self) -> int: + return self._frame_count + + +class WsiDicomTestImageData(ImageData): + def __init__(self, data: Sequence[bytes], tiled_size: Size) -> None: + if len(data) != tiled_size.area: + raise ValueError("Number of frames and tiled size area differ") + TILE_SIZE = Size(10, 10) + self._data = data + self._tile_size = TILE_SIZE + self._image_size = tiled_size * TILE_SIZE + + @property + def transfer_syntax(self) -> UID: + return JPEGBaseline8Bit + + @property + def image_size(self) -> Size: + return self._image_size + + @property + def tile_size(self) -> Size: + return self._tile_size + + @property + def pixel_spacing(self) -> SizeMm: + return SizeMm(1.0, 1.0) + + @property + def samples_per_pixel(self) -> int: + return 3 + + @property + def photometric_interpretation(self) -> str: + return "YBR" + + @property + def image_coordinate_system(self) -> Optional[ImageCoordinateSystem]: + return None + + def _get_decoded_tile(self, tile_point: Point, z: float, path: str) -> PILImage: + raise NotImplementedError() + + def _get_encoded_tile(self, tile: Point, z: float, path: str) -> bytes: + return self._data[tile.x + tile.y * self.tiled_size.width] + + +@pytest.fixture() +def tiled_size(): + yield Size(10, 10) + + +@pytest.fixture() +def frame_count(tiled_size: Size): + yield tiled_size.area + + +@pytest.fixture() +def rng(): + SEED = 0 + yield random.Random(SEED) + + +@pytest.fixture() +def test_data(rng: random.Random, frame_count: int): + MIN_FRAME_LENGTH = 2 + MAX_FRAME_LENGTH = 100 + lengths = [ + rng.randint(MIN_FRAME_LENGTH, MAX_FRAME_LENGTH) for i in range(frame_count) + ] + yield [ + rng.getrandbits(length * 8).to_bytes(length, sys.byteorder) + for length in lengths + ] + + +@pytest.fixture() +def image_data(test_data: List[bytes], tiled_size: Size): + yield WsiDicomTestImageData(test_data, tiled_size) + + +@pytest.fixture() +def test_dataset(image_data: ImageData, frame_count: int): + assert image_data.pixel_spacing is not None + dataset = Dataset() + dataset.SOPClassUID = WSI_SOP_CLASS_UID + dataset.ImageType = ["ORIGINAL", "PRIMARY", "VOLUME", "NONE"] + dataset.NumberOfFrames = frame_count + dataset.SOPInstanceUID = generate_uid() + dataset.StudyInstanceUID = generate_uid() + dataset.SeriesInstanceUID = generate_uid() + dataset.FrameOfReferenceUID = generate_uid() + dataset.DimensionOrganizationType = "TILED_FULL" + dataset.Rows = image_data.tile_size.width + dataset.Columns = image_data.tile_size.height + dataset.SamplesPerPixel = image_data.samples_per_pixel + dataset.PhotometricInterpretation = image_data.photometric_interpretation + dataset.TotalPixelMatrixColumns = image_data.image_size.width + dataset.TotalPixelMatrixRows = image_data.image_size.height + dataset.OpticalPathSequence = DicomSequence([]) + dataset.ImagedVolumeWidth = 1.0 + dataset.ImagedVolumeHeight = 1.0 + dataset.ImagedVolumeDepth = 1.0 + dataset.InstanceNumber = 0 + pixel_measure = Dataset() + pixel_measure.PixelSpacing = [ + image_data.pixel_spacing.width, + image_data.pixel_spacing.height, + ] + pixel_measure.SpacingBetweenSlices = 1.0 + pixel_measure.SliceThickness = 1.0 + shared_functional_group = Dataset() + shared_functional_group.PixelMeasuresSequence = DicomSequence([pixel_measure]) + dataset.SharedFunctionalGroupsSequence = DicomSequence([shared_functional_group]) + dataset.TotalPixelMatrixFocalPlanes = 1 + dataset.NumberOfOpticalPaths = 1 + dataset.ExtendedDepthOfField = "NO" + dataset.FocusMethod = "AUTO" + yield dataset + + +def write_table( + image_data: ImageData, + test_data: List[bytes], + frame_count: int, + file_path: Path, + offset_table: OffsetTableType, +) -> List[Tuple[int, int]]: + with WsiDicomFileWriter.open(file_path) as write_file: + table_start, pixel_data_start = write_file._write_pixel_data_start( + number_of_frames=frame_count, offset_table=offset_table + ) + positions = write_file._write_pixel_data( + image_data, + image_data.default_z, + image_data.default_path, + 1, + 100, + ) + pixel_data_end = write_file._file.tell() + write_file._write_pixel_data_end() + if offset_table != OffsetTableType.NONE: + if table_start is None: + raise ValueError("Table start should not be None") + if offset_table == OffsetTableType.EXTENDED: + write_file._write_eot( + table_start, pixel_data_start, positions, pixel_data_end + ) + elif offset_table == OffsetTableType.BASIC: + write_file._write_bot(table_start, pixel_data_start, positions) + + TAG_BYTES = 4 + LENGTH_BYTES = 4 + frame_offsets = [] + for position in positions: # Positions are from frame data start + frame_offsets.append(position + TAG_BYTES + LENGTH_BYTES) + frame_lengths = [ # Lengths are divisable with 2 + 2 * math.ceil(len(frame) / 2) for frame in test_data + ] + expected_frame_index = [ + (offset, length) for offset, length in zip(frame_offsets, frame_lengths) + ] + return expected_frame_index + + +@pytest.mark.save +class TestWsiDicomFileWriter: + @staticmethod + def assertEndOfFile(file: WsiDicomTestFile): + with pytest.raises(EOFError): + file._file.read(1, need_exact_length=True) + + def test_write_preamble(self, tmp_path: Path): + # Arrange + filepath = tmp_path.joinpath("1.dcm") + + # Act + with WsiDicomFileWriter.open(filepath) as write_file: + write_file._write_preamble() + + # Assert + assert is_dicom(filepath) + + def test_write_meta(self, tmp_path: Path): + # Arrange + transfer_syntax = JPEGBaseline8Bit + instance_uid = generate_uid() + class_uid = WSI_SOP_CLASS_UID + filepath = tmp_path.joinpath("1.dcm") + + # Act + with WsiDicomFileWriter.open(filepath) as write_file: + write_file._write_preamble() + write_file._write_file_meta(instance_uid, transfer_syntax) + file_meta = read_file_meta_info(filepath) + + # Assert + assert file_meta.TransferSyntaxUID == transfer_syntax + assert file_meta.MediaStorageSOPInstanceUID == instance_uid + assert file_meta.MediaStorageSOPClassUID == class_uid + + @pytest.mark.parametrize( + "writen_table_type", + [ + OffsetTableType.NONE, + OffsetTableType.BASIC, + OffsetTableType.EXTENDED, + ], + ) + def test_write_and_read_table( + self, + image_data: ImageData, + test_data: List[bytes], + frame_count: int, + writen_table_type: OffsetTableType, + tmp_path: Path, + ): + # Arrange + filepath = tmp_path.joinpath(str(writen_table_type)) + writen_frame_indices = write_table( + image_data, test_data, frame_count, filepath, writen_table_type + ) + + # Act + with WsiDicomTestFile(filepath, JPEGBaseline8Bit, frame_count) as read_file: + read_frame_indices, read_table_type = read_file._parse_pixel_data() + + # Assert + assert writen_frame_indices == read_frame_indices + assert writen_table_type == read_table_type + + def test_reserve_bot(self, tmp_path: Path, frame_count: int): + # Arrange + filepath = tmp_path.joinpath("1.dcm") + + # Act + with WsiDicomFileWriter.open(filepath) as write_file: + write_file._reserve_bot(frame_count) + + # Assert + with WsiDicomTestFile(filepath, JPEGBaseline8Bit, frame_count) as read_file: + tag = read_file._file.read_tag() + assert tag == ItemTag + BOT_ITEM_LENGTH = 4 + length = read_file._read_tag_length(False) + assert length == BOT_ITEM_LENGTH * frame_count + for frame in range(frame_count): + assert read_file._file.read_UL() == 0 + self.assertEndOfFile(read_file) + + def test_reserve_eot(self, tmp_path: Path, frame_count: int): + # Arrange + filepath = tmp_path.joinpath("1.dcm") + + # Act + with WsiDicomFileWriter.open(filepath) as write_file: + write_file._reserve_eot(frame_count) + + # Assert + with WsiDicomTestFile(filepath, JPEGBaseline8Bit, frame_count) as read_file: + tag = read_file._file.read_tag() + assert tag == Tag("ExtendedOffsetTable") + EOT_ITEM_LENGTH = 8 + length = read_file._read_tag_length(True) + assert length == EOT_ITEM_LENGTH * frame_count + for frame in range(frame_count): + assert unpack(" List[List[float]]: def __len__(self) -> int: raise NotImplementedError() - @abstractmethod - def __repr__(self) -> str: - raise NotImplementedError() - @classmethod @abstractmethod def from_coords( @@ -1233,7 +1229,7 @@ def _set_planes_in_ds(self, ds: Dataset) -> Dataset: Dataset The Annotation Group Sequence with focal plane attributes. """ - if self._z_planes is []: + if len(self._z_planes) == 0: ds.AnnotationAppliesToAllZPlanes = "YES" else: ds.AnnotationAppliesToAllZPlanes = "NO" @@ -1254,7 +1250,7 @@ def _set_optical_paths_in_ds(self, ds: Dataset) -> Dataset: Dataset The Annotation Group Sequence with optical path attributes. """ - if self._optical_paths is []: + if len(self._optical_paths) == 0: ds.AnnotationAppliesToAllOpticalPaths = "YES" else: ds.AnnotationAppliesToAllOpticalPaths = "NO" diff --git a/wsidicom/group/group.py b/wsidicom/group/group.py index c783e008..7fc8ebcb 100644 --- a/wsidicom/group/group.py +++ b/wsidicom/group/group.py @@ -33,6 +33,7 @@ ImageType, WsiDataset, WsiInstance, + image_coordinate_system, ) from wsidicom.stringprinting import dict_pretty_str from wsidicom.uid import SlideUids @@ -345,18 +346,19 @@ def get_region_mm( PILImage Region as image """ - if slide_origin and self.image_coordinate_system is None: - raise ValueError( - "Can't map to slide region as image coordinate system is not defined." - ) - + to_coordinate_system = None if slide_origin: - region = self.image_coordinate_system.slide_to_image(region) + if self.image_coordinate_system is None: + raise ValueError( + "Can't map to slide region as image coordinate system is not defined." + ) + to_coordinate_system = self.image_coordinate_system + region = to_coordinate_system.slide_to_image(region) pixel_region = self.mm_to_pixel(region) image = self.get_region(pixel_region, z, path, threads) - if slide_origin: + if to_coordinate_system: image = image.rotate( - self.image_coordinate_system.rotation, + to_coordinate_system.rotation, resample=Image.Resampling.BILINEAR, expand=True, ) diff --git a/wsidicom/optical.py b/wsidicom/optical.py index 451ab458..891448e7 100644 --- a/wsidicom/optical.py +++ b/wsidicom/optical.py @@ -565,10 +565,11 @@ def __init__( List of OpticalPaths. """ if optical_paths is None: - optical_paths = [] - self._optical_paths: Dict[str, OpticalPath] = { - optical_path.identifier: optical_path for optical_path in optical_paths - } + self._optical_paths = {} + else: + self._optical_paths: Dict[str, OpticalPath] = { + optical_path.identifier: optical_path for optical_path in optical_paths + } @classmethod def open(cls, instances: Sequence[WsiInstance]) -> "OpticalManager":