Skip to content

Commit

Permalink
feat(sdk): fix client tls connections (#2223)
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Klimek <[email protected]>
  • Loading branch information
QuantumExplorer and lklimek authored Oct 8, 2024
1 parent 5e996ed commit 5aed75a
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 45 deletions.
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ dump = ["mocks"]
offline-testing = []

[dependencies]
backon = "0.5"
dapi-grpc = { path = "../dapi-grpc" }
backon = { version = "1.2", default-features = false, features = [
"tokio-sleep",
] }
dapi-grpc = { path = "../dapi-grpc", features = [
"core",
"platform",
"client",
], default-features = false }
futures = "0.3.28"
http-serde = { version = "2.1", optional = true }
rand = { version = "0.8.5", features = ["small_rng"] }
Expand Down
12 changes: 7 additions & 5 deletions packages/rs-dapi-client/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,21 @@ impl ConnectionPool {
/// * `prefix` - Prefix for the item in the pool. Used to distinguish between Core and Platform clients.
/// * `uri` - URI of the node.
/// * `settings` - Applied request settings.
pub fn get_or_create(
pub fn get_or_create<E>(
&self,
prefix: PoolPrefix,
uri: &Uri,
settings: Option<&AppliedRequestSettings>,
create: impl FnOnce() -> PoolItem,
) -> PoolItem {
create: impl FnOnce() -> Result<PoolItem, E>,
) -> Result<PoolItem, E> {
if let Some(cli) = self.get(prefix, uri, settings) {
return cli;
return Ok(cli);
}

let cli = create();
self.put(uri, settings, cli.clone());
if let Ok(cli) = &cli {
self.put(uri, settings, cli.clone());
}
cli
}

Expand Down
10 changes: 8 additions & 2 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,13 @@ impl DapiRequestExecutor for DapiClient {
address.uri().clone(),
&applied_settings,
&pool,
);
)
.map_err(|e| {
DapiClientError::<<R::Client as TransportClient>::Error>::Transport(
e,
address.clone(),
)
})?;

let response = transport_request
.execute_transport(&mut transport_client, &applied_settings)
Expand Down Expand Up @@ -250,7 +256,7 @@ impl DapiRequestExecutor for DapiClient {
// Start the routine with retry policy applied:
// We allow let_and_return because `result` is used later if dump feature is enabled
let result = routine
.retry(&retry_settings)
.retry(retry_settings)
.notify(|error, duration| {
tracing::warn!(
?error,
Expand Down
4 changes: 2 additions & 2 deletions packages/rs-dapi-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ pub trait TransportClient: Send + Sized {
type Error: CanRetry + Send + Debug + Mockable;

/// Build client using node's url.
fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self;
fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error>;

/// Build client using node's url and [AppliedRequestSettings].
fn with_uri_and_settings(
uri: Uri,
settings: &AppliedRequestSettings,
pool: &ConnectionPool,
) -> Self;
) -> Result<Self, Self::Error>;
}
90 changes: 66 additions & 24 deletions packages/rs-dapi-client/src/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{request_settings::AppliedRequestSettings, RequestSettings};
use dapi_grpc::core::v0::core_client::CoreClient;
use dapi_grpc::core::v0::{self as core_proto};
use dapi_grpc::platform::v0::{self as platform_proto, platform_client::PlatformClient};
use dapi_grpc::tonic::transport::Uri;
use dapi_grpc::tonic::transport::{ClientTlsConfig, Uri};
use dapi_grpc::tonic::Streaming;
use dapi_grpc::tonic::{transport::Channel, IntoRequest};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
Expand All @@ -18,59 +18,101 @@ pub type PlatformGrpcClient = PlatformClient<Channel>;
/// Core Client using gRPC transport.
pub type CoreGrpcClient = CoreClient<Channel>;

fn create_channel(uri: Uri, settings: Option<&AppliedRequestSettings>) -> Channel {
let mut builder = Channel::builder(uri);
fn create_channel(
uri: Uri,
settings: Option<&AppliedRequestSettings>,
) -> Result<Channel, dapi_grpc::tonic::transport::Error> {
let mut builder = Channel::builder(uri).tls_config(
ClientTlsConfig::new()
.with_native_roots()
.with_webpki_roots()
.assume_http2(true),
)?;

if let Some(settings) = settings {
if let Some(timeout) = settings.connect_timeout {
builder = builder.connect_timeout(timeout);
}
}

builder.connect_lazy()
Ok(builder.connect_lazy())
}

impl TransportClient for PlatformGrpcClient {
type Error = dapi_grpc::tonic::Status;

fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self {
pool.get_or_create(PoolPrefix::Platform, &uri, None, || {
Self::new(create_channel(uri.clone(), None)).into()
})
.into()
fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> {
Ok(pool
.get_or_create(PoolPrefix::Platform, &uri, None, || {
match create_channel(uri.clone(), None) {
Ok(channel) => Ok(Self::new(channel).into()),
Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!(
"Channel creation failed: {}",
e
))),
}
})?
.into())
}

fn with_uri_and_settings(
uri: Uri,
settings: &AppliedRequestSettings,
pool: &ConnectionPool,
) -> Self {
pool.get_or_create(PoolPrefix::Platform, &uri, Some(settings), || {
Self::new(create_channel(uri.clone(), Some(settings))).into()
})
.into()
) -> Result<Self, Self::Error> {
Ok(pool
.get_or_create(
PoolPrefix::Platform,
&uri,
Some(settings),
|| match create_channel(uri.clone(), Some(settings)) {
Ok(channel) => Ok(Self::new(channel).into()),
Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!(
"Channel creation failed: {}",
e
))),
},
)?
.into())
}
}

impl TransportClient for CoreGrpcClient {
type Error = dapi_grpc::tonic::Status;

fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self {
pool.get_or_create(PoolPrefix::Core, &uri, None, || {
Self::new(create_channel(uri.clone(), None)).into()
})
.into()
fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> {
Ok(pool
.get_or_create(PoolPrefix::Core, &uri, None, || {
match create_channel(uri.clone(), None) {
Ok(channel) => Ok(Self::new(channel).into()),
Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!(
"Channel creation failed: {}",
e
))),
}
})?
.into())
}

fn with_uri_and_settings(
uri: Uri,
settings: &AppliedRequestSettings,
pool: &ConnectionPool,
) -> Self {
pool.get_or_create(PoolPrefix::Core, &uri, Some(settings), || {
Self::new(create_channel(uri.clone(), Some(settings))).into()
})
.into()
) -> Result<Self, Self::Error> {
Ok(pool
.get_or_create(
PoolPrefix::Core,
&uri,
Some(settings),
|| match create_channel(uri.clone(), Some(settings)) {
Ok(channel) => Ok(Self::new(channel).into()),
Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!(
"Channel creation failed: {}",
e
))),
},
)?
.into())
}
}

Expand Down
13 changes: 11 additions & 2 deletions packages/rs-sdk/src/platform/types/evonode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{FutureExt, TryFutureExt};
use rs_dapi_client::transport::{
AppliedRequestSettings, PlatformGrpcClient, TransportClient, TransportRequest,
};
use rs_dapi_client::{Address, ConnectionPool, RequestSettings};
use rs_dapi_client::{Address, ConnectionPool, DapiClientError, RequestSettings};
#[cfg(feature = "mocks")]
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down Expand Up @@ -74,7 +74,16 @@ impl TransportRequest for EvoNode {
// We also create a new client to use with this request, so that the user does not need to
// reconfigure SDK to use a single node.
let pool = ConnectionPool::new(1);
let mut client = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool);
// We create a new client with the given URI and settings
let client_result = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool);

// Handle the result manually to create a proper error response
let mut client = match client_result {
Ok(client) => client,
Err(e) => {
return async { Err(e) }.boxed();
}
};
let mut grpc_request = GetStatusRequest {
version: Some(get_status_request::Version::V0(GetStatusRequestV0 {})),
}
Expand Down

0 comments on commit 5aed75a

Please sign in to comment.