forked from Project-MONAI/model-zoo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_spleen_ct_segmentation.py
120 lines (106 loc) · 4.71 KB
/
test_spleen_ct_segmentation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import unittest
import nibabel as nib
import numpy as np
from monai.bundle import ConfigWorkflow
from parameterized import parameterized
TEST_CASE_1 = [ # train, evaluate
{
"bundle_root": "models/spleen_ct_segmentation",
"images": "$list(sorted(glob.glob(@dataset_dir + '/image_*.nii.gz')))",
"labels": "$list(sorted(glob.glob(@dataset_dir + '/label_*.nii.gz')))",
"epochs": 1,
"train#dataset#cache_rate": 0.0,
"validate#dataset#cache_rate": 0.0,
"train#dataloader#num_workers": 1,
"validate#dataloader#num_workers": 1,
"train#random_transforms#0#spatial_size": [32, 32, 32],
}
]
TEST_CASE_2 = [ # inference
{
"bundle_root": "models/spleen_ct_segmentation",
"datalist": "$list(sorted(glob.glob(@dataset_dir + '/image_*.nii.gz')))",
"handlers#0#_disabled_": True, # do not load weights
}
]
class TestSpleenCTSeg(unittest.TestCase):
def setUp(self):
self.dataset_dir = tempfile.mkdtemp()
dataset_size = 10
input_shape = (64, 64, 64)
for s in range(dataset_size):
test_image = np.random.randint(low=0, high=2, size=input_shape).astype(np.int8)
test_label = np.random.randint(low=0, high=2, size=input_shape).astype(np.int8)
image_filename = os.path.join(self.dataset_dir, f"image_{s}.nii.gz")
label_filename = os.path.join(self.dataset_dir, f"label_{s}.nii.gz")
nib.save(nib.Nifti1Image(test_image, np.eye(4)), image_filename)
nib.save(nib.Nifti1Image(test_label, np.eye(4)), label_filename)
def tearDown(self):
shutil.rmtree(self.dataset_dir)
@parameterized.expand([TEST_CASE_1])
def test_train_eval_config(self, override):
override["dataset_dir"] = self.dataset_dir
bundle_root = override["bundle_root"]
train_file = os.path.join(bundle_root, "configs/train.json")
eval_file = os.path.join(bundle_root, "configs/evaluate.json")
trainer = ConfigWorkflow(
workflow="train",
config_file=train_file,
logging_file=os.path.join(bundle_root, "configs/logging.conf"),
meta_file=os.path.join(bundle_root, "configs/metadata.json"),
**override,
)
trainer.initialize()
# check required and optional properties
check_result = trainer.check_properties()
if check_result is not None and len(check_result) > 0:
raise ValueError(f"check properties for train config failed: {check_result}")
trainer.run()
trainer.finalize()
validator = ConfigWorkflow(
# override train.json, thus set the workflow to "train" rather than "eval"
workflow="train",
config_file=[train_file, eval_file],
logging_file=os.path.join(bundle_root, "configs/logging.conf"),
meta_file=os.path.join(bundle_root, "configs/metadata.json"),
**override,
)
validator.initialize()
check_result = validator.check_properties()
if check_result is not None and len(check_result) > 0:
raise ValueError(f"check properties for overrided train config failed: {check_result}")
validator.run()
validator.finalize()
@parameterized.expand([TEST_CASE_2])
def test_infer_config(self, override):
override["dataset_dir"] = self.dataset_dir
bundle_root = override["bundle_root"]
inferrer = ConfigWorkflow(
workflow="infer",
config_file=os.path.join(bundle_root, "configs/inference.json"),
logging_file=os.path.join(bundle_root, "configs/logging.conf"),
meta_file=os.path.join(bundle_root, "configs/metadata.json"),
**override,
)
inferrer.initialize()
# check required and optional properties
check_result = inferrer.check_properties()
if check_result is not None and len(check_result) > 0:
raise ValueError(f"check properties for inference config failed: {check_result}")
inferrer.run()
inferrer.finalize()
if __name__ == "__main__":
unittest.main()