Skip to content

Commit

Permalink
Add support for batch observations
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 11, 2024
1 parent 7280db4 commit 11b8cb0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3,025 deletions.
46 changes: 39 additions & 7 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ func main() {

const (
GSM_signedObservation GossipMsgType = iota
GSM_signedObservationInBatch
GSM_signedObservationBatch
GSM_tbObservation
GSM_signedHeartbeat
GSM_signedVaaWithQuorum
Expand All @@ -253,6 +255,9 @@ func main() {
// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)

// Inbound observations
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)

Expand Down Expand Up @@ -320,7 +325,7 @@ func main() {

gossipMsgTable := table.NewWriter()
gossipMsgTable.SetOutputMirror(os.Stdout)
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.SetStyle(table.StyleColoredDark)

obsvRateTable := table.NewWriter()
Expand Down Expand Up @@ -391,6 +396,7 @@ func main() {
// Just count observations
go func() {
uniqueObs := map[string]struct{}{}
uniqueObsInBatch := map[string]struct{}{}
for {
select {
case <-rootCtx.Done():
Expand Down Expand Up @@ -421,7 +427,32 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
case b := <-batchObsvC:
// Peg the batch count.
addr := "0x" + string(hex.EncodeToString(b.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
gossipCounter[idx][GSM_signedObservationBatch]++
gossipCounter[totalsRow][GSM_signedObservationBatch]++

// Add these to the observations in batches.
gossipCounter[idx][GSM_signedObservationInBatch] += len(b.Msg.Observations)
gossipCounter[totalsRow][GSM_signedObservationInBatch] += len(b.Msg.Observations)

// Walk through and peg the unique observation count.
if *loadTesting {
for _, o := range b.Msg.Observations {
uniqueObsInBatch[hex.EncodeToString(o.Hash)] = struct{}{}
}
gossipCounter[uniqueRow][GSM_signedObservationInBatch] = len(uniqueObsInBatch)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -442,7 +473,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -474,7 +505,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -543,7 +574,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
if activeTable == 0 {
Expand Down Expand Up @@ -578,7 +609,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -599,7 +630,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -620,6 +651,7 @@ func main() {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
batchObsvC,
obsvReqC,
nil,
sendC,
Expand Down
Loading

0 comments on commit 11b8cb0

Please sign in to comment.