-
Notifications
You must be signed in to change notification settings - Fork 0
/
websocket_server.py
139 lines (122 loc) · 4.7 KB
/
websocket_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from aioesphomeapi import APIClient,ReconnectLogic, APIConnectionError, LogLevel
import zeroconf
import asyncio
import websockets
import datetime
import time
import planet_weight
import math
import numpy as np
import logging
import argparse
import random
def get_now():
return datetime.datetime.now()
weight_key = 520680920
weight = 0
weight_date = get_now()
planet_weight_zero = planet_weight.get_weight_json(weight)
weight_median_array = []
weight_median_array_index = -1
weight_median = 0
def update_weight_array(new_weight):
global weight_median_array_index
global weight_median
global weight_median_array
weight_median_array_index = weight_median_array_index+1
if len(weight_median_array) < 10000:
weight_median_array.append(new_weight)
else:
weight_median_array[weight_median_array_index%10000] = new_weight
weight_median = np.median(weight_median_array)
#print(f"Median is {weight_median}, len is {len(weight_median_array)}")
async def main(args):
logging.info("Astropolis scale starting ...")
"""Connect to an ESPHome device and get details."""
loop = asyncio.get_running_loop()
def change_callback(state):
try:
global weight
global weight_date
"""Print the state changes of the device.."""
logging.debug(f"state: {state}, isWeight:{state.key == weight_key}, state isnan: {math.isnan(state.state)}")
if state is not None and not math.isnan(state.state) and state.key == weight_key:
weight_date = get_now()
update_weight_array(state.state)
weight = state.state - weight_median
logging.debug(f"Setting weight: {weight} corrected with {weight_median} at date {weight_date}")
if args.random:
weight = random.randrange(0, 100)
except Exception as e:
logging.error("erroring out of callback", e)
except:
logging.error("erroring out of callback 2")
# Establish connection
api = APIClient(loop, "esp-scale-1.local", 6053, "1WkzndV8oAZ5sqbe47rc", client_info="Moonscale")
async def on_connect():
try:
await api.subscribe_states(change_callback)
# Get API version of the device's firmware
logging.info(api.api_version)
# Show device details
device_info = await api.device_info()
logging.info(device_info)
# List all entities of the device
entities = await api.list_entities_services()
logging.debug(f'Listing all entities: {entities}')
except APIConnectionError as e:
logging.error("Esphome api connection error", e)
await api.disconnect()
async def on_disconnect():
logger.warning("Disconnected from API")
zc = zeroconf.Zeroconf()
reconnect = ReconnectLogic(
client=api,
on_connect=on_connect,
on_disconnect=on_disconnect,
zeroconf_instance=zc,
)
await reconnect.start()
await websockets.serve(weight_socket, "127.0.0.1", 5678, loop=loop)
try:
while True:
try:
await asyncio.sleep(5)
except Exception as e:
logging.error("catched exception", e)
except KeyboardInterrupt:
logging.error("Keyb interrupt")
pass
except KeyboardInterrupt:
await reconnect.stop()
zc.close()
finally:
logging.info("Closing loop.")
def check_stale_weight(w, w_zero, w_date):
time_diff = (get_now() - weight_date).total_seconds()
if time_diff > 30:
return w_zero
return w
async def weight_socket(websocket, path):
global weight
global weight_date
try:
while True:
to_send = check_stale_weight(planet_weight.get_weight_json(weight, rounding=1), planet_weight_zero, weight_date)
logging.debug(f"Sending WS: weight={to_send}\n")
await websocket.send(to_send)
await asyncio.sleep(1)
finally:
print("exiting weight socket")
if __name__ == "__main__":
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(format="%(asctime)s %(name)s: %(levelname)s %(message)s")
parser = argparse.ArgumentParser(description="Moonscale server - reads scale, provides websocket")
parser.add_argument( "-r", "--random", help="Sets random weights for testing purposes", action="store_true", required=False)
parser.add_argument( "-v", "--verbose", help="Set logging to debug mode", action="store_true")
args = parser.parse_args()
# add the handlers to the logger
if args.verbose:
logger.setLevel(logging.DEBUG)
asyncio.run(main(args))