Skip to content

Commit

Permalink
Add callback for fail-rs (#50)
Browse files Browse the repository at this point in the history
I want to add a new kind of action for fail-rs, which can help TiKV be able to
stop at some statement and call one callback function that set by tests. This
feature will help use more convenient to deal with some tests which need to
wait for a while, because we can pass a callback to fail_point to notify
test-thread to avoid waiting.

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Mar 27, 2020
1 parent e75d98a commit 2cf1175
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
60 changes: 60 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,39 @@

use std::collections::HashMap;
use std::env::VarError;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, TryLockError};
use std::time::{Duration, Instant};
use std::{env, thread};

#[derive(Clone)]
struct SyncCallback(Arc<dyn Fn() + Send + Sync>);

impl Debug for SyncCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("SyncCallback()")
}
}

impl PartialEq for SyncCallback {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl SyncCallback {
fn new(f: impl Fn() + Send + Sync + 'static) -> SyncCallback {
SyncCallback(Arc::new(f))
}

fn run(&self) {
let callback = &self.0;
callback();
}
}

/// Supported tasks.
#[derive(Clone, Debug, PartialEq)]
enum Task {
Expand All @@ -251,6 +278,8 @@ enum Task {
Yield,
/// Busy waiting for some milliseconds.
Delay(u64),
/// Call callback function.
Callback(SyncCallback),
}

#[derive(Debug)]
Expand Down Expand Up @@ -285,6 +314,15 @@ impl Action {
}
}

fn from_callback(f: impl Fn() + Send + Sync + 'static) -> Action {
let task = Task::Callback(SyncCallback::new(f));
Action {
task,
freq: 1.0,
count: None,
}
}

fn get_task(&self) -> Option<Task> {
use rand::Rng;

Expand Down Expand Up @@ -459,6 +497,9 @@ impl FailPoint {
let timeout = Duration::from_millis(t);
while timer.elapsed() < timeout {}
}
Task::Callback(f) => {
f.run();
}
}
None
}
Expand Down Expand Up @@ -628,6 +669,25 @@ pub fn cfg<S: Into<String>>(name: S, actions: &str) -> Result<(), String> {
set(&mut registry, name.into(), actions)
}

/// Configure the actions for a fail point at runtime.
///
/// Each fail point can be configured by a callback. Process will call this callback function
/// when it meet this fail-point.
pub fn cfg_callback<S, F>(name: S, f: F) -> Result<(), String>
where
S: Into<String>,
F: Fn() + Send + Sync + 'static,
{
let mut registry = REGISTRY.registry.write().unwrap();
let p = registry
.entry(name.into())
.or_insert_with(|| Arc::new(FailPoint::new()));
let action = Action::from_callback(f);
let actions = vec![action];
p.set_actions("callback", actions);
Ok(())
}

/// Remove a fail point.
///
/// If the fail point doesn't exist, nothing will happen.
Expand Down
22 changes: 22 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::*;
use std::time::*;
use std::*;
Expand Down Expand Up @@ -135,6 +136,27 @@ fn test_yield() {
f();
}

#[test]
#[cfg_attr(not(feature = "failpoints"), ignore)]
fn test_callback() {
let f1 = || {
fail_point!("cb");
};
let f2 = || {
fail_point!("cb");
};

let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone();
fail::cfg_callback("cb", move || {
counter2.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
f1();
f2();
assert_eq!(2, counter.load(Ordering::SeqCst));
}

#[test]
#[cfg_attr(not(feature = "failpoints"), ignore)]
fn test_delay() {
Expand Down

0 comments on commit 2cf1175

Please sign in to comment.