Skip to content

Commit

Permalink
future: cass_future_wait_timed
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Jul 31, 2024
1 parent 3989ab2 commit 203587d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,9 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as:
<tr>
<td colspan=2 align="center" style="font-weight:bold">Future</td>
</tr>
<tr>
<td>cass_future_wait_timed</td>
<td rowspan="3">Unimplemented</td>
</tr>
<tr>
<td>cass_future_coordinator</td>
<td>Unimplemented</td>
</tr>
<tr>
<td colspan=2 align="center" style="font-weight:bold">Collection</td>
Expand Down
82 changes: 82 additions & 0 deletions scylla-rust-wrapper/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::mem;
use std::os::raw::c_void;
use std::sync::{Arc, Condvar, Mutex};
use tokio::task::JoinHandle;
use tokio::time::Duration;

pub enum CassResultValue {
Empty,
Expand Down Expand Up @@ -124,6 +125,43 @@ impl CassFuture {
f(&mut guard)
}

pub(self) fn with_waited_result_timed<T>(
&self,
f: impl FnOnce(&mut CassFutureResult) -> T,
timeout_duration: Duration,
) -> Result<T, ()> {
self.with_waited_state_timed(|s| f(s.value.as_mut().unwrap()), timeout_duration)
}

pub(self) fn with_waited_state_timed<T>(
&self,
f: impl FnOnce(&mut CassFutureState) -> T,
timeout_duration: Duration,
) -> Result<T, ()> {
let mut guard = self.state.lock().unwrap();
let handle = guard.join_handle.take();
if let Some(handle) = handle {
mem::drop(guard);
// Need to wrap it with async{} block, so the timeout is lazily executed inside the runtime.
// See mention about panics: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html.
let timed = async { tokio::time::timeout(timeout_duration, handle).await };
RUNTIME.block_on(timed).map_err(|_| ())?.unwrap();
guard = self.state.lock().unwrap();
} else {
let wait_result = self
.wait_for_value
.wait_timeout_while(guard, timeout_duration, |state| state.value.is_none())
.unwrap();
let timeout_result = wait_result.1;
if timeout_result.timed_out() {
return Err(());
}
guard = wait_result.0;
}

Ok(f(&mut guard))
}

pub fn set_callback(&self, cb: CassFutureCallback, data: *mut c_void) -> CassError {
let mut lock = self.state.lock().unwrap();
if lock.callback.is_some() {
Expand Down Expand Up @@ -167,6 +205,16 @@ pub unsafe extern "C" fn cass_future_wait(future_raw: *const CassFuture) {
ptr_to_ref(future_raw).with_waited_result(|_| ());
}

#[no_mangle]
pub unsafe extern "C" fn cass_future_wait_timed(
future_raw: *const CassFuture,
timeout_us: cass_duration_t,
) -> cass_bool_t {
ptr_to_ref(future_raw)
.with_waited_result_timed(|_| (), Duration::from_micros(timeout_us))
.is_ok() as cass_bool_t
}

#[no_mangle]
pub unsafe extern "C" fn cass_future_ready(future_raw: *const CassFuture) -> cass_bool_t {
let state_guard = ptr_to_ref(future_raw).state.lock().unwrap();
Expand Down Expand Up @@ -308,4 +356,38 @@ mod tests {
cass_future_free(cass_fut);
}
}

// This test makes sure that the future resolves even if timeout happens.
//
// Tokio's runtime documentation states that Runtime::spawn starts
// running a future immediately, even if the future was not await'ed before.
#[test]
#[ntest::timeout(200)]
fn cass_future_resolves_after_timeout() {
const ERROR_MSG: &str = "NOBODY EXPECTED SPANISH INQUISITION";
const HUNDRED_MILLIS_IN_MICROS: u64 = 100 * 1000;
let fut = async move {
tokio::time::sleep(Duration::from_micros(HUNDRED_MILLIS_IN_MICROS)).await;
Err((CassError::CASS_OK, ERROR_MSG.into()))
};
let cass_fut = CassFuture::make_raw(fut);

unsafe {
// This should timeout on tokio::time::timeout.
let timed_result = cass_future_wait_timed(cass_fut, HUNDRED_MILLIS_IN_MICROS / 5);
assert_eq!(0, timed_result);

// This should timeout on cond variable (previous call consumed JoinHandle).
let timed_result = cass_future_wait_timed(cass_fut, HUNDRED_MILLIS_IN_MICROS / 5);
assert_eq!(0, timed_result);

// Verify that future eventually resolves, even though timeouts occurred before.
let mut message: *const c_char = std::ptr::null();
let mut msg_len: size_t = 0;
cass_future_error_message(cass_fut, &mut message, &mut msg_len);
assert_eq!(ptr_to_cstr_n(message, msg_len), Some(ERROR_MSG));

cass_future_free(cass_fut);
}
}
}
5 changes: 0 additions & 5 deletions src/testing_unimplemented.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,6 @@ CASS_EXPORT const CassNode*
cass_future_coordinator(CassFuture* future){
throw std::runtime_error("UNIMPLEMENTED cass_future_coordinator\n");
}
CASS_EXPORT cass_bool_t
cass_future_wait_timed(CassFuture* future,
cass_duration_t timeout_us){
throw std::runtime_error("UNIMPLEMENTED cass_future_wait_timed\n");
}
CASS_EXPORT const CassValue*
cass_index_meta_field_by_name(const CassIndexMeta* index_meta,
const char* name){
Expand Down

0 comments on commit 203587d

Please sign in to comment.