From 3dad5812e2caa57ae7360762f01acd287d20e7b9 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Thu, 10 Oct 2024 06:55:51 -0700 Subject: [PATCH] Debug --- beacon-chain/execution/engine_client.go | 72 +++++++++++-------- beacon-chain/execution/options.go | 9 +++ beacon-chain/execution/service.go | 15 ++++ beacon-chain/node/node.go | 1 + beacon-chain/sync/metrics.go | 7 ++ beacon-chain/sync/subscriber_beacon_blocks.go | 30 ++++++-- proto/engine/v1/json_marshal_unmarshal.go | 8 ++- 7 files changed, 104 insertions(+), 38 deletions(-) diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index 31d1b58c161c..e90d186eb901 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -29,7 +29,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" - "k8s.io/utils/strings/slices" ) var ( @@ -114,7 +113,7 @@ type Reconstructor interface { ReconstructFullBellatrixBlockBatch( ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock, ) ([]interfaces.SignedBeaconBlock, error) - ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error) + ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices [6]bool) ([]blocks.VerifiedROBlob, error) } // EngineCaller defines a client that can interact with an Ethereum @@ -306,6 +305,8 @@ func (s *Service) ExchangeCapabilities(ctx context.Context) ([]string, error) { result := &pb.ExchangeCapabilities{} err := s.rpcClient.CallContext(ctx, &result, ExchangeCapabilities, supportedEngineEndpoints) + log.Info(result.SupportedMethods) + var unsupported []string for _, s1 := range supportedEngineEndpoints { supported := false @@ -493,12 +494,13 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) { ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs") defer span.End() - s.capabilitiesLock.RLock() - if !slices.Contains(s.capabilities, GetBlobsV1) { - s.capabilitiesLock.RUnlock() - return nil, nil - } - s.capabilitiesLock.RUnlock() + // TODO: Engine get capabilities is broken + //s.capabilitiesLock.RLock() + //if !slices.Contains(s.capabilities, GetBlobsV1) { + // s.capabilitiesLock.RUnlock() + // return nil, nil + //} + //s.capabilitiesLock.RUnlock() result := make([]*pb.BlobAndProof, len(versionedHashes)) err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes) @@ -536,17 +538,25 @@ func (s *Service) ReconstructFullBellatrixBlockBatch( // ReconstructBlobSidecars reconstructs the blob sidecars for a given beacon block. // It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs, // and constructs the corresponding verified read-only blob sidecars. -func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error) { +func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists [6]bool) ([]blocks.VerifiedROBlob, error) { blockBody := block.Block().Body() - - // Get KZG commitments from the block body kzgCommitments, err := blockBody.BlobKzgCommitments() if err != nil { return nil, errors.Wrap(err, "could not get blob KZG commitments") } - // Initialize KZG hashes and retrieve blobs - kzgHashes := make([]common.Hash, len(kzgCommitments)) + // Collect KZG hashes for non-existing blobs + var kzgHashes []common.Hash + for i, commitment := range kzgCommitments { + if !exists[i] { + kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment)) + } + } + if len(kzgHashes) == 0 { + return nil, nil + } + + // Fetch blobs from EL blobs, err := s.GetBlobs(ctx, kzgHashes) if err != nil { return nil, errors.Wrap(err, "could not get blobs") @@ -555,51 +565,57 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces. return nil, nil } - // Get the block header and its hash tree root header, err := block.Header() if err != nil { return nil, errors.Wrap(err, "could not get header") } - // Loop through the blobs and reconstruct blob sidecars - verifiedBlobs := make([]blocks.VerifiedROBlob, 0, len(blobs)) - for index, blob := range blobs { + + // Reconstruct verify blob sidecars + var verifiedBlobs []blocks.VerifiedROBlob + for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ { + if exists[i] { + continue + } + + blob := blobs[blobIndex] + blobIndex++ if blob == nil { continue } - // Get the Merkle proof for the KZG commitment - proof, err := blocks.MerkleProofKZGCommitment(blockBody, index) + proof, err := blocks.MerkleProofKZGCommitment(blockBody, i) if err != nil { - log.WithError(err).Error("could not get Merkle proof for KZG commitment") + log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment") continue } - - // Create the BlobSidecar object sidecar := ðpb.BlobSidecar{ - Index: uint64(index), + Index: uint64(i), Blob: blob.Blob, - KzgCommitment: kzgCommitments[index], + KzgCommitment: kzgCommitments[i], KzgProof: blob.KzgProof, SignedBlockHeader: header, CommitmentInclusionProof: proof, } - // Create a read-only blob with the header root roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot) if err != nil { - log.WithError(err).Error("could not create RO blob with root") + log.WithError(err).WithField("index", i).Error("failed to create RO blob with root") continue } + + // Verify the sidecar KZG proof v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements) if err := v.SidecarKzgProofVerified(); err != nil { - log.WithError(err).Error("could not verify KZG proof for sidecar") + log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar") continue } + verifiedBlob, err := v.VerifiedROBlob() if err != nil { - log.WithError(err).Error("could not verify RO blob") + log.WithError(err).WithField("index", i).Error("failed to verify RO blob") continue } + verifiedBlobs = append(verifiedBlobs, verifiedBlob) } diff --git a/beacon-chain/execution/options.go b/beacon-chain/execution/options.go index edc616bcc533..028b7f0c1c38 100644 --- a/beacon-chain/execution/options.go +++ b/beacon-chain/execution/options.go @@ -7,6 +7,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/network" "github.com/prysmaticlabs/prysm/v5/network/authorization" ) @@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option { return nil } } + +// WithVerifierWaiter gives the sync package direct access to the verifier waiter. +func WithVerifierWaiter(v *verification.InitializerWaiter) Option { + return func(s *Service) error { + s.verifierWaiter = v + return nil + } +} diff --git a/beacon-chain/execution/service.go b/beacon-chain/execution/service.go index 5f8818f6099f..597d70e48efd 100644 --- a/beacon-chain/execution/service.go +++ b/beacon-chain/execution/service.go @@ -31,6 +31,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/container/trie" contracts "github.com/prysmaticlabs/prysm/v5/contracts/deposit" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" @@ -158,6 +159,7 @@ type Service struct { preGenesisState state.BeaconState capabilities []string capabilitiesLock sync.RWMutex + verifierWaiter *verification.InitializerWaiter blobVerifier verification.NewBlobVerifier } @@ -233,6 +235,13 @@ func (s *Service) Start() { } } + v, err := s.verifierWaiter.WaitForInitializer(s.ctx) + if err != nil { + log.WithError(err).Error("Could not get verification initializer") + return + } + s.blobVerifier = newBlobVerifierFromInitializer(v) + s.isRunning = true // Poll the execution client connection and fallback if errors occur. @@ -890,3 +899,9 @@ func (s *Service) migrateOldDepositTree(eth1DataInDB *ethpb.ETH1ChainData) error func (s *Service) removeStartupState() { s.cfg.finalizedStateAtStartup = nil } + +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) + } +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e059375adecc..fce4a4e56afd 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -796,6 +796,7 @@ func (b *BeaconNode) registerPOWChainService() error { execution.WithBeaconNodeStatsUpdater(bs), execution.WithFinalizedStateAtStartup(b.finalizedStateAtStartUp), execution.WithJwtId(b.cliCtx.String(flags.JwtId.Name)), + execution.WithVerifierWaiter(b.verifyInitWaiter), ) web3Service, err := execution.NewService(b.ctx, opts...) if err != nil { diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 14f38c98214d..0f1f07577fa6 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -177,6 +177,13 @@ var ( Help: "Count the number of times blobs have been recovered from the execution layer.", }, ) + + blobSidecarHitRate = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "blob_sidecar_hit_rate", + Help: "The rate of blob sidecars that were already in the database.", + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 549bd705faf8..09be1bd32094 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -72,26 +72,39 @@ func (s *Service) attemptBlobSaveAndBroadcast(ctx context.Context, block interfa return } + indices, err := s.cfg.blobStorage.Indices(blockRoot) + if err != nil { + log.WithError(err).Error("Failed to retrieve indices for block") + return + } + // Reconstruct blob sidecars from the EL - blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot) + blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot, indices) if err != nil { log.WithError(err).Error("Failed to reconstruct blob sidecars") return } - if blobSidecars == nil { + if len(blobSidecars) == 0 { return } - // Get indices of already existing blobs in the database to avoid duplicates - indices, err := s.cfg.blobStorage.Indices(blockRoot) + // Refresh indices as new blobs may have been added to the db + indices, err = s.cfg.blobStorage.Indices(blockRoot) if err != nil { log.WithError(err).Error("Failed to retrieve indices for block") return } // Iterate through the reconstructed sidecars, save them to the chain, and broadcast them to peers - for i, sidecar := range blobSidecars { - if indices[i] { + kzgs, err := block.Block().Body().BlobKzgCommitments() + if err != nil { + log.WithError(err).Error("Failed to get blob KZG commitments") + return + } + total := len(kzgs) + hit := 0 + for _, sidecar := range blobSidecars { + if indices[sidecar.Index] { continue // Skip if the blob already exists in the database } @@ -107,8 +120,11 @@ func (s *Service) attemptBlobSaveAndBroadcast(ctx context.Context, block interfa fields := blobFields(sidecar.ROBlob) fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime) - log.WithFields(fields).Debug("Successfully received and processed blob sidecar from EL") + log.WithFields(fields).Debug("Processed blob sidecar from EL") + hit++ } + rate := float64(hit) / float64(total) + blobSidecarHitRate.Set(rate) } // WriteInvalidBlockToDisk as a block ssz. Writes to temp directory. diff --git a/proto/engine/v1/json_marshal_unmarshal.go b/proto/engine/v1/json_marshal_unmarshal.go index 03c35b0de652..c97247f2ab97 100644 --- a/proto/engine/v1/json_marshal_unmarshal.go +++ b/proto/engine/v1/json_marshal_unmarshal.go @@ -831,8 +831,8 @@ func (b BlobBundleJSON) ToProto() *BlobsBundle { } type BlobAndProofJson struct { - Blob hexutil.Bytes `json:"blobs"` - KzgProof hexutil.Bytes `json:"proofs"` + Blob hexutil.Bytes `json:"blob"` + KzgProof hexutil.Bytes `json:"proof"` } // MarshalJSON -- @@ -1128,7 +1128,7 @@ func RecastHexutilByteSlice(h []hexutil.Bytes) [][]byte { // UnmarshalJSON implements the json unmarshaler interface for BlobAndProof. func (b *BlobAndProof) UnmarshalJSON(enc []byte) error { - dec := BlobAndProofJson{} + var dec *BlobAndProofJson if err := json.Unmarshal(enc, &dec); err != nil { return err } @@ -1140,6 +1140,8 @@ func (b *BlobAndProof) UnmarshalJSON(enc []byte) error { proof := make([]byte, fieldparams.BLSPubkeyLength) copy(proof, dec.KzgProof) b.KzgProof = proof + + return nil } type ExchangeCapabilitiesJSON struct {