-
Notifications
You must be signed in to change notification settings - Fork 1
/
Pedals.py
134 lines (105 loc) · 3.58 KB
/
Pedals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import mdlog
log = mdlog.getLogger(__name__)
from EventLoop import getLoop, pushEvent
from EventList import PedalsEvent, ExitEvent, RestartEvent
log.info("Importing pedals!")
from piehid32 import *
log.info("Imported pedals!")
from ctypes import *
from eventfd import eventfd
import struct
import sys, os
import atexit
wakeupFd = eventfd(0, 0)
oldPedals = [0] * 3
_pedalCb = None
def setPedalCallback(cb):
"""
Set a callback that gets executed on the pedal reading thread,
unlike the PedalEvent which is always received on the main thread.
"""
global _pedalCb
_pedalCb = cb
def getPedalCallback():
return _pedalCb
def dataCb(data, deviceId, error):
global oldPedals
pedals = [0] * 3
pedals[0] = (data[3] & 2) != 0
pedals[1] = (data[3] & 4) != 0
pedals[2] = (data[3] & 8) != 0
for i in range(3):
if pedals[i] != oldPedals[i]:
changed = i
oldPedals = pedals
log.info("Pedals: [%s]" % pedals)
pushEvent(PedalsEvent(pedals, changed))
if _pedalCb:
_pedalCb(pedals, changed)
global wakeupFd
# We use eventfd to wakeup the main thread so it will
# see the pedal event
written = os.write(wakeupFd, c_longlong(1))
if written != 8:
log.error("Error writing to eventfd.")
return 0
def errorCb(deviceId, status):
log.error("Error in pedals: [%s] [%s]" % (deviceId, status))
return 0
def readOut():
num = os.read(wakeupFd, 8)
log.debug("Pedal EventFD Read: %d" % struct.unpack('@Q', num)[0])
getLoop().subscribeFile(wakeupFd, getLoop().FILE_INPUT, readOut)
_dataCb = PHIDDataEvent(dataCb)
_errorCb = PHIDErrorEvent(errorCb)
infoArrayType = TEnumHIDInfo * MAX_XKEY_DEVICES
TEnumHIDInfoPtr = POINTER(TEnumHIDInfo)
info = infoArrayType()
count = c_long(0)
result = EnumeratePIE(PI_VID, info, pointer(count))
dev = None
for i in range(count.value):
dev = pointer(info[i])
log.info("Found XKeys Device:")
log.info("\tPID: %04x" % dev.contents.PID)
log.info("\tUsage Page: %04x" % dev.contents.UP)
log.info("\tUsage: %04x" % dev.contents.Usage)
log.info("\tVersion: %04x" % dev.contents.Version)
# magic constants from piehidtestgui!
if dev.contents.UP == 0x000c and dev.contents.Usage == 0x0001:
log.info("\tSetting up interface.")
result = SetupInterfaceEx(dev.contents.Handle)
if result != 0:
log.info("Unable to open device. Error: %d" % result)
# Why break after one iteration? No idea, but if I don't
# do this then my keyboard stops working until I unplug it
# and plug it back in!
break
if dev.contents.Handle < 0:
log.error("Unable to open device.")
sys.exit(1)
dataArrayType = c_char * 33
data = dataArrayType()
idx = c_int(0)
result = SetDataCallback(dev.contents.Handle, _dataCb)
if result != 0:
log.error("Unable to set data callback, Error: %d" % result)
result = SetErrorCallback(dev.contents.Handle, _errorCb)
if result != 0:
log.error("Unable to set error callback, Error: %d" % result)
def exitHandler():
global dev
if dev:
SetDataCallback(dev.contents.Handle, cast(None, PHIDDataEvent))
SetErrorCallback(dev.contents.Handle, cast(None, PHIDErrorEvent))
CleanupInterface(dev.contents.Handle)
ClearBuffer(dev.contents.Handle)
dev = None
#atexit.register(exitHandler)
getLoop().subscribeEvent(ExitEvent, exitHandler)
getLoop().subscribeEvent(RestartEvent, exitHandler)
# def pedalsTest(ev):
# log.info("got pedals event")
# getLoop().subscribeEvent(PedalsEvent, pedalsTest)