Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Oct 11, 2024
1 parent 6a5e029 commit 3dad581
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 38 deletions.
72 changes: 44 additions & 28 deletions beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"k8s.io/utils/strings/slices"
)

var (
Expand Down Expand Up @@ -114,7 +113,7 @@ type Reconstructor interface {
ReconstructFullBellatrixBlockBatch(
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices [6]bool) ([]blocks.VerifiedROBlob, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down Expand Up @@ -306,6 +305,8 @@ func (s *Service) ExchangeCapabilities(ctx context.Context) ([]string, error) {
result := &pb.ExchangeCapabilities{}
err := s.rpcClient.CallContext(ctx, &result, ExchangeCapabilities, supportedEngineEndpoints)

log.Info(result.SupportedMethods)

var unsupported []string
for _, s1 := range supportedEngineEndpoints {
supported := false
Expand Down Expand Up @@ -493,12 +494,13 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H
func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()
s.capabilitiesLock.RLock()
if !slices.Contains(s.capabilities, GetBlobsV1) {
s.capabilitiesLock.RUnlock()
return nil, nil
}
s.capabilitiesLock.RUnlock()
// TODO: Engine get capabilities is broken
//s.capabilitiesLock.RLock()
//if !slices.Contains(s.capabilities, GetBlobsV1) {
// s.capabilitiesLock.RUnlock()
// return nil, nil
//}
//s.capabilitiesLock.RUnlock()

result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
Expand Down Expand Up @@ -536,17 +538,25 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
// ReconstructBlobSidecars reconstructs the blob sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs,
// and constructs the corresponding verified read-only blob sidecars.
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedROBlob, error) {
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists [6]bool) ([]blocks.VerifiedROBlob, error) {
blockBody := block.Block().Body()

// Get KZG commitments from the block body
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
}

// Initialize KZG hashes and retrieve blobs
kzgHashes := make([]common.Hash, len(kzgCommitments))
// Collect KZG hashes for non-existing blobs
var kzgHashes []common.Hash
for i, commitment := range kzgCommitments {
if !exists[i] {
kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment))
}
}
if len(kzgHashes) == 0 {
return nil, nil
}

// Fetch blobs from EL
blobs, err := s.GetBlobs(ctx, kzgHashes)
if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
Expand All @@ -555,51 +565,57 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return nil, nil
}

// Get the block header and its hash tree root
header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
}
// Loop through the blobs and reconstruct blob sidecars
verifiedBlobs := make([]blocks.VerifiedROBlob, 0, len(blobs))
for index, blob := range blobs {

// Reconstruct verify blob sidecars
var verifiedBlobs []blocks.VerifiedROBlob
for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ {
if exists[i] {
continue
}

blob := blobs[blobIndex]
blobIndex++
if blob == nil {
continue
}

// Get the Merkle proof for the KZG commitment
proof, err := blocks.MerkleProofKZGCommitment(blockBody, index)
proof, err := blocks.MerkleProofKZGCommitment(blockBody, i)
if err != nil {
log.WithError(err).Error("could not get Merkle proof for KZG commitment")
log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment")
continue
}

// Create the BlobSidecar object
sidecar := &ethpb.BlobSidecar{
Index: uint64(index),
Index: uint64(i),
Blob: blob.Blob,
KzgCommitment: kzgCommitments[index],
KzgCommitment: kzgCommitments[i],
KzgProof: blob.KzgProof,
SignedBlockHeader: header,
CommitmentInclusionProof: proof,
}

// Create a read-only blob with the header root
roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot)
if err != nil {
log.WithError(err).Error("could not create RO blob with root")
log.WithError(err).WithField("index", i).Error("failed to create RO blob with root")
continue
}

// Verify the sidecar KZG proof
v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements)
if err := v.SidecarKzgProofVerified(); err != nil {
log.WithError(err).Error("could not verify KZG proof for sidecar")
log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar")
continue
}

verifiedBlob, err := v.VerifiedROBlob()
if err != nil {
log.WithError(err).Error("could not verify RO blob")
log.WithError(err).WithField("index", i).Error("failed to verify RO blob")
continue
}

verifiedBlobs = append(verifiedBlobs, verifiedBlob)
}

Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/execution/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/network"
"github.com/prysmaticlabs/prysm/v5/network/authorization"
)
Expand Down Expand Up @@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option {
return nil
}
}

// WithVerifierWaiter gives the sync package direct access to the verifier waiter.
func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return func(s *Service) error {
s.verifierWaiter = v
return nil
}
}
15 changes: 15 additions & 0 deletions beacon-chain/execution/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/container/trie"
contracts "github.com/prysmaticlabs/prysm/v5/contracts/deposit"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
Expand Down Expand Up @@ -158,6 +159,7 @@ type Service struct {
preGenesisState state.BeaconState
capabilities []string
capabilitiesLock sync.RWMutex
verifierWaiter *verification.InitializerWaiter
blobVerifier verification.NewBlobVerifier
}

Expand Down Expand Up @@ -233,6 +235,13 @@ func (s *Service) Start() {
}
}

v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
if err != nil {
log.WithError(err).Error("Could not get verification initializer")
return
}
s.blobVerifier = newBlobVerifierFromInitializer(v)

s.isRunning = true

// Poll the execution client connection and fallback if errors occur.
Expand Down Expand Up @@ -890,3 +899,9 @@ func (s *Service) migrateOldDepositTree(eth1DataInDB *ethpb.ETH1ChainData) error
func (s *Service) removeStartupState() {
s.cfg.finalizedStateAtStartup = nil
}

func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (b *BeaconNode) registerPOWChainService() error {
execution.WithBeaconNodeStatsUpdater(bs),
execution.WithFinalizedStateAtStartup(b.finalizedStateAtStartUp),
execution.WithJwtId(b.cliCtx.String(flags.JwtId.Name)),
execution.WithVerifierWaiter(b.verifyInitWaiter),
)
web3Service, err := execution.NewService(b.ctx, opts...)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ var (
Help: "Count the number of times blobs have been recovered from the execution layer.",
},
)

blobSidecarHitRate = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "blob_sidecar_hit_rate",
Help: "The rate of blob sidecars that were already in the database.",
},
)
)

func (s *Service) updateMetrics() {
Expand Down
30 changes: 23 additions & 7 deletions beacon-chain/sync/subscriber_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,39 @@ func (s *Service) attemptBlobSaveAndBroadcast(ctx context.Context, block interfa
return
}

indices, err := s.cfg.blobStorage.Indices(blockRoot)
if err != nil {
log.WithError(err).Error("Failed to retrieve indices for block")
return
}

// Reconstruct blob sidecars from the EL
blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot)
blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot, indices)
if err != nil {
log.WithError(err).Error("Failed to reconstruct blob sidecars")
return
}
if blobSidecars == nil {
if len(blobSidecars) == 0 {
return
}

// Get indices of already existing blobs in the database to avoid duplicates
indices, err := s.cfg.blobStorage.Indices(blockRoot)
// Refresh indices as new blobs may have been added to the db
indices, err = s.cfg.blobStorage.Indices(blockRoot)
if err != nil {
log.WithError(err).Error("Failed to retrieve indices for block")
return
}

// Iterate through the reconstructed sidecars, save them to the chain, and broadcast them to peers
for i, sidecar := range blobSidecars {
if indices[i] {
kzgs, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
return
}
total := len(kzgs)
hit := 0
for _, sidecar := range blobSidecars {
if indices[sidecar.Index] {
continue // Skip if the blob already exists in the database
}

Expand All @@ -107,8 +120,11 @@ func (s *Service) attemptBlobSaveAndBroadcast(ctx context.Context, block interfa

fields := blobFields(sidecar.ROBlob)
fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime)
log.WithFields(fields).Debug("Successfully received and processed blob sidecar from EL")
log.WithFields(fields).Debug("Processed blob sidecar from EL")
hit++
}
rate := float64(hit) / float64(total)
blobSidecarHitRate.Set(rate)
}

// WriteInvalidBlockToDisk as a block ssz. Writes to temp directory.
Expand Down
8 changes: 5 additions & 3 deletions proto/engine/v1/json_marshal_unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,8 @@ func (b BlobBundleJSON) ToProto() *BlobsBundle {
}

type BlobAndProofJson struct {
Blob hexutil.Bytes `json:"blobs"`
KzgProof hexutil.Bytes `json:"proofs"`
Blob hexutil.Bytes `json:"blob"`
KzgProof hexutil.Bytes `json:"proof"`
}

// MarshalJSON --
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func RecastHexutilByteSlice(h []hexutil.Bytes) [][]byte {

// UnmarshalJSON implements the json unmarshaler interface for BlobAndProof.
func (b *BlobAndProof) UnmarshalJSON(enc []byte) error {
dec := BlobAndProofJson{}
var dec *BlobAndProofJson
if err := json.Unmarshal(enc, &dec); err != nil {
return err
}
Expand All @@ -1140,6 +1140,8 @@ func (b *BlobAndProof) UnmarshalJSON(enc []byte) error {
proof := make([]byte, fieldparams.BLSPubkeyLength)
copy(proof, dec.KzgProof)
b.KzgProof = proof

return nil
}

type ExchangeCapabilitiesJSON struct {
Expand Down

0 comments on commit 3dad581

Please sign in to comment.