Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stage_bor_heimdall commits partial progress #12097

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 53 additions & 54 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,75 +765,74 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log

heimdallClient := engine.(*bor.Bor).HeimdallClient

return db.Update(ctx, func(tx kv.RwTx) error {
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx); err != nil {
return err
}
return nil
var tx kv.RwTx
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx, db); err != nil {
return err
}
if unwind > 0 {
sn, borSn, agg, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()

stageState := stage(sync, tx, nil, stages.BorHeimdall)

snapshotsMaxBlock := borSn.BlocksAvailable()
if unwind <= snapshotsMaxBlock {
return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock)
}

if unwind > stageState.BlockNumber {
return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber)
}
return nil
}
if unwind > 0 {
sn, borSn, agg, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}
stageState := stage(sync, tx, db, stages.BorHeimdall)

stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}
snapshotsMaxBlock := borSn.BlocksAvailable()
if unwind <= snapshotsMaxBlock {
return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
if unwind > stageState.BlockNumber {
return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber)
}

sn, borSn, agg, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
blockReader, _ := blocksIO(db, logger)
var (
snapDb kv.RwDB
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot]
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address]
)
if bor, ok := engine.(*bor.Bor); ok {
snapDb = bor.DB
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}

stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall)
stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
})
}

sn, borSn, agg, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
blockReader, _ := blocksIO(db, logger)
var (
snapDb kv.RwDB
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot]
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address]
)
if bor, ok := engine.(*bor.Bor); ok {
snapDb = bor.DB
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, db, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
return err
}

stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
}

func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
Expand Down
19 changes: 17 additions & 2 deletions core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.Full

return nil
}
func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error {
func ResetBorHeimdall(ctx context.Context, tx kv.RwTx, db kv.RwDB) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err := tx.ClearBucket(kv.BorEventNums); err != nil {
return err
}
Expand All @@ -109,7 +118,13 @@ func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error {
if err := tx.ClearBucket(kv.BorSpans); err != nil {
return err
}
return clearStageProgress(tx, stages.BorHeimdall)
if err := clearStageProgress(tx, stages.BorHeimdall); err != nil {
return err
}
if !useExternalTx {
return tx.Commit()
}
return nil
}

func ResetPolygonSync(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, logger log.Logger) error {
Expand Down
11 changes: 7 additions & 4 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64
workersCount = 1
}

prevStageProgress, err := senderStageProgress(txc.Tx, cfg.db)
prevStageProgress, err := stageProgress(txc.Tx, cfg.db, stages.Senders)
if err != nil {
return err
}
Expand Down Expand Up @@ -226,15 +226,15 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex
return nil
}

func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err error) {
func stageProgress(tx kv.Tx, db kv.RoDB, stage stages.SyncStage) (prevStageProgress uint64, err error) {
if tx != nil {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
prevStageProgress, err = stages.GetStageProgress(tx, stage)
if err != nil {
return prevStageProgress, err
}
} else {
if err = db.View(context.Background(), func(tx kv.Tx) error {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
prevStageProgress, err = stages.GetStageProgress(tx, stage)
if err != nil {
return err
}
Expand All @@ -245,6 +245,9 @@ func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err er
}
return prevStageProgress, nil
}
func BorHeimdallStageProgress(tx kv.Tx, cfg BorHeimdallCfg) (prevStageProgress uint64, err error) {
return stageProgress(tx, cfg.db, stages.BorHeimdall)
}

// ================ Erigon3 End ================

Expand Down
2 changes: 1 addition & 1 deletion migrations/clear_bor_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var ClearBorTables = Migration{
return err
}

if err := reset2.ResetBorHeimdall(context.Background(), tx); err != nil {
if err := reset2.ResetBorHeimdall(context.Background(), tx, db); err != nil {
return err
}

Expand Down
Loading