Skip to content

Commit

Permalink
node/pkg/ethereum: poll for guardian set changes
Browse files Browse the repository at this point in the history
The new set of Eth contracts no longer emits an event.

Change-Id: I6c3654c88960b08b5548ed72cf09e555b079ef3a
  • Loading branch information
Leo committed Aug 10, 2021
1 parent 08e70a5 commit 212e04a
Showing 1 changed file with 47 additions and 75 deletions.
122 changes: 47 additions & 75 deletions bridge/pkg/ethereum/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ var (
Name: "wormhole_eth_messages_confirmed_total",
Help: "Total number of Eth messages verified (post-confirmation)",
}, []string{"eth_network"})
guardianSetChangesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_guardian_set_changes_confirmed_total",
Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)",
}, []string{"eth_network"})
currentEthHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "wormhole_eth_current_height",
Expand Down Expand Up @@ -155,34 +150,54 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
}

// Subscribe to guardian set changes
guardianSetC := make(chan *abi.AbiGuardianSetAdded, 2)
guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
}
// 0 is a valid guardian set, so we need a nil value here
var currentGuardianSet *uint32

// Get initial validator set.
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
}
logger.Info("initial guardian set fetched",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))

if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
// Poll for guardian set.
go func() {
t := time.NewTicker(15 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
msm := time.Now()
logger.Info("fetching guardian set")
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
logger.Error("failed requesting guardian set",
zap.Error(err), zap.String("eth_network", e.networkName))
cancel()
continue
}

queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())

cancel()

if currentGuardianSet != nil && *currentGuardianSet == idx {
continue
}

logger.Info("updated guardian set found",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))

currentGuardianSet = &idx

if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
}
}
}
}
}()

errC := make(chan error)
go func() {
Expand All @@ -195,11 +210,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case err := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case ev := <-messageC:
// Request timestamp for block
msm := time.Now()
Expand Down Expand Up @@ -237,34 +247,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
height: ev.Raw.BlockNumber,
}
e.pendingMu.Unlock()
case ev := <-guardianSetC:
logger.Info("guardian set has changed, fetching new value",
zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName))

guardianSetChangesConfirmed.WithLabelValues(e.networkName).Inc()

msm := time.Now()
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.Index)
cancel()
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
if err != nil {
// We failed to process the guardian set update and are now out of sync with the chain.
// Recover by crashing the runnable, which causes the guardian set to be re-fetched.
errC <- fmt.Errorf("error requesting new guardian set value for %d: %w", ev.Index, err)
return
}

logger.Info("new guardian set fetched",
zap.Any("value", gs), zap.Uint32("index", ev.Index),
zap.String("eth_network", e.networkName))

if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: ev.Index,
}
}
}
}
}()
Expand Down Expand Up @@ -338,17 +320,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
}

// Fetch the current guardian set ID and guardian set from the chain.
func FetchCurrentGuardianSet(ctx context.Context, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.StructsGuardianSet, error) {
c, err := ethclient.DialContext(ctx, rpcURL)
if err != nil {
return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
}

caller, err := abi.NewAbiCaller(bridgeContract, c)
if err != nil {
panic(err)
}

func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) {
opts := &bind.CallOpts{Context: ctx}

currentIndex, err := caller.GetCurrentGuardianSetIndex(opts)
Expand Down

0 comments on commit 212e04a

Please sign in to comment.