-
Notifications
You must be signed in to change notification settings - Fork 0
/
estore
executable file
·182 lines (159 loc) · 4.97 KB
/
estore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/bin/env python3
#from __future__ import division, absolute_import, print_function, unicode_literals
import os, sys, io, re, json, argparse, getpass, signal
import simplecrypt
signal.signal(signal.SIGINT, lambda a, b: sys.exit("abort: SIGINT"))
signal.signal(signal.SIGTERM, lambda a, b: sys.exit("abort: SIGTERM"))
def is_unicode_str(o):
try: return type(o) is unicode
except NameError: return type(o) is str
def to_unicode_str(o):
try: return unicode(o)
except NameError: return str(o)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--path", required=True)
group = parser.add_mutually_exclusive_group()
group.add_argument("-s", "--set")
group.add_argument("-u", "--unset", action="store_true")
parser.add_argument("-f", "--force", action="store_true")
parser.add_argument("-rs", "--raw-str", action="store_true")
args = parser.parse_args()
path = args.path.split("/")
listing_keys = len(path) > 1 and not path[-1]
if listing_keys:
path = path[:-1]
if not re.match("^[A-Za-z0-9_-]+$", path[0]): # simple slug
sys.exit("error: invalid path root '{}'".format(path[0]))
for n in path[1:]:
if not re.match("^[^\s]+$", n): # no spaces
sys.exit("error: invalid path part '{}'".format(n))
if listing_keys and (args.set or args.unset):
sys.exit("error: path cannot end in '/' when using --set or --unset")
# find home directory
dir_home = os.path.expanduser("~")
if dir_home == "~" or not os.path.exists(dir_home):
sys.exit("error: home path not found")
# find .estore directory
dir_estore = os.path.join(dir_home, ".estore")
if os.path.lexists(dir_estore):
if not os.path.isdir(dir_estore):
sys.exit("error: '{}' exists but is not a directory".format(dir_estore))
else:
os.mkdir(dir_estore)
# find the requested file
the_file = os.path.join(dir_estore, path[0] + ".bin")
key=None
def get_key(new=False):
global key
if not key:
key = getpass.getpass(("NEW " if new else "") + "Encryption Key for '{}': ".format(path[0]))
if not key:
sys.exit("error: empty key")
return key
def read_the_file(required=False):
data = None
if os.path.lexists(the_file):
if os.path.isfile(the_file):
with io.open(the_file, "rb") as fp:
data = fp.read()
else:
sys.exit("error: '{}' exists but is not a file".format(the_file))
elif required:
sys.exit("error: '{}' not found".format(path[0]))
if not data:
return None
# TODO: decrypt
try:
data = simplecrypt.decrypt(get_key(), data)
except simplecrypt.DecryptionException as e:
sys.exit("error: decryption error (simple-crypt: {})".format(str(e).lower().rstrip('.')))
return json.loads(data)
def write_the_file(obj):
data = json.dumps(obj, separators=(',', ':'))
data = simplecrypt.encrypt(get_key(True), data)
with io.open(the_file, "wb") as fp:
fp.write(data)
def remove_the_file():
if os.path.lexists(the_file) and os.path.isfile(the_file):
os.remove(the_file)
else:
sys.exit("error: '{}' not found".format(path[0]))
def deep_read(obj, keys):
o = obj
rel = [keys[0]]
for k in keys[1:]:
if type(o) is not dict:
sys.exit("error: '{}' is not a collection".format("/".join(rel)))
rel.append(k)
if not k in o:
sys.exit("error: '{}' not found".format("/".join(rel)))
o = o[k]
return o
def deep_write(obj, keys, value):
if type(obj) is not dict:
sys.exit("error: '{}' is not a collection".format(keys[0]))
o = obj
rel = [keys[0]]
for k in keys[1:-1]:
rel.append(k)
if not k in o:
o[k] = {}
elif type(o[k]) is not dict:
sys.exit("error: '{}' is not a collection".format("/".join(rel)))
o = o[k]
if keys[-1] in o:
sys.exit("error: cannot overwrite '{}' (use --unset first)".format("/".join(keys)))
o[keys[-1]] = value
def deep_delete(obj, keys):
if type(obj) is not dict:
sys.exit("error: '{}' is not a collection".format(keys[0]))
o = obj
rel = [keys[0]]
for k in keys[1:-1]:
rel.append(k)
if not k in o:
o[k] = {}
elif type(o[k]) is not dict:
sys.exit("error: '{}' is not a collection".format("/".join(rel)))
o = o[k]
if not keys[-1] in o:
sys.exit("error: '{}' not found".format("/".join(keys)))
del o[keys[-1]]
def pretty_print_obj(obj):
print(json.dumps(obj, indent=4, separators=(',', ': ')))
if args.set:
# writing
data = read_the_file()
if len(path) == 1:
# handle special case when writing to the root
if data != None:
sys.exit("error: cannot overwrite '{}' (use --unset first)".format(path[0]))
data = args.set
else:
# general case
if data == None:
data = {}
deep_write(data, path, args.set)
write_the_file(data)
elif args.unset:
if len(path) == 1:
if not args.force:
sys.exit("error: cannot unset root without force")
remove_the_file()
else:
data = read_the_file()
deep_delete(data, path)
write_the_file(data)
else:
# reading
data = read_the_file(True)
obj = deep_read(data, path)
if listing_keys:
if type(obj) is not dict:
sys.exit("error: '{}' is not a collection".format("/".join(path)))
pretty_print_obj(list(obj.keys()))
elif args.raw_str and is_unicode_str(obj):
sys.stdout.write(obj)
sys.stdout.flush()
else:
pretty_print_obj(obj)