Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return Query Error for non existent DC #825

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub enum QueryError {
#[error("Request timeout: {0}")]
RequestTimeout(String),

/// Empty Query Plan
#[error("Load balancing policy returned empty query plan. It can happen when the driver is provided with non-existing datacenter name")]
EmptyQueryPlan,

/// Address translation failed
#[error("Address translation failed: {0}")]
TranslationError(#[from] TranslationError),
Expand Down Expand Up @@ -404,6 +408,10 @@ pub enum NewSessionError {
#[error("Client timeout: {0}")]
RequestTimeout(String),

/// Empty Query Plan
#[error("Load balancing policy returned empty query plan. It can happen when the driver is provided with non-existing datacenter name")]
EmptyQueryPlan,

/// Address translation failed
#[error("Address translation failed: {0}")]
TranslationError(#[from] TranslationError),
Expand Down Expand Up @@ -482,6 +490,7 @@ impl From<QueryError> for NewSessionError {
QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId,
QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg),
QueryError::TranslationError(e) => NewSessionError::TranslationError(e),
QueryError::EmptyQueryPlan => NewSessionError::EmptyQueryPlan,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,7 @@ mod latency_awareness {
| QueryError::IoError(_)
| QueryError::ProtocolError(_)
| QueryError::TimeoutError
| QueryError::EmptyQueryPlan
| QueryError::RequestTimeout(_) => true,
}
}
Expand Down
7 changes: 7 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,10 @@ impl Session {
.consistency_set_on_statement
.unwrap_or(execution_profile.consistency);

let mut query_plan_is_empty = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to do a peekable check to ensure the iterator is not empty like:

query_plan.peekable()

but it caused this lifetime error:

error: higher-ranked lifetime error
   --> scylla/src/transport/session_test.rs:409:5
    |
409 | /     tokio::spawn(async move {
410 | |         let values = (
411 | |             (1_i32, 2_i32, "abc"),
412 | |             (),
...   |
415 | |         session_clone.batch(&batch, values).await.unwrap();
416 | |     })
    | |______^
    |
    = note: could not prove `[async block@scylla/src/transport/session_test.rs:409:18: 416:6]: std::marker::Send`
    ```
    So, I just defaulted to doing this more simple check


'nodes_in_plan: for node in query_plan {
query_plan_is_empty = false;
let span = trace_span!("Executing query", node = %node.address);
'same_node_retries: loop {
trace!(parent: &span, "Execution started");
Expand Down Expand Up @@ -1769,6 +1772,10 @@ impl Session {
}
}

if query_plan_is_empty {
return Some(Err(QueryError::EmptyQueryPlan));
}

last_error.map(Result::Err)
}

Expand Down
28 changes: 28 additions & 0 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate as scylla;
use crate::batch::{Batch, BatchStatement};
use crate::frame::response::result::Row;
use crate::frame::value::ValueList;
use crate::load_balancing::DefaultPolicy;
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession};
Expand Down Expand Up @@ -2857,3 +2858,30 @@ async fn test_manual_primary_key_computation() {
.await;
}
}

#[tokio::test]
async fn test_non_existent_dc_return_correct_error() {
let ks = unique_keyspace_name();

let dc = "non existent dc";
let default_policy = DefaultPolicy::builder()
.prefer_datacenter(dc.to_string())
.build();

let profile = ExecutionProfile::builder()
.load_balancing_policy(default_policy)
.build();

let handle = profile.into_handle();

let session: Session = create_new_session_builder()
.default_execution_profile_handle(handle)
.build()
.await
.expect("cannot create session");

let ks_stmt = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks);
let query_result = session.query(ks_stmt, &[]).await;

assert_matches!(query_result.unwrap_err(), QueryError::EmptyQueryPlan)
}