forked from dolphindb/Tutorials_CN
-
Notifications
You must be signed in to change notification settings - Fork 0
/
csi_1000.dos
84 lines (75 loc) · 3.81 KB
/
csi_1000.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use ops
//清理流数据环境
def clearEnv(){
try{
unsubscribeAll()
go
clearTablePersistence(objByName("trade"))
undef("trade", SHARED)
undef("tradeAmountIndex", SHARED)
undef("data")
undef("weightDict")
dropAggregator("csEngine")
dropAggregator("rsEngine")
dropAggregator("tsEngine")
}catch(err){
print err
}
}
//选取并创建1000个股票的权重因子
def createWeightDict(constituentCsvPath){
return dict(loadText(constituentCsvPath).SecurityID, take(0.001, 1000))
}
//流数据订阅
def prepareSub(weightDict){
//创建流数据订阅的共享表
tradeTemp = streamTable(2000:0, `TradeTime`SecurityID`TradePrice`TradeQty`TradeBSFlag, [TIMESTAMP, SYMBOL,DOUBLE,DOUBLE,SYMBOL])
try{ enableTableShareAndPersistence(table=tradeTemp, tableName="trade", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("tradeTemp")
tsEngineDummy = table(1:0, `TradeTime`SecurityID`SellTradeAmount`BuyTradeAmount, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
rsEngineDummy = table(1:0, `SecurityID`TradeTime`SellTradeAmount`BuyTradeAmount, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE])
//创建存储结果的表
share(keyedTable(`Time, 2000:0, `Time`SellTradeAmount`BuyTradeAmount`UpdateTime, [TIMESTAMP, DOUBLE, DOUBLE, TIMESTAMP]), "tradeAmountIndex")
go
//创建横截面引擎
csEngine = createCrossSectionalEngine(name="csEngine", metrics=<[wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()]>, dummyTable=rsEngineDummy, outputTable=objByName("tradeAmountIndex"), keyColumn=`SecurityID, triggeringPattern="keyCount", triggeringInterval=1000, useSystemTime=false, timeColumn=`TradeTime, lastBatchOnly=false)
//创建响应式状态引擎
rsEngine = createReactiveStateEngine(name="rsEngine", metrics=<[cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)]>, dummyTable=tsEngineDummy, outputTable=csEngine, keyColumn=`SecurityID)
//创建时间序列引擎
tsEngine = createTimeSeriesEngine(name="tsEngine", windowSize=60000, step=60000, metrics=<[sum(iif(TradeBSFlag=="0", 1, 0)*TradeQty*TradePrice), sum(iif(TradeBSFlag=="1", 1, 0)*TradeQty*TradePrice)]>, dummyTable=objByName("trade"), outputTable=rsEngine, timeColumn=`TradeTime, keyColumn=`SecurityID, useWindowStartTime=true, fill=[0, 0], forceTriggerTime=100)
//订阅流数据表
subscribeTable(tableName="trade", actionName="act_tsEngine", offset=0, handler=append!{tsEngine}, msgAsTable=true, batchSize=10000, throttle=0.001)
}
//加载数据
def loadData(dataCsvPath) {
schemaTb=extractTextSchema(dataCsvPath)
update schemaTb set type=`SYMBOL where name=`TradeBSFlag
return loadText(dataCsvPath,, schemaTb)
}
//提交回放任务
def replayJob(data, weightDict){
filteredData = select TradeTime, SecurityID, TradePrice, TradeQty, TradeBSFlag from data where SecurityID in weightDict order by TradeTime
submitJob("replayJob", "replay at the maximum speed", replay{filteredData, objByName("trade")})
}
//访问database需要权限,连接datanode后执行一次login()即可
login("admin", "123456")
clearEnv()
weightDict = createWeightDict("/your/path/to/constituent.csv")
prepareSub(weightDict)
data = loadData("/your/path/to/data.csv")
replayJob(data, weightDict)
/*
* 结果查询:
select top 10 Time, SellTradeAmount, BuyTradeAmount, UpdateTime from tradeAmountIndex where UpdateTime >= datetimeAdd(now(),-10s)
select top 1000 Time, SellTradeAmount, BuyTradeAmount, UpdateTime from tradeAmountIndex
*/
/*
* 常用监控函数
getRecentJobs()//查询最近的任务
cancelJob("replayJob202209150003") //取消任务,jobID通过getRecentJobs()获取
getStreamingStat().subWorkers //查询订阅状态
getStreamingStat().pubConns //发布链接情况
getStreamingStat().pubTables //发布表
getStreamingStat().subConns //订阅链接数
*/