-
Notifications
You must be signed in to change notification settings - Fork 77
/
metacode_derived_features.txt
112 lines (101 loc) · 4.84 KB
/
metacode_derived_features.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
metacode_derived_features.txt
Script to aggregate data
DolphinDB Inc.
DolphinDB server version: 2.00.6 2022.05.09
Last modification time: 2022.08.31
*/
/**
Attention:
1. The developer need to import level2 snapshot data into the database in advance
2. There is one place in the script that need to be modified according to the environment
*/
//login account
login("admin", "123456")
clearAllCache()
undef(all)
go
/**
part1: Load data from database
modified location 1: dbName and tableName
*/
stockList=`601318`600519`600036`600276`601166`600030`600887`600016`601328`601288`600000`600585`601398`600031`601668`600048
dbName = "dfs://SH_TSDB_snapshot_MultiColumn"
tableName = "snapshot"
snapshot = loadTable(dbName, tableName)
/**
part2: define functions
*/
def logReturn(s){
return log(s)-log(prev(s))
}
def realizedVolatility(s){
return sqrt(sum2(s))
}
//与pandas中的group by agg功能相同,传入字典(key为列名,value为functions),批量生成元编程代码
def createAggMetaCode(aggDict){
metaCode = []
metaCodeColName = []
for(colName in aggDict.keys()){
for(funcName in aggDict[colName])
{
metaCode.append!(sqlCol(colName, funcByName(funcName), colName + `_ + funcName$STRING))
metaCodeColName.append!(colName + `_ + funcName$STRING)
}
}
return metaCode, metaCodeColName$STRING
}
/**
part3: feature engineering
*/
features = {
"DateTime":[`count]
}
for( i in 0..9)
{
features["Wap"+i] = [`sum, `mean, `std]
features["LogReturn"+i] = [`sum, `realizedVolatility, `mean, `std]
features["LogReturnOffer"+i] = [`sum, `realizedVolatility, `mean, `std]
features["LogReturnBid"+i] = [`sum, `realizedVolatility, `mean, `std]
}
features["WapBalance"] = [`sum, `mean, `std]
features["PriceSpread"] = [`sum, `mean, `std]
features["BidSpread"] = [`sum, `mean, `std]
features["OfferSpread"] = [`sum, `mean, `std]
features["TotalVolume"] = [`sum, `mean, `std]
features["VolumeImbalance"] = [`sum, `mean, `std]
aggMetaCode, metaCodeColName = createAggMetaCode(features)
/**
part4: define aggregate function
*/
defg featureEngineering(DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, aggMetaCode){
wap = (BidPrice * OfferOrderQty + BidOrderQty * OfferPrice) \ (BidOrderQty + OfferOrderQty)
wapBalance = abs(wap[0] - wap[1])
priceSpread = (OfferPrice[0] - BidPrice[0]) \ ((OfferPrice[0] + BidPrice[0]) \ 2)
BidSpread = BidPrice[0] - BidPrice[1]
OfferSpread = OfferPrice[0] - OfferPrice[1]
totalVolume = OfferOrderQty.rowSum() + BidOrderQty.rowSum()
volumeImbalance = abs(OfferOrderQty.rowSum() - BidOrderQty.rowSum())
LogReturnWap = logReturn(wap)
LogReturnOffer = logReturn(OfferPrice)
LogReturnBid = logReturn(BidPrice)
subTable = table(DateTime as `DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, wap, wapBalance, priceSpread, BidSpread, OfferSpread, totalVolume, volumeImbalance, LogReturnWap, LogReturnOffer, LogReturnBid)
colNum = 0..9$STRING
colName = `DateTime <- (`BidPrice + colNum) <- (`BidOrderQty + colNum) <- (`OfferPrice + colNum) <- (`OfferOrderQty + colNum) <- (`Wap + colNum) <- `WapBalance`PriceSpread`BidSpread`OfferSpread`TotalVolume`VolumeImbalance <- (`LogReturn + colNum) <- (`LogReturnOffer + colNum) <- (`LogReturnBid + colNum)
subTable.rename!(colName)
subTable['BarDateTime'] = bar(subTable['DateTime'], 10m)
result = sql(select = aggMetaCode, from = subTable).eval().matrix()
result150 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 150*1000) >).eval().matrix()
result300 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 300*1000) >).eval().matrix()
result450 = sql(select = aggMetaCode, from = subTable, where = <time(DateTime) >= (time(BarDateTime) + 450*1000) >).eval().matrix()
return concatMatrix([result, result150, result300, result450])
}
/**
part5: meta conditions and run with timer
*/
whereConditions = [<date(DateTime) between 2021.01.03 : 2021.12.31>, <SecurityID in stockList>, <(time(DateTime) between 09:30:00.000 : 11:29:59.999) or (time(DateTime) between 13:00:00.000 : 14:56:59.999)>]
timer result = sql(select = sqlColAlias(<featureEngineering(DateTime,
matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9),
matrix(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9),
matrix(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9),
matrix(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9), aggMetaCode)>, metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450")), from = snapshot, where = whereConditions, groupBy = [<SecurityID>, <bar(DateTime, 10m) as DateTime>]).eval()