Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bf-tree"
version = "0.5.4"
version = "0.5.5"
edition = "2021"
license = "MIT"
description = "Bf-Tree is a modern read-write-optimized concurrent larger-than-memory range index in Rust from Microsoft Research."
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ $ cargo add bf_tree
Which will add bf_tree as a dependency to your Cargo.toml
```toml
[dependencies]
bf-tree = "0.5.3"
bf-tree = "0.5.5"
```

An example use of Bf-Tree:
Expand Down
113 changes: 91 additions & 22 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ use crate::{
nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
storage::{make_vfs, LeafStorage, PageLocation, PageTable},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::thread,
sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
sync::RwLock,
utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
utils::{atomic_wait, get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
wal::{LogEntry, LogEntryImpl, WriteAheadLog},
BfTree, Config, StorageBackend, WalConfig, WalReader,
};
// Used for macOS fallback sleep and tests
#[allow(unused_imports)]
use crate::sync::thread;

const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
Expand Down Expand Up @@ -117,6 +119,9 @@ pub struct CPRSnapShotMgr {
vfs: RwLock<Arc<dyn VfsImpl>>,
// Ensuring only one snapshot is in progress at a time.
snapshot_in_progress: AtomicBool,
// Counter used for atomic wait/wake signaling. Incremented when threads release slots.
// The snapshot thread waits on this value; worker threads wake it after releasing slots.
phase_waiter: AtomicU32,
}

unsafe impl Sync for CPRSnapShotMgr {}
Expand Down Expand Up @@ -239,6 +244,7 @@ impl CPRSnapShotMgr {
pause_snapshot: AtomicBool::new(false),
vfs: RwLock::new(vfs),
snapshot_in_progress: AtomicBool::new(false),
phase_waiter: AtomicU32::new(0),
}
}

Expand Down Expand Up @@ -303,32 +309,29 @@ impl CPRSnapShotMgr {
let phase_id = self.get_global_phase_id();
let version = self.get_global_version();

match phase_id {
let new_state = match phase_id {
PhaseId::Rest => {
// (REST, v) -> (PREPARE, v)
let new_state = Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version)
}
PhaseId::Prepare => {
// (PREPARE, v) -> (IN_PROGRESS, v + 1)
let new_state = Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1);
self.global_state.store(new_state, Ordering::Release);
new_state
Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1)
}
PhaseId::InProgress => {
// (IN_PROGRESS, v + 1) -> (SWEEPING, v + 1)
let new_state = Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version)
}
PhaseId::Sweep => {
// (SWEEPING, v + 1) -> (REST, v + 1)
let new_state = Self::new_snapshot_state(PhaseId::Rest.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
Self::new_snapshot_state(PhaseId::Rest.as_raw(), version)
}
}
};

// Advance the global state
self.global_state.store(new_state, Ordering::Release);

new_state
}

/// If all thread local states are either invalid or equal to the target state, return true.
Expand All @@ -344,6 +347,58 @@ impl CPRSnapShotMgr {
true
}

/// Wait for all threads in the old phase to complete their transition.
/// Uses atomic wait instead of polling with fixed sleep.
fn wait_for_phase_completion(&self, target_state: u64) {
loop {
// Check if all threads have transitioned
if self.check_if_phase_completed(target_state) {
return;
}

// Capture current waiter value before waiting
let waiter_val = self.phase_waiter.load(Ordering::Acquire);

// Double-check after loading waiter (avoid missed wake)
if self.check_if_phase_completed(target_state) {
return;
}

// Wait for notification (futex on Linux, WaitOnAddress on Windows)
atomic_wait::wait(&self.phase_waiter, waiter_val);

// Fallback short sleep for platforms where atomic_wait is a no-op (e.g., macOS)
#[cfg(target_os = "macos")]
thread::sleep(std::time::Duration::from_millis(1));
}
}

/// Wait for all threads to release their slots (all local states become INVALID).
/// Used during sweep pause when we need to drain all active threads.
fn wait_for_all_slots_released(&self) {
loop {
// Check if all slots are released
if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
return;
}

// Capture current waiter value before waiting
let waiter_val = self.phase_waiter.load(Ordering::Acquire);

// Double-check after loading waiter
if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
return;
}

// Wait for any thread to release its slot
atomic_wait::wait(&self.phase_waiter, waiter_val);

// Fallback short sleep for platforms where atomic_wait is a no-op
#[cfg(target_os = "macos")]
thread::sleep(std::time::Duration::from_millis(1));
}
}

/// Obtain an unique thread slot id for the caller thread.
/// Guarantee that any local state assigned after the global state advances to the next one,
/// will either be reversed without further action or in the new state.
Expand Down Expand Up @@ -374,13 +429,19 @@ impl CPRSnapShotMgr {
// Mrgr: all threads in phase 'x + 1' or invalid -> Execute 'x + 1' action
// T1: local state = state <- Inconsistency with global state
// Similar case for the pause_snapshot flag.
if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire)
let current_global = self.global_state.load(Ordering::Acquire);
if self.get_local_state(&tid) != current_global
|| self.pause_snapshot.load(Ordering::Acquire)
{
self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
assert!(self.thread_slots[tid]
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok());

// Wake the snapshot thread since we released a slot
self.phase_waiter.fetch_add(1, Ordering::Release);
atomic_wait::wake_all(&self.phase_waiter);

return Err(());
} else {
let version = global_state & SNAPSHOT_STATE_VERSION_MASK;
Expand All @@ -396,9 +457,16 @@ impl CPRSnapShotMgr {
}

/// Free up the thread slot specified by the given thread slot id.
/// Always notifies waiting snapshot thread via phase_waiter.
pub fn release_thread_slot(&self, thread_slot_id: usize) {
// Clear the local state and slot
self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
self.thread_slots[thread_slot_id].store(false, Ordering::Release);

// Always wake the snapshot thread - it will verify completion via slot scan.
// This is simpler and avoids race conditions with counter-based approaches.
self.phase_waiter.fetch_add(1, Ordering::Release);
atomic_wait::wake_all(&self.phase_waiter);
}

pub fn get_snapshot_guard(
Expand Down Expand Up @@ -590,8 +658,8 @@ impl CPRSnapShotMgr {

break;
}
// At most wasting 1 second per state transition.
thread::sleep(std::time::Duration::from_secs(1));
// Wait for all threads to release their slots (no polling)
self.wait_for_all_slots_released();

#[cfg(all(feature = "shuttle", test))]
shuttle::thread::yield_now();
Expand Down Expand Up @@ -1058,8 +1126,9 @@ impl CPRSnapShotMgr {
}
}

// At most wasting 1 second per state transition.
thread::sleep(std::time::Duration::from_secs(1));
// Wait for all threads to complete phase transition.
// Use atomic wait instead of polling with fixed sleep.
self.wait_for_phase_completion(current_global_state);

#[cfg(all(feature = "shuttle", test))]
shuttle::thread::yield_now();
Expand Down
Loading