forked from pleboeuf/ErabliCollecteur
-
Notifications
You must be signed in to change notification settings - Fork 0
/
command.js
111 lines (105 loc) · 3.91 KB
/
command.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
exports.CommandHandler = function (db, blacklist) {
function isBlacklisted(row) {
const blacklisted = blacklist.find(function (black) {
return (
(!black.device || black.device === row.device_id) &&
(!black.timestampUntil || black.timestampUntil < row.published_at)
);
});
if (blacklisted) {
// console.log("Blacklisted row", row.device_id, row.published_at, blacklisted.reason);
return true;
}
}
function handleQuery(command, connection) {
const hasDeviceParam = (typeof command.device !== "undefined");
const hasGenerationParam = (typeof command.generation !== "undefined");
const hasAfterParam = (typeof command.after !== "undefined");
if (hasAfterParam && !hasDeviceParam) {
connection.send(JSON.stringify({
"error": "parameter 'device' is mandatory with 'after' parameter"
}));
}
if (hasAfterParam && !hasGenerationParam) {
connection.send(JSON.stringify({
"error": "parameter 'generation' is mandatory with 'after' parameter"
}));
}
var sql = "select * from raw_events";
var params = [];
if (hasDeviceParam) {
sql = sql + " where device_id = ?";
params.push(command.device);
}
if (hasAfterParam) {
sql = sql + " and serial_no > ?";
params.push(command.after);
}
if (hasGenerationParam) {
sql = sql + " and generation_id = ?";
params.push(command.generation);
}
sql = sql + " order by generation_id, serial_no";
// TODO Abort query when connection closes.
const iterator = db.prepare(sql).iterate(params);
command.sent = 0;
function doneSending() {
console.log("Completed.", command);
connection.send(JSON.stringify({
name: "collector/querycomplete",
data: {
command: command,
sql: sql
}
}));
}
function sendNext() {
const elem = iterator.next();
if (elem.done) {
doneSending();
db.close();
return;
}
const row = elem.value;
if (isBlacklisted(row)) {
// Ignore
sendNext();
} else if (row.published_at === null || row.generation_id === null) {
// console.log("Skipping invalid row", row);
sendNext();
} else {
const data = JSON.parse(row.raw_data);
data.generation = row.generation_id;
data.noSerie = row.serial_no;
const event = {
"coreid": row.device_id,
"published_at": new Date(row.published_at),
"name": data.eName,
"data": JSON.stringify(data)
// "context": {command: command, sql: sql, row: row}
};
command.sent += 1;
connection.send(JSON.stringify(event), sendNext);
}
}
sendNext();
}
return {
// format: { "command" : "query", "device": "device-id", "generation" : 0, "after": 0 }
"onCommand": function (command, connection) {
if (command.command === "subscribe") {
connection.subscribed = true;
} else if (command.command === "query") {
handleQuery(command, connection);
} else {
connection.send(JSON.stringify({
name: "collector/error",
data: {
message: "command not supported",
command: command
}
}));
}
}
};
};