Skip to content

Commit

Permalink
add test and build fn apis
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Sep 7, 2024
1 parent e9eb0ea commit 5dfefef
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 51 deletions.
34 changes: 34 additions & 0 deletions crates/libs/core/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,37 @@ impl ClientConnectionEventHandler for DefaultClientConnectionEventHandler {
Ok(())
}
}

/// Turns a Fn into client connection notification handler.
pub struct LambdaClientConnectionNotificationHandler<T, K>
where
T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
K: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
{
f_conn: T,
f_disconn: K,
}

impl<T, K> LambdaClientConnectionNotificationHandler<T, K>
where
T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
K: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
{
pub fn new(f_conn: T, f_disconn: K) -> Self {
Self { f_conn, f_disconn }
}
}

impl<T, K> ClientConnectionEventHandler for LambdaClientConnectionNotificationHandler<T, K>
where
T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
K: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
{
fn on_connected(&self, info: &GatewayInformationResult) -> crate::Result<()> {
(self.f_conn)(info)
}

fn on_disconnected(&self, info: &GatewayInformationResult) -> crate::Result<()> {
(self.f_disconn)(info)
}
}
121 changes: 86 additions & 35 deletions crates/libs/core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@

use connection::{
ClientConnectionEventHandler, ClientConnectionEventHandlerBridge,
DefaultClientConnectionEventHandler,
LambdaClientConnectionNotificationHandler,
};
use mssf_com::FabricClient::{
FabricCreateLocalClient4, IFabricPropertyManagementClient2, IFabricQueryClient10,
IFabricServiceManagementClient6,
};
use notification::{
DefaultServiceNotificationEventHandler, ServiceNotificationEventHandler,
ServiceNotificationEventHandlerBridge,
FabricCreateLocalClient4, IFabricClientConnectionEventHandler,
IFabricPropertyManagementClient2, IFabricQueryClient10, IFabricServiceManagementClient6,
IFabricServiceNotificationEventHandler,
};
use notification::{LambdaServiceNotificationHandler, ServiceNotificationEventHandlerBridge};
use windows_core::Interface;

use crate::types::ClientRole;
Expand All @@ -26,20 +24,19 @@ mod notification;
pub mod query_client;
pub mod svc_mgmt_client;

// reexport
pub use connection::GatewayInformationResult;
pub use notification::{ServiceNotification, ServiceNotificationEventHandler};

#[cfg(test)]
mod tests;

// Fabric Client creation
// Creates the local client
pub fn create_local_client<T: Interface>(
service_notification_handler: Option<impl ServiceNotificationEventHandler>,
client_connection_handler: Option<impl ClientConnectionEventHandler>,
fn create_local_client_internal<T: Interface>(
service_notification_handler: Option<&IFabricServiceNotificationEventHandler>,
client_connection_handler: Option<&IFabricClientConnectionEventHandler>,
client_role: Option<ClientRole>,
) -> T {
let sn_handler =
service_notification_handler.map(|sn| ServiceNotificationEventHandlerBridge::new_com(sn));
let cc_handler =
client_connection_handler.map(|cc| ClientConnectionEventHandlerBridge::new_com(cc));
let role = client_role.unwrap_or(ClientRole::User);
assert_ne!(
role,
Expand All @@ -48,8 +45,8 @@ pub fn create_local_client<T: Interface>(
);
let raw = unsafe {
FabricCreateLocalClient4(
sn_handler.as_ref(),
cc_handler.as_ref(),
service_notification_handler,
client_connection_handler,
role.into(),
&T::IID,
)
Expand All @@ -59,13 +56,78 @@ pub fn create_local_client<T: Interface>(
unsafe { T::from_raw(raw) }
}

// Used for convenience.
pub(crate) fn create_local_client_default<T: Interface>() -> T {
create_local_client::<T>(
None::<DefaultServiceNotificationEventHandler>,
None::<DefaultClientConnectionEventHandler>,
None,
)
// Builder for FabricClient
pub struct FabricClientBuilder {
sn_handler: Option<IFabricServiceNotificationEventHandler>,
cc_handler: Option<IFabricClientConnectionEventHandler>,
client_role: ClientRole,
}

impl FabricClientBuilder {
pub fn new() -> Self {
Self {
sn_handler: None,
cc_handler: None,
client_role: ClientRole::User,
}
}

/// Configures the service notification handler.
pub fn with_service_notification_handler(
mut self,
handler: impl ServiceNotificationEventHandler,
) -> Self {
self.sn_handler = Some(ServiceNotificationEventHandlerBridge::new_com(handler));
self
}

/// Configures the service notification handler, but using a function.
pub fn with_service_notification_handler_fn<T>(self, f: T) -> Self
where
T: Fn(&ServiceNotification) -> crate::Result<()> + 'static,
{
let handler = LambdaServiceNotificationHandler::new(f);
self.with_service_notification_handler(handler)
}

/// Configures client connection handler.
pub fn with_client_connection_handler(
mut self,
handler: impl ClientConnectionEventHandler,
) -> Self {
self.cc_handler = Some(ClientConnectionEventHandlerBridge::new_com(handler));
self
}

/// Configures client connection handler, but functions.
/// f_conn and f_disconn is invoked when fabric client connects and disconnects
/// to SF cluster respectively.
pub fn with_client_connection_handler_fn<T, K>(self, f_conn: T, f_disconn: K) -> Self
where
T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
K: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static,
{
let handler = LambdaClientConnectionNotificationHandler::new(f_conn, f_disconn);
self.with_client_connection_handler(handler)
}

/// Build the fabricclient
/// Remarks: FabricClient connect to SF cluster when
/// the first API call is triggered. Build/create of the object does not
/// establish connection.
pub fn build(self) -> FabricClient {
let c = Self::build_interface(self);
FabricClient::from_com(c)
}

/// Build the specific com interface of the fabric client.
pub fn build_interface<T: Interface>(self) -> T {
create_local_client_internal::<T>(
self.sn_handler.as_ref(),
self.cc_handler.as_ref(),
Some(self.client_role),
)
}
}

// FabricClient safe wrapper
Expand All @@ -78,18 +140,7 @@ pub struct FabricClient {
com_query_client: IFabricQueryClient10,
}

impl Default for FabricClient {
fn default() -> Self {
Self::new()
}
}

impl FabricClient {
pub fn new() -> Self {
let com = create_local_client_default::<IFabricPropertyManagementClient2>();
Self::from_com(com)
}

// Get a copy of COM object
pub fn get_com(&self) -> IFabricPropertyManagementClient2 {
self.com_property_client.clone()
Expand Down
26 changes: 21 additions & 5 deletions crates/libs/core/src/client/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,28 @@ where
}
}

// default implementation of ServiceNotificationEventHandler
pub struct DefaultServiceNotificationEventHandler {}
/// Turns a Fn into service notification handler.
pub struct LambdaServiceNotificationHandler<T>
where
T: Fn(&ServiceNotification) -> crate::Result<()> + 'static,
{
f: T,
}

impl<T> LambdaServiceNotificationHandler<T>
where
T: Fn(&ServiceNotification) -> crate::Result<()> + 'static,
{
pub fn new(f: T) -> Self {
Self { f }
}
}

impl ServiceNotificationEventHandler for DefaultServiceNotificationEventHandler {
impl<T> ServiceNotificationEventHandler for LambdaServiceNotificationHandler<T>
where
T: Fn(&ServiceNotification) -> crate::Result<()> + 'static,
{
fn on_notification(&self, notification: &ServiceNotification) -> crate::Result<()> {
tracing::debug!("Got service notification {:?}", notification);
Ok(())
(self.f)(notification)
}
}
4 changes: 2 additions & 2 deletions crates/libs/core/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use tokio_util::sync::CancellationToken;
use windows_core::HSTRING;

use crate::{
client::{svc_mgmt_client::PartitionKeyType, FabricClient},
client::{svc_mgmt_client::PartitionKeyType, FabricClientBuilder},
error::FabricErrorCode,
types::{NodeQueryDescription, NodeStatusFilter, PagedQueryDescription},
};

#[tokio::test]
async fn test_fabric_client() {
let c = FabricClient::new();
let c = FabricClientBuilder::new().build();
let qc = c.get_query_manager();
let timeout = Duration::from_secs(1);
let paging_status;
Expand Down
8 changes: 5 additions & 3 deletions crates/libs/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
pub fn new() -> $name {
return $name {
com: paste::item! {
crate::client::create_local_client_default::<mssf_com::FabricClient::[<I $name>]>()
crate::client::FabricClientBuilder::new().build_interface::<mssf_com::FabricClient::[<I $name>]>()
},
};
}
Expand Down Expand Up @@ -320,7 +320,8 @@ mod tests {
impl FabricQueryClient {
pub fn new() -> FabricQueryClient {
FabricQueryClient {
com: crate::client::create_local_client_default::<IFabricQueryClient>(),
com: crate::client::FabricClientBuilder::new()
.build_interface::<IFabricQueryClient>(),
}
}

Expand Down Expand Up @@ -510,7 +511,8 @@ mod tests {

#[test]
fn local_client_create() {
let _mgmt = crate::client::create_local_client_default::<IFabricClusterManagementClient3>();
let _mgmt = crate::client::FabricClientBuilder::new()
.build_interface::<IFabricClusterManagementClient3>();
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions crates/samples/echomain-stateful2/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mssf_core::{
PartitionKeyType, ResolvedServiceEndpoint, ResolvedServicePartition,
ServiceEndpointRole, ServicePartitionKind,
},
FabricClient,
FabricClient, FabricClientBuilder,
},
error::FabricErrorCode,
types::{
Expand Down Expand Up @@ -233,7 +233,7 @@ impl TestClient {
// Uses fabric client to perform various actions for this service.
#[tokio::test]
async fn test_partition_info() {
let fc = FabricClient::new();
let fc = FabricClientBuilder::new().build();
let tc = TestClient::new(fc.clone());
let timeout = Duration::from_secs(1);

Expand Down
53 changes: 49 additions & 4 deletions crates/samples/echomain/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mssf_core::{
PartitionKeyType, ResolvedServiceEndpoint, ResolvedServicePartitionInfo,
ServiceEndpointRole, ServicePartitionKind,
},
FabricClient,
FabricClient, FabricClientBuilder, GatewayInformationResult, ServiceNotification,
},
error::FabricErrorCode,
types::{
Expand All @@ -25,6 +25,8 @@ use mssf_core::{
};

static ECHO_SVC_URI: &str = "fabric:/EchoApp/EchoAppService";
static MAX_RETRY_COUNT: i32 = 5;
static RETRY_DURATION_SHORT: Duration = Duration::from_secs(1);

// Test client for echo server.
pub struct EchoTestClient {
Expand Down Expand Up @@ -114,7 +116,26 @@ impl EchoTestClient {
// Uses fabric client to perform various actions to the app.
#[tokio::test]
async fn test_fabric_client() {
let fc = FabricClient::new();
// channel for service notification
let (sn_tx, mut sn_rx) = tokio::sync::mpsc::channel::<ServiceNotification>(1);
// channel for client connection notification
let (cc_tx, mut cc_rx) = tokio::sync::mpsc::channel::<GatewayInformationResult>(1);
let fc = FabricClientBuilder::new()
.with_service_notification_handler_fn(move |notification| {
sn_tx
.blocking_send(notification.clone())
.expect("cannot send notification");
Ok(())
})
.with_client_connection_handler_fn(
move |gw| {
cc_tx.blocking_send(gw.clone()).expect("cannot send");
Ok(())
},
|_| Ok(()), // we only care about connection even in this test.
)
.build();

let ec = EchoTestClient::new(fc.clone());

let timeout = Duration::from_secs(1);
Expand All @@ -128,6 +149,10 @@ async fn test_fabric_client() {
// assert_eq!(stateless.health_state, HealthState::Ok);
assert_ne!(single.id, GUID::zeroed());

// Connection event notification should be received since we already sent a request.
let gw = cc_rx.try_recv().expect("notification not present");
assert!(!gw.node_name.is_empty());

// Get replica info
let stateless_replica = ec.get_replica(single.id).await.unwrap();

Expand Down Expand Up @@ -182,18 +207,38 @@ async fn test_fabric_client() {
if replica2.instance_id != stateless_replica.instance_id {
break;
} else {
if count > 5 {
if count > MAX_RETRY_COUNT {
panic!(
"replica id not changed after retry. original {}, new {}",
stateless_replica.instance_id, replica2.instance_id
);
}
// replica has not changed yet.
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(RETRY_DURATION_SHORT).await;
}
count += 1;
}

// check service notification is invoked because service addr is changed for
// replica removal and recreation.
for i in 0..MAX_RETRY_COUNT {
match sn_rx.try_recv() {
Ok(sn) => {
assert_eq!(sn.partition_id, single.id);
break;
}
Err(e) => {
if e == tokio::sync::mpsc::error::TryRecvError::Disconnected {
panic!("channnel should not be closed");
}
if i == MAX_RETRY_COUNT {
panic!("notification not received");
}
tokio::time::sleep(RETRY_DURATION_SHORT).await;
}
};
}

// unregisters the notification
mgmt.unregister_service_notification_filter(filter_handle, timeout, None)
.await
Expand Down

0 comments on commit 5dfefef

Please sign in to comment.