From 7c0e1d13a8fe941c26f9c056ae2a94ea5347881c Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Tue, 24 Sep 2024 16:25:32 -0700 Subject: [PATCH] Support concurrent threads locking the same transaction (#19526) (#19531) Fix crash when two threads concurrently lock the same transaction. The new test case fails reliably if the fix is not present. --- .../src/execution_cache/object_locks.rs | 62 ++++++++-------- .../unit_tests/writeback_cache_tests.rs | 73 +++++++++++++++++++ 2 files changed, 102 insertions(+), 33 deletions(-) diff --git a/crates/sui-core/src/execution_cache/object_locks.rs b/crates/sui-core/src/execution_cache/object_locks.rs index ed27ebb1907fe..002ff6a2904a3 100644 --- a/crates/sui-core/src/execution_cache/object_locks.rs +++ b/crates/sui-core/src/execution_cache/object_locks.rs @@ -9,10 +9,12 @@ use sui_types::error::{SuiError, SuiResult, UserInputError}; use sui_types::object::Object; use sui_types::storage::ObjectStore; use sui_types::transaction::VerifiedSignedTransaction; -use tracing::{debug, info, instrument, trace}; +use tracing::{debug, error, info, instrument, trace}; use super::writeback_cache::WritebackCache; +type RefCount = usize; + pub(super) struct ObjectLocks { // When acquire transaction locks, lock entries are briefly inserted into this map. The map // exists to provide atomic test-and-set operations on the locks. After all locks have been inserted @@ -23,7 +25,7 @@ pub(super) struct ObjectLocks { // those objects. Therefore we do a db read for each object we are locking. // // TODO: find a strategy to allow us to avoid db reads for each object. - locked_transactions: DashMap, + locked_transactions: DashMap, } impl ObjectLocks { @@ -38,29 +40,10 @@ impl ObjectLocks { obj_ref: &ObjectRef, epoch_store: &AuthorityPerEpochStore, ) -> SuiResult> { - match self.locked_transactions.entry(*obj_ref) { - DashMapEntry::Vacant(vacant) => { - let tables = epoch_store.tables()?; - let lock = tables.get_locked_transaction(obj_ref)?; - if let Some(lock_details) = lock { - vacant.insert(lock_details); - } - Ok(lock) - } - DashMapEntry::Occupied(occupied) => { - if cfg!(debug_assertions) { - if let Some(lock_details) = epoch_store - .tables() - .unwrap() - .get_locked_transaction(obj_ref) - .unwrap() - { - assert_eq!(*occupied.get(), lock_details); - } - } - Ok(Some(*occupied.get())) - } - } + // We don't consult the in-memory state here. We are only interested in state that + // has been committed to the db. This is because in memory state is reverted + // if the transaction is not successfully locked. + epoch_store.tables()?.get_locked_transaction(obj_ref) } /// Attempts to atomically test-and-set a transaction lock on an object. @@ -96,15 +79,18 @@ impl ObjectLocks { let tables = epoch_store.tables()?; if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? { trace!("read lock from db: {:?}", lock_details); - vacant.insert(lock_details); + vacant.insert((1, lock_details)); lock_details } else { trace!("set lock: {:?}", new_lock); - vacant.insert(new_lock); + vacant.insert((1, new_lock)); new_lock } } - DashMapEntry::Occupied(occupied) => *occupied.get(), + DashMapEntry::Occupied(mut occupied) => { + occupied.get_mut().0 += 1; + occupied.get().1 + } }; if prev_lock != new_lock { @@ -156,14 +142,24 @@ impl ObjectLocks { fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) { for (obj_ref, lock) in locks { let entry = self.locked_transactions.entry(*obj_ref); - let occupied = match entry { - DashMapEntry::Vacant(_) => panic!("lock must exist"), + let mut occupied = match entry { + DashMapEntry::Vacant(_) => { + if cfg!(debug_assertions) { + panic!("lock must exist"); + } else { + error!(?obj_ref, "lock should exist"); + } + continue; + } DashMapEntry::Occupied(occupied) => occupied, }; - if occupied.get() == lock { - trace!("clearing lock: {:?}", lock); - occupied.remove(); + if occupied.get().1 == *lock { + occupied.get_mut().0 -= 1; + if occupied.get().0 == 0 { + trace!("clearing lock: {:?}", lock); + occupied.remove(); + } } else { // this is impossible because the only case in which we overwrite a // lock is when the lock is from a previous epoch. but we are holding diff --git a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index 4dd4894c244d8..7321a5a18af06 100644 --- a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -1145,6 +1145,79 @@ async fn test_concurrent_lockers() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn test_concurrent_lockers_same_tx() { + telemetry_subscribers::init_for_testing(); + + let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await; + let cache = s.cache.clone(); + let mut txns = Vec::new(); + + for i in 0..1000 { + let a = i * 4; + let b = i * 4 + 1; + s.with_created(&[a, b]); + s.do_tx().await; + + let a_ref = s.obj_ref(a); + let b_ref = s.obj_ref(b); + + let tx1 = s.take_outputs(); + + let tx1 = s.make_signed_transaction(&tx1.transaction); + + txns.push((tx1, a_ref, b_ref)); + } + + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + + let t1 = { + let txns = txns.clone(); + let cache = cache.clone(); + let barrier = barrier.clone(); + let epoch_store = s.epoch_store.clone(); + tokio::task::spawn(async move { + let mut results = Vec::new(); + for (tx1, a_ref, b_ref) in txns { + results.push( + cache + .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1) + .await, + ); + barrier.wait().await; + } + results + }) + }; + + let t2 = { + let txns = txns.clone(); + let cache = cache.clone(); + let barrier = barrier.clone(); + let epoch_store = s.epoch_store.clone(); + tokio::task::spawn(async move { + let mut results = Vec::new(); + for (tx1, a_ref, b_ref) in txns { + results.push( + cache + .acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1) + .await, + ); + barrier.wait().await; + } + results + }) + }; + + let results1 = t1.await.unwrap(); + let results2 = t2.await.unwrap(); + + for (r1, r2) in results1.into_iter().zip(results2) { + assert!(r1.is_ok()); + assert!(r2.is_ok()); + } +} + #[tokio::test] async fn latest_object_cache_race_test() { let authority = TestAuthorityBuilder::new().build().await;