Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard aware batching - add Session::shard_for_statement & Batch::enforce_target_node #738

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::retry_policy::RetryPolicy;
use crate::routing;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -898,15 +899,7 @@ impl Session {
.as_ref()
.map(|pk| prepared.get_partitioner_name().hash(pk));

let statement_info = RoutingInfo {
consistency: prepared
.get_consistency()
.unwrap_or(self.default_execution_profile_handle.access().consistency),
serial_consistency: prepared.get_serial_consistency(),
token,
keyspace: prepared.get_keyspace_name(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
};
let statement_info = self.routing_info(prepared, token);

let span =
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());
Expand Down Expand Up @@ -1814,13 +1807,63 @@ impl Session {
prepared: &PreparedStatement,
serialized_values: &SerializedValues,
) -> Result<Option<Token>, QueryError> {
match self.calculate_partition_key(prepared, serialized_values) {
Ok(Some(partition_key)) => {
let partitioner_name = prepared.get_partitioner_name();
Ok(Some(partitioner_name.hash(&partition_key)))
}
Ok(None) => Ok(None),
Err(err) => Err(err),
Ok(self
.calculate_partition_key(prepared, serialized_values)?
.map(|partition_key| prepared.get_partitioner_name().hash(&partition_key)))
Ten0 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Get the first node/shard that the load balancer would target if running this query
///
Ten0 marked this conversation as resolved.
Show resolved Hide resolved
/// This may help constituting shard-aware batches
pub fn first_shard_for_statement(
&self,
prepared: &PreparedStatement,
serialized_values: &SerializedValues,
) -> Result<Option<(Arc<Node>, Option<routing::Shard>)>, QueryError> {
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
let token = match self.calculate_token(prepared, serialized_values)? {
Some(token) => token,
None => return Ok(None),
};
let routing_info = self.routing_info(prepared, Some(token));
let cluster_data = self.cluster.get_data();
let execution_profile = prepared
.config
.execution_profile_handle
.as_ref()
.unwrap_or_else(|| self.get_default_execution_profile_handle())
.access();
let mut query_plan = load_balancing::Plan::new(
&*execution_profile.load_balancing_policy,
&routing_info,
&cluster_data,
);
// We can't return the full iterator here because the iterator borrows from local variables.
// In order to achieve that, two designs would be possible:
// - Construct a self-referential struct and implement iterator on it via e.g. Ouroboros
// - Take a closure as a parameter that will take the local iterator and return anything, and
// this function would return directly what the closure returns
// Most likely though, people would use this for some kind of shard-awareness optimization for batching,
// and are consequently not interested in subsequent nodes.
// Until then, let's just expose this, as it is simpler
Ok(query_plan.next().map(move |node| {
let token = node.sharder().map(|sharder| sharder.shard_of(token));
(node.clone(), token)
}))
}

fn routing_info<'p>(
&self,
prepared: &'p PreparedStatement,
token: Option<Token>,
) -> RoutingInfo<'p> {
RoutingInfo {
consistency: prepared
.get_consistency()
.unwrap_or(self.default_execution_profile_handle.access().consistency),
serial_consistency: prepared.get_serial_consistency(),
token,
keyspace: prepared.get_keyspace_name(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
}
Ten0 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down