Skip to content

Commit

Permalink
feat(platform)!: withdrawal automatic retries after core rejection (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
QuantumExplorer authored Sep 30, 2024
1 parent c8317da commit 404d6d7
Show file tree
Hide file tree
Showing 78 changed files with 1,585 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ where
Error::Execution(ExecutionError::UpdateValidatorProposedAppVersionError(e))
})?; // This is a system error

// Rebroadcast expired withdrawals if they exist
self.rebroadcast_expired_withdrawal_documents(
&block_info,
&last_committed_platform_state,
transaction,
platform_version,
)?;

// Mark all previously broadcasted and chainlocked withdrawals as complete
// only when we are on a new core height
if block_state_info.core_chain_locked_height() != last_block_core_height {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::platform_types::platform::Platform;
use dpp::version::PlatformVersion;
use dpp::version::ProtocolVersion;
use drive::drive::identity::withdrawals::paths::{
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
get_withdrawal_root_path, WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
WITHDRAWAL_TRANSACTIONS_SUM_AMOUNT_TREE_KEY,
};
use drive::grovedb::{Element, Transaction};

Expand Down Expand Up @@ -68,6 +69,14 @@ impl<C> Platform<C> {
None,
&platform_version.drive,
)?;
self.drive.grove_insert_if_not_exists(
(&path).into(),
&WITHDRAWAL_TRANSACTIONS_BROADCASTED_KEY,
Element::empty_tree(),
Some(transaction),
None,
&platform_version.drive,
)?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub(in crate::execution) mod cleanup_expired_locks_of_withdrawal_amounts;
pub(in crate::execution) mod dequeue_and_build_unsigned_withdrawal_transactions;
pub(in crate::execution) mod fetch_transactions_block_inclusion_status;
pub(in crate::execution) mod pool_withdrawals_into_transactions_queue;
pub(in crate::execution) mod rebroadcast_expired_withdrawal_documents;
pub(in crate::execution) mod update_broadcasted_withdrawal_statuses;
Original file line number Diff line number Diff line change
Expand Up @@ -44,125 +44,8 @@ where
);
return Ok(());
}
let documents = self.drive.fetch_oldest_withdrawal_documents_by_status(
withdrawals_contract::WithdrawalStatus::QUEUED.into(),
platform_version
.system_limits
.withdrawal_transactions_per_block_limit,
transaction,
platform_version,
)?;

if documents.is_empty() {
return Ok(());
}

// Only take documents up to the withdrawal amount
let current_withdrawal_limit = self
.drive
.calculate_current_withdrawal_limit(transaction, platform_version)?;

// Only process documents up to the current withdrawal limit.
let mut total_withdrawal_amount = 0u64;

// Iterate over the documents and accumulate their withdrawal amounts.
let mut documents_to_process = vec![];
for document in documents {
// Get the withdrawal amount from the document properties.
let amount: u64 = document
.properties()
.get_integer(withdrawal::properties::AMOUNT)?;

// Check if adding this amount would exceed the current withdrawal limit.
let potential_total_withdrawal_amount =
total_withdrawal_amount.checked_add(amount).ok_or_else(|| {
Error::Execution(ExecutionError::Overflow(
"overflow in total withdrawal amount",
))
})?;

if potential_total_withdrawal_amount > current_withdrawal_limit {
// If adding this withdrawal would exceed the limit, stop processing further.
break;
}

total_withdrawal_amount = potential_total_withdrawal_amount;

// Add this document to the list of documents to be processed.
documents_to_process.push(document);
}

if documents_to_process.is_empty() {
return Ok(());
}

let start_transaction_index = self
.drive
.fetch_next_withdrawal_transaction_index(transaction, platform_version)?;

let (withdrawal_transactions, total_amount) = self
.build_untied_withdrawal_transactions_from_documents(
&mut documents_to_process,
start_transaction_index,
block_info,
platform_version,
)?;

let withdrawal_transactions_count = withdrawal_transactions.len();

let mut drive_operations = vec![];

self.drive
.add_enqueue_untied_withdrawal_transaction_operations(
withdrawal_transactions,
total_amount,
&mut drive_operations,
platform_version,
)?;

let end_transaction_index = start_transaction_index + withdrawal_transactions_count as u64;

self.drive
.add_update_next_withdrawal_transaction_index_operation(
end_transaction_index,
&mut drive_operations,
platform_version,
)?;

tracing::debug!(
"Pooled {} withdrawal documents into {} transactions with indices from {} to {}",
documents_to_process.len(),
withdrawal_transactions_count,
start_transaction_index,
end_transaction_index,
);

let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();

self.drive.add_update_multiple_documents_operations(
&documents_to_process,
&withdrawals_contract,
withdrawals_contract
.document_type_for_name(withdrawal::NAME)
.map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"Can't fetch withdrawal data contract",
))
})?,
&mut drive_operations,
&platform_version.drive,
)?;

self.drive.apply_drive_operations(
drive_operations,
true,
block_info,
transaction,
platform_version,
None,
)?;

Ok(())
// Just use the v1 as to not duplicate code
self.pool_withdrawals_into_transactions_queue_v1(block_info, transaction, platform_version)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::platform_types::platform::Platform;

use crate::rpc::core::CoreRPCLike;
use dpp::block::block_info::BlockInfo;

use crate::platform_types::platform_state::PlatformState;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;

mod v0;
mod v1;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
/// Rebroadcasts expired withdrawal documents if any exist.
///
/// This function attempts to rebroadcast expired withdrawal documents by checking if there are
/// any documents with the status `EXPIRED`. It updates the status of such documents to
/// `BROADCASTED`, increments their revision, and reschedules them for broadcasting.
///
/// # Parameters
/// - `block_info`: Information about the current block (e.g., timestamp).
/// - `transaction`: The transaction within which the rebroadcast should be executed.
/// - `platform_version`: The version of the platform, used to determine the correct method implementation.
///
/// # Returns
/// - `Ok(())` if the rebroadcast process succeeds without issues.
/// - `Err(ExecutionError::UnknownVersionMismatch)` if the platform version is unsupported.
pub fn rebroadcast_expired_withdrawal_documents(
&self,
block_info: &BlockInfo,
last_committed_platform_state: &PlatformState,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
match platform_version
.drive_abci
.methods
.withdrawals
.rebroadcast_expired_withdrawal_documents
{
0 => self.rebroadcast_expired_withdrawal_documents_v0(
block_info,
last_committed_platform_state,
transaction,
platform_version,
),
1 => self.rebroadcast_expired_withdrawal_documents_v1(
block_info,
transaction,
platform_version,
),
version => Err(Error::Execution(ExecutionError::UnknownVersionMismatch {
method: "rebroadcast_expired_withdrawal_documents".to_string(),
known_versions: vec![0, 1],
received: version,
})),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::error::Error;
use crate::platform_types::platform_state::v0::PlatformStateV0Methods;
use crate::platform_types::platform_state::PlatformState;
use crate::{platform_types::platform::Platform, rpc::core::CoreRPCLike};
use dpp::block::block_info::BlockInfo;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
pub(super) fn rebroadcast_expired_withdrawal_documents_v0(
&self,
block_info: &BlockInfo,
last_committed_platform_state: &PlatformState,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
// Currently Core only supports using the first 2 quorums (out of 24 for mainnet).
// For us, we just use the latest quorum to be extra safe.
let Some(position_of_current_quorum) =
last_committed_platform_state.current_validator_set_position_in_list_by_most_recent()
else {
tracing::warn!("Current quorum not in current validator set, not making withdrawals");
return Ok(());
};
if position_of_current_quorum != 0 {
tracing::debug!(
"Current quorum is not most recent, it is in position {}, not making withdrawals",
position_of_current_quorum
);
return Ok(());
}
// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
// Hence we can just use the v1 here after the extra logic of v0
self.rebroadcast_expired_withdrawal_documents_v1(block_info, transaction, platform_version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use dpp::block::block_info::BlockInfo;
use dpp::data_contract::accessors::v0::DataContractV0Getters;
use dpp::data_contracts::withdrawals_contract::WithdrawalStatus;
use dpp::document::document_methods::DocumentMethodsV0;
use dpp::document::{DocumentV0Getters, DocumentV0Setters};
use dpp::platform_value::btreemap_extensions::BTreeValueMapHelper;

use dpp::system_data_contracts::withdrawals_contract::v1::document_types::withdrawal;
use dpp::version::PlatformVersion;
use std::collections::BTreeSet;

use crate::{
error::{execution::ExecutionError, Error},
platform_types::platform::Platform,
rpc::core::CoreRPCLike,
};
use dpp::withdrawal::WithdrawalTransactionIndex;
use drive::grovedb::Transaction;
use drive::util::batch::DriveOperation;

impl<C> Platform<C>
where
C: CoreRPCLike,
{
/// Version 1 changes on Version 0, by not having the Core 2 Quorum limit.
/// We should switch to Version 1 once Core has fixed the issue
pub(super) fn rebroadcast_expired_withdrawal_documents_v1(
&self,
block_info: &BlockInfo,
transaction: &Transaction,
platform_version: &PlatformVersion,
) -> Result<(), Error> {
let expired_withdrawal_documents_to_retry_signing =
self.drive.fetch_oldest_withdrawal_documents_by_status(
WithdrawalStatus::EXPIRED.into(),
platform_version
.system_limits
.retry_signing_expired_withdrawal_documents_per_block_limit,
transaction.into(),
platform_version,
)?;

if expired_withdrawal_documents_to_retry_signing.is_empty() {
return Ok(());
}

// Collecting unique withdrawal indices of expired documents
let expired_withdrawal_indices: Vec<WithdrawalTransactionIndex> =
expired_withdrawal_documents_to_retry_signing
.iter()
.map(|document| {
document
.properties()
.get_optional_u64(withdrawal::properties::TRANSACTION_INDEX)?
.ok_or(Error::Execution(ExecutionError::CorruptedDriveResponse(
"Can't get transaction index from withdrawal document".to_string(),
)))
})
.collect::<Result<BTreeSet<WithdrawalTransactionIndex>, Error>>()?
.into_iter()
.collect();

let mut drive_operations: Vec<DriveOperation> = vec![];

// Collecting only documents that have been updated
let mut documents_to_update = Vec::new();

for mut document in expired_withdrawal_documents_to_retry_signing {
document.set_u8(
withdrawal::properties::STATUS,
WithdrawalStatus::BROADCASTED as u8,
);

document.set_updated_at(Some(block_info.time_ms));

document.increment_revision().map_err(Error::Protocol)?;

documents_to_update.push(document);
}

if documents_to_update.is_empty() {
return Ok(());
}

self.drive
.move_broadcasted_withdrawal_transactions_back_to_queue_operations(
expired_withdrawal_indices,
&mut drive_operations,
platform_version,
)?;

let withdrawals_contract = self.drive.cache.system_data_contracts.load_withdrawals();

self.drive.add_update_multiple_documents_operations(
&documents_to_update,
&withdrawals_contract,
withdrawals_contract
.document_type_for_name(withdrawal::NAME)
.map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"Can't fetch withdrawal data contract",
))
})?,
&mut drive_operations,
&platform_version.drive,
)?;

self.drive.apply_drive_operations(
drive_operations,
true,
block_info,
transaction.into(),
platform_version,
None,
)?;

Ok(())
}
}
Loading

0 comments on commit 404d6d7

Please sign in to comment.