Skip to content

Commit

Permalink
node: add GetLastHeartbeats RPC call
Browse files Browse the repository at this point in the history
This aggregates verified guardian heartbeats server-side so they
can be fetched via unary calls.

Change-Id: I8458b139bb5d75f87ed700b50684a5ff8ca594fa
  • Loading branch information
Leo committed Aug 4, 2021
1 parent 952a9d9 commit 82731c2
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 11 deletions.
4 changes: 2 additions & 2 deletions bridge/cmd/guardiand/adminserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
}

func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
Expand Down Expand Up @@ -166,7 +166,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
logger: logger.Named("adminservice"),
}

publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db)
publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db, gst)

grpcServer := grpc.NewServer()
nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService)
Expand Down
6 changes: 3 additions & 3 deletions bridge/cmd/guardiand/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func runBridge(cmd *cobra.Command, args []string) {
injectC := make(chan *vaa.VAA)

// Guardian set state managed by processor
gst := &common.GuardianSetState{}
gst := common.NewGuardianSetState()

// Load p2p private key
var priv crypto.PrivKey
Expand All @@ -394,13 +394,13 @@ func runBridge(cmd *cobra.Command, args []string) {

// subscriber channel multiplexing for public gPRC streams
rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger)
publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db)
publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db, gst)
if err != nil {
log.Fatal("failed to create publicrpc service socket", zap.Error(err))
}

// local admin service socket
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db)
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db, gst)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions bridge/cmd/guardiand/publicrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package guardiand

import (
"fmt"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/db"
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/bridge/pkg/publicrpc"
Expand All @@ -11,15 +12,15 @@ import (
"net"
)

func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen: %w", err)
}

logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))

rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db)
rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db, gst)
grpcServer := grpc.NewServer()
publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer)

Expand Down
30 changes: 29 additions & 1 deletion bridge/pkg/common/guardianset.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package common

import (
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"sync"
)

// MaxGuardianCount specifies the maximum number of guardians supported by on-chain contracts.
//
// Matching constants:
// - MAX_LEN_GUARDIAN_KEYS in Solana contract (limited by transaction size - 19 is the maximum amount possible)
//
Expand All @@ -29,7 +32,7 @@ func (g *GuardianSet) KeysAsHexStrings() []string {
return r
}

// Get a given address index from the guardian set. Returns (-1, false)
// KeyIndex returns a given address index from the guardian set. Returns (-1, false)
// if the address wasn't found and (addr, true) otherwise.
func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
for n, k := range g.Keys {
Expand All @@ -44,6 +47,16 @@ func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
type GuardianSetState struct {
mu sync.Mutex
current *GuardianSet

// Last heartbeat message received per guardian. Maintained
// across guardian set updates - these values don't change.
lastHeartbeat map[common.Address]*gossipv1.Heartbeat
}

func NewGuardianSetState() *GuardianSetState {
return &GuardianSetState{
lastHeartbeat: map[common.Address]*gossipv1.Heartbeat{},
}
}

func (st *GuardianSetState) Set(set *GuardianSet) {
Expand All @@ -59,3 +72,18 @@ func (st *GuardianSetState) Get() *GuardianSet {

return st.current
}

// LastHeartbeat returns the most recent heartbeat message received for
// a given guardian node, or nil if none have been received.
func (st *GuardianSetState) LastHeartbeat(addr common.Address) *gossipv1.Heartbeat {
st.mu.Lock()
defer st.mu.Unlock()
return st.lastHeartbeat[addr]
}

// SetHeartBeat stores a verified heartbeat observed by a given guardian.
func (st *GuardianSetState) SetHeartBeat(addr common.Address, hb *gossipv1.Heartbeat) {
st.mu.Lock()
defer st.mu.Unlock()
st.lastHeartbeat[addr] = hb
}
7 changes: 5 additions & 2 deletions bridge/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
zap.String("from", envelope.GetFrom().String()))
break
}
if heartbeat, err := processSignedHeartbeat(s, gs); err != nil {
if heartbeat, err := processSignedHeartbeat(s, gs, gst); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Warn("invalid signed heartbeat received",
zap.Error(err),
Expand Down Expand Up @@ -330,7 +330,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
}
}

func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet) (*gossipv1.Heartbeat, error) {
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState) (*gossipv1.Heartbeat, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
if !ok {
Expand All @@ -357,5 +357,8 @@ func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.Guard
return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
}

// Store verified heartbeat in global guardian set state.
gst.SetHeartBeat(signerAddr, &h)

return &h, nil
}
28 changes: 27 additions & 1 deletion bridge/pkg/publicrpc/publicrpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/db"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
Expand All @@ -19,16 +20,41 @@ type PublicrpcServer struct {
rawHeartbeatListeners *RawHeartbeatConns
logger *zap.Logger
db *db.Database
gst *common.GuardianSetState
}

func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns, db *db.Database) *PublicrpcServer {
func NewPublicrpcServer(
logger *zap.Logger,
rawHeartbeatListeners *RawHeartbeatConns,
db *db.Database,
gst *common.GuardianSetState,
) *PublicrpcServer {
return &PublicrpcServer{
rawHeartbeatListeners: rawHeartbeatListeners,
logger: logger.Named("publicrpcserver"),
db: db,
gst: gst,
}
}

func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv1.GetLastHeartbeatRequest) (*publicrpcv1.GetLastHeartbeatResponse, error) {
gs := s.gst.Get()
if gs == nil {
return nil, status.Error(codes.Unavailable, "guardian set not fetched from chain yet")
}

resp := &publicrpcv1.GetLastHeartbeatResponse{
RawHeartbeats: make(map[string]*gossipv1.Heartbeat),
}

for _, addr := range gs.Keys {
hb := s.gst.LastHeartbeat(addr)
resp.RawHeartbeats[addr.Hex()] = hb
}

return resp, nil
}

func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error {
s.logger.Info("gRPC heartbeat stream opened by client")

Expand Down
17 changes: 17 additions & 0 deletions proto/publicrpc/v1/publicrpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ service Publicrpc {
};
};

// GetLastHeartbeats returns the last heartbeat received for each guardian node in the
// node's active guardian set. Heartbeats received by nodes not in the guardian set are ignored.
// The heartbeat value is null if no heartbeat has yet been received.
rpc GetLastHeartbeats (GetLastHeartbeatRequest) returns (GetLastHeartbeatResponse) {
option (google.api.http) = {
get: "/v1/heartbeats"
};
}

rpc GetSignedVAA (GetSignedVAARequest) returns (GetSignedVAAResponse) {
option (google.api.http) = {
get: "/v1/signed_vaa/{message_id.emitter_chain}/{message_id.emitter_address}/{message_id.sequence}"
Expand All @@ -53,3 +62,11 @@ message GetSignedVAARequest {
message GetSignedVAAResponse {
bytes vaa_bytes = 1;
}

message GetLastHeartbeatRequest {
}

message GetLastHeartbeatResponse {
// Mapping of hex-encoded guardian addresses to raw heartbeat messages.
map<string, gossip.v1.Heartbeat> raw_heartbeats = 1;
}

0 comments on commit 82731c2

Please sign in to comment.