From d1e0d8c9f10173d7ac77dc83fe6d9bacf7224349 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Thu, 10 Oct 2024 15:49:05 +0300 Subject: [PATCH] reffactoring --- src/lib.rs | 21 ++-- src/providers/fastnear/fetchers.rs | 15 +++ src/providers/fastnear/mod.rs | 2 +- src/providers/fastnear/types.rs | 4 +- src/providers/s3/client.rs | 148 +++++++++++++++++++++++++++ src/providers/s3/fetchers.rs | 158 +---------------------------- src/providers/s3/mod.rs | 19 +--- src/providers/s3/types.rs | 25 ++++- 8 files changed, 207 insertions(+), 185 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ff46685..bf246a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,11 +280,17 @@ extern crate derive_builder; pub use near_indexer_primitives; pub use aws_credential_types::Credentials; -pub use providers::fastnear::client as fastnear_client; + +mod providers; + +pub use providers::fastnear; +pub use providers::s3; + +pub use providers::fastnear::client::FastNearClient; pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; -pub use providers::s3::client as s3_client; + +pub use providers::s3::client::LakeS3Client; pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; -pub mod providers; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; @@ -317,11 +323,10 @@ pub fn streamer>( let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); match config { providers::NearLakeFrameworkConfig::Lake(config) => { - (tokio::spawn(providers::s3::start(sender, config)), receiver) + (tokio::spawn(s3::start(sender, config)), receiver) + } + providers::NearLakeFrameworkConfig::FastNear(config) => { + (tokio::spawn(fastnear::start(sender, config)), receiver) } - providers::NearLakeFrameworkConfig::FastNear(config) => ( - tokio::spawn(providers::fastnear::start(sender, config)), - receiver, - ), } } diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs index 25fb813..dd8fd18 100644 --- a/src/providers/fastnear/fetchers.rs +++ b/src/providers/fastnear/fetchers.rs @@ -21,6 +21,21 @@ pub async fn fetch_optimistic_block( .expect("Failed to fetch optimistic block") } +/// Fetches the optimistic block from the fastenar +/// This function is used to fetch the optimistic block by height +/// This function will be using endpoint `/v0/block_opt/:block_height` +/// This would be waiting some time until the optimistic block is available +/// Returns `near_indexer_primitives::StreamerMessage` +pub async fn fetch_optimistic_block_by_height( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success(&format!("/v0/block_opt/{}", block_height)) + .await + .expect("Failed to fetch optimistic block") +} + /// Fetches the genesis block from the fastenar /// Returns `near_indexer_primitives::StreamerMessage` pub async fn fetch_first_block( diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs index 234c2e5..cad5431 100644 --- a/src/providers/fastnear/mod.rs +++ b/src/providers/fastnear/mod.rs @@ -12,7 +12,7 @@ pub async fn start( blocks_sink: tokio::sync::mpsc::Sender, config: types::FastNearConfig, ) -> anyhow::Result<()> { - let client = config.client(); + let client = config.client().await; let max_num_threads = config.num_threads; let next_sink_block = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs index 3dcc47c..a4674cb 100644 --- a/src/providers/fastnear/types.rs +++ b/src/providers/fastnear/types.rs @@ -32,8 +32,8 @@ pub struct FastNearConfig { } impl FastNearConfig { - pub fn client(&self) -> crate::fastnear_client::FastNearClient { - crate::fastnear_client::FastNearClient::new(self.endpoint.clone()) + pub async fn client(&self) -> super::client::FastNearClient { + super::client::FastNearClient::new(self.endpoint.clone()) } } diff --git a/src/providers/s3/client.rs b/src/providers/s3/client.rs index 6d718b4..70cf91e 100644 --- a/src/providers/s3/client.rs +++ b/src/providers/s3/client.rs @@ -80,3 +80,151 @@ pub trait S3Client: Send + Sync { start_after_prefix: &str, ) -> Result, ListCommonPrefixesError>; } + +#[derive(Clone, Debug)] +pub struct LakeS3Client { + s3: aws_sdk_s3::Client, +} + +impl LakeS3Client { + pub fn new(s3: aws_sdk_s3::Client) -> Self { + Self { s3 } + } + + pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { + let s3_client = aws_sdk_s3::Client::from_conf(config); + + Self { s3: s3_client } + } +} + +#[async_trait] +impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let object = self + .s3 + .get_object() + .bucket(bucket) + .key(prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .send() + .await?; + + let bytes = object.body.collect().await?.into_bytes().to_vec(); + + Ok(bytes) + } + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError> { + let response = self + .s3 + .list_objects_v2() + .max_keys(1000) // 1000 is the default and max value for this parameter + .delimiter("/".to_string()) + .start_after(start_after_prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .bucket(bucket) + .send() + .await?; + + let prefixes = match response.common_prefixes { + None => vec![], + Some(common_prefixes) => common_prefixes + .into_iter() + .filter_map(|common_prefix| common_prefix.prefix) + .collect::>() + .into_iter() + .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) + .collect(), + }; + + Ok(prefixes) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::sync::Arc; + + use crate::providers::s3::fetchers::fetch_streamer_message; + use async_trait::async_trait; + + #[derive(Clone, Debug)] + pub struct LakeS3Client {} + + #[async_trait] + impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + _bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); + tokio::fs::read(path) + .await + .map_err(|e| GetObjectBytesError(Arc::new(e))) + } + + async fn list_common_prefixes( + &self, + _bucket: &str, + _start_after: &str, + ) -> Result, ListCommonPrefixesError> { + Ok(Vec::new()) + } + } + + #[tokio::test] + async fn deserializes_meta_transactions() { + let lake_client = LakeS3Client {}; + let streamer_message = + fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) + .await + .unwrap(); + + let delegate_action = &streamer_message.shards[0] + .chunk + .as_ref() + .unwrap() + .transactions[0] + .transaction + .actions[0]; + + assert_eq!( + serde_json::to_value(delegate_action).unwrap(), + serde_json::json!({ + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + }) + ); + } +} diff --git a/src/providers/s3/fetchers.rs b/src/providers/s3/fetchers.rs index 4028188..0d360f9 100644 --- a/src/providers/s3/fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -1,80 +1,6 @@ use std::str::FromStr; -use async_trait::async_trait; - -use super::{ - client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}, - types, -}; - -#[derive(Clone, Debug)] -pub struct LakeS3Client { - s3: aws_sdk_s3::Client, -} - -impl LakeS3Client { - pub fn new(s3: aws_sdk_s3::Client) -> Self { - Self { s3 } - } - - pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { - let s3_client = aws_sdk_s3::Client::from_conf(config); - - Self { s3: s3_client } - } -} - -#[async_trait] -impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let object = self - .s3 - .get_object() - .bucket(bucket) - .key(prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .send() - .await?; - - let bytes = object.body.collect().await?.into_bytes().to_vec(); - - Ok(bytes) - } - - async fn list_common_prefixes( - &self, - bucket: &str, - start_after_prefix: &str, - ) -> Result, ListCommonPrefixesError> { - let response = self - .s3 - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(start_after_prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .bucket(bucket) - .send() - .await?; - - let prefixes = match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) - .collect(), - }; - - Ok(prefixes) - } -} +use super::{client::S3Client, types}; /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched @@ -103,8 +29,8 @@ pub async fn list_block_heights( /// By the given block height gets the objects: /// - block.json /// - shard_N.json -/// Reads the content of the objects and parses as a JSON. -/// Returns the result in `near_indexer_primitives::StreamerMessage` +/// Reads the content of the objects and parses as a JSON. +/// Returns the result in `near_indexer_primitives::StreamerMessage` pub async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, @@ -254,81 +180,3 @@ pub async fn fetch_shard_or_retry( } } } - -#[cfg(test)] -mod test { - use super::*; - - use std::sync::Arc; - - use async_trait::async_trait; - - #[derive(Clone, Debug)] - pub struct LakeS3Client {} - - #[async_trait] - impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - _bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - tokio::fs::read(path) - .await - .map_err(|e| GetObjectBytesError(Arc::new(e))) - } - - async fn list_common_prefixes( - &self, - _bucket: &str, - _start_after: &str, - ) -> Result, ListCommonPrefixesError> { - Ok(Vec::new()) - } - } - - #[tokio::test] - async fn deserializes_meta_transactions() { - let lake_client = LakeS3Client {}; - let streamer_message = - fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) - .await - .unwrap(); - - let delegate_action = &streamer_message.shards[0] - .chunk - .as_ref() - .unwrap() - .transactions[0] - .transaction - .actions[0]; - - assert_eq!( - serde_json::to_value(delegate_action).unwrap(), - serde_json::json!({ - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - }) - ); - } -} diff --git a/src/providers/s3/mod.rs b/src/providers/s3/mod.rs index aeeb2bc..071ed00 100644 --- a/src/providers/s3/mod.rs +++ b/src/providers/s3/mod.rs @@ -103,18 +103,7 @@ pub(crate) async fn start( ) -> anyhow::Result<()> { let mut start_from_block_height = config.start_block_height; - let lake_s3_client: Box = if let Some(s3_client) = config.s3_client { - s3_client - } else if let Some(config) = config.s3_config { - Box::new(fetchers::LakeS3Client::from_conf(config)) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - - Box::new(fetchers::LakeS3Client::from_conf(s3_config)) - }; + let lake_s3_client = config.client().await; let mut last_processed_block_hash: Option = None; @@ -127,7 +116,7 @@ pub(crate) async fn start( // We require to stream blocks consistently, so we need to try to load the block again. let pending_block_heights = stream_block_heights( - &*lake_s3_client, + &lake_s3_client, &config.s3_bucket_name, start_from_block_height, ); @@ -150,7 +139,7 @@ pub(crate) async fn start( .into_iter() .map(|block_height| { fetchers::fetch_streamer_message( - &*lake_s3_client, + &lake_s3_client, &config.s3_bucket_name, block_height, ) @@ -251,7 +240,7 @@ pub(crate) async fn start( .into_iter() .map(|block_height| { fetchers::fetch_streamer_message( - &*lake_s3_client, + &lake_s3_client, &config.s3_bucket_name, block_height, ) diff --git a/src/providers/s3/types.rs b/src/providers/s3/types.rs index da76a94..d1c61e1 100644 --- a/src/providers/s3/types.rs +++ b/src/providers/s3/types.rs @@ -1,4 +1,4 @@ -use super::client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; +use super::client::{GetObjectBytesError, ListCommonPrefixesError}; /// Type alias represents the block height pub type BlockHeight = u64; @@ -61,11 +61,28 @@ pub struct LakeConfig { /// /// This field is mutually exclusive with [LakeConfigBuilder::s3_config]. #[builder(setter(strip_option, custom), default)] - pub(crate) s3_client: Option>, + pub(crate) s3_client: Option, #[builder(default = "100")] pub(crate) blocks_preload_pool_size: usize, } +impl LakeConfig { + pub async fn client(&self) -> super::client::LakeS3Client { + if let Some(s3_client) = self.s3_client.clone() { + s3_client + } else if let Some(config) = self.s3_config.clone() { + super::client::LakeS3Client::from_conf(config) + } else { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .region(aws_types::region::Region::new(self.s3_region_name.clone())) + .build(); + + super::client::LakeS3Client::from_conf(s3_config) + } + } +} + impl LakeConfigBuilder { fn validate(&self) -> Result<(), String> { if self.s3_config.is_some() && self.s3_client.is_some() { @@ -75,9 +92,9 @@ impl LakeConfigBuilder { Ok(()) } - pub fn s3_client(self, s3_client: T) -> Self { + pub fn s3_client(self, s3_client: super::client::LakeS3Client) -> Self { Self { - s3_client: Some(Some(Box::new(s3_client))), + s3_client: Some(Some(s3_client)), ..self } }