-
Notifications
You must be signed in to change notification settings - Fork 77
/
01.创建存储快照数据的库表并导入数据.txt
89 lines (84 loc) · 5.53 KB
/
01.创建存储快照数据的库表并导入数据.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//login account
login("admin", "123456")
/**
part1: create database and table
*/
def createDfsTb(dbName, tbName){
//create database
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 30])
db = database(dbName, COMPO, [db1, db2], , "TSDB")
//create table
schemaTable = table(
array(SYMBOL, 0) as SecurityID,
array(TIMESTAMP, 0) as DateTime,
array(DOUBLE, 0) as PreClosePx,
array(DOUBLE, 0) as OpenPx,
array(DOUBLE, 0) as HighPx,
array(DOUBLE, 0) as LowPx,
array(DOUBLE, 0) as LastPx,
array(INT, 0) as TotalVolumeTrade,
array(DOUBLE, 0) as TotalValueTrade,
array(SYMBOL, 0) as InstrumentStatus,
array(DOUBLE[], 0) as BidPrice,
array(INT[], 0) as BidOrderQty,
array(INT[], 0) as BidNumOrders,
array(INT[], 0) as BidOrders,
array(DOUBLE[], 0) as OfferPrice,
array(INT[], 0) as OfferOrderQty,
array(INT[], 0) as OfferNumOrders,
array(INT[], 0) as OfferOrders,
array(INT, 0) as NumTrades,
array(DOUBLE, 0) as IOPV,
array(INT, 0) as TotalBidQty,
array(INT, 0) as TotalOfferQty,
array(DOUBLE, 0) as WeightedAvgBidPx,
array(DOUBLE, 0) as WeightedAvgOfferPx,
array(INT, 0) as TotalBidNumber,
array(INT, 0) as TotalOfferNumber,
array(INT, 0) as BidTradeMaxDuration,
array(INT, 0) as OfferTradeMaxDuration,
array(INT, 0) as NumBidOrders,
array(INT, 0) as NumOfferOrders,
array(INT, 0) as WithdrawBuyNumber,
array(INT, 0) as WithdrawBuyAmount,
array(DOUBLE, 0) as WithdrawBuyMoney,
array(INT, 0) as WithdrawSellNumber,
array(INT, 0) as WithdrawSellAmount,
array(DOUBLE, 0) as WithdrawSellMoney,
array(INT, 0) as ETFBuyNumber,
array(INT, 0) as ETFBuyAmount,
array(DOUBLE, 0) as ETFBuyMoney,
array(INT, 0) as ETFSellNumber,
array(INT, 0) as ETFSellAmount,
array(DOUBLE, 0) as ETFSellMoney
)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
dbName, tbName = "dfs://SH_TSDB_snapshot_ArrayVector", "snapshot"
createDfsTb(dbName, tbName)
/**
part2: load data
*/
def transform(t){
temp = select lpad(string(SecurityID), 6, "0") as SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus,
fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice,
fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty,
fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders,
fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders,
fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice,
fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferOrderQty,
fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders,
fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders,
NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration,
NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney,WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount,
ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney
from t
return temp
}
csvDataPath = "/home/v2/下载/data/20211201snapshot_30stocks.csv"
loadTextEx(dbHandle=database(dbName), tableName=tbName, partitionColumns=`DateTime`SecurityID, filename=csvDataPath, transform=transform)
select count(*) from loadTable(dbName, tbName) group by date(DateTime) as TradeDate