Skip to content

Commit

Permalink
exec_profile: make LBP optional
Browse files Browse the repository at this point in the history
After this commit, we are finally consistent with cpp-driver.
Now, if user did not specify an LBP for given execution profile,
the default LBP (session-wide LBP) will be assigned to the profile.
  • Loading branch information
muzarski committed Oct 21, 2024
1 parent 61768fc commit 00a272e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
33 changes: 18 additions & 15 deletions scylla-rust-wrapper/src/exec_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;
use scylla::execution_profile::{
ExecutionProfile, ExecutionProfileBuilder, ExecutionProfileHandle,
};
use scylla::load_balancing::LatencyAwarenessBuilder;
use scylla::load_balancing::{LatencyAwarenessBuilder, LoadBalancingPolicy};
use scylla::retry_policy::RetryPolicy;
use scylla::speculative_execution::SimpleSpeculativeExecutionPolicy;
use scylla::statement::Consistency;
Expand All @@ -31,27 +31,30 @@ use crate::types::{
#[derive(Clone, Debug)]
pub struct CassExecProfile {
inner: ExecutionProfileBuilder,
load_balancing_kind: LoadBalancingKind,
load_balancing_kind: Option<LoadBalancingKind>,
load_balancing_config: LoadBalancingConfig,
}

impl CassExecProfile {
fn new() -> Self {
Self {
inner: ExecutionProfile::builder(),
load_balancing_kind: LoadBalancingKind::RoundRobin,
load_balancing_kind: None,
load_balancing_config: Default::default(),
}
}

pub(crate) async fn build(self) -> ExecutionProfile {
self.inner
.load_balancing_policy(
self.load_balancing_config
.build(self.load_balancing_kind)
.await,
)
.build()
pub(crate) async fn build(
self,
cluster_default_lbp: Arc<dyn LoadBalancingPolicy>,
) -> ExecutionProfile {
let load_balacing = if let Some(load_balancing_kind) = self.load_balancing_kind {
self.load_balancing_config.build(load_balancing_kind).await
} else {
cluster_default_lbp
};

self.inner.load_balancing_policy(load_balacing).build()
}
}

Expand Down Expand Up @@ -346,7 +349,7 @@ pub unsafe extern "C" fn cass_execution_profile_set_load_balance_dc_aware_n(
let profile_builder = ptr_to_ref_mut(profile);

set_load_balance_dc_aware_n(
|load_balancing_kind| profile_builder.load_balancing_kind = load_balancing_kind,
|load_balancing_kind| profile_builder.load_balancing_kind = Some(load_balancing_kind),
local_dc,
local_dc_length,
used_hosts_per_remote_dc,
Expand All @@ -359,7 +362,7 @@ pub unsafe extern "C" fn cass_execution_profile_set_load_balance_round_robin(
profile: *mut CassExecProfile,
) -> CassError {
let profile_builder = ptr_to_ref_mut(profile);
profile_builder.load_balancing_kind = LoadBalancingKind::RoundRobin;
profile_builder.load_balancing_kind = Some(LoadBalancingKind::RoundRobin);

CassError::CASS_OK
}
Expand Down Expand Up @@ -479,7 +482,7 @@ mod tests {
/* Test valid configurations */
let profile = ptr_to_ref(profile_raw);
{
assert_matches!(profile.load_balancing_kind, LoadBalancingKind::RoundRobin);
assert_matches!(profile.load_balancing_kind, None);
assert!(profile.load_balancing_config.token_awareness_enabled);
assert!(!profile.load_balancing_config.latency_awareness_enabled);
}
Expand Down Expand Up @@ -508,7 +511,7 @@ mod tests {

let load_balancing_kind = &profile.load_balancing_kind;
match load_balancing_kind {
LoadBalancingKind::DcAware { local_dc } => {
Some(LoadBalancingKind::DcAware { local_dc }) => {
assert_eq!(local_dc, "eu")
}
_ => panic!("Expected preferred dc"),
Expand Down
7 changes: 4 additions & 3 deletions scylla-rust-wrapper/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ impl CassSessionInner {
"Already connecting, closing, or connected".msg(),
));
}

let (mut session_builder, default_lbp) = session_builder_and_default_lbp_fut.await;

let mut exec_profile_map = HashMap::with_capacity(exec_profile_builder_map.len());
for (name, builder) in exec_profile_builder_map {
exec_profile_map.insert(name, builder.build().await.into_handle());
exec_profile_map.insert(name, builder.build(default_lbp.clone()).await.into_handle());
}

// TODO: pass default_lbp to exec profiles above.
let (mut session_builder, _default_lbp) = session_builder_and_default_lbp_fut.await;
if let Some(keyspace) = keyspace {
session_builder = session_builder.use_keyspace(keyspace, false);
}
Expand Down

0 comments on commit 00a272e

Please sign in to comment.