Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec_profile: Use cluster lbp for exec profiles by default #197

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,27 @@ const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");
pub(crate) struct LoadBalancingConfig {
pub(crate) token_awareness_enabled: bool,
pub(crate) token_aware_shuffling_replicas_enabled: bool,
pub(crate) dc_awareness: Option<DcAwareness>,
pub(crate) load_balancing_kind: Option<LoadBalancingKind>,
pub(crate) latency_awareness_enabled: bool,
pub(crate) latency_awareness_builder: LatencyAwarenessBuilder,
}
impl LoadBalancingConfig {
// This is `async` to prevent running this function from beyond tokio context,
// as it results in panic due to DefaultPolicyBuilder::build() spawning a tokio task.
pub(crate) async fn build(self) -> Arc<dyn LoadBalancingPolicy> {
let load_balancing_kind = self
.load_balancing_kind
// Round robin is chosen by default for cluster wide LBP.
.unwrap_or(LoadBalancingKind::RoundRobin);

let mut builder = DefaultPolicyBuilder::new().token_aware(self.token_awareness_enabled);
if self.token_awareness_enabled {
// Cpp-driver enables shuffling replicas only if token aware routing is enabled.
builder =
builder.enable_shuffling_replicas(self.token_aware_shuffling_replicas_enabled);
}
if let Some(dc_awareness) = self.dc_awareness.as_ref() {
builder = builder
.prefer_datacenter(dc_awareness.local_dc.clone())
.permit_dc_failover(true)
if let LoadBalancingKind::DcAware { local_dc } = load_balancing_kind {
builder = builder.prefer_datacenter(local_dc).permit_dc_failover(true)
}
if self.latency_awareness_enabled {
builder = builder.latency_awareness(self.latency_awareness_builder);
Expand All @@ -81,16 +84,17 @@ impl Default for LoadBalancingConfig {
Self {
token_awareness_enabled: true,
token_aware_shuffling_replicas_enabled: true,
dc_awareness: None,
load_balancing_kind: None,
latency_awareness_enabled: false,
latency_awareness_builder: Default::default(),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct DcAwareness {
pub(crate) local_dc: String,
pub(crate) enum LoadBalancingKind {
RoundRobin,
DcAware { local_dc: String },
}

#[derive(Clone)]
Expand Down Expand Up @@ -457,7 +461,7 @@ pub unsafe extern "C" fn cass_cluster_set_credentials_n(
#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_load_balance_round_robin(cluster_raw: *mut CassCluster) {
let cluster = ptr_to_ref_mut(cluster_raw);
cluster.load_balancing_config.dc_awareness = None;
cluster.load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::RoundRobin);
}

#[no_mangle]
Expand Down Expand Up @@ -496,7 +500,7 @@ pub(crate) unsafe fn set_load_balance_dc_aware_n(
.unwrap()
.to_string();

load_balancing_config.dc_awareness = Some(DcAwareness { local_dc });
load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::DcAware { local_dc });

CassError::CASS_OK
}
Expand Down Expand Up @@ -851,7 +855,7 @@ mod tests {
/* Test valid configurations */
let cluster = ptr_to_ref(cluster_raw);
{
assert_matches!(cluster.load_balancing_config.dc_awareness, None);
assert_matches!(cluster.load_balancing_config.load_balancing_kind, None);
assert!(cluster.load_balancing_config.token_awareness_enabled);
assert!(!cluster.load_balancing_config.latency_awareness_enabled);
}
Expand All @@ -878,8 +882,13 @@ mod tests {
40,
);

let dc_awareness = cluster.load_balancing_config.dc_awareness.as_ref().unwrap();
assert_eq!(dc_awareness.local_dc, "eu");
let load_balancing_kind = &cluster.load_balancing_config.load_balancing_kind;
match load_balancing_kind {
Some(LoadBalancingKind::DcAware { local_dc }) => {
assert_eq!(local_dc, "eu")
}
_ => panic!("Expected preferred dc"),
}
assert!(!cluster.load_balancing_config.token_awareness_enabled);
assert!(cluster.load_balancing_config.latency_awareness_enabled);
}
Expand Down
32 changes: 23 additions & 9 deletions scylla-rust-wrapper/src/exec_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::argconv::{free_boxed, ptr_to_cstr_n, ptr_to_ref, ptr_to_ref_mut, strl
use crate::batch::CassBatch;
use crate::cass_error::CassError;
use crate::cass_types::CassConsistency;
use crate::cluster::{set_load_balance_dc_aware_n, LoadBalancingConfig};
use crate::cluster::{set_load_balance_dc_aware_n, LoadBalancingConfig, LoadBalancingKind};
use crate::retry_policy::CassRetryPolicy;
use crate::retry_policy::RetryPolicy::{
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
Expand All @@ -42,10 +42,19 @@ impl CassExecProfile {
}
}

pub(crate) async fn build(self) -> ExecutionProfile {
self.inner
.load_balancing_policy(self.load_balancing_config.build().await)
.build()
pub(crate) async fn build(
self,
cluster_default_profile: &ExecutionProfile,
) -> ExecutionProfile {
let load_balacing = if self.load_balancing_config.load_balancing_kind.is_some() {
self.load_balancing_config.build().await
} else {
// If load balancing config does not have LB kind defined,
// we make use of cluster's LBP.
cluster_default_profile.get_load_balancing_policy().clone()
};

self.inner.load_balancing_policy(load_balacing).build()
}
}

Expand Down Expand Up @@ -353,7 +362,7 @@ pub unsafe extern "C" fn cass_execution_profile_set_load_balance_round_robin(
profile: *mut CassExecProfile,
) -> CassError {
let profile_builder = ptr_to_ref_mut(profile);
profile_builder.load_balancing_config.dc_awareness = None;
profile_builder.load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::RoundRobin);

CassError::CASS_OK
}
Expand Down Expand Up @@ -473,7 +482,7 @@ mod tests {
/* Test valid configurations */
let profile = ptr_to_ref(profile_raw);
{
assert_matches!(profile.load_balancing_config.dc_awareness, None);
assert_matches!(profile.load_balancing_config.load_balancing_kind, None);
assert!(profile.load_balancing_config.token_awareness_enabled);
assert!(!profile.load_balancing_config.latency_awareness_enabled);
}
Expand All @@ -500,8 +509,13 @@ mod tests {
40,
);

let dc_awareness = profile.load_balancing_config.dc_awareness.as_ref().unwrap();
assert_eq!(dc_awareness.local_dc, "eu");
let load_balancing_kind = &profile.load_balancing_config.load_balancing_kind;
match load_balancing_kind {
Some(LoadBalancingKind::DcAware { local_dc }) => {
assert_eq!(local_dc, "eu")
}
_ => panic!("Expected preferred dc"),
}
assert!(!profile.load_balancing_config.token_awareness_enabled);
assert!(profile.load_balancing_config.latency_awareness_enabled);
}
Expand Down
10 changes: 8 additions & 2 deletions scylla-rust-wrapper/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,18 @@ impl CassSessionInner {
"Already connecting, closing, or connected".msg(),
));
}

let mut session_builder = session_builder_fut.await;
let default_profile = session_builder
.config
.default_execution_profile_handle
.to_profile();

let mut exec_profile_map = HashMap::with_capacity(exec_profile_builder_map.len());
for (name, builder) in exec_profile_builder_map {
exec_profile_map.insert(name, builder.build().await.into_handle());
exec_profile_map.insert(name, builder.build(&default_profile).await.into_handle());
}

let mut session_builder = session_builder_fut.await;
if let Some(keyspace) = keyspace {
session_builder = session_builder.use_keyspace(keyspace, false);
}
Expand Down