-
Notifications
You must be signed in to change notification settings - Fork 8
/
04.sendMsgToKafka.txt
77 lines (68 loc) · 3.13 KB
/
04.sendMsgToKafka.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
publishToKafka.txt
Script to publish to kafka streaming services with DolphinDB Kafka Plugin
DolphinDB Inc.
DolphinDB server version: 2.00.6 2022.05.31
Storage engine: TSDB
Last modification time: 2022.07.07
*/
//login account
login("admin", "123456")
//create stream table: messageStream
def createStreamTableFunc(){
colName = `timestamp`source`msg
colType = [TIMESTAMP,SYMBOL, BLOB]
messageTemp = streamTable(1000000:0, colName, colType)
enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000)
messageTemp = NULL
}
createStreamTableFunc()
go
//load kafka plugin
loadPlugin("/yourPluginsPath/kafka/PluginKafka.txt")
go
//initialize kafka producer
def initKafkaProducerFunc(metadataBrokerList){
producerCfg = dict(STRING, ANY)
producerCfg["metadata.broker.list"] = metadataBrokerList
return kafka::producer(producerCfg)
}
producer = initKafkaProducerFunc("localhost")
//publish to kafka topic
def sendMsgToKafkaFunc(dataType, producer, msg){
startTime = now()
try {
kafka::produce(producer, "topic-message", 1, msg, true)
cost = now() - startTime
writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")
}
catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}
}
//register stream computing engine and subscribe the stream tables
def filterAndParseStreamFunc(producer){
filter1 = dict(STRING,ANY)
filter1["condition"] = "order"
filter1["handler"] = sendMsgToKafkaFunc{"order", producer}
filter2 = dict(STRING,ANY)
filter2["condition"] = "trade"
filter2["handler"] = sendMsgToKafkaFunc{"trade", producer}
filter3 = dict(STRING,ANY)
filter3["condition"] = "snapshot"
filter3["handler"] = sendMsgToKafkaFunc{"snapshot", producer}
schema = dict(["order","trade", "snapshot"], [loadTable("dfs://order", "order"), loadTable("dfs://trade", "trade"), loadTable("dfs://snapshot", "snapshot")])
engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2, filter3], msgSchema=schema)
subscribeTable(tableName="messageStream", actionName="sendMsgToKafka", offset=-1, handler=engine, msgAsTable=true, reconnect=true)
}
filterAndParseStreamFunc(producer)
//replay history data
def replayStockMarketData(){
timeRS = cutPoints(09:15:00.000..15:00:00.000, 100)
orderDS = replayDS(sqlObj=<select * from loadTable("dfs://order", "order") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS)
inputDict = dict(["order", "trade", "snapshot"], [orderDS, tradeDS, snapshotDS])
submitJob("replay", "replay stock market", replay, inputDict, messageStream, `Date, `Time, 20000, true, 3)
}
replayStockMarketData()
//getRecentJobs()
//cancelJob("your jobId")