From 4a89fec2542752f876a8e422aecdc9fe2e4a423f Mon Sep 17 00:00:00 2001 From: Dan Kaplun Date: Sat, 5 May 2018 22:10:17 -0400 Subject: [PATCH 1/4] Add compare_and_swap, compare_exchange, compare_exchange_weak --- src/lib.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++ tests/atom.rs | 67 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 3197876..ea74ff6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,6 +126,14 @@ where self.inner.load(order).is_null() } + #[inline] + fn inner_into_raw(val: Option

) -> *mut () { + match val { + Some(val) => val.into_raw(), + None => ptr::null_mut(), + } + } + #[inline] unsafe fn inner_from_raw(ptr: *mut ()) -> Option

{ if !ptr.is_null() { @@ -136,6 +144,78 @@ where } } +impl Atom

+where + P: IntoRawPtr + FromRawPtr + Deref, +{ + /// Stores a value into the pointer if the current value is the same as the current value. + /// + /// The return value is always the previous value. If it is equal to current, then the value was updated. + /// + /// compare_and_swap also takes an Ordering argument which describes the memory ordering of this operation. + pub fn compare_and_swap( + &self, + current: Option<&P>, + new: Option

, + order: Ordering, + ) -> Result, (Option

, *mut P)> { + let pcurrent = Self::inner_as_ptr(current); + let pnew = Self::inner_into_raw(new); + let pprev = self.inner.compare_and_swap(pcurrent, pnew, order); + if pprev == pcurrent { + Ok(unsafe { Self::inner_from_raw(pprev) }) + } else { + Err((unsafe { Self::inner_from_raw(pnew) }, pprev as *mut P)) + } + } + + /// Stores a value into the pointer if the current value is the same as the `current` value. + /// + /// The return value is a result indicating whether the new value was written and containing the previous value. On success this value is guaranteed to be equal to `current`. + /// + /// `compare_exchange` takes two `Ordering` arguments to describe the memory ordering of this operation. The first describes the required ordering if the operation succeeds while the second describes the required ordering when the operation fails. The failure ordering can't be `Release` or `AcqRel` and must be equivalent or weaker than the success ordering. + pub fn compare_exchange( + &self, + current: Option<&P>, + new: Option

, + success: Ordering, + failure: Ordering, + ) -> Result, (Option

, *mut P)> { + let pnew = Self::inner_into_raw(new); + self.inner + .compare_exchange(Self::inner_as_ptr(current), pnew, success, failure) + .map(|pprev| unsafe { Self::inner_from_raw(pprev) }) + .map_err(|pprev| (unsafe { Self::inner_from_raw(pnew) }, pprev as *mut P)) + } + + /// Stores a value into the pointer if the current value is the same as the `current` value. + /// + /// Unlike `compare_exchange`, this function is allowed to spuriously fail even when the comparison succeeds, which can result in more efficient code on some platforms. The return value is a result indicating whether the new value was written and containing the previous value. + /// + /// `compare_exchange_weak` takes two `Ordering` arguments to describe the memory ordering of this operation. The first describes the required ordering if the operation succeeds while the second describes the required ordering when the operation fails. The failure ordering can't be `Release` or `AcqRel` and must be equivalent or weaker than the success ordering. + pub fn compare_exchange_weak( + &self, + current: Option<&P>, + new: Option

, + success: Ordering, + failure: Ordering, + ) -> Result, (Option

, *mut P)> { + let pnew = Self::inner_into_raw(new); + self.inner + .compare_exchange_weak(Self::inner_as_ptr(current), pnew, success, failure) + .map(|pprev| unsafe { Self::inner_from_raw(pprev) }) + .map_err(|pprev| (unsafe { Self::inner_from_raw(pnew) }, pprev as *mut P)) + } + + #[inline] + fn inner_as_ptr(val: Option<&P>) -> *mut () { + match val { + Some(val) => &**val as *const _ as *mut (), + None => ptr::null_mut(), + } + } +} + impl

Drop for Atom

where P: IntoRawPtr + FromRawPtr, diff --git a/tests/atom.rs b/tests/atom.rs index 48dfaef..957407f 100644 --- a/tests/atom.rs +++ b/tests/atom.rs @@ -15,6 +15,7 @@ extern crate atom; use atom::*; +use std::collections::HashSet; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::*; @@ -45,6 +46,72 @@ fn set_if_none() { ); } +#[test] +fn compare_and_swap() { + cas_test_helper(|a, cas_val, next_val| a.compare_and_swap(cas_val, next_val, Ordering::SeqCst)); +} + +#[test] +fn compare_exchange() { + cas_test_helper(|a, cas_val, next_val| { + a.compare_exchange(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) + }); +} + +#[test] +fn compare_exchange_weak() { + cas_test_helper(|a, cas_val, next_val| { + a.compare_exchange_weak(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) + }); +} + +fn cas_test_helper( + cas: fn(&Atom>, Option<&Arc>, Option>) + -> Result>, (Option>, *mut Arc)>, +) { + let cur_val = Arc::new("current".to_owned()); + let next_val = Arc::new("next".to_owned()); + let other_val = Arc::new("other".to_owned()); + + let a = Arc::new(Atom::new(cur_val.clone())); + + let num_threads = 10; + let cas_thread = num_threads / 2; + let pprevs: Vec> = (0..num_threads) + .map(|i| { + let a = a.clone(); + let cur_val = cur_val.clone(); + let next_val = next_val.clone(); + let other_val = other_val.clone(); + thread::spawn(move || { + let cas_val = Some(if i == cas_thread { + &cur_val + } else { + &other_val + }); + match cas(&a, cas_val, Some(next_val.clone())) { + Ok(prev) => { + let prev = prev.unwrap(); + assert!(Arc::ptr_eq(&prev, &cur_val)); + assert!(!Arc::ptr_eq(&prev, &next_val)); + Ok(prev.into_raw() as usize) + } + Err((_, pprev)) => Err(pprev as usize), + } + }) + }) + .map(|handle| handle.join().unwrap()) + .collect(); + assert_eq!(pprevs.iter().filter(|pprev| pprev.is_ok()).count(), 1); + let uniq_pprevs: HashSet<_> = pprevs + .into_iter() + .map(|pprev| pprev.unwrap_or_else(|pprev| pprev) as *mut _) + .collect(); + assert!(uniq_pprevs.contains(&cur_val.into_raw())); + assert!(!uniq_pprevs.contains(&other_val.into_raw())); + assert_eq!(a.take(Ordering::Relaxed), Some(next_val)); +} + #[derive(Clone)] struct Canary(Arc); From f84ac2b628cf1539dd9aa8699b6c0393dc168c42 Mon Sep 17 00:00:00 2001 From: Dan Kaplun Date: Mon, 14 May 2018 07:30:57 -0400 Subject: [PATCH 2/4] Add line breaks to docs --- src/lib.rs | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ea74ff6..02d85b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,11 +148,14 @@ impl Atom

where P: IntoRawPtr + FromRawPtr + Deref, { - /// Stores a value into the pointer if the current value is the same as the current value. + /// Stores a value into the pointer if the current value is the same as the + /// current value. /// - /// The return value is always the previous value. If it is equal to current, then the value was updated. + /// The return value is always the previous value. If it is equal to + /// current, then the value was updated. /// - /// compare_and_swap also takes an Ordering argument which describes the memory ordering of this operation. + /// compare_and_swap also takes an Ordering argument which describes the + /// memory ordering of this operation. pub fn compare_and_swap( &self, current: Option<&P>, @@ -169,11 +172,19 @@ where } } - /// Stores a value into the pointer if the current value is the same as the `current` value. + /// Stores a value into the pointer if the current value is the same as the + /// `current` value. /// - /// The return value is a result indicating whether the new value was written and containing the previous value. On success this value is guaranteed to be equal to `current`. + /// The return value is a result indicating whether the new value was + /// written and containing the previous value. On success this value is + /// guaranteed to be equal to `current`. /// - /// `compare_exchange` takes two `Ordering` arguments to describe the memory ordering of this operation. The first describes the required ordering if the operation succeeds while the second describes the required ordering when the operation fails. The failure ordering can't be `Release` or `AcqRel` and must be equivalent or weaker than the success ordering. + /// `compare_exchange` takes two `Ordering` arguments to describe the + /// memory ordering of this operation. The first describes the required + /// ordering if the operation succeeds while the second describes the + /// required ordering when the operation fails. The failure ordering can't + /// be `Release` or `AcqRel` and must be equivalent or weaker than the + /// success ordering. pub fn compare_exchange( &self, current: Option<&P>, @@ -188,11 +199,20 @@ where .map_err(|pprev| (unsafe { Self::inner_from_raw(pnew) }, pprev as *mut P)) } - /// Stores a value into the pointer if the current value is the same as the `current` value. + /// Stores a value into the pointer if the current value is the same as the + /// `current` value. /// - /// Unlike `compare_exchange`, this function is allowed to spuriously fail even when the comparison succeeds, which can result in more efficient code on some platforms. The return value is a result indicating whether the new value was written and containing the previous value. + /// Unlike `compare_exchange`, this function is allowed to spuriously fail + /// even when the comparison succeeds, which can result in more efficient + /// code on some platforms. The return value is a result indicating whether + /// the new value was written and containing the previous value. /// - /// `compare_exchange_weak` takes two `Ordering` arguments to describe the memory ordering of this operation. The first describes the required ordering if the operation succeeds while the second describes the required ordering when the operation fails. The failure ordering can't be `Release` or `AcqRel` and must be equivalent or weaker than the success ordering. + /// `compare_exchange_weak` takes two `Ordering` arguments to describe the + /// memory ordering of this operation. The first describes the required + /// ordering if the operation succeeds while the second describes the + /// required ordering when the operation fails. The failure ordering can't + /// be `Release` or `AcqRel` and must be equivalent or weaker than the + /// success ordering. pub fn compare_exchange_weak( &self, current: Option<&P>, From 75690755b4fe18c6902fbebc92d9c5b83ad14555 Mon Sep 17 00:00:00 2001 From: Dan Kaplun Date: Tue, 15 May 2018 21:03:13 -0400 Subject: [PATCH 3/4] Improve compare_and_swap documentation --- src/lib.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02d85b7..83219e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,14 +148,16 @@ impl Atom

where P: IntoRawPtr + FromRawPtr + Deref, { - /// Stores a value into the pointer if the current value is the same as the - /// current value. + /// Stores `new` in the Atom if `current` has the same raw pointer + /// representation as the currently stored value. /// - /// The return value is always the previous value. If it is equal to - /// current, then the value was updated. + /// On success, the Atom's previous value is returned. On failure, `new` is + /// returned together with a raw pointer to the Atom's current unchanged + /// value, which is **not safe to dereference**, especially if the Atom is + /// accessed from multiple threads. /// - /// compare_and_swap also takes an Ordering argument which describes the - /// memory ordering of this operation. + /// `compare_and_swap` also takes an `Ordering` argument which describes + /// the memory ordering of this operation. pub fn compare_and_swap( &self, current: Option<&P>, From 01b29f66c560332a4083915f3cb80d2a695da87f Mon Sep 17 00:00:00 2001 From: Dan Kaplun Date: Tue, 15 May 2018 21:03:50 -0400 Subject: [PATCH 4/4] Add some deterministic tests for CAS ops --- tests/atom.rs | 68 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/tests/atom.rs b/tests/atom.rs index 957407f..6ddeb39 100644 --- a/tests/atom.rs +++ b/tests/atom.rs @@ -47,28 +47,76 @@ fn set_if_none() { } #[test] -fn compare_and_swap() { - cas_test_helper(|a, cas_val, next_val| a.compare_and_swap(cas_val, next_val, Ordering::SeqCst)); +fn compare_and_swap_basics() { + cas_test_basics_helper(|a, cas_val, next_val| { + a.compare_and_swap(cas_val, next_val, Ordering::SeqCst) + }); } #[test] -fn compare_exchange() { - cas_test_helper(|a, cas_val, next_val| { +fn compare_exchange_basics() { + cas_test_basics_helper(|a, cas_val, next_val| { a.compare_exchange(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) }); } #[test] -fn compare_exchange_weak() { - cas_test_helper(|a, cas_val, next_val| { +fn compare_exchange_weak_basics() { + cas_test_basics_helper(|a, cas_val, next_val| { a.compare_exchange_weak(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) }); } -fn cas_test_helper( - cas: fn(&Atom>, Option<&Arc>, Option>) - -> Result>, (Option>, *mut Arc)>, -) { +#[test] +fn compare_and_swap_threads() { + cas_test_threads_helper(|a, cas_val, next_val| { + a.compare_and_swap(cas_val, next_val, Ordering::SeqCst) + }); +} + +#[test] +fn compare_exchange_threads() { + cas_test_threads_helper(|a, cas_val, next_val| { + a.compare_exchange(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) + }); +} + +#[test] +fn compare_exchange_weak_threads() { + cas_test_threads_helper(|a, cas_val, next_val| { + a.compare_exchange_weak(cas_val, next_val, Ordering::SeqCst, Ordering::SeqCst) + }); +} + +type TestCASFn = fn(&Atom>, Option<&Arc>, Option>) + -> Result>, (Option>, *mut Arc)>; + +fn cas_test_basics_helper(cas: TestCASFn) { + let cur_val = Arc::new("123".to_owned()); + let mut next_val = Arc::new("456".to_owned()); + let other_val = Arc::new("1927447".to_owned()); + + let a = Atom::new(cur_val.clone()); + + let pcur = IntoRawPtr::into_raw(cur_val.clone()); + let pnext = IntoRawPtr::into_raw(next_val.clone()); + + for attempt in vec![None, Some(&other_val), Some(&Arc::new("wow".to_owned()))] { + let res = cas(&a, attempt, Some(next_val.clone())).unwrap_err(); + next_val = res.0.unwrap(); + assert_eq!(res.1, pcur as *mut _); + } + + let res = cas(&a, Some(&cur_val), Some(next_val.clone())); + assert_eq!(res, Ok(Some(cur_val))); + + for attempt in vec![None, Some(&other_val), Some(&Arc::new("wow".to_owned()))] { + let res = cas(&a, attempt, None).unwrap_err(); + assert_eq!(res, (None, pnext as *mut _)); + } +} + +fn cas_test_threads_helper(cas: TestCASFn) { let cur_val = Arc::new("current".to_owned()); let next_val = Arc::new("next".to_owned()); let other_val = Arc::new("other".to_owned());