-
Notifications
You must be signed in to change notification settings - Fork 1
/
dynamodb.py
109 lines (90 loc) · 3.15 KB
/
dynamodb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import boto3
from boto3.dynamodb.conditions import Key
import os
class Dynamodb:
def __init__(self):
SERVER_SECRET_KEY = os.getenv("SECRET")
SERVER_PUBLIC_KEY = os.getenv("ACCESSKEY")
REGION_NAME = os.getenv('REGION_NAME')
self.dynamodb_client = dynamodb = boto3.resource('dynamodb',
aws_access_key_id=SERVER_PUBLIC_KEY,
aws_secret_access_key=SERVER_SECRET_KEY,
region_name=REGION_NAME
)
self.statustable = self.dynamodb_client.Table('statustable')
def getStatus(self):
response = self.statustable.get_item(
Key={
'statuskey': 'personpresent'
}
)
statusvalue = response['Item']['statusvalue']
return statusvalue
def setStatus(self,value, sessionid):
if (value == 'True'):
updatevalue = '{"presence": {"S": "'+ value + '"},"sessionID": {"S": "'+sessionid+'"}}'
else:
updatevalue = value
self.statustable.update_item(
Key={
'statuskey': 'personpresent'
},
UpdateExpression='SET statusvalue = :statusvalue',
ExpressionAttributeValues={
':statusvalue': updatevalue
}
)
def setStatusek(self,value):
items_to_delete = [value]
with self.statustable.batch_writer() as batch:
for item in items_to_delete:
response = batch.put_item(Item={
"statuskey": item["statuskey"],
"statusvalue": item["statusvalue"]
})
def getPosture(self):
response = self.statustable.get_item(
Key={
'statuskey': 'currentposture'
}
)
statusvalue = response['Item']['statusvalue']
return statusvalue
def setPosture(self,value):
self.statustable.update_item(
Key={
'statuskey': 'currentposture',
},
UpdateExpression='SET statusvalue = :statusvalue',
ExpressionAttributeValues={
':statusvalue': value
}
)
def getDeviationExceeded(self):
response = self.statustable.get_item(
Key={
'statuskey': 'deviationexceeded'
}
)
statusvalue = response['Item']['statusvalue']
return statusvalue
def setDeviationExceeded(self,value):
self.statustable.update_item(
Key={
'statuskey': 'deviationexceeded',
},
UpdateExpression='SET statusvalue = :statusvalue',
ExpressionAttributeValues={
':statusvalue': value
}
)
def setTooManyPeople(self, value):
self.statustable.update_item(
Key={
'statuskey': 'toomanypeople'
},
UpdateExpression='SET statusvalue = :statusvalue',
ExpressionAttributeValues={
':statusvalue': value
}
)