diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 372b28bc3f..468ca39974 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -39,11 +39,11 @@ pub enum DapiClientError { } impl CanRetry for DapiClientError { - fn is_node_failure(&self) -> bool { + fn can_retry(&self) -> bool { use DapiClientError::*; match self { NoAvailableAddresses => false, - Transport(transport_error, _) => transport_error.is_node_failure(), + Transport(transport_error, _) => transport_error.can_retry(), AddressList(_) => false, #[cfg(feature = "mocks")] Mock(_) => false, @@ -233,7 +233,7 @@ impl DapiRequestExecutor for DapiClient { tracing::trace!(?response, "received {} response", response_name); } Err(error) => { - if error.is_node_failure() { + if !error.can_retry() { if applied_settings.ban_failed_address { let mut address_list = self .address_list @@ -264,12 +264,12 @@ impl DapiRequestExecutor for DapiClient { duration.as_secs_f32() ) }) - .when(|e| e.is_node_failure()) + .when(|e| e.can_retry()) .instrument(tracing::info_span!("request routine")) .await; if let Err(error) = &result { - if error.is_node_failure() { + if !error.can_retry() { tracing::error!(?error, "request failed"); } } diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 976537097e..760d9ce2e7 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -71,9 +71,16 @@ impl DapiRequest for T { } } -/// Allows to flag the transport error variant how tolerant we are of it and whether we can -/// try to do a request again. +/// Returns true if the operation can be retried. pub trait CanRetry { + /// Returns true if the operation can be retried safely. + fn can_retry(&self) -> bool; + /// Get boolean flag that indicates if the error is retryable. - fn is_node_failure(&self) -> bool; + /// + /// Depreacted in favor of [CanRetry::can_retry]. + #[deprecated = "Use !can_retry() instead"] + fn is_node_failure(&self) -> bool { + !self.can_retry() + } } diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index d5180099d0..e32d6c70f0 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -117,11 +117,12 @@ impl TransportClient for CoreGrpcClient { } impl CanRetry for dapi_grpc::tonic::Status { - fn is_node_failure(&self) -> bool { + fn can_retry(&self) -> bool { let code = self.code(); use dapi_grpc::tonic::Code::*; - matches!( + + !matches!( code, Ok | DataLoss | Cancelled diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index ebf783f652..98f5dd4c02 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] arc-swap = { version = "1.7.1" } +chrono = { version = "0.4.38" } dpp = { path = "../rs-dpp", default-features = false, features = [ "dash-sdk-features", ] } @@ -52,7 +53,6 @@ data-contracts = { path = "../data-contracts" } tokio-test = { version = "0.4.4" } clap = { version = "4.5.4", features = ["derive"] } sanitize-filename = { version = "0.5.0" } -chrono = { version = "0.4.38" } test-case = { version = "3.3.1" } [features] diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index ce8b3f309a..e55bda4742 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -5,7 +5,7 @@ use std::time::Duration; use dapi_grpc::mock::Mockable; use dpp::version::PlatformVersionError; use dpp::ProtocolError; -use rs_dapi_client::DapiClientError; +use rs_dapi_client::{CanRetry, DapiClientError}; pub use drive_proof_verifier::error::ContextProviderError; @@ -67,6 +67,10 @@ pub enum Error { /// Operation cancelled - cancel token was triggered, timeout, etc. #[error("Operation cancelled: {0}")] Cancelled(String), + + /// Remote node is stale; try another server + #[error(transparent)] + StaleNode(#[from] StaleNodeError), } impl From> for Error { @@ -80,3 +84,36 @@ impl From for Error { Self::Protocol(value.into()) } } + +impl CanRetry for Error { + fn can_retry(&self) -> bool { + matches!(self, Error::StaleNode(..) | Error::TimeoutReached(_, _)) + } +} + +/// Server returned stale metadata +#[derive(Debug, thiserror::Error)] +pub enum StaleNodeError { + /// Server returned metadata with outdated height + #[error("received height is outdated: expected {expected_height}, received {received_height}, tolerance {tolerance_blocks}; try another server")] + Height { + /// Expected height - last block height seen by the Sdk + expected_height: u64, + /// Block height received from the server + received_height: u64, + /// Tolerance - how many blocks can be behind the expected height + tolerance_blocks: u64, + }, + /// Server returned metadata with time outside of the tolerance + #[error( + "received invalid time: expected {expected_timestamp_ms}ms, received {received_timestamp_ms} ms, tolerance {tolerance_ms} ms; try another server" + )] + Time { + /// Expected time in milliseconds - is local time when the message was received + expected_timestamp_ms: u64, + /// Time received from the server in the message, in milliseconds + received_timestamp_ms: u64, + /// Tolerance in milliseconds + tolerance_ms: u64, + }, +} diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index f7f938703d..2b9268f275 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -1,6 +1,6 @@ //! [Sdk] entrypoint to Dash Platform. -use crate::error::Error; +use crate::error::{Error, StaleNodeError}; use crate::internal_cache::InternalSdkCache; use crate::mock::MockResponse; #[cfg(feature = "mocks")] @@ -36,7 +36,8 @@ use std::fmt::Debug; use std::num::NonZeroUsize; #[cfg(feature = "mocks")] use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::sync::{atomic, Arc}; use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(feature = "mocks")] use tokio::sync::{Mutex, MutexGuard}; @@ -100,6 +101,21 @@ pub struct Sdk { /// Note that setting this to None can panic. context_provider: ArcSwapOption>, + /// Last seen height; used to determine if the remote node is stale. + /// + /// This is clone-able and can be shared between threads. + metadata_last_seen_height: Arc, + + /// How many blocks difference is allowed between the last height and the current height received in metadata. + /// + /// See [SdkBuilder::with_height_tolerance] for more information. + metadata_height_tolerance: Option, + + /// How many milliseconds difference is allowed between the time received in response and current local time. + /// + /// See [SdkBuilder::with_time_tolerance] for more information. + metadata_time_tolerance_ms: Option, + /// Cancellation token; once cancelled, all pending requests should be aborted. pub(crate) cancel_token: CancellationToken, @@ -115,6 +131,9 @@ impl Clone for Sdk { internal_cache: Arc::clone(&self.internal_cache), context_provider: ArcSwapOption::new(self.context_provider.load_full()), cancel_token: self.cancel_token.clone(), + metadata_last_seen_height: Arc::clone(&self.metadata_last_seen_height), + metadata_height_tolerance: self.metadata_height_tolerance, + metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, #[cfg(feature = "mocks")] dump_dir: self.dump_dir.clone(), } @@ -192,7 +211,7 @@ impl Sdk { &self, request: O::Request, response: O::Response, - ) -> Result, drive_proof_verifier::Error> + ) -> Result, Error> where O::Request: Mockable, { @@ -213,31 +232,32 @@ impl Sdk { &self, request: O::Request, response: O::Response, - ) -> Result<(Option, ResponseMetadata), drive_proof_verifier::Error> + ) -> Result<(Option, ResponseMetadata), Error> where O::Request: Mockable, { - let provider = self - .context_provider() - .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; + let (object, metadata, _proof) = self + .parse_proof_with_metadata_and_proof(request, response) + .await?; - match self.inner { - SdkInstance::Dapi { .. } => O::maybe_from_proof_with_metadata( - request, - response, - self.network, - self.version(), - &provider, - ) - .map(|(a, b, _)| (a, b)), - #[cfg(feature = "mocks")] - SdkInstance::Mock { ref mock, .. } => { - let guard = mock.lock().await; - guard - .parse_proof_with_metadata(request, response) - .map(|(a, b, _)| (a, b)) - } - } + Ok((object, metadata)) + } + + /// Verify response metadata against the current state of the SDK. + fn verify_response_metadata(&self, metadata: &ResponseMetadata) -> Result<(), Error> { + if let Some(height_tolerance) = self.metadata_height_tolerance { + verify_metadata_height( + metadata, + height_tolerance, + Arc::clone(&(self.metadata_last_seen_height)), + )?; + }; + if let Some(time_tolerance) = self.metadata_time_tolerance_ms { + let now = chrono::Utc::now().timestamp_millis() as u64; + verify_metadata_time(metadata, now, time_tolerance)?; + }; + + Ok(()) } /// Retrieve object `O` from proof contained in `request` (of type `R`) and `response`. @@ -252,7 +272,7 @@ impl Sdk { &self, request: O::Request, response: O::Response, - ) -> Result<(Option, ResponseMetadata, Proof), drive_proof_verifier::Error> + ) -> Result<(Option, ResponseMetadata, Proof), Error> where O::Request: Mockable, { @@ -260,7 +280,7 @@ impl Sdk { .context_provider() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; - match self.inner { + let (object, metadata, proof) = match self.inner { SdkInstance::Dapi { .. } => O::maybe_from_proof_with_metadata( request, response, @@ -273,7 +293,10 @@ impl Sdk { let guard = mock.lock().await; guard.parse_proof_with_metadata(request, response) } - } + }?; + + self.verify_response_metadata(&metadata)?; + Ok((object, metadata, proof)) } /// Return [ContextProvider] used by the SDK. @@ -544,6 +567,105 @@ impl Sdk { } } +/// If received metadata time differs from local time by more than `tolerance`, the remote node is considered stale. +/// +/// ## Parameters +/// +/// - `metadata`: Metadata of the received response +/// - `now_ms`: Current local time in milliseconds +/// - `tolerance_ms`: Tolerance in milliseconds +fn verify_metadata_time( + metadata: &ResponseMetadata, + now_ms: u64, + tolerance_ms: u64, +) -> Result<(), Error> { + let metadata_time = metadata.time_ms; + + // metadata_time - tolerance_ms <= now_ms <= metadata_time + tolerance_ms + if now_ms.abs_diff(metadata_time) > tolerance_ms { + tracing::warn!( + expected_time = now_ms, + received_time = metadata_time, + tolerance_ms, + "received response with stale time; you should retry with another server" + ); + return Err(StaleNodeError::Time { + expected_timestamp_ms: now_ms, + received_timestamp_ms: metadata_time, + tolerance_ms, + } + .into()); + } + + tracing::trace!( + expected_time = now_ms, + received_time = metadata_time, + tolerance_ms, + "received response with valid time" + ); + Ok(()) +} + +/// If current metadata height is behind previously seen height by more than `tolerance`, the remote node +/// is considered stale. +fn verify_metadata_height( + metadata: &ResponseMetadata, + tolerance: u64, + last_seen_height: Arc, +) -> Result<(), Error> { + let mut expected_height = last_seen_height.load(Ordering::Relaxed); + let received_height = metadata.height; + + // Same height, no need to update. + if received_height == expected_height { + tracing::trace!( + expected_height, + received_height, + tolerance, + "received message has the same height as previously seen" + ); + return Ok(()); + } + + // If expected_height <= tolerance, then Sdk just started, so we just assume what we got is correct. + if expected_height > tolerance && received_height < expected_height - tolerance { + tracing::warn!( + expected_height, + received_height, + tolerance, + "received message with stale height; you should retry with another server" + ); + return Err(StaleNodeError::Height { + expected_height, + received_height, + tolerance_blocks: tolerance, + } + .into()); + } + + // New height is ahead of the last seen height, so we update the last seen height. + tracing::trace!( + expected_height = expected_height, + received_height = received_height, + tolerance, + "received message with new height" + ); + while let Err(stored_height) = last_seen_height.compare_exchange( + expected_height, + received_height, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + // The value was changed to a higher value by another thread, so we need to retry. + if stored_height >= metadata.height { + break; + } + expected_height = stored_height; + } + + Ok(()) +} + #[async_trait::async_trait] impl DapiRequestExecutor for Sdk { async fn execute( @@ -605,6 +727,17 @@ pub struct SdkBuilder { /// Context provider used by the SDK. context_provider: Option>, + /// How many blocks difference is allowed between the last seen metadata height and the height received in response + /// metadata. + /// + /// See [SdkBuilder::with_height_tolerance] for more information. + metadata_height_tolerance: Option, + + /// How many milliseconds difference is allowed between the time received in response metadata and current local time. + /// + /// See [SdkBuilder::with_time_tolerance] for more information. + metadata_time_tolerance_ms: Option, + /// directory where dump files will be stored #[cfg(feature = "mocks")] dump_dir: Option, @@ -626,6 +759,8 @@ impl Default for SdkBuilder { core_user: "".to_string(), proofs: true, + metadata_height_tolerance: Some(1), + metadata_time_tolerance_ms: None, #[cfg(feature = "mocks")] data_contract_cache_size: NonZeroUsize::new(DEFAULT_CONTRACT_CACHE_SIZE) @@ -750,6 +885,41 @@ impl SdkBuilder { self } + /// Change number of blocks difference allowed between the last height and the height received in current response. + /// + /// If height received in response metadata is behind previously seen height by more than this value, the node + /// is considered stale, and the request will fail. + /// + /// If None, the height is not checked. + /// + /// Note that this feature doesn't guarantee that you are getting latest data, but it significantly decreases + /// probability of getting old data. + /// + /// This is set to `1` by default. + pub fn with_height_tolerance(mut self, tolerance: Option) -> Self { + self.metadata_height_tolerance = tolerance; + self + } + + /// How many milliseconds difference is allowed between the time received in response and current local time. + /// If the received time differs from local time by more than this value, the remote node is stale. + /// + /// If None, the time is not checked. + /// + /// This is set to `None` by default. + /// + /// Note that enabling this check can cause issues if the local time is not synchronized with the network time, + /// when the network is stalled or time between blocks increases significantly. + /// + /// Selecting a safe value for this parameter depends on maximum time between blocks mined on the network. + /// For example, if the network is configured to mine a block every maximum 3 minutes, setting this value + /// to a bit more than 6 minutes (to account for misbehaving proposers, network delays and local time + /// synchronization issues) should be safe. + pub fn with_time_tolerance(mut self, tolerance_ms: Option) -> Self { + self.metadata_time_tolerance_ms = tolerance_ms; + self + } + /// Configure directory where dumps of all requests and responses will be saved. /// Useful for debugging. /// @@ -792,9 +962,13 @@ impl SdkBuilder { proofs:self.proofs, context_provider: ArcSwapOption::new( self.context_provider.map(Arc::new)), cancel_token: self.cancel_token, + internal_cache: Default::default(), + // Note: in future, we need to securely initialize initial height during Sdk bootstrap or first request. + metadata_last_seen_height: Arc::new(atomic::AtomicU64::new(0)), + metadata_height_tolerance: self.metadata_height_tolerance, + metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, #[cfg(feature = "mocks")] dump_dir: self.dump_dir, - internal_cache: Default::default(), }; // if context provider is not set correctly (is None), it means we need to fallback to core wallet if sdk.context_provider.load().is_none() { @@ -850,13 +1024,15 @@ impl SdkBuilder { mock:mock_sdk.clone(), dapi, version:self.version, - }, dump_dir: self.dump_dir.clone(), proofs:self.proofs, internal_cache: Default::default(), context_provider:ArcSwapAny::new( Some(Arc::new(context_provider))), cancel_token: self.cancel_token, + metadata_last_seen_height: Arc::new(atomic::AtomicU64::new(0)), + metadata_height_tolerance: self.metadata_height_tolerance, + metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, }; let mut guard = mock_sdk.try_lock().expect("mock sdk is in use by another thread and connot be reconfigured"); guard.set_sdk(sdk.clone()); @@ -902,3 +1078,141 @@ pub fn prettify_proof(proof: &Proof) -> String { proof.quorum_type, ) } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use dapi_grpc::platform::v0::ResponseMetadata; + use test_case::test_matrix; + + use crate::SdkBuilder; + + #[test_matrix(97..102, 100, 2, false; "valid height")] + #[test_case(103, 100, 2, true; "invalid height")] + fn test_verify_metadata_height( + expected_height: u64, + received_height: u64, + tolerance: u64, + expect_err: bool, + ) { + let metadata = ResponseMetadata { + height: received_height, + ..Default::default() + }; + + let last_seen_height = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(expected_height)); + + let result = + super::verify_metadata_height(&metadata, tolerance, Arc::clone(&last_seen_height)); + + assert_eq!(result.is_err(), expect_err); + if result.is_ok() { + assert_eq!( + last_seen_height.load(std::sync::atomic::Ordering::Relaxed), + received_height, + "previous height should be updated" + ); + } + } + + #[test] + fn cloned_sdk_verify_metadata_height() { + let sdk1 = SdkBuilder::new_mock() + .build() + .expect("mock Sdk should be created"); + + // First message verified, height 1. + let metadata = ResponseMetadata { + height: 1, + ..Default::default() + }; + + sdk1.verify_response_metadata(&metadata) + .expect("metadata should be valid"); + + assert_eq!( + sdk1.metadata_last_seen_height + .load(std::sync::atomic::Ordering::Relaxed), + metadata.height, + "initial height" + ); + + // now, we clone sdk and do two requests. + let sdk2 = sdk1.clone(); + let sdk3 = sdk1.clone(); + + // Second message verified, height 2. + let metadata = ResponseMetadata { + height: 2, + ..Default::default() + }; + sdk2.verify_response_metadata(&metadata) + .expect("metadata should be valid"); + + assert_eq!( + sdk1.metadata_last_seen_height + .load(std::sync::atomic::Ordering::Relaxed), + metadata.height, + "first sdk should see height from second sdk" + ); + assert_eq!( + sdk3.metadata_last_seen_height + .load(std::sync::atomic::Ordering::Relaxed), + metadata.height, + "third sdk should see height from second sdk" + ); + + // Third message verified, height 3. + let metadata = ResponseMetadata { + height: 3, + ..Default::default() + }; + sdk3.verify_response_metadata(&metadata) + .expect("metadata should be valid"); + + assert_eq!( + sdk1.metadata_last_seen_height + .load(std::sync::atomic::Ordering::Relaxed), + metadata.height, + "first sdk should see height from third sdk" + ); + + assert_eq!( + sdk2.metadata_last_seen_height + .load(std::sync::atomic::Ordering::Relaxed), + metadata.height, + "second sdk should see height from third sdk" + ); + + // Now, using sdk1 for height 1 again should fail, as we are already at 3, with default tolerance 1. + let metadata = ResponseMetadata { + height: 1, + ..Default::default() + }; + + sdk1.verify_response_metadata(&metadata) + .expect_err("metadata should be invalid"); + } + + #[test_matrix([90,91,100,109,110], 100, 10, false; "valid time")] + #[test_matrix([0,89,111], 100, 10, true; "invalid time")] + #[test_matrix([0,100], [0,100], 100, false; "zero time")] + #[test_matrix([99,101], 100, 0, true; "zero tolerance")] + fn test_verify_metadata_time( + received_time: u64, + now_time: u64, + tolerance: u64, + expect_err: bool, + ) { + let metadata = ResponseMetadata { + time_ms: received_time, + ..Default::default() + }; + + let result = super::verify_metadata_time(&metadata, now_time, tolerance); + + assert_eq!(result.is_err(), expect_err); + } +}