Skip to content

Commit

Permalink
hash comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
stevemilk committed Oct 7, 2024
1 parent f777c20 commit 6bf917a
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 19 deletions.
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

progress := float32(float64(100) * (float64(bytesCompleted) / float64(tLen)))

fmt.Println("[snapshots] progress", "file", torrentName, "progress", fmt.Sprintf("%.2f%%", progress))
if info, ok := downloading[torrentName]; ok {
if progress != info.progress {
info.time = time.Now()
Expand Down
133 changes: 120 additions & 13 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/erigontech/erigon-lib/chain"
"github.com/erigontech/erigon-lib/chain/snapcfg"
common2 "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/background"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/common/dir"
dir2 "github.com/erigontech/erigon-lib/common/dir"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/downloader/downloadercfg"
"github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -223,6 +227,10 @@ type DirtySegment struct {
refcount atomic.Int32

canDelete atomic.Bool

// true means download is finished or file is generated locally
complete atomic.Bool
indexComplete atomic.Bool
}

type VisibleSegment struct {
Expand Down Expand Up @@ -477,6 +485,8 @@ type RoSnapshots struct {

// allows for pruning segments - this is the min available segment
segmentsMin atomic.Uint64

integrityCheckCh chan *DirtySegment
}

// NewRoSnapshots - opens all snapshots. But to simplify everything:
Expand All @@ -496,7 +506,8 @@ func newRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, types []snapty
})
}

s := &RoSnapshots{dir: snapDir, cfg: cfg, segments: segs, logger: logger, types: types}
s := &RoSnapshots{dir: snapDir, cfg: cfg, segments: segs, logger: logger, types: types, integrityCheckCh: make(chan *DirtySegment, 512)}
go s.integrityCheckLoop()
s.segmentsMin.Store(segmentsMin)
s.recalcVisibleFiles()

Expand All @@ -518,6 +529,96 @@ func (s *RoSnapshots) BlocksAvailable() uint64 {

return s.idxMax.Load()
}

func (s *RoSnapshots) integrityCheckLoop() {
worker := func() {
for sn := range s.integrityCheckCh {
s.integrityCheck(sn)
}
}
worker()
}

func (s *RoSnapshots) integrityCheck(sn *DirtySegment) {
if !sn.complete.Load() {
file := filepath.Join(s.dir, sn.FileName())
complete, err := fileIntegrityCheck(file)
if err != nil {
log.Warn("[snapshots] integrity check failed", "err", err, "file", file)
return
}
if !complete {
log.Warn("[snapshots] file downloading is not finished", "file", file)
return
}
sn.complete.Store(true)
}

if !sn.indexComplete.Load() {
for _, indexFile := range sn.Type().IdxFileNames(sn.version, sn.from, sn.to) {
complete, err := fileIntegrityCheck(filepath.Join(s.dir, indexFile))
if err != nil {
log.Warn("[snapshots] index integrity check failed", "err", err, "indexFile", indexFile)
return
}
if !complete {
log.Warn("[snapshots] index file downloading is not finished", "indexFile", indexFile)
return
}
}
sn.indexComplete.Store(true)
}
}

func fileIntegrityCheck(filePath string) (bool, error) {
actualHash, err := fileHash(filePath)
if err != nil {
if err == os.ErrNotExist {
return false, nil
}
return false, err
}
torrentFile := filePath + ".torrent"
meta, err := metainfo.LoadFromFile(torrentFile)
if err != nil {
// if .torrent file doesn't exist, it means that file is generated locally
if err == os.ErrNotExist {
return true, nil
}
return false, err
}
expectHash := meta.HashInfoBytes().Bytes()
if !bytes.Equal(expectHash, actualHash) {
return false, nil
}
return true, nil
}

func fileHash(filePath string) ([]byte, error) {
s1 := time.Now()
exists, err := dir.FileExist(filePath)
if err != nil {
return nil, err
}
if !exists {
return nil, os.ErrNotExist
}
_, fName := filepath.Split(filePath)
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize, Name: fName}
if err := info.BuildFromFilePath(filePath); err != nil {
return nil, fmt.Errorf("can't get hash for %s: %w", fName, err)
}

infoBytes, err := bencode.Marshal(info)
if err != nil {
return nil, err
}
bs := bencode.Bytes(infoBytes)
hashBytes := metainfo.HashBytes(bs).Bytes()
fmt.Println("timecost:", time.Since(s1), "fileSize:", info.TotalLength()/1024/1024, "MB")
return hashBytes, nil
}

func (s *RoSnapshots) LogStat(label string) {
var m runtime.MemStats
dbg.ReadMemStats(&m)
Expand Down Expand Up @@ -791,20 +892,23 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic
sn = &DirtySegment{segType: f.Type, version: f.Version, Range: Range{f.From, f.To}, frozen: snapcfg.Seedable(s.cfg.ChainName, f)}
}

if open && sn.Decompressor == nil {
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if open {
if sn.Decompressor == nil || !sn.complete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
continue
} else {
break
}
}
if optimistic {
continue
} else {
break
return err
}
}
if optimistic {
continue
} else {
return err
}
}
}

Expand All @@ -814,9 +918,12 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic
segtype.DirtySegments.Set(sn)
}

if open && sn.indexes == nil {
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
if open {
if sn.indexes == nil || !sn.indexComplete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
}
}

Expand Down
59 changes: 54 additions & 5 deletions turbo/snapshotsync/freezeblocks/caplin_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type CaplinSnapshots struct {
segmentsMin atomic.Uint64
// chain cfg
beaconCfg *clparams.BeaconChainConfig

integrityCheckCh chan *DirtySegment
}

// NewCaplinSnapshots - opens all snapshots. But to simplify everything:
Expand All @@ -121,11 +123,54 @@ func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, beaconCfg *clparams.Beacon
BlobSidecars := &segments{
DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}),
}
c := &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, BeaconBlocks: BeaconBlocks, BlobSidecars: BlobSidecars, logger: logger, beaconCfg: beaconCfg}
c := &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, BeaconBlocks: BeaconBlocks,
BlobSidecars: BlobSidecars, logger: logger, beaconCfg: beaconCfg, integrityCheckCh: make(chan *DirtySegment, 1024)}

go c.integrityCheckLoop()
c.recalcVisibleFiles()
return c
}

func (s *CaplinSnapshots) integrityCheckLoop() {
worker := func() {
for sn := range s.integrityCheckCh {
s.integrityCheck(sn)
}
}
worker()
}

func (s *CaplinSnapshots) integrityCheck(sn *DirtySegment) {
if !sn.complete.Load() {
file := filepath.Join(s.dir, sn.FileName())
complete, err := fileIntegrityCheck(file)
if err != nil {
log.Warn("[snapshots] integrity check failed", "err", err, "file", file)
return
}
if !complete {
log.Warn("[snapshots] file downloading is not finished", "file", file)
return
}
sn.complete.Store(true)
}

if !sn.indexComplete.Load() {
for _, indexFile := range sn.Type().IdxFileNames(sn.version, sn.from, sn.to) {
complete, err := fileIntegrityCheck(filepath.Join(s.dir, indexFile))
if err != nil {
log.Warn("[snapshots] index integrity check failed", "err", err, "indexFile", indexFile)
return
}
if !complete {
log.Warn("[snapshots] index file downloading is not finished", "indexFile", indexFile)
return
}
}
sn.indexComplete.Store(true)
}
}

func (s *CaplinSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
func (s *CaplinSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }

Expand Down Expand Up @@ -228,7 +273,8 @@ Loop:
frozen: snapcfg.Seedable(s.cfg.ChainName, f),
}
}
if sn.Decompressor == nil {
if sn.Decompressor == nil || !sn.complete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
Expand All @@ -251,7 +297,8 @@ Loop:
// then make segment available even if index open may fail
s.BeaconBlocks.DirtySegments.Set(sn)
}
if sn.indexes == nil {
if sn.indexes == nil || !sn.indexComplete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +336,8 @@ Loop:
frozen: snapcfg.Seedable(s.cfg.ChainName, f),
}
}
if sn.Decompressor == nil {
if sn.Decompressor == nil || !sn.complete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenSeg(s.dir); err != nil {
if errors.Is(err, os.ErrNotExist) {
if optimistic {
Expand All @@ -312,7 +360,8 @@ Loop:
// then make segment available even if index open may fail
s.BlobSidecars.DirtySegments.Set(sn)
}
if sn.indexes == nil {
if sn.indexes == nil || !sn.indexComplete.Load() {
s.integrityCheckCh <- sn
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions turbo/snapshotsync/freezeblocks/hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package freezeblocks

import (
"testing"
)

func TestIntegrityCheck(t *testing.T) {
testFiles := []string{
// "/Users/steven/erigon-data/snapshots/mainnet/v1-006500-007000-transactions.seg",
// "/Users/steven/erigon-data/snapshots/mainnet/v1-013000-013500-headers.seg",
// "/Users/steven/erigon-data/snapshots/mainnet/v1-020410-020420-headers.seg",
}

for _, file := range testFiles {
complete, err := fileIntegrityCheck(file)
if err != nil {
t.Fatal(err)
}
if !complete {
t.Errorf("File %s is not complete", file)
} else {
t.Logf("File %s is complete", file)
}
}
}

0 comments on commit 6bf917a

Please sign in to comment.