Skip to content

Commit

Permalink
Make other tools use batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 22, 2024
1 parent d2171d3 commit 59d9801
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 16 deletions.
35 changes: 34 additions & 1 deletion fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func main() {

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -338,7 +339,7 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
Expand Down Expand Up @@ -367,6 +368,37 @@ func main() {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
}
gossipLock.Unlock()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
spl := strings.Split(o.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
gossipCounter[totalsRow][GSM_tbObservation]++
}
if handleObsv(uint(idx)) {
obsvRateTable.ResetRows()
for i := 0; i < numGuardians; i++ {
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount, obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2], obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5], obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8], obsvRateRows[uint(i)].percents[9]})
}
}
gossipCounter[idx][GSM_signedObservation]++
gossipCounter[totalsRow][GSM_signedObservation]++

if *loadTesting {
uniqueObs[hex.EncodeToString(o.Hash)] = struct{}{}
gossipCounter[uniqueRow][GSM_signedObservation] = len(uniqueObs)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
}
gossipLock.Unlock()
}
}
}
}()
Expand Down Expand Up @@ -568,6 +600,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
18 changes: 17 additions & 1 deletion fly/cmd/observation_stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Guardian set state managed by processor
gst := common.NewGuardianSetState(nil)
Expand All @@ -92,7 +93,7 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
if o.Msg.MessageId[:3] != "26/" && o.Msg.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
// logger.Warn("observation", zap.String("id",o.MessageId), zap.String("addr",ga))
Expand All @@ -104,6 +105,20 @@ func main() {
}
logger.Warn("status", zap.String("id", o.Msg.MessageId), zap.Any("msg", obsvByHash[o.Msg.MessageId]))
}
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
if o.MessageId[:3] != "26/" && o.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(batch.Msg.Addr).String()
// logger.Warn("observation", zap.String("id",o.MessageId), zap.String("addr",ga))
if _, ok := obsvByHash[o.MessageId]; !ok {
obsvByHash[o.MessageId] = map[string]time.Time{}
}
if _, ok := obsvByHash[o.MessageId][ga]; !ok {
obsvByHash[o.MessageId][ga] = time.Now()
}
logger.Warn("status", zap.String("id", o.MessageId), zap.Any("msg", obsvByHash[o.MessageId]))
}
}
}
}
}()
Expand All @@ -127,6 +142,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
Expand Down
26 changes: 25 additions & 1 deletion fly/cmd/prom_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func main() {

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -247,7 +248,7 @@ func main() {
afterCount := len(uniqueObs)
logger.Info("Cleaned up unique observations cache", zap.Int("beforeCount", beforeCount), zap.Int("afterCount", afterCount), zap.Int("cleanedUpCount", beforeCount-afterCount))
timer.Reset(delay)
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
gossipByType.WithLabelValues("observation").Inc()
spl := strings.Split(o.Msg.MessageId, "/")
chain, err := parseChainID(spl[0])
Expand All @@ -270,6 +271,28 @@ func main() {
uniqueObservationsCounter.Inc()
}
uniqueObs[hash] = time.Now()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
gossipByType.WithLabelValues("observation").Inc()
spl := strings.Split(o.MessageId, "/")
chain, err := parseChainID(spl[0])
if err != nil {
chain = vaa.ChainIDUnset
}
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
name := guardianIndexToNameMap[idx]
observationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
if knownEmitters[emitter] {
tbObservationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
}
hash := hex.EncodeToString(o.Hash)
if _, exists := uniqueObs[hash]; exists {
uniqueObservationsCounter.Inc()
}
uniqueObs[hash] = time.Now()
}
}
}
}()
Expand Down Expand Up @@ -406,6 +429,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
6 changes: 5 additions & 1 deletion fly/cmd/track_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -127,8 +128,10 @@ func main() {
select {
case <-rootCtx.Done():
return
case <-obsvC:
case <-obsvC: // TODO: Rip out this code once we cut over to batching.
numObs++
case batch := <-batchObsvC:
numObs += len(batch.Msg.Observations)
case <-signedInC:
numSigned++
case <-obsvReqC:
Expand Down Expand Up @@ -191,6 +194,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
26 changes: 19 additions & 7 deletions fly/cmd/track_pyth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
Expand Down Expand Up @@ -159,8 +160,18 @@ func main() {
select {
case <-rootCtx.Done():
return
case m := <-obsvC:
handleObservation(logger, gs, m.Msg)
case m := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: m.Msg.Hash,
Signature: m.Msg.Signature,
TxHash: m.Msg.TxHash,
MessageId: m.Msg.MessageId,
}
handleObservation(logger, gs, m.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
handleObservation(logger, gs, batch.Msg.Addr, o)
}
}
}
}()
Expand Down Expand Up @@ -225,6 +236,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
)
if err != nil {
Expand Down Expand Up @@ -350,7 +362,7 @@ func handleSignedVAAWithQuorum(logger *zap.Logger, gs common.GuardianSet, m *gos
}
}

func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.SignedObservation) {
func handleObservation(logger *zap.Logger, gs common.GuardianSet, addr []byte, m *gossipv1.Observation) {
hash := hex.EncodeToString(m.Hash)

// Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
Expand All @@ -360,20 +372,20 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.Si
logger.Warn("failed to verify signature on observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.Error(err))
return
}

// Verify that m.Addr matches the public key that signed m.Hash.
their_addr := eth_common.BytesToAddress(m.Addr)
their_addr := eth_common.BytesToAddress(addr)
signer_pk := eth_common.BytesToAddress(eth_crypto.Keccak256(pk[1:])[12:])

if their_addr != signer_pk {
logger.Warn("invalid observation - address does not match pubkey",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.String("pk", signer_pk.Hex()))
return
}
Expand All @@ -398,7 +410,7 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.Si
logger.Debug("received observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.String("txhash", hex.EncodeToString(m.TxHash)),
zap.String("txhash_b58", base58.Encode(m.TxHash)),
zap.String("message_id", m.MessageId),
Expand Down
4 changes: 1 addition & 3 deletions fly/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
cloud.google.com/go/pubsub v1.33.0
firebase.google.com/go v3.13.0+incompatible
github.com/buger/goterm v1.0.4
github.com/certusone/wormhole/node v0.0.0-20240507160701-1c858d355869
github.com/certusone/wormhole/node v0.0.0-20240820223034-c60e755908cf
github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203
github.com/ethereum/go-ethereum v1.10.26
github.com/ipfs/go-log/v2 v2.5.1
Expand Down Expand Up @@ -279,5 +279,3 @@ replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alp
replace github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240507160701-1c858d355869 => github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240820223034-c60e755908cf

replace github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000 => github.com/wormhole-foundation/wormhole/wormchain v0.0.0-20221107132100-09459fcf9e67

replace github.com/certusone/wormhole/node v0.0.0-20240507160701-1c858d355869 => github.com/wormhole-foundation/wormhole/node v0.0.0-20240820223034-c60e755908cf
4 changes: 2 additions & 2 deletions fly/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/certusone/wormhole/node v0.0.0-20240820223034-c60e755908cf h1:VTANDF76BwrGS9JfF/Pktdt3QuNM8O+7MnTBpzZOvuE=
github.com/certusone/wormhole/node v0.0.0-20240820223034-c60e755908cf/go.mod h1:nynfZl9OpSSvYpolu16jbLRrTia5MeArcBNDP7B3Twc=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down Expand Up @@ -1038,8 +1040,6 @@ github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSD
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/wormhole-foundation/wormhole/node v0.0.0-20240820223034-c60e755908cf h1:Erv1URRpPZVNdLzPV1M7KFPAMEyVCFIVJaK55XCQqCk=
github.com/wormhole-foundation/wormhole/node v0.0.0-20240820223034-c60e755908cf/go.mod h1:nynfZl9OpSSvYpolu16jbLRrTia5MeArcBNDP7B3Twc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240820223034-c60e755908cf h1:QDnA8nGXraYb0jNlQez6cmu3CHHQsh66IbCB66QFGw8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240820223034-c60e755908cf/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand Down

0 comments on commit 59d9801

Please sign in to comment.