forked from kdm9/sampler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sampler.py
138 lines (117 loc) · 3.75 KB
/
sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from PIL import Image
import zbarlight as zbar
import subprocess as sp
import readline
import io
import os
import os.path as op
def ask_yesno(prompt, default=True):
if default:
prompt += " [Y/n] "
else:
prompt += " [y/N] "
resp = input(prompt)
if not resp:
return default
elif resp.lower()[0] == 'y':
return True
else:
return False
def ask_default(prompt, default="", dtype=str):
prompt += " [%s] " % default
resp = input(prompt)
if not resp:
return dtype(default)
else:
return dtype(resp)
def qrdecode(image):
code = zbar.scan_codes('qrcode', image)
if isinstance(code, list):
code = code[0]
if isinstance(code, bytes):
return code.decode('utf-8')
else:
return code
def capture_image():
capture_cmd = "gphoto2 --capture-image-and-download --stdout".split()
proc = sp.Popen(capture_cmd, stdout=sp.PIPE, stderr=sp.PIPE)
while True:
try:
out, err = proc.communicate(timeout=20)
break
except sp.TimeoutExpired:
if ask_yesno("Image capture is taking ages. Kill capture?"):
proc.kill()
if proc.returncode != 0:
if ask_yesno("Image capture failed. Retry?"):
return capture_image()
return out
def show_image(image):
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import numpy as np
plt.imshow(np.asarray(image))
plt.show()
class Capturer(object):
"""Captures images from camera using gphoto CLI"""
def __init__(self, imagedir, samplecsv):
os.makedirs(imagedir, exist_ok=False)
self.imagedir = imagedir
self.sample_csv = samplecsv
with open(self.sample_csv, "w") as fh:
print("sample_id", "tube_id", sep=',', file=fh)
def capture_sample(self):
images = []
sample_id = None
tube_id = None
try:
while True:
img_jpg = capture_image()
images.append(img_jpg)
img = Image.open(io.BytesIO(img_jpg))
if ask_yesno("Show image?"):
show_image(img)
if sample_id is None:
sid = qrdecode(img)
sample_id = ask_default("Sample name is", sid)
if not ask_yesno("Capture another image?"):
break
tube_id = ask_default("What's the tube label?", default=sample_id)
except KeyboardInterrupt:
pass
finally:
if sample_id is None:
return
# Make image dir
imagedir = op.join(self.imagedir, sample_id)
os.makedirs(imagedir, exist_ok=False)
# Write images
for i, jpgbytes in enumerate(images):
fn = op.join(imagedir, "{:02d}.jpg".format(i))
with open(fn, "wb") as fh:
fh.write(jpgbytes)
# Append to sample CSV
with open(self.sample_csv, "a") as fh:
print(sample_id, tube_id, sep=",", file=fh)
def main(self):
while True:
try:
input("Press enter to start sample capture (Ctrl-C will close)")
except (KeyboardInterrupt, EOFError):
break
self.capture_sample()
if __name__ == "__main__":
DOC = """
USAGE:
sampler -c CSV -d OUTDIR
OPTIONS:
-d OUTDIR Image output directory (base of per-sample image directories).
-c CSV Sample -> Tube ID mapping CSV filename (will be overwritten).
"""
from docopt import docopt
args = docopt(DOC)
samplecsv = args['-c']
outdir = args['-d']
c = Capturer(outdir, samplecsv)
c.main()