diff --git a/bridge/cmd/guardiand/adminserver.go b/bridge/cmd/guardiand/adminserver.go index 997831e60e..aace7430da 100644 --- a/bridge/cmd/guardiand/adminserver.go +++ b/bridge/cmd/guardiand/adminserver.go @@ -132,7 +132,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) { +func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { @@ -166,7 +166,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- logger: logger.Named("adminservice"), } - publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db) + publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db, gst) grpcServer := grpc.NewServer() nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService) diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index 693f172bbe..54a7fbc8c8 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -375,7 +375,7 @@ func runBridge(cmd *cobra.Command, args []string) { injectC := make(chan *vaa.VAA) // Guardian set state managed by processor - gst := &common.GuardianSetState{} + gst := common.NewGuardianSetState() // Load p2p private key var priv crypto.PrivKey @@ -394,13 +394,13 @@ func runBridge(cmd *cobra.Command, args []string) { // subscriber channel multiplexing for public gPRC streams rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger) - publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db) + publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db, gst) if err != nil { log.Fatal("failed to create publicrpc service socket", zap.Error(err)) } // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db) + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db, gst) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) } diff --git a/bridge/cmd/guardiand/publicrpc.go b/bridge/cmd/guardiand/publicrpc.go index 6abf723a96..c0e11f8dad 100644 --- a/bridge/cmd/guardiand/publicrpc.go +++ b/bridge/cmd/guardiand/publicrpc.go @@ -2,6 +2,7 @@ package guardiand import ( "fmt" + "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/db" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/publicrpc" @@ -11,7 +12,7 @@ import ( "net" ) -func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) { +func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { l, err := net.Listen("tcp", listenAddr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) @@ -19,7 +20,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String())) - rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db) + rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db, gst) grpcServer := grpc.NewServer() publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) diff --git a/bridge/pkg/common/guardianset.go b/bridge/pkg/common/guardianset.go index de76fc641d..ed08ece996 100644 --- a/bridge/pkg/common/guardianset.go +++ b/bridge/pkg/common/guardianset.go @@ -1,10 +1,13 @@ package common import ( + gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" "github.com/ethereum/go-ethereum/common" "sync" ) +// MaxGuardianCount specifies the maximum number of guardians supported by on-chain contracts. +// // Matching constants: // - MAX_LEN_GUARDIAN_KEYS in Solana contract (limited by transaction size - 19 is the maximum amount possible) // @@ -29,7 +32,7 @@ func (g *GuardianSet) KeysAsHexStrings() []string { return r } -// Get a given address index from the guardian set. Returns (-1, false) +// KeyIndex returns a given address index from the guardian set. Returns (-1, false) // if the address wasn't found and (addr, true) otherwise. func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) { for n, k := range g.Keys { @@ -44,6 +47,16 @@ func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) { type GuardianSetState struct { mu sync.Mutex current *GuardianSet + + // Last heartbeat message received per guardian. Maintained + // across guardian set updates - these values don't change. + lastHeartbeat map[common.Address]*gossipv1.Heartbeat +} + +func NewGuardianSetState() *GuardianSetState { + return &GuardianSetState{ + lastHeartbeat: map[common.Address]*gossipv1.Heartbeat{}, + } } func (st *GuardianSetState) Set(set *GuardianSet) { @@ -59,3 +72,18 @@ func (st *GuardianSetState) Get() *GuardianSet { return st.current } + +// LastHeartbeat returns the most recent heartbeat message received for +// a given guardian node, or nil if none have been received. +func (st *GuardianSetState) LastHeartbeat(addr common.Address) *gossipv1.Heartbeat { + st.mu.Lock() + defer st.mu.Unlock() + return st.lastHeartbeat[addr] +} + +// SetHeartBeat stores a verified heartbeat observed by a given guardian. +func (st *GuardianSetState) SetHeartBeat(addr common.Address, hb *gossipv1.Heartbeat) { + st.mu.Lock() + defer st.mu.Unlock() + st.lastHeartbeat[addr] = hb +} diff --git a/bridge/pkg/p2p/p2p.go b/bridge/pkg/p2p/p2p.go index 419dbbf7ba..dc4f94f971 100644 --- a/bridge/pkg/p2p/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -301,7 +301,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat zap.String("from", envelope.GetFrom().String())) break } - if heartbeat, err := processSignedHeartbeat(s, gs); err != nil { + if heartbeat, err := processSignedHeartbeat(s, gs, gst); err != nil { p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() logger.Warn("invalid signed heartbeat received", zap.Error(err), @@ -330,7 +330,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat } } -func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet) (*gossipv1.Heartbeat, error) { +func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState) (*gossipv1.Heartbeat, error) { envelopeAddr := common.BytesToAddress(s.GuardianAddr) idx, ok := gs.KeyIndex(envelopeAddr) if !ok { @@ -357,5 +357,8 @@ func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.Guard return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err) } + // Store verified heartbeat in global guardian set state. + gst.SetHeartBeat(signerAddr, &h) + return &h, nil } diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go index 2d15889b59..2ddba85907 100644 --- a/bridge/pkg/publicrpc/publicrpcserver.go +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/db" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" @@ -19,16 +20,41 @@ type PublicrpcServer struct { rawHeartbeatListeners *RawHeartbeatConns logger *zap.Logger db *db.Database + gst *common.GuardianSetState } -func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns, db *db.Database) *PublicrpcServer { +func NewPublicrpcServer( + logger *zap.Logger, + rawHeartbeatListeners *RawHeartbeatConns, + db *db.Database, + gst *common.GuardianSetState, +) *PublicrpcServer { return &PublicrpcServer{ rawHeartbeatListeners: rawHeartbeatListeners, logger: logger.Named("publicrpcserver"), db: db, + gst: gst, } } +func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv1.GetLastHeartbeatRequest) (*publicrpcv1.GetLastHeartbeatResponse, error) { + gs := s.gst.Get() + if gs == nil { + return nil, status.Error(codes.Unavailable, "guardian set not fetched from chain yet") + } + + resp := &publicrpcv1.GetLastHeartbeatResponse{ + RawHeartbeats: make(map[string]*gossipv1.Heartbeat), + } + + for _, addr := range gs.Keys { + hb := s.gst.LastHeartbeat(addr) + resp.RawHeartbeats[addr.Hex()] = hb + } + + return resp, nil +} + func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error { s.logger.Info("gRPC heartbeat stream opened by client") diff --git a/proto/publicrpc/v1/publicrpc.proto b/proto/publicrpc/v1/publicrpc.proto index 5883b1318e..1fdeea7a42 100644 --- a/proto/publicrpc/v1/publicrpc.proto +++ b/proto/publicrpc/v1/publicrpc.proto @@ -36,6 +36,15 @@ service Publicrpc { }; }; + // GetLastHeartbeats returns the last heartbeat received for each guardian node in the + // node's active guardian set. Heartbeats received by nodes not in the guardian set are ignored. + // The heartbeat value is null if no heartbeat has yet been received. + rpc GetLastHeartbeats (GetLastHeartbeatRequest) returns (GetLastHeartbeatResponse) { + option (google.api.http) = { + get: "/v1/heartbeats" + }; + } + rpc GetSignedVAA (GetSignedVAARequest) returns (GetSignedVAAResponse) { option (google.api.http) = { get: "/v1/signed_vaa/{message_id.emitter_chain}/{message_id.emitter_address}/{message_id.sequence}" @@ -53,3 +62,11 @@ message GetSignedVAARequest { message GetSignedVAAResponse { bytes vaa_bytes = 1; } + +message GetLastHeartbeatRequest { +} + +message GetLastHeartbeatResponse { + // Mapping of hex-encoded guardian addresses to raw heartbeat messages. + map raw_heartbeats = 1; +}