-
Notifications
You must be signed in to change notification settings - Fork 3
/
mqtt-script.py
116 lines (97 loc) · 3.92 KB
/
mqtt-script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import paho.mqtt.client as mqtt
import sys
from meshtastic import protocols, BROADCAST_NUM
try:
from meshtastic import mqtt_pb2, portnums_pb2, mesh_pb2
except ImportError:
from meshtastic.protobuf import mqtt_pb2, portnums_pb2, mesh_pb2
from google.protobuf.json_format import MessageToJson
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
root_topic = 'msh'
default_key = "1PG7OiApB1nwvP+rz05pAQ=="
node_names = {}
# with thanks to pdxlocs
def try_decode(mp):
key_bytes = base64.b64decode(default_key.encode('ascii'))
nonce = getattr(mp, "id").to_bytes(8, "little") + getattr(mp, "from").to_bytes(8, "little")
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize()
data = mesh_pb2.Data()
data.ParseFromString(decrypted_bytes)
mp.decoded.CopyFrom(data)
def on_connect(client, userdata, flags, reason_code, properties = None):
global root_topic
if reason_code == 0:
client.subscribe(f'{root_topic}/2/c/#')
client.subscribe(f'{root_topic}/2/e/#')
else:
print(f"{userdata} {flags} {reason_code} {properties}")
def on_disconnect(client, userdata, flags, reason_code, properties = None):
print(f"disconnected with reason code {str(reason_code)}")
def on_message(client, userdata, msg):
global node_names
se = mqtt_pb2.ServiceEnvelope()
try:
se.ParseFromString(msg.payload)
mp = se.packet
except Exception as e:
print(f"ERROR: parsing service envelope: {str(e)}")
print(f"{msg.info} {msg.payload}")
return
from_id = getattr(mp, 'from')
if from_id in node_names:
from_id = f"{from_id:08x}[{node_names.get(getattr(mp,'from'))}]"
else:
from_id = f"{from_id:08x}"
to_id = mp.to
if to_id == BROADCAST_NUM:
to_id = 'all'
else:
to_id = f"{to_id:08x}"
pn = portnums_pb2.PortNum.Name(mp.decoded.portnum)
prefix = f"{mp.channel} [{from_id}->{to_id}] {pn}:"
if mp.HasField("encrypted") and not mp.HasField("decoded"):
try:
try_decode(mp)
pn = portnums_pb2.PortNum.Name(mp.decoded.portnum)
prefix = f"{mp.channel} [{from_id}->{to_id}] {pn}:"
except Exception as e:
print(f"{prefix} could not be decrypted")
return
handler = protocols.get(mp.decoded.portnum)
if handler is None:
print(f"{prefix} no handler came from protocols")
return
if handler.protobufFactory is None:
print(f"{prefix} {mp.decoded.payload}")
else:
pb = handler.protobufFactory()
pb.ParseFromString(mp.decoded.payload)
p = MessageToJson(pb)
if mp.decoded.portnum == portnums_pb2.PortNum.NODEINFO_APP:
print(f"node {getattr(mp,'from'):x} has short_name {pb.short_name}")
node_names[getattr(mp,"from")] = pb.short_name
from_id = f"{getattr(mp,'from'):x}[{node_names.get(getattr(mp,'from'))}]"
prefix = f"{mp.channel} [{from_id}->{to_id}] {pn}:"
print(f"{prefix} {p}")
def connect(client, username, pw, broker, port):
try:
client.username_pw_set(username, pw)
client.connect(broker, port, 60)
except Exception as e:
print(f"failed connect: {str(e)}")
if __name__ == "__main__":
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="", clean_session=True, userdata=None)
except:
client = mqtt.Client(client_id="", clean_session=True, userdata=None)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
connect(client, sys.argv[1], sys.argv[2], sys.argv[3], int(sys.argv[4]))
if len(sys.argv) > 5:
root_topic = sys.argv[5]
client.loop_forever()