Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pivot updator for optimism #7586

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ public void Setup()
_blockCacheService = new BlockCacheService();
_beaconSyncStrategy = Substitute.For<IBeaconSyncStrategy>();
_metadataDb = new MemDb();

PivotUpdator pivotUpdator = new(
_blockTree,
_syncModeSelector,
_syncPeerPool,
_syncConfig,
_blockCacheService,
_beaconSyncStrategy,
_metadataDb,
LimboLogs.Instance
);
}

[Test]
public void TrySetFreshPivot_SavesFinalizedHashInDb()
public void TrySetFreshPivot_saves_FinalizedHash_in_db()
{
PivotUpdator pivotUpdator = new(
_blockTree!,
_syncModeSelector!,
_syncPeerPool!,
_syncConfig!,
_blockCacheService!,
_beaconSyncStrategy!,
_metadataDb!,
LimboLogs.Instance
);

SyncModeChangedEventArgs args = new(SyncMode.FastSync, SyncMode.UpdatingPivot);
Hash256 expectedFinalizedHash = _externalPeerBlockTree!.HeadHash;
long expectedPivotBlockNumber = _externalPeerBlockTree!.Head!.Number;
Expand All @@ -91,5 +91,35 @@ public void TrySetFreshPivot_SavesFinalizedHashInDb()
storedFinalizedHash.Should().Be(expectedFinalizedHash);
expectedPivotBlockNumber.Should().Be(storedPivotBlockNumber);
}

// [Test]
// public void TrySetFreshPivot_for_optimism_saves_HeadBlockHash_in_db()
// {
// UnsafePivotUpdator unsafePivotUpdator = new(
// _blockTree!,
// _syncModeSelector!,
// _syncPeerPool!,
// _syncConfig!,
// _blockCacheService!,
// _beaconSyncStrategy!,
// _metadataDb!,
// LimboLogs.Instance
// );
//
// SyncModeChangedEventArgs args = new(SyncMode.FastSync, SyncMode.UpdatingPivot);
// Hash256 expectedHeadBlockHash = _externalPeerBlockTree!.HeadHash;
// long expectedPivotBlockNumber = _externalPeerBlockTree!.Head!.Number;
// _beaconSyncStrategy!.GetHeadBlockHash().Returns(expectedHeadBlockHash);
//
// _syncModeSelector!.Changed += Raise.EventWith(args);
//
// byte[] storedData = _metadataDb!.Get(MetadataDbKeys.UpdatedPivotData)!;
// RlpStream pivotStream = new(storedData!);
// long storedPivotBlockNumber = pivotStream.DecodeLong();
// Hash256 storedHeadBlockHash = pivotStream.DecodeKeccak()!;
//
// storedHeadBlockHash.Should().Be(expectedHeadBlockHash);
// expectedPivotBlockNumber.Should().Be(storedPivotBlockNumber);
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public class BlockCacheService : IBlockCacheService
{
public ConcurrentDictionary<Hash256AsKey, Block> BlockCache { get; } = new();
public Hash256? FinalizedHash { get; set; }
public Hash256? HeadBlockHash { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public async Task<ResultWrapper<ForkchoiceUpdatedV1Result>> Handle(ForkchoiceSta
{
_peerRefresher.RefreshPeers(newHeadBlock!.Hash!, newHeadBlock.ParentHash!, forkchoiceState.FinalizedBlockHash);
_blockCacheService.FinalizedHash = forkchoiceState.FinalizedBlockHash;
_blockCacheService.HeadBlockHash = forkchoiceState.HeadBlockHash;
_mergeSyncController.StopBeaconModeControl();

// Debug as already output in Received ForkChoice
Expand Down Expand Up @@ -326,6 +327,7 @@ private void StartNewBeaconHeaderSync(ForkchoiceStateV1 forkchoiceState, BlockHe
_beaconPivot.ProcessDestination = blockHeader;
_peerRefresher.RefreshPeers(blockHeader.Hash!, blockHeader.ParentHash!, forkchoiceState.FinalizedBlockHash);
_blockCacheService.FinalizedHash = forkchoiceState.FinalizedBlockHash;
_blockCacheService.HeadBlockHash = forkchoiceState.HeadBlockHash;

if (_logger.IsInfo) _logger.Info($"Start a new sync process, Request: {requestStr}.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public interface IBlockCacheService
{
public ConcurrentDictionary<Hash256AsKey, Block> BlockCache { get; }
Hash256? FinalizedHash { get; set; }
Hash256? HeadBlockHash { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ lowestInsertedBeaconHeader is not null &&
{
return _blockCacheService.FinalizedHash;
}

public Hash256? GetHeadBlockHash()
{
return _blockCacheService.HeadBlockHash;
}
}

public interface IMergeSyncController
Expand Down
111 changes: 62 additions & 49 deletions src/Nethermind/Nethermind.Merge.Plugin/Synchronization/PivotUpdator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public class PivotUpdator
{
private readonly IBlockTree _blockTree;
private readonly ISyncModeSelector _syncModeSelector;
private readonly ISyncPeerPool _syncPeerPool;
protected readonly ISyncPeerPool _syncPeerPool;
private readonly ISyncConfig _syncConfig;
private readonly IBlockCacheService _blockCacheService;
private readonly IBeaconSyncStrategy _beaconSyncStrategy;
protected readonly IBeaconSyncStrategy _beaconSyncStrategy;
private readonly IDb _metadataDb;
private readonly ILogger _logger;
protected readonly ILogger _logger;

private readonly CancellationTokenSource _cancellation = new();

Expand Down Expand Up @@ -125,74 +125,69 @@ private async void OnSyncModeChanged(object? sender, SyncModeChangedEventArgs sy

private async Task<bool> TrySetFreshPivot(CancellationToken cancellationToken)
{
Hash256? finalizedBlockHash = TryGetFinalizedBlockHashFromCl();
Hash256? potentialPivotBlockHash = await TryGetPotentialPivotBlockHash(cancellationToken);

if (finalizedBlockHash is null || finalizedBlockHash == Keccak.Zero)
if (potentialPivotBlockHash is null || potentialPivotBlockHash == Keccak.Zero)
{
return false;
}

long? finalizedBlockNumber = TryGetFinalizedBlockNumberFromBlockCache(finalizedBlockHash);
finalizedBlockNumber ??= TryGetFinalizedBlockNumberFromBlockTree(finalizedBlockHash);
finalizedBlockNumber ??= await TryGetFinalizedBlockNumberFromPeers(finalizedBlockHash, cancellationToken);
long? potentialPivotBlockNumber = TryGetPotentialPivotBlockNumberFromBlockCache(potentialPivotBlockHash);
potentialPivotBlockNumber ??= TryGetPotentialPivotBlockNumberFromBlockTree(potentialPivotBlockHash);
potentialPivotBlockNumber ??= await TryGetPotentialPivotBlockNumberFromPeers(potentialPivotBlockHash, cancellationToken);

return finalizedBlockNumber is not null && TryOverwritePivot(finalizedBlockHash, (long)finalizedBlockNumber);
return potentialPivotBlockNumber is not null && TryOverwritePivot(potentialPivotBlockHash, (long)potentialPivotBlockNumber);
}

private Hash256? TryGetFinalizedBlockHashFromCl()
protected virtual Task<Hash256?> TryGetPotentialPivotBlockHash(CancellationToken cancellationToken)
{
// getting finalized block hash as it is safe, because can't be reorganized
Hash256? finalizedBlockHash = _beaconSyncStrategy.GetFinalizedHash();

if (finalizedBlockHash is null || finalizedBlockHash == Keccak.Zero)
{
if (_logger.IsInfo && (_maxAttempts - _attemptsLeft) % 10 == 0) _logger.Info($"Waiting for Forkchoice message from Consensus Layer to set fresh pivot block [{_maxAttempts - _attemptsLeft}s]");

return null;
PrintWaitingForMessageFromCl();
return Task.FromResult<Hash256?>(null);
}

if (_alreadyAnnouncedNewPivotHash != finalizedBlockHash)
{
if (_logger.IsInfo) _logger.Info($"Potential new pivot block hash: {finalizedBlockHash}");
_alreadyAnnouncedNewPivotHash = finalizedBlockHash;
}

return finalizedBlockHash;
UpdateAndPrintPotentialNewPivot(finalizedBlockHash);
return Task.FromResult<Hash256?>(finalizedBlockHash);
}

private long? TryGetFinalizedBlockNumberFromBlockCache(Hash256 finalizedBlockHash)
private long? TryGetPotentialPivotBlockNumberFromBlockCache(Hash256 potentialPivotBlockHash)
{
if (_logger.IsDebug) _logger.Debug("Looking for pivot block in block cache");
if (_blockCacheService.BlockCache.TryGetValue(finalizedBlockHash, out Block? finalizedBlock))
if (_blockCacheService.BlockCache.TryGetValue(potentialPivotBlockHash, out Block? potentialPivotBlock))
{
if (HeaderValidator.ValidateHash(finalizedBlock.Header))
if (HeaderValidator.ValidateHash(potentialPivotBlock.Header))
{
if (_logger.IsDebug) _logger.Debug("Found pivot block in block cache");
return finalizedBlock.Header.Number;
return potentialPivotBlock.Header.Number;
}
if (_logger.IsDebug) _logger.Debug($"Hash of header found in block cache is {finalizedBlock.Header.Hash} when expecting {finalizedBlockHash}");
if (_logger.IsDebug) _logger.Debug($"Hash of header found in block cache is {potentialPivotBlock.Header.Hash} when expecting {potentialPivotBlockHash}");
}

return null;
}

private long? TryGetFinalizedBlockNumberFromBlockTree(Hash256 finalizedBlockHash)
private long? TryGetPotentialPivotBlockNumberFromBlockTree(Hash256 potentialPivotBlockHash)
{
if (_logger.IsDebug) _logger.Debug("Looking for header of pivot block in blockTree");
BlockHeader? finalizedHeader = _blockTree.FindHeader(finalizedBlockHash, BlockTreeLookupOptions.DoNotCreateLevelIfMissing);
if (finalizedHeader is not null)
BlockHeader? potentialPivotBlock = _blockTree.FindHeader(potentialPivotBlockHash, BlockTreeLookupOptions.DoNotCreateLevelIfMissing);
if (potentialPivotBlock is not null)
{
if (HeaderValidator.ValidateHash(finalizedHeader))
if (HeaderValidator.ValidateHash(potentialPivotBlock))
{
if (_logger.IsDebug) _logger.Debug("Found header of pivot block in block tree");
return finalizedHeader.Number;
return potentialPivotBlock.Number;
}
if (_logger.IsDebug) _logger.Debug($"Hash of header found in block tree is {finalizedHeader.Hash} when expecting {finalizedBlockHash}");
if (_logger.IsDebug) _logger.Debug($"Hash of header found in block tree is {potentialPivotBlock.Hash} when expecting {potentialPivotBlockHash}");
}

return null;
}

private async Task<long?> TryGetFinalizedBlockNumberFromPeers(Hash256 finalizedBlockHash, CancellationToken cancellationToken)
private async Task<long?> TryGetPotentialPivotBlockNumberFromPeers(Hash256 potentialPivotBlockHash, CancellationToken cancellationToken)
{
foreach (PeerInfo peer in _syncPeerPool.InitializedPeers)
{
Expand All @@ -202,50 +197,50 @@ private async Task<bool> TrySetFreshPivot(CancellationToken cancellationToken)
}
try
{
if (_logger.IsInfo) _logger.Info($"Asking peer {peer.SyncPeer.Node.ClientId} for header of pivot block {finalizedBlockHash}");
BlockHeader? finalizedHeader = await peer.SyncPeer.GetHeadBlockHeader(finalizedBlockHash, cancellationToken);
if (finalizedHeader is not null)
if (_logger.IsInfo) _logger.Info($"Asking peer {peer.SyncPeer.Node.ClientId} for header of pivot block {potentialPivotBlockHash}");
BlockHeader? potentialPivotBlock = await peer.SyncPeer.GetHeadBlockHeader(potentialPivotBlockHash, cancellationToken);
if (potentialPivotBlock is not null)
{
if (HeaderValidator.ValidateHash(finalizedHeader))
if (HeaderValidator.ValidateHash(potentialPivotBlock))
{
if (_logger.IsInfo) _logger.Info($"Received header of pivot block from peer {peer.SyncPeer.Node.ClientId}");
return finalizedHeader.Number;
return potentialPivotBlock.Number;
}
if (_logger.IsInfo) _logger.Info($"Hash of header received from peer {peer.SyncPeer.Node.ClientId} is {finalizedHeader.Hash} when expecting {finalizedBlockHash}");
if (_logger.IsInfo) _logger.Info($"Hash of header received from peer {peer.SyncPeer.Node.ClientId} is {potentialPivotBlock.Hash} when expecting {potentialPivotBlockHash}");
}
}
catch (Exception exception) when (exception is TimeoutException or OperationCanceledException)
{
if (_logger.IsInfo) _logger.Info($"Peer {peer.SyncPeer.Node.ClientId} didn't respond to request for header of pivot block {finalizedBlockHash}");
if (_logger.IsInfo) _logger.Info($"Peer {peer.SyncPeer.Node.ClientId} didn't respond to request for header of pivot block {potentialPivotBlockHash}");
if (_logger.IsDebug) _logger.Debug($"Exception in GetHeadBlockHeader request to peer {peer.SyncPeer.Node.ClientId}. {exception}");
}
}

if (_logger.IsInfo && (_maxAttempts - _attemptsLeft) % 10 == 0) _logger.Info($"Potential new pivot block hash: {finalizedBlockHash}. Waiting for pivot block header [{_maxAttempts - _attemptsLeft}s]");
PrintPotentialNewPivotAndWaiting(potentialPivotBlockHash.ToString());
return null;
}

private bool TryOverwritePivot(Hash256 finalizedBlockHash, long finalizedBlockNumber)
private bool TryOverwritePivot(Hash256 potentialPivotBlockHash, long potentialPivotBlockNumber)
{
long targetBlock = _beaconSyncStrategy.GetTargetBlockHeight() ?? 0;
bool isCloseToHead = targetBlock <= finalizedBlockNumber || (targetBlock - finalizedBlockNumber) < Constants.MaxDistanceFromHead;
bool newPivotHigherThanOld = finalizedBlockNumber > _syncConfig.PivotNumberParsed;
bool isCloseToHead = targetBlock <= potentialPivotBlockNumber || (targetBlock - potentialPivotBlockNumber) < Constants.MaxDistanceFromHead;
bool newPivotHigherThanOld = potentialPivotBlockNumber > _syncConfig.PivotNumberParsed;

if (isCloseToHead && newPivotHigherThanOld)
{
UpdateConfigValues(finalizedBlockHash, finalizedBlockNumber);
UpdateConfigValues(potentialPivotBlockHash, potentialPivotBlockNumber);

RlpStream pivotData = new(38); //1 byte (prefix) + 4 bytes (long) + 1 byte (prefix) + 32 bytes (Keccak)
pivotData.Encode(finalizedBlockNumber);
pivotData.Encode(finalizedBlockHash);
pivotData.Encode(potentialPivotBlockNumber);
pivotData.Encode(potentialPivotBlockHash);
_metadataDb.Set(MetadataDbKeys.UpdatedPivotData, pivotData.Data.ToArray()!);

if (_logger.IsInfo) _logger.Info($"New pivot block has been set based on ForkChoiceUpdate from CL. Pivot block number: {finalizedBlockNumber}, hash: {finalizedBlockHash}");
if (_logger.IsInfo) _logger.Info($"New pivot block has been set based on ForkChoiceUpdate from CL. Pivot block number: {potentialPivotBlockNumber}, hash: {potentialPivotBlockHash}");
return true;
}

if (!isCloseToHead && _logger.IsInfo) _logger.Info($"Pivot block from Consensus Layer too far from head. PivotBlockNumber: {finalizedBlockNumber}, TargetBlockNumber: {targetBlock}, difference: {targetBlock - finalizedBlockNumber} blocks. Max difference allowed: {Constants.MaxDistanceFromHead}");
if (!newPivotHigherThanOld && _logger.IsInfo) _logger.Info($"Pivot block from Consensus Layer isn't higher than pivot from initial config. New PivotBlockNumber: {finalizedBlockNumber}, old: {_syncConfig.PivotNumber}");
if (!isCloseToHead && _logger.IsInfo) _logger.Info($"Pivot block from Consensus Layer too far from head. PivotBlockNumber: {potentialPivotBlockNumber}, TargetBlockNumber: {targetBlock}, difference: {targetBlock - potentialPivotBlockNumber} blocks. Max difference allowed: {Constants.MaxDistanceFromHead}");
if (!newPivotHigherThanOld && _logger.IsInfo) _logger.Info($"Pivot block from Consensus Layer isn't higher than pivot from initial config. New PivotBlockNumber: {potentialPivotBlockNumber}, old: {_syncConfig.PivotNumber}");
return false;
}

Expand All @@ -256,4 +251,22 @@ private void UpdateConfigValues(Hash256 finalizedBlockHash, long finalizedBlockN
_syncConfig.MaxAttemptsToUpdatePivot = 0;
}

protected void PrintWaitingForMessageFromCl()
{
if (_logger.IsInfo && (_maxAttempts - _attemptsLeft) % 10 == 0) _logger.Info($"Waiting for Forkchoice message from Consensus Layer to set fresh pivot block [{_maxAttempts - _attemptsLeft}s]");
}

protected void PrintPotentialNewPivotAndWaiting(string potentialPivotBlockHash)
{
if (_logger.IsInfo && (_maxAttempts - _attemptsLeft) % 10 == 0) _logger.Info($"Potential new pivot block: {potentialPivotBlockHash}. Waiting for pivot block header [{_maxAttempts - _attemptsLeft}s]");
}

protected void UpdateAndPrintPotentialNewPivot(Hash256 finalizedBlockHash)
{
if (_alreadyAnnouncedNewPivotHash != finalizedBlockHash)
{
if (_logger.IsInfo) _logger.Info($"Potential new pivot block hash: {finalizedBlockHash}");
_alreadyAnnouncedNewPivotHash = finalizedBlockHash;
}
}
}
Loading
Loading