diff --git a/packages/taiko-client/cmd/flags/prover.go b/packages/taiko-client/cmd/flags/prover.go index d9c13f1ce1..2de5bbc142 100644 --- a/packages/taiko-client/cmd/flags/prover.go +++ b/packages/taiko-client/cmd/flags/prover.go @@ -212,7 +212,7 @@ var ( Category: proverCategory, EnvVars: []string{"PROVER_ZKVM_BATCH_SIZE"}, } - ForceProveInterval = &cli.DurationFlag{ + ForceBatchProvingInterval = &cli.DurationFlag{ Name: "prover.forceBatchProvingInterval", Usage: "Time interval to prove blocks even the number of pending proof do not exceed prover.batchSize, " + "this flag only works post Ontake fork", @@ -254,5 +254,5 @@ var ProverFlags = MergeFlags(CommonFlags, []cli.Flag{ RaikoZKVMHostEndpoint, SGXBatchSize, ZKVMBatchSize, - ForceProveInterval, + ForceBatchProvingInterval, }, TxmgrFlags) diff --git a/packages/taiko-client/prover/config.go b/packages/taiko-client/prover/config.go index 552ecfac23..cab66a38db 100644 --- a/packages/taiko-client/prover/config.go +++ b/packages/taiko-client/prover/config.go @@ -62,7 +62,7 @@ type Config struct { PrivateTxmgrConfigs *txmgr.CLIConfig SGXProofBufferSize uint64 ZKVMProofBufferSize uint64 - ForceProveInterval time.Duration + ForceBatchProvingInterval time.Duration } // NewConfigFromCliContext creates a new config instance from command line flags. @@ -186,8 +186,8 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { l1ProverPrivKey, c, ), - SGXProofBufferSize: c.Uint64(flags.SGXBatchSize.Name), - ZKVMProofBufferSize: c.Uint64(flags.ZKVMBatchSize.Name), - ForceProveInterval: c.Duration(flags.ForceProveInterval.Name), + SGXProofBufferSize: c.Uint64(flags.SGXBatchSize.Name), + ZKVMProofBufferSize: c.Uint64(flags.ZKVMBatchSize.Name), + ForceBatchProvingInterval: c.Duration(flags.ForceBatchProvingInterval.Name), }, nil } diff --git a/packages/taiko-client/prover/init.go b/packages/taiko-client/prover/init.go index dc2e4cb40e..61faf0f66f 100644 --- a/packages/taiko-client/prover/init.go +++ b/packages/taiko-client/prover/init.go @@ -159,6 +159,7 @@ func (p *Prover) initProofSubmitters( p.IsGuardianProver(), p.cfg.GuardianProofSubmissionDelay, bufferSize, + p.cfg.ForceBatchProvingInterval, ); err != nil { return err } diff --git a/packages/taiko-client/prover/proof_submitter/interface.go b/packages/taiko-client/prover/proof_submitter/interface.go index 26971fc858..b60ec9c391 100644 --- a/packages/taiko-client/prover/proof_submitter/interface.go +++ b/packages/taiko-client/prover/proof_submitter/interface.go @@ -19,6 +19,7 @@ type Submitter interface { Producer() proofProducer.ProofProducer Tier() uint16 BufferSize() uint64 + AggregationEnabled() bool } // Contester is the interface for contesting proofs of the L2 blocks. diff --git a/packages/taiko-client/prover/proof_submitter/proof_buffer.go b/packages/taiko-client/prover/proof_submitter/proof_buffer.go index b21e992aa3..8f2d6d0cb7 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_buffer.go +++ b/packages/taiko-client/prover/proof_submitter/proof_buffer.go @@ -3,6 +3,7 @@ package submitter import ( "errors" "sync" + "time" producer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) @@ -14,16 +15,19 @@ var ( // ProofBuffer caches all single proof with a fixed size. type ProofBuffer struct { - MaxLength uint64 - buffer []*producer.ProofWithHeader - mutex sync.RWMutex + MaxLength uint64 + buffer []*producer.ProofWithHeader + lastUpdatedAt time.Time + isAggregating bool + mutex sync.RWMutex } // NewProofBuffer creates a new ProofBuffer instance. func NewProofBuffer(maxLength uint64) *ProofBuffer { return &ProofBuffer{ - buffer: make([]*producer.ProofWithHeader, 0, maxLength), - MaxLength: maxLength, + buffer: make([]*producer.ProofWithHeader, 0, maxLength), + lastUpdatedAt: time.Now(), + MaxLength: maxLength, } } @@ -37,6 +41,7 @@ func (pb *ProofBuffer) Write(item *producer.ProofWithHeader) (int, error) { } pb.buffer = append(pb.buffer, item) + pb.lastUpdatedAt = time.Now() return len(pb.buffer), nil } @@ -65,11 +70,14 @@ func (pb *ProofBuffer) Len() int { return len(pb.buffer) } -// Clear clears all buffer. -func (pb *ProofBuffer) Clear() { - pb.mutex.Lock() - defer pb.mutex.Unlock() - pb.buffer = pb.buffer[:0] +// LastUpdatedAt returns the last updated time of the buffer. +func (pb *ProofBuffer) LastUpdatedAt() time.Time { + return pb.lastUpdatedAt +} + +// LastUpdatedAt returns the last updated time of the buffer. +func (pb *ProofBuffer) UpdateLastUpdatedAt() { + pb.lastUpdatedAt = time.Now() } // ClearItems clears items that has given block ids in the buffer. @@ -94,5 +102,21 @@ func (pb *ProofBuffer) ClearItems(blockIDs ...uint64) int { } pb.buffer = newBuffer + pb.isAggregating = false return clearedCount } + +// MarkAggregating marks the proofs in this buffer are aggregating. +func (pb *ProofBuffer) MarkAggregating() { + pb.isAggregating = true +} + +// IsAggregating returns if the proofs in this buffer are aggregating. +func (pb *ProofBuffer) IsAggregating() bool { + return pb.isAggregating +} + +// Enabled returns if the buffer is enabled. +func (pb *ProofBuffer) Enabled() bool { + return pb.MaxLength > 1 +} diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index ba1cb8bad0..60648aeda3 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -53,7 +53,8 @@ type ProofSubmitter struct { isGuardian bool submissionDelay time.Duration // Batch proof related - proofBuffer *ProofBuffer + proofBuffer *ProofBuffer + forceBatchProvingInterval time.Duration } // NewProofSubmitter creates a new ProofSubmitter instance. @@ -74,6 +75,7 @@ func NewProofSubmitter( isGuardian bool, submissionDelay time.Duration, proofBufferSize uint64, + forceBatchProvingInterval time.Duration, ) (*ProofSubmitter, error) { anchorValidator, err := validator.New(taikoL2Address, rpcClient.L2.ChainID, rpcClient) if err != nil { @@ -81,22 +83,23 @@ func NewProofSubmitter( } return &ProofSubmitter{ - rpc: rpcClient, - proofProducer: proofProducer, - resultCh: resultCh, - batchResultCh: batchResultCh, - aggregationNotify: aggregationNotify, - anchorValidator: anchorValidator, - txBuilder: builder, - sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit), - proverAddress: txmgr.From(), - proverSetAddress: proverSetAddress, - taikoL2Address: taikoL2Address, - graffiti: rpc.StringToBytes32(graffiti), - tiers: tiers, - isGuardian: isGuardian, - submissionDelay: submissionDelay, - proofBuffer: NewProofBuffer(proofBufferSize), + rpc: rpcClient, + proofProducer: proofProducer, + resultCh: resultCh, + batchResultCh: batchResultCh, + aggregationNotify: aggregationNotify, + anchorValidator: anchorValidator, + txBuilder: builder, + sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit), + proverAddress: txmgr.From(), + proverSetAddress: proverSetAddress, + taikoL2Address: taikoL2Address, + graffiti: rpc.StringToBytes32(graffiti), + tiers: tiers, + isGuardian: isGuardian, + submissionDelay: submissionDelay, + proofBuffer: NewProofBuffer(proofBufferSize), + forceBatchProvingInterval: forceBatchProvingInterval, }, nil } @@ -143,7 +146,7 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl Graffiti: common.Bytes2Hex(s.graffiti[:]), GasUsed: header.GasUsed, ParentGasUsed: parent.GasUsed(), - Compressed: s.proofBuffer.MaxLength > 1, + Compressed: s.proofBuffer.Enabled(), } // If the prover set address is provided, we use that address as the prover on chain. @@ -159,9 +162,9 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl log.Error("Failed to request proof, context is canceled", "blockID", opts.BlockID, "error", ctx.Err()) return nil } - // Check if the proof buffer is full - if s.proofBuffer.MaxLength > 1 && s.proofBuffer.MaxLength == uint64(s.proofBuffer.Len()) { - log.Debug("Buffer is full now", "blockID", meta.GetBlockID()) + // Check if the proof buffer is full. + if s.proofBuffer.Enabled() && uint64(s.proofBuffer.Len()) >= s.proofBuffer.MaxLength { + log.Warn("Proof buffer is full now", "blockID", meta.GetBlockID()) return errBufferOverflow } // Check if there is a need to generate proof @@ -198,21 +201,30 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl } return fmt.Errorf("failed to request proof (id: %d): %w", meta.GetBlockID(), err) } - if meta.IsOntakeBlock() && s.proofBuffer.MaxLength > 1 { + if meta.IsOntakeBlock() && s.proofBuffer.Enabled() { bufferSize, err := s.proofBuffer.Write(result) if err != nil { - return fmt.Errorf("failed to add proof into buffer (id: %d)(current buffer size: %d): %w", + return fmt.Errorf( + "failed to add proof into buffer (id: %d) (current buffer size: %d): %w", meta.GetBlockID(), bufferSize, err, ) } - log.Debug("Succeed to generate proof", + log.Info( + "Proof generated", "blockID", meta.GetBlockID(), "bufferSize", bufferSize, + "maxBufferSize", s.proofBuffer.MaxLength, + "bufferIsAggregating", s.proofBuffer.IsAggregating(), + "bufferLastUpdatedAt", s.proofBuffer.lastUpdatedAt, ) - if s.proofBuffer.MaxLength == uint64(bufferSize) { + // Check if we need to aggregate proofs. + if !s.proofBuffer.IsAggregating() && + (uint64(bufferSize) >= s.proofBuffer.MaxLength || + time.Since(s.proofBuffer.lastUpdatedAt) > s.forceBatchProvingInterval) { s.aggregationNotify <- s.Tier() + s.proofBuffer.MarkAggregating() } } else { s.resultCh <- result @@ -344,7 +356,8 @@ func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proo ) var ( invalidBlockIDs []uint64 - latestProvenBlockID = big.NewInt(0) + latestProvenBlockID = common.Big0 + uint64BlockIDs []uint64 ) if len(batchProof.Proofs) == 0 { return proofProducer.ErrInvalidLength @@ -369,19 +382,20 @@ func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proo return err } for i, proof := range batchProof.Proofs { + uint64BlockIDs = append(uint64BlockIDs, proof.BlockID.Uint64()) // Check if this proof is still needed to be submitted. ok, err := s.sender.ValidateProof(ctx, proof, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId)) if err != nil { return err } if !ok { - log.Error("a valid proof for block is already submitted", "blockId", proof.BlockID) + log.Error("A valid proof for block is already submitted", "blockId", proof.BlockID) invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) continue } if proofStatus[i].IsSubmitted && !proofStatus[i].Invalid { - log.Error("a valid proof for block is already submitted", "blockId", proof.BlockID) + log.Error("A valid proof for block is already submitted", "blockId", proof.BlockID) invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) continue } @@ -389,7 +403,8 @@ func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proo // Get the corresponding L2 block. block, err := s.rpc.L2.BlockByHash(ctx, proof.Header.Hash()) if err != nil { - log.Error("failed to get L2 block with given hash", + log.Error( + "Failed to get L2 block with given hash", "hash", proof.Header.Hash(), "error", err, ) @@ -415,7 +430,7 @@ func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proo } if len(invalidBlockIDs) > 0 { - log.Warn("Detected invalid proofs", "blockIds", invalidBlockIDs) + log.Warn("Invalid proofs in batch", "blockIds", invalidBlockIDs) s.proofBuffer.ClearItems(invalidBlockIDs...) return ErrInvalidProof } @@ -435,7 +450,9 @@ func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proo metrics.ProverSentProofCounter.Add(float64(len(batchProof.BlockIDs))) metrics.ProverLatestProvenBlockIDGauge.Set(float64(latestProvenBlockID.Uint64())) - s.proofBuffer.Clear() + s.proofBuffer.ClearItems(uint64BlockIDs...) + // Each time we submit a batch proof, we should update the LastUpdatedAt() of the buffer. + s.proofBuffer.UpdateLastUpdatedAt() return nil } @@ -511,3 +528,8 @@ func (s *ProofSubmitter) Tier() uint16 { func (s *ProofSubmitter) BufferSize() uint64 { return s.proofBuffer.MaxLength } + +// AggregationEnabled returns whether the proof submitter's aggregation feature is enabled. +func (s *ProofSubmitter) AggregationEnabled() bool { + return s.proofBuffer.Enabled() +} diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go b/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go index 3a7cbde8a1..be076f01ef 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go @@ -100,6 +100,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() { false, 0*time.Second, 0, + 30*time.Minute, ) s.Nil(err) s.contester = NewProofContester( @@ -199,6 +200,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { false, time.Duration(0), 0, + 30*time.Minute, ) s.Nil(err) @@ -223,6 +225,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { false, 1*time.Hour, 0, + 30*time.Minute, ) s.Nil(err) delay, err = submitter2.getRandomBumpedSubmissionDelay(time.Now()) diff --git a/packages/taiko-client/prover/prover.go b/packages/taiko-client/prover/prover.go index 237a7f4493..a157a59bc6 100644 --- a/packages/taiko-client/prover/prover.go +++ b/packages/taiko-client/prover/prover.go @@ -279,15 +279,7 @@ func (p *Prover) eventLoop() { default: } } - // reqAggregation requests performing a aggregate operation, won't block - // if we are already aggregating. - reqAggregation := func() { - select { - // 0 means aggregating all tier proofs - case p.aggregationNotify <- 0: - default: - } - } + // Call reqProving() right away to catch up with the latest state. reqProving() @@ -297,9 +289,6 @@ func (p *Prover) eventLoop() { forceProvingTicker := time.NewTicker(15 * time.Second) defer forceProvingTicker.Stop() - forceAggregatingTicker := time.NewTicker(p.cfg.ForceProveInterval) - defer forceAggregatingTicker.Stop() - // Channels chBufferSize := p.protocolConfigs.BlockMaxProposals blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize) @@ -347,9 +336,7 @@ func (p *Prover) eventLoop() { log.Error("Prove new blocks error", "error", err) } case tier := <-p.aggregationNotify: - p.withRetry(func() error { - return p.aggregateOp(tier) - }) + p.withRetry(func() error { return p.aggregateOp(tier) }) case e := <-blockVerifiedCh: p.blockVerifiedHandler.Handle(encoding.BlockVerifiedEventToV2(e)) case e := <-transitionProvedCh: @@ -389,8 +376,6 @@ func (p *Prover) eventLoop() { reqProving() case <-forceProvingTicker.C: reqProving() - case <-forceAggregatingTicker.C: - reqAggregation() } } } @@ -422,10 +407,10 @@ func (p *Prover) aggregateOp(tier uint16) error { g, gCtx := errgroup.WithContext(p.ctx) for _, submitter := range p.proofSubmitters { g.Go(func() error { - if submitter.BufferSize() > 1 && - (tier == 0 || submitter.Tier() == tier) { + if submitter.AggregationEnabled() && submitter.Tier() == tier { if err := submitter.AggregateProofs(gCtx); err != nil { - log.Error("Failed to aggregate proofs", + log.Error( + "Failed to aggregate proofs", "error", err, "tier", submitter.Tier(), )