Skip to content

Commit

Permalink
move bridge address validation to SK persistance
Browse files Browse the repository at this point in the history
  • Loading branch information
perekopskiy committed Oct 7, 2024
1 parent aff2e98 commit cd015b6
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 50 deletions.
8 changes: 6 additions & 2 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ impl StateKeeperRunner {
self.pool.0.clone(),
Some(ethabi::Address::repeat_byte(11)),
5,
);
)
.await
.unwrap();

let io = ExternalIO::new(
self.pool.0.clone(),
Expand Down Expand Up @@ -678,7 +680,9 @@ impl StateKeeperRunner {
self.pool.0.clone(),
Some(ethabi::Address::repeat_byte(11)),
5,
);
)
.await
.unwrap();
let tree_writes_persistence = TreeWritesPersistence::new(self.pool.0.clone());

let io = ExternalIO::new(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use anyhow::Context as _;
use zksync_dal::{Core, CoreDal};
use zksync_db_connection::connection_pool::ConnectionPool;
use zksync_node_framework_derive::FromContext;
use zksync_state_keeper::{
io::seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, L2BlockSealerTask, OutputHandler,
StateKeeperPersistence, TreeWritesPersistence,
};
use zksync_types::{Address, ProtocolVersionId};
use zksync_types::Address;

use crate::{
implementations::resources::{
Expand Down Expand Up @@ -89,38 +87,6 @@ impl OutputHandlerLayer {
self.protective_reads_persistence_enabled = protective_reads_persistence_enabled;
self
}

async fn validate_l2_legacy_shared_bridge_addr(
&self,
pool: &ConnectionPool<Core>,
) -> Result<(), WiringError> {
let mut connection = pool.connection().await.context("Get DB connection")?;

if let Some(l2_block) = connection
.blocks_dal()
.get_earliest_l2_block_number()
.await
.context("failed to load earliest l2 block number")?
{
let header = connection
.blocks_dal()
.get_l2_block_header(l2_block)
.await
.context("failed to load L2 block header")?
.context("missing L2 block header")?;
let protocol_version = header
.protocol_version
.unwrap_or_else(ProtocolVersionId::last_potentially_undefined);

if protocol_version.is_pre_gateway() && self.l2_legacy_shared_bridge_addr.is_none() {
return Err(WiringError::Configuration(
"Missing `l2_legacy_shared_bridge_addr` for chain that was initialized before gateway upgrade".to_string()
));
}
}

Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -140,14 +106,13 @@ impl WiringLayer for OutputHandlerLayer {
.get_custom(L2BlockSealProcess::subtasks_len())
.await
.context("Get master pool")?;
self.validate_l2_legacy_shared_bridge_addr(&persistence_pool)
.await?;

let (mut persistence, l2_block_sealer) = StateKeeperPersistence::new(
persistence_pool.clone(),
self.l2_legacy_shared_bridge_addr,
self.l2_block_seal_queue_capacity,
);
)
.await?;
if self.pre_insert_txs {
persistence = persistence.with_tx_insertion();
}
Expand Down
4 changes: 3 additions & 1 deletion core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ impl StateKeeperHandles {

let sync_state = SyncState::default();
let (persistence, l2_block_sealer) =
StateKeeperPersistence::new(pool.clone(), Some(Address::repeat_byte(1)), 5);
StateKeeperPersistence::new(pool.clone(), Some(Address::repeat_byte(1)), 5)
.await
.unwrap();
let tree_writes_persistence = TreeWritesPersistence::new(pool.clone());
let output_handler = OutputHandler::new(Box::new(persistence.with_tx_insertion()))
.with_handler(Box::new(tree_writes_persistence))
Expand Down
58 changes: 50 additions & 8 deletions core/node/state_keeper/src/io/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_shared_metrics::{BlockStage, APP_METRICS};
use zksync_types::{writes::TreeWrite, Address};
use zksync_types::{writes::TreeWrite, Address, ProtocolVersionId};
use zksync_utils::u256_to_h256;

use crate::{
Expand Down Expand Up @@ -41,13 +41,47 @@ pub struct StateKeeperPersistence {
impl StateKeeperPersistence {
const SHUTDOWN_MSG: &'static str = "L2 block sealer unexpectedly shut down";

async fn validate_l2_legacy_shared_bridge_addr(
pool: &ConnectionPool<Core>,
l2_legacy_shared_bridge_addr: Option<Address>,
) -> anyhow::Result<()> {
let mut connection = pool.connection().await.context("Get DB connection")?;

if let Some(l2_block) = connection
.blocks_dal()
.get_earliest_l2_block_number()
.await
.context("failed to load earliest l2 block number")?
{
let header = connection
.blocks_dal()
.get_l2_block_header(l2_block)
.await
.context("failed to load L2 block header")?
.context("missing L2 block header")?;
let protocol_version = header
.protocol_version
.unwrap_or_else(ProtocolVersionId::last_potentially_undefined);

if protocol_version.is_pre_gateway() && l2_legacy_shared_bridge_addr.is_none() {
anyhow::bail!(
"Missing `l2_legacy_shared_bridge_addr` for chain that was initialized before gateway upgrade".to_string()
);
}
}

Ok(())
}

/// Creates a sealer that will use the provided Postgres connection and will have the specified
/// `command_capacity` for unprocessed sealing commands.
pub fn new(
pub async fn new(
pool: ConnectionPool<Core>,
l2_legacy_shared_bridge_addr: Option<Address>,
mut command_capacity: usize,
) -> (Self, L2BlockSealerTask) {
) -> anyhow::Result<(Self, L2BlockSealerTask)> {
Self::validate_l2_legacy_shared_bridge_addr(&pool, l2_legacy_shared_bridge_addr).await?;

let is_sync = command_capacity == 0;
command_capacity = command_capacity.max(1);

Expand All @@ -67,7 +101,7 @@ impl StateKeeperPersistence {
latest_completion_receiver: None,
is_sync,
};
(this, sealer)
Ok((this, sealer))
}

pub fn with_tx_insertion(mut self) -> Self {
Expand Down Expand Up @@ -396,7 +430,9 @@ mod tests {
pool.clone(),
Some(Address::default()),
l2_block_sealer_capacity,
);
)
.await
.unwrap();
let mut output_handler = OutputHandler::new(Box::new(persistence))
.with_handler(Box::new(TreeWritesPersistence::new(pool.clone())));
tokio::spawn(l2_block_sealer.run());
Expand Down Expand Up @@ -530,7 +566,9 @@ mod tests {
drop(storage);

let (mut persistence, l2_block_sealer) =
StateKeeperPersistence::new(pool.clone(), Some(Address::default()), 1);
StateKeeperPersistence::new(pool.clone(), Some(Address::default()), 1)
.await
.unwrap();
persistence = persistence.with_tx_insertion().without_protective_reads();
let mut output_handler = OutputHandler::new(Box::new(persistence));
tokio::spawn(l2_block_sealer.run());
Expand Down Expand Up @@ -569,7 +607,9 @@ mod tests {
async fn l2_block_sealer_handle_blocking() {
let pool = ConnectionPool::constrained_test_pool(1).await;
let (mut persistence, mut sealer) =
StateKeeperPersistence::new(pool, Some(Address::default()), 1);
StateKeeperPersistence::new(pool, Some(Address::default()), 1)
.await
.unwrap();

// The first command should be successfully submitted immediately.
let mut updates_manager = create_updates_manager();
Expand Down Expand Up @@ -620,7 +660,9 @@ mod tests {
async fn l2_block_sealer_handle_parallel_processing() {
let pool = ConnectionPool::constrained_test_pool(1).await;
let (mut persistence, mut sealer) =
StateKeeperPersistence::new(pool, Some(Address::default()), 5);
StateKeeperPersistence::new(pool, Some(Address::default()), 5)
.await
.unwrap();

// 5 L2 block sealing commands can be submitted without blocking.
let mut updates_manager = create_updates_manager();
Expand Down
4 changes: 3 additions & 1 deletion core/node/state_keeper/src/io/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,9 @@ async fn l2_block_processing_after_snapshot_recovery(commitment_mode: L1BatchCom
);

let (mut persistence, l2_block_sealer) =
StateKeeperPersistence::new(connection_pool.clone(), Some(Address::default()), 0);
StateKeeperPersistence::new(connection_pool.clone(), Some(Address::default()), 0)
.await
.unwrap();
tokio::spawn(l2_block_sealer.run());
persistence.handle_l2_block(&updates).await.unwrap();

Expand Down

0 comments on commit cd015b6

Please sign in to comment.