-
Notifications
You must be signed in to change notification settings - Fork 2
/
dbus-ruuvi
executable file
·173 lines (129 loc) · 4.89 KB
/
dbus-ruuvi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
from ruuvitag_sensor.ruuvi import RuuviTagSensor
import gobject
import platform
import argparse
import logging
import sys
import os
from threading import Thread
from dbus.mainloop.glib import DBusGMainLoop
import dbus
# our own packages
sys.path.insert(1, os.path.join(os.path.dirname(__file__), './ext/velib_python'))
print os.path.join(os.path.dirname(__file__), './ext/velib_python')
from vedbus import VeDbusService
from settingsdevice import SettingsDevice
VERSION = '0.01'
# /Temperature degrees Celcius
# /Status 0=Ok; 1=Disconnected; 2=Short circuited; 3=Reverse polarity; 4=Unknown
# /Scale <- normally not necessary! devices should be calibrated.
# /Offset <- normally not necessary! devices should be calibrated.
# /TemperatureType 0=battery; 1=fridge; 2=generic
# MAC C3:E9:A1:E8:C9:C1
# {'battery': 3157.0, 'pressure': 1019.17, 'mac': 'c3e9a1e8c9c1', 'measurement_sequence_number': 18899, 'acceleration_z': 976, 'acceleration': 979.61625139643331, 'temperature': 17.550000000000001, 'acceleration_y': 76, 'acceleration_x': -36, 'data_format': 5, 'humidity': 37.210000000000001, 'tx_power': 4, 'movement_counter': 62}
class DbusRuuviReader:
def __init__(self):
self._dbusservice=VeDbusService(com.victronenergy.temperature)
class SystemBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)
class SessionBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)
def dbusconnection():
return SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()
class RuuviService(object):
product_id = 0xFFFF
_product_name = 'Generic GPIO'
def __init__(self, bus, mac, data):
self.bus = bus
self.mac = mac
self.data = data
self._product_name = 'Ruuvi Tag ' + mac[8:10] + mac[10:13]
path = '/Settings/Devices/Ruuvi' + mac
def_inst = '%s:%s' % ('temperature', 40)
SETTINGS = {
'instance': [path + '/ClassAndVrmInstance', def_inst, 0, 0],
'customname': [path + '/CustomName', '', 0, 0],
}
self.settings = SettingsDevice(bus, SETTINGS, self.setting_changed)
self.service = VeDbusService('com.victronenergy.temperature.ruuvi_' + mac, bus=bus)
# Add mandatory paths
self.service.add_path('/Mgmt/ProcessName', __file__)
self.service.add_path('/Mgmt/ProcessVersion', VERSION)
self.service.add_path('/Mgmt/Connection', "Ruuvi BLE Tag")
self.service.add_path('/DeviceInstance', self.settings['instance'].split(':')[1])
self.service.add_path('/ProductId', 0xFFFF)
self.service.add_path('/ProductName', self._product_name)
self.service.add_path('/Connected', 1)
# Add data paths
self.service.add_path('/Temperature', value=data['temperature'])
self.service.add_path('/Status', value=0) # Connected
self.service.add_path('/TemperatureType', value=2) # 0=battery; 1=fridge; 2=generic
# Add custom name path
# def _change_name(p, v):
# # This should fire a change event that will update product_name
# # below.
# settings['name'] = v
# return True
#
#self.service.add_path('/CustomName', settings['name'], writeable=True,
# onchangecallback=_change_name)
def setting_changed(self, name, old, new):
return
if self.service:
# There is no service yet, and looks like there
# are some return calls when first making the service
# which btw is something to look into, but to ignore for now
return
if name == 'customname':
self.service['/CustomName'] = new
return
def update(self, data):
print self.mac + ' new temp: ' + str(data['temperature'])
self.service['/Temperature'] = data['temperature']
@property
def product_name(self):
return self.settings['name'] or self._product_name
@product_name.setter
def product_name(self, v):
# Some pin types don't have an associated service (Disabled pins for
# example)
if self.service is not None:
self.service['/ProductName'] = v or self._product_name
def deactivate(self):
self.save_count()
self.service.__del__()
del self.service
self.service = None
@property
def active(self):
return self.service is not None
@classmethod
def createHandler(cls, _type, *args, **kwargs):
if _type in cls.handlers:
return cls.handlers[_type](*args, **kwargs)
return None
def main():
ruuvis = {}
def handle_data(d):
mac = d[0].replace(':', '')
data = d[1]
if mac in ruuvis:
ruuvis[mac].update(data)
else:
ruuvis[mac] = RuuviService(dbusconnection(), mac, data)
DBusGMainLoop(set_as_default=True)
# Need to run the HCI polling in separate thread. Pass in the mainloop so
# the thread can kill us if there is an exception.
gobject.threads_init()
mainloop = gobject.MainLoop()
t = Thread(target=lambda: RuuviTagSensor.get_datas(handle_data))
t.daemon = True
t.start()
try:
mainloop.run()
except KeyboardInterrupt:
pass
main()