diff --git a/bridge/pkg/solana/client.go b/bridge/pkg/solana/client.go index 6b21b13661..b59974aa70 100644 --- a/bridge/pkg/solana/client.go +++ b/bridge/pkg/solana/client.go @@ -26,6 +26,8 @@ type SolanaWatcher struct { rpcUrl string commitment rpc.CommitmentType messageEvent chan *common.MessagePublication + logger *zap.Logger + rpcClient *rpc.Client } var ( @@ -100,6 +102,7 @@ func NewSolanaWatcher( wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents, commitment: commitment, + rpcClient: rpc.New(rpcUrl), } } @@ -110,8 +113,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { BridgeAddress: bridgeAddr, }) - rpcClient := rpc.New(s.rpcUrl) - logger := supervisor.Logger(ctx) + s.logger = supervisor.Logger(ctx) errC := make(chan error) var lastSlot uint64 @@ -128,7 +130,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() start := time.Now() - slot, err := rpcClient.GetSlot(rCtx, s.commitment) + slot, err := s.rpcClient.GetSlot(rCtx, s.commitment) queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds()) if err != nil { solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc() @@ -144,7 +146,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { Height: int64(slot), BridgeAddress: bridgeAddr, }) - logger.Info("fetched current Solana height", + s.logger.Info("fetched current Solana height", zap.String("commitment", string(s.commitment)), zap.Uint64("slot", slot), zap.Uint64("lastSlot", lastSlot), @@ -160,7 +162,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { rCtx, cancel = context.WithTimeout(ctx, rpcTimeout) defer cancel() start = time.Now() - slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment) + slots, err := s.rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment) queryLatency.WithLabelValues("get_confirmed_blocks", string(s.commitment)).Observe(time.Since(start).Seconds()) if err != nil { solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_blocks_error").Inc() @@ -168,7 +170,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { return } - logger.Info("fetched slots in range", + s.logger.Info("fetched slots in range", zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd), zap.Duration("took", time.Since(start)), zap.String("commitment", string(s.commitment))) @@ -181,7 +183,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { continue } - go s.fetchBlock(ctx, logger, s.commitment, rpcClient, slot) + go s.fetchBlock(ctx, slot) } lastSlot = slot @@ -197,38 +199,38 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { } } -func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64) { - logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(commitment))) +func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) { + s.logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment))) rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() start := time.Now() rewards := false - out, err := rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{ + out, err := s.rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{ Encoding: "json", TransactionDetails: "full", Rewards: &rewards, - Commitment: commitment, + Commitment: s.commitment, }) - queryLatency.WithLabelValues("get_confirmed_block", string(commitment)).Observe(time.Since(start).Seconds()) + queryLatency.WithLabelValues("get_confirmed_block", string(s.commitment)).Observe(time.Since(start).Seconds()) if err != nil { solanaConnectionErrors.WithLabelValues("get_confirmed_block_error").Inc() - logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment))) + s.logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) return } if out == nil { - logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment))) + s.logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) return } - logger.Info("fetched block", + s.logger.Info("fetched block", zap.Uint64("slot", slot), zap.Int("num_tx", len(out.Transactions)), zap.Duration("took", time.Since(start)), - zap.String("commitment", string(commitment))) + zap.String("commitment", string(s.commitment))) OUTER: for _, tx := range out.Transactions { @@ -243,20 +245,20 @@ OUTER: continue } - logger.Info("found Wormhole transaction", + s.logger.Info("found Wormhole transaction", zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment))) + zap.String("commitment", string(s.commitment))) // Find top-level instructions for _, inst := range tx.Transaction.Message.Instructions { - found, err := s.processInstruction(ctx, logger, commitment, rpcClient, slot, inst, programIndex, tx) + found, err := s.processInstruction(ctx, slot, inst, programIndex, tx) if err != nil { - logger.Error("malformed Wormhole instruction", + s.logger.Error("malformed Wormhole instruction", zap.Error(err), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Binary("data", inst.Data)) continue OUTER } @@ -269,43 +271,43 @@ OUTER: rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() start := time.Now() - tr, err := rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{ + tr, err := s.rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{ Encoding: "json", - Commitment: commitment, + Commitment: s.commitment, }) - queryLatency.WithLabelValues("get_confirmed_transaction", string(commitment)).Observe(time.Since(start).Seconds()) + queryLatency.WithLabelValues("get_confirmed_transaction", string(s.commitment)).Observe(time.Since(start).Seconds()) if err != nil { solanaConnectionErrors.WithLabelValues("get_confirmed_transaction_error").Inc() - logger.Error("failed to request transaction", + s.logger.Error("failed to request transaction", zap.Error(err), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("signature", signature)) return } - logger.Info("fetched transaction", + s.logger.Info("fetched transaction", zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("signature", signature), zap.Duration("took", time.Since(start))) for _, inner := range tr.Meta.InnerInstructions { for _, inst := range inner.Instructions { - _, err := s.processInstruction(ctx, logger, commitment, rpcClient, slot, inst, programIndex, tx) + _, err := s.processInstruction(ctx, slot, inst, programIndex, tx) if err != nil { - logger.Error("malformed Wormhole instruction", + s.logger.Error("malformed Wormhole instruction", zap.Error(err), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment))) + zap.String("commitment", string(s.commitment))) } } } } } -func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) { +func (s *SolanaWatcher) processInstruction(ctx context.Context, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) { if inst.ProgramIDIndex != programIndex { return false, nil } @@ -325,49 +327,49 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg return false, fmt.Errorf("failed to deserialize instruction data: %w", err) } - logger.Info("post message data", zap.Any("deserialized_data", data)) + s.logger.Info("post message data", zap.Any("deserialized_data", data)) level, err := data.ConsistencyLevel.Commitment() if err != nil { return false, fmt.Errorf("failed to determine commitment: %w", err) } - if level != commitment { + if level != s.commitment { return true, nil } // The second account in a well-formed Wormhole instruction is the VAA program account. acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]] - go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot) + go s.fetchMessageAccount(ctx, acc, slot) return true, nil } -func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, rpcClient *rpc.Client, commitment rpc.CommitmentType, slot uint64) { +func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.PublicKey, slot uint64) { // Fetching account rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() start := time.Now() - info, err := rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{ + info, err := s.rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{ Encoding: solana.EncodingBase64, - Commitment: commitment, + Commitment: s.commitment, }) - queryLatency.WithLabelValues("get_account_info", string(commitment)).Observe(time.Since(start).Seconds()) + queryLatency.WithLabelValues("get_account_info", string(s.commitment)).Observe(time.Since(start).Seconds()) if err != nil { solanaConnectionErrors.WithLabelValues("get_account_info_error").Inc() - logger.Error("failed to request account", + s.logger.Error("failed to request account", zap.Error(err), zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("account", acc)) return } if !info.Value.Owner.Equals(s.bridge) { solanaConnectionErrors.WithLabelValues("account_owner_mismatch").Inc() - logger.Error("account has invalid owner", + s.logger.Error("account has invalid owner", zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("account", acc), zap.Stringer("unexpected_owner", info.Value.Owner)) return @@ -376,27 +378,27 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log data := info.Value.Data.GetBinary() if string(data[:3]) != "msg" { solanaConnectionErrors.WithLabelValues("bad_account_data").Inc() - logger.Error("account is not a message account", + s.logger.Error("account is not a message account", zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("account", acc)) return } - logger.Info("found valid VAA account", + s.logger.Info("found valid VAA account", zap.Uint64("slot", slot), - zap.String("commitment", string(commitment)), + zap.String("commitment", string(s.commitment)), zap.Stringer("account", acc), zap.Binary("data", data)) - s.processMessageAccount(logger, data, acc) + s.processMessageAccount(data, acc) } -func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) { +func (s *SolanaWatcher) processMessageAccount(data []byte, acc solana.PublicKey) { proposal, err := ParseTransferOutProposal(data) if err != nil { solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc() - logger.Error( + s.logger.Error( "failed to parse transfer proposal", zap.Stringer("account", acc), zap.Binary("data", data), @@ -420,7 +422,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a solanaMessagesConfirmed.Inc() - logger.Info("message observed", + s.logger.Info("message observed", zap.Stringer("account", acc), zap.Time("timestamp", observation.Timestamp), zap.Uint32("nonce", observation.Nonce),