-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
83 lines (69 loc) · 2.15 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
var config = require('./config/config');
var AWS = require('aws-sdk');
var pg = require('pg');
// AWS Kinesis
AWS.config.update({
accessKeyId: config.aws_access_key,
secretAccessKey: config.aws_secret_key,
region: "ap-southeast-1"
});
var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
// PostGreSql Connection
const Client = pg.Client;
const client = new Client(config.getDBConfig());
client.connect();
// PostGreSql Notification
client.on('notification', function (msg) {
var recordData = [];
console.log("*========*");
if (msg.name === 'notification' && msg.channel === 'table_update') {
const pl = JSON.parse(msg.payload);
console.log("*========*");
if (typeof(pl) == "object" && "table" in pl) {
var data = {
table: "table_" + config.app_db_name + '_' + pl.table,
timestamp: Date.now()
};
if ("INSERT" in pl) {
data["operation"] = "insert";
data["payload"] = pl.INSERT;
}
else if ("UPDATE" in pl) {
data["operation"] = "update";
data["payload"] = pl.UPDATE;
}
else if ("DELETE" in pl) {
data["operation"] = "delete";
data["payload"] = {
id: pl.DELETE
};
}
else {
data = false;
}
if (data) {
var record = {
Data: JSON.stringify(data),
PartitionKey: 'partition-1'
};
recordData.push(record);
}
}
if (recordData.length > 0) {
kinesis.putRecords({
Records: recordData,
StreamName: config.stream_name
}, function (err, data) {
if (err) {
console.error(err);
}
else {
console.log(recordData);
recordData = [];
}
});
}
console.log("-========-");
}
});
client.query("LISTEN table_update");