Skip to content

Commit 1f12a1a

Browse files
committed
memtable/skiplist: add a purpose-built skiplist
1 parent 3dc981b commit 1f12a1a

File tree

6 files changed

+1230
-4
lines changed

6 files changed

+1230
-4
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ fs_extra = "1.3.0"
4747
nanoid = "0.4.0"
4848
rand = "0.9.0"
4949
test-log = "0.2.16"
50+
quickcheck = "1.0.3"
5051

5152
# half 2.5.0 has MSRV 1.81
5253
half = "=2.4.0"

src/memtable/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55
use crate::key::InternalKey;
66
use crate::segment::block::ItemSize;
77
use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
8-
use crossbeam_skiplist::SkipMap;
98
use std::ops::RangeBounds;
109
use std::sync::atomic::{AtomicU32, AtomicU64};
1110

11+
#[allow(unsafe_code)]
12+
mod skiplist;
13+
1214
/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
1315
///
1416
/// When the Memtable exceeds some size, it should be flushed to a disk segment.
1517
#[derive(Default)]
1618
pub struct Memtable {
1719
/// The actual content, stored in a lock-free skiplist.
1820
#[doc(hidden)]
19-
pub items: SkipMap<InternalKey, UserValue>,
21+
pub items: skiplist::SkipMap<InternalKey, UserValue>,
2022

2123
/// Approximate active memtable size.
2224
///
@@ -32,7 +34,7 @@ pub struct Memtable {
3234
impl Memtable {
3335
/// Clears the memtable.
3436
pub fn clear(&mut self) {
35-
self.items.clear();
37+
self.items = Default::default();
3638
self.highest_seqno = AtomicU64::new(0);
3739
self.approximate_size
3840
.store(0, std::sync::atomic::Ordering::Release);
@@ -131,7 +133,11 @@ impl Memtable {
131133
.fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
132134

133135
let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
134-
self.items.insert(key, item.value);
136+
// TODO(ajwerner): Decide what we want to do here. The panic is sort of
137+
// extreme, but also seems right given the invariants.
138+
if let Err((key, _value)) = self.items.insert(key, item.value) {
139+
panic!("duplicate insert of {key:?} into memtable")
140+
}
135141

136142
self.highest_seqno
137143
.fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);

src/memtable/skiplist/arena.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
use std::{
6+
alloc::Layout,
7+
mem::offset_of,
8+
sync::{
9+
atomic::{AtomicPtr, AtomicUsize, Ordering},
10+
Mutex,
11+
},
12+
};
13+
14+
// DEFAULT_BUFFER_SIZE needs to be at least big enough for one fullly-aligned node
15+
// for the crate to work correctly. Anything larger than that will work.
16+
//
17+
// TODO: Justify this size.
18+
const DEFAULT_BUFFER_SIZE: usize = (32 << 10) - size_of::<AtomicUsize>();
19+
20+
impl<const BUFFER_SIZE: usize> Default for Arenas<BUFFER_SIZE> {
21+
fn default() -> Self {
22+
Self::new()
23+
}
24+
}
25+
26+
unsafe impl<const N: usize> Send for Arenas<N> {}
27+
unsafe impl<const N: usize> Sync for Arenas<N> {}
28+
29+
pub(crate) struct Arenas<const BUFFER_SIZE: usize = DEFAULT_BUFFER_SIZE> {
30+
// The current set of Arenas
31+
arenas: Mutex<Vec<*mut Buffer<BUFFER_SIZE>>>,
32+
// Cache of the currently open Arena. It'll be the last item in the buffers
33+
// vec. This atomic is only ever written while holding the buffers Mutex.
34+
open_arena: AtomicPtr<Buffer<BUFFER_SIZE>>,
35+
}
36+
37+
impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
38+
pub(crate) fn new() -> Self {
39+
Self {
40+
arenas: Default::default(),
41+
open_arena: AtomicPtr::default(),
42+
}
43+
}
44+
}
45+
46+
impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
47+
pub(crate) fn alloc(&self, layout: Layout) -> *mut u8 {
48+
loop {
49+
let buffer_tail = self.open_arena.load(Ordering::Acquire);
50+
if !buffer_tail.is_null() {
51+
if let Some(offset) = try_alloc(buffer_tail, layout) {
52+
return offset;
53+
}
54+
}
55+
let mut buffers = self.arenas.lock().unwrap();
56+
let buffer = buffers.last().unwrap_or(&std::ptr::null_mut());
57+
if *buffer != buffer_tail {
58+
// Lost the race with somebody else.
59+
continue;
60+
}
61+
let new_buffer: Box<Buffer<BUFFER_SIZE>> = Box::new(Buffer::default());
62+
let new_buffer = Box::into_raw(new_buffer);
63+
self.open_arena.store(new_buffer, Ordering::Release);
64+
buffers.push(new_buffer);
65+
}
66+
}
67+
}
68+
69+
struct Buffer<const N: usize> {
70+
offset: AtomicUsize,
71+
data: [u8; N],
72+
}
73+
74+
impl<const N: usize> Default for Buffer<N> {
75+
fn default() -> Self {
76+
Self {
77+
offset: Default::default(),
78+
data: [0; N],
79+
}
80+
}
81+
}
82+
83+
impl<const N: usize> Drop for Arenas<N> {
84+
fn drop(&mut self) {
85+
let mut buffers = self.arenas.lock().unwrap();
86+
for buffer in buffers.drain(..) {
87+
drop(unsafe { Box::from_raw(buffer) })
88+
}
89+
}
90+
}
91+
92+
fn try_alloc<const N: usize>(buf: *mut Buffer<N>, layout: Layout) -> Option<*mut u8> {
93+
let mut cur_offset = unsafe { &(*buf).offset }.load(Ordering::Relaxed);
94+
loop {
95+
let buf_start = unsafe { buf.byte_add(offset_of!(Buffer<N>, data)) as *mut u8 };
96+
let free_start = unsafe { buf_start.byte_add(cur_offset) };
97+
let start_addr = unsafe { free_start.byte_add(free_start.align_offset(layout.align())) };
98+
let new_offset = ((start_addr as usize) + layout.size()) - (buf_start as usize);
99+
if new_offset > N {
100+
return None;
101+
}
102+
103+
// Note that we can get away with using relaxed ordering here because we're not
104+
// asserting anything about the contents of the buffer. We're just trying to
105+
// allocate a new node.
106+
match unsafe { &(*buf).offset }.compare_exchange(
107+
cur_offset,
108+
new_offset,
109+
Ordering::Relaxed,
110+
Ordering::Relaxed,
111+
) {
112+
Ok(_offset) => return Some(start_addr),
113+
Err(offset) => cur_offset = offset,
114+
}
115+
}
116+
}

src/memtable/skiplist/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
// This implementation was heavily inspired by:
6+
// * https://github.com/andy-kimball/arenaskl/tree/f7010085
7+
// * https://github.com/crossbeam-rs/crossbeam/tree/983d56b6/crossbeam-skiplist
8+
9+
//! This mod is a purpose-built concurrent skiplist intended for use
10+
//! by the memtable.
11+
//!
12+
//! Due to the requirements of memtable, there are a number of notable in the
13+
//! features it lacks:
14+
//! * Updates
15+
//! * Deletes
16+
//! * Overwrites
17+
//!
18+
//! The main reasons for its existence are that it
19+
//! * provides concurrent reads and inserts, and
20+
//! * batches memory allocations.
21+
//!
22+
//! Prior to this implementation, crossbeam_skiplist was used.
23+
24+
mod arena;
25+
mod skipmap;
26+
27+
pub use skipmap::SkipMap;
28+
29+
#[cfg(test)]
30+
mod test;

0 commit comments

Comments
 (0)