-
Notifications
You must be signed in to change notification settings - Fork 6
/
iotile_sync.js
214 lines (183 loc) · 7.19 KB
/
iotile_sync.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
const axios = require('axios');
const iotileConfig = require('./iotile.conf.json');
const mysql = require('mysql2');
const pe = require('parse-error');
const path = require('path');
const dotenvPath = path.resolve('..', '.env');
const dotenv = require('dotenv').config({ path: dotenvPath});
const moment = require('moment');
if (dotenv.error) {
return console.log(`.env file not found at '${dotenvPath}')}`);
}
const env = process.env.NODE_ENV || 'development';
const dbConfig = require(`./config/database`)[env];
// Clone the DB config and add additional options
const configs = {...dbConfig, connectionLimit: 50, supportBigNumbers: true, decimalNumbers: true};
// Get rid of those unnecessary properties
delete configs.username;
delete configs.dialect;
// hp stands for Handle Promise
hp = promise =>
promise
.then(data => [null, data])
.catch(err => [pe(err)]);
// TODO: Make sure the config file exists and contains the necesary info
// Get the token using the credentials from the config file
const getIotileToken = () => {
return new Promise((resolve, reject) => {
axios.post(`${iotileConfig.baseUrl}/auth/api-jwt-auth/`, {
username: iotileConfig.username,
password: iotileConfig.password
})
.then(result => {
resolve(result.data);
})
.catch(error => {
reject(error);
});
});
};
const selectLatestEntryQuery = `SELECT
*
FROM
reading
WHERE
kiosk_id = ? AND parameter_id = ?
AND sampling_site_id = ?
ORDER BY id DESC
LIMIT 1;
`;
// We need the latest entry so we know where to continue gathering
// data from. If there is none, it returns null
const getLatestEntry = (mapping, pool) => {
return new Promise((resolve, reject) => {
pool.getConnection((err, connection) => {
connection.query(selectLatestEntryQuery, [
mapping.kiosk_id,
mapping.parameter_id,
mapping.sampling_site_id
], (err, result) => {
connection.release();
if (err) return reject(err);
if (Array.isArray(result) && result.length > 0) {
return resolve(result);
} else {
return resolve(null);
}
});
});
});
};
// Pull stream data from the IoTile API server using the token received during login
// And the latest product flow meter entry date
const getStreamData = (token, lastSyncDate, mapping) => {
const axiosOptions = {
headers: {
Authorization: `JWT ${token}`,
'Content-Type': 'application/json'
}
};
// Make sure to only get data after the last sync, if any
if (lastSyncDate) {
axiosOptions['params'] = {
start: lastSyncDate
};
}
return new Promise((resolve, reject) => {
axios.get(`${iotileConfig.baseUrl}/stream/${mapping.device_stream_id}/data/`, axiosOptions)
.then(result => {
resolve(result.data);
})
.catch(error => {
reject(error);
});
});
};
// Calculate the total from the IoTile stream data and add it to the previous reading value
const doAccumulation = (dbValue, iotileData) => {
let syncDate = null;
const total = iotileData.reduce((total, input, index) => {
if (iotileData.length === index + 1) syncDate = input.timestamp;
return input.value;
}, 0);
return {
syncDate,
value: total + dbValue
};
};
const insertNewReadingQuery = `INSERT INTO
reading (created_at, kiosk_id, parameter_id, sampling_site_id, value, user_id)
VALUES (?, ?, ?, ?, ?, ?);
`;
const getIoTileUserQuery = `SELECT id FROM user WHERE username = 'iotile'`;
// Enter the data received from IoTile API server into the SEMA schema
const syncReading = (readingData, mapping, pool) => {
return new Promise((resolve, reject) => {
pool.getConnection(async (err, connection) => {
connection.query(getIoTileUserQuery, (err, user) => {
if (err) return reject(err);
connection.query(insertNewReadingQuery, [
moment(readingData.syncDate).format('YYYY-MM-DD HH:mm:ss'),
mapping.kiosk_id,
mapping.parameter_id,
mapping.sampling_site_id,
readingData.value,
user[0].id
], err => {
connection.release();
resolve();
if (err) return reject(err);
});
});
});
});
};
// TODO: Handle errors
iotileConfig.mappings.forEach(async mapping => {
// Skip inactive devices
if (!mapping.active) return console.log(`${mapping.slug}: Device inactive or not setup yet`);
console.log(`${mapping.slug}: Begin data synchronization`)
// The connection pool to use for database calls
let pool = mysql.createPool(configs);
// Get the latest entry and ultimately the latest sync date
const [err, latestEntry] = await hp(getLatestEntry(mapping, pool));
const lastSyncDate = latestEntry && latestEntry[0] && latestEntry[0].created_at;
// TODO: save the received token to local storage and check its validity before calling for
// a new one. For now, it's getting a new token every 10 minutes
const [err2, {token}] = await hp(getIotileToken());
// Pull the data from IoTile using the token and the latest sync date
const [err3, streamData] = await hp(getStreamData(token, lastSyncDate, mapping));
// Get rid of the first value of the stream data if it's of the exact same
// date as the last sync date because currently, IoTile includes
// streams with the same date as the one specified in the "start" parameter
// Make sure to only remove it if it's not the first sync ever.
if (streamData.count && moment(streamData.results[0].timestamp).isSame(lastSyncDate)) {
streamData.results.splice(0, 1);
}
// No need to do anything when there's no new readings from IoTile
if (streamData.results && !streamData.results.length) {
pool.end();
return console.log(`${mapping.slug}: No new data`);
}
// If stream data is an accumulation, we use the total of the last values entered for it
// to create the next reading. If not, we upload each as a reading value.
if (mapping.is_accumulation) {
const accumulationData = doAccumulation(latestEntry && latestEntry[0] && latestEntry[0].value, streamData.results);
await syncReading(accumulationData, mapping, pool);
pool.end();
console.log(`${mapping.slug}: Accumulation synced successfully`)
} else {
streamData.results.forEach(async (stream, index) => {
const readingData = {
value: stream.value,
syncDate: stream.timestamp
};
await syncReading(readingData, mapping, pool);
pool.end();
if (streamData.results.length === index + 1) {
console.log(`${mapping.slug}: Synced ${index + 1} new readings`);
}
});
}
console.log(`${mapping.slug}: DONE`)
});