From 425b14cf238fc00315f1db704bb66c756f9df752 Mon Sep 17 00:00:00 2001 From: Yi Shan Date: Mon, 22 Jun 2026 22:41:01 -0700 Subject: [PATCH 1/2] Using atomic_wait on thread releasing their slot to replace polling --- src/snapshot.rs | 113 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 91 insertions(+), 22 deletions(-) diff --git a/src/snapshot.rs b/src/snapshot.rs index 1225302..d8d8326 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -31,13 +31,15 @@ use crate::{ nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET}, nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE}, storage::{make_vfs, LeafStorage, PageLocation, PageTable}, - sync::atomic::{AtomicBool, AtomicU64, Ordering}, - sync::thread, + sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, sync::RwLock, - utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo}, + utils::{atomic_wait, get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo}, wal::{LogEntry, LogEntryImpl, WriteAheadLog}, BfTree, Config, StorageBackend, WalConfig, WalReader, }; +// Used for macOS fallback sleep and tests +#[allow(unused_imports)] +use crate::sync::thread; const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN"; const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END"; @@ -117,6 +119,9 @@ pub struct CPRSnapShotMgr { vfs: RwLock>, // Ensuring only one snapshot is in progress at a time. snapshot_in_progress: AtomicBool, + // Counter used for atomic wait/wake signaling. Incremented when threads release slots. + // The snapshot thread waits on this value; worker threads wake it after releasing slots. + phase_waiter: AtomicU32, } unsafe impl Sync for CPRSnapShotMgr {} @@ -239,6 +244,7 @@ impl CPRSnapShotMgr { pause_snapshot: AtomicBool::new(false), vfs: RwLock::new(vfs), snapshot_in_progress: AtomicBool::new(false), + phase_waiter: AtomicU32::new(0), } } @@ -303,32 +309,29 @@ impl CPRSnapShotMgr { let phase_id = self.get_global_phase_id(); let version = self.get_global_version(); - match phase_id { + let new_state = match phase_id { PhaseId::Rest => { // (REST, v) -> (PREPARE, v) - let new_state = Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version); - self.global_state.store(new_state, Ordering::Release); - new_state + Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version) } PhaseId::Prepare => { // (PREPARE, v) -> (IN_PROGRESS, v + 1) - let new_state = Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1); - self.global_state.store(new_state, Ordering::Release); - new_state + Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1) } PhaseId::InProgress => { // (IN_PROGRESS, v + 1) -> (SWEEPING, v + 1) - let new_state = Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version); - self.global_state.store(new_state, Ordering::Release); - new_state + Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version) } PhaseId::Sweep => { // (SWEEPING, v + 1) -> (REST, v + 1) - let new_state = Self::new_snapshot_state(PhaseId::Rest.as_raw(), version); - self.global_state.store(new_state, Ordering::Release); - new_state + Self::new_snapshot_state(PhaseId::Rest.as_raw(), version) } - } + }; + + // Advance the global state + self.global_state.store(new_state, Ordering::Release); + + new_state } /// If all thread local states are either invalid or equal to the target state, return true. @@ -344,6 +347,58 @@ impl CPRSnapShotMgr { true } + /// Wait for all threads in the old phase to complete their transition. + /// Uses atomic wait instead of polling with fixed sleep. + fn wait_for_phase_completion(&self, target_state: u64) { + loop { + // Check if all threads have transitioned + if self.check_if_phase_completed(target_state) { + return; + } + + // Capture current waiter value before waiting + let waiter_val = self.phase_waiter.load(Ordering::Acquire); + + // Double-check after loading waiter (avoid missed wake) + if self.check_if_phase_completed(target_state) { + return; + } + + // Wait for notification (futex on Linux, WaitOnAddress on Windows) + atomic_wait::wait(&self.phase_waiter, waiter_val); + + // Fallback short sleep for platforms where atomic_wait is a no-op (e.g., macOS) + #[cfg(target_os = "macos")] + thread::sleep(std::time::Duration::from_millis(1)); + } + } + + /// Wait for all threads to release their slots (all local states become INVALID). + /// Used during sweep pause when we need to drain all active threads. + fn wait_for_all_slots_released(&self) { + loop { + // Check if all slots are released + if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) { + return; + } + + // Capture current waiter value before waiting + let waiter_val = self.phase_waiter.load(Ordering::Acquire); + + // Double-check after loading waiter + if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) { + return; + } + + // Wait for any thread to release its slot + atomic_wait::wait(&self.phase_waiter, waiter_val); + + // Fallback short sleep for platforms where atomic_wait is a no-op + #[cfg(target_os = "macos")] + thread::sleep(std::time::Duration::from_millis(1)); + } + } + /// Obtain an unique thread slot id for the caller thread. /// Guarantee that any local state assigned after the global state advances to the next one, /// will either be reversed without further action or in the new state. @@ -374,13 +429,19 @@ impl CPRSnapShotMgr { // Mrgr: all threads in phase 'x + 1' or invalid -> Execute 'x + 1' action // T1: local state = state <- Inconsistency with global state // Similar case for the pause_snapshot flag. - if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire) + let current_global = self.global_state.load(Ordering::Acquire); + if self.get_local_state(&tid) != current_global || self.pause_snapshot.load(Ordering::Acquire) { self.set_local_state(&tid, INVALID_SNAPSHOT_STATE); assert!(self.thread_slots[tid] .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) .is_ok()); + + // Wake the snapshot thread since we released a slot + self.phase_waiter.fetch_add(1, Ordering::Release); + atomic_wait::wake_all(&self.phase_waiter); + return Err(()); } else { let version = global_state & SNAPSHOT_STATE_VERSION_MASK; @@ -396,9 +457,16 @@ impl CPRSnapShotMgr { } /// Free up the thread slot specified by the given thread slot id. + /// Always notifies waiting snapshot thread via phase_waiter. pub fn release_thread_slot(&self, thread_slot_id: usize) { + // Clear the local state and slot self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE); self.thread_slots[thread_slot_id].store(false, Ordering::Release); + + // Always wake the snapshot thread - it will verify completion via slot scan. + // This is simpler and avoids race conditions with counter-based approaches. + self.phase_waiter.fetch_add(1, Ordering::Release); + atomic_wait::wake_all(&self.phase_waiter); } pub fn get_snapshot_guard( @@ -590,8 +658,8 @@ impl CPRSnapShotMgr { break; } - // At most wasting 1 second per state transition. - thread::sleep(std::time::Duration::from_secs(1)); + // Wait for all threads to release their slots (no polling) + self.wait_for_all_slots_released(); #[cfg(all(feature = "shuttle", test))] shuttle::thread::yield_now(); @@ -1058,8 +1126,9 @@ impl CPRSnapShotMgr { } } - // At most wasting 1 second per state transition. - thread::sleep(std::time::Duration::from_secs(1)); + // Wait for all threads to complete phase transition. + // Use atomic wait instead of polling with fixed sleep. + self.wait_for_phase_completion(current_global_state); #[cfg(all(feature = "shuttle", test))] shuttle::thread::yield_now(); From 72cbf65875fc25e2257ae37f9270a7bede04bfda Mon Sep 17 00:00:00 2001 From: Yi Shan Date: Mon, 22 Jun 2026 22:42:37 -0700 Subject: [PATCH 2/2] Meta --- Cargo.toml | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92049d0..ef55bda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bf-tree" -version = "0.5.4" +version = "0.5.5" edition = "2021" license = "MIT" description = "Bf-Tree is a modern read-write-optimized concurrent larger-than-memory range index in Rust from Microsoft Research." diff --git a/README.md b/README.md index afd5630..efbb15a 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ $ cargo add bf_tree Which will add bf_tree as a dependency to your Cargo.toml ```toml [dependencies] -bf-tree = "0.5.3" +bf-tree = "0.5.5" ``` An example use of Bf-Tree: