Skip to content

Commit

Permalink
Add initial MemoDB benchmark implementation (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiemh authored Feb 20, 2025
1 parent 3b6daa8 commit d62e24e
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ jobs:
database: map
enabled: true
description: Map
# MemoDB
- name: memodb
database: memodb
enabled: true
description: MemoDB
# MongoDB
- name: mongodb
database: mongodb
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ jobs:
database: map
enabled: true
description: Map
# MemoDB
- name: memodb
database: memodb
enabled: true
description: MemoDB
# MongoDB
- name: mongodb
database: mongodb
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ jobs:
database: map
enabled: true
description: Map
# MemoDB
- name: memodb
database: memodb
enabled: true
description: MemoDB
# MongoDB
- name: mongodb
database: mongodb
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ default = [
"fjall",
"keydb",
"lmdb",
"memodb",
"mongodb",
"mysql",
"neo4j",
Expand All @@ -31,6 +32,7 @@ echodb = ["dep:echodb"]
keydb = ["dep:redis"]
fjall = ["dep:fjall"]
lmdb = ["dep:heed"]
memodb = ["dep:memodb"]
mongodb = ["dep:mongodb"]
mysql = ["dep:mysql_async"]
neo4j = ["dep:neo4rs"]
Expand Down Expand Up @@ -69,6 +71,7 @@ futures = "0.3.31"
hdrhistogram = "7.5.4"
heed = { version = "0.21.0", optional = true }
log = "0.4.25"
memodb = { version = "0.4.0", optional = true }
mongodb = { version = "3.1.1", optional = true }
mysql_async = { version = "0.35.1", default-features = false, features = ["bigdecimal", "binlog", "derive", "frunk", "rust_decimal", "time"], optional = true }
neo4rs = { version = "0.8.0", optional = true }
Expand Down
14 changes: 14 additions & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub(crate) enum Database {
Keydb,
#[cfg(feature = "lmdb")]
Lmdb,
#[cfg(feature = "memodb")]
Memodb,
#[cfg(feature = "mongodb")]
Mongodb,
#[cfg(feature = "mysql")]
Expand Down Expand Up @@ -203,6 +205,18 @@ impl Database {
)
.await
}
#[cfg(feature = "memodb")]
Database::Memodb => {
benchmark
.run::<_, DefaultDialect, _>(
crate::memodb::MemoDBClientProvider::setup(kt, vp.columns(), benchmark)
.await?,
kp,
vp,
scans,
)
.await
}
#[cfg(feature = "mongodb")]
Database::Mongodb => {
benchmark
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod fjall;
mod keydb;
mod lmdb;
mod map;
mod memodb;
mod mongodb;
mod mysql;
mod neo4j;
Expand Down
185 changes: 185 additions & 0 deletions src/memodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#![cfg(feature = "memodb")]

use crate::benchmark::NOT_SUPPORTED_ERROR;
use crate::engine::{BenchmarkClient, BenchmarkEngine};
use crate::valueprovider::Columns;
use crate::{Benchmark, KeyType, Projection, Scan};
use anyhow::{bail, Result};
use memodb::{new, Database};
use serde_json::Value;
use std::hint::black_box;
use std::sync::Arc;
use std::time::Duration;

type Key = Vec<u8>;
type Val = Vec<u8>;

pub(crate) struct MemoDBClientProvider(Arc<Database<Key, Val>>);

impl BenchmarkEngine<MemoDBClient> for MemoDBClientProvider {
/// The number of seconds to wait before connecting
fn wait_timeout(&self) -> Option<Duration> {
None
}
/// Initiates a new datastore benchmarking engine
async fn setup(_: KeyType, _columns: Columns, _options: &Benchmark) -> Result<Self> {
// Create the store
Ok(Self(Arc::new(new())))
}
/// Creates a new client for this benchmarking engine
async fn create_client(&self) -> Result<MemoDBClient> {
Ok(MemoDBClient {
db: self.0.clone(),
})
}
}

pub(crate) struct MemoDBClient {
db: Arc<Database<Key, Val>>,
}

impl BenchmarkClient for MemoDBClient {
async fn create_u32(&self, key: u32, val: Value) -> Result<()> {
self.create_bytes(&key.to_ne_bytes(), val).await
}

async fn create_string(&self, key: String, val: Value) -> Result<()> {
self.create_bytes(&key.into_bytes(), val).await
}

async fn read_u32(&self, key: u32) -> Result<()> {
self.read_bytes(&key.to_ne_bytes()).await
}

async fn read_string(&self, key: String) -> Result<()> {
self.read_bytes(&key.into_bytes()).await
}

async fn update_u32(&self, key: u32, val: Value) -> Result<()> {
self.update_bytes(&key.to_ne_bytes(), val).await
}

async fn update_string(&self, key: String, val: Value) -> Result<()> {
self.update_bytes(&key.into_bytes(), val).await
}

async fn delete_u32(&self, key: u32) -> Result<()> {
self.delete_bytes(&key.to_ne_bytes()).await
}

async fn delete_string(&self, key: String) -> Result<()> {
self.delete_bytes(&key.into_bytes()).await
}

async fn scan_u32(&self, scan: &Scan) -> Result<usize> {
self.scan_bytes(scan).await
}

async fn scan_string(&self, scan: &Scan) -> Result<usize> {
self.scan_bytes(scan).await
}
}

impl MemoDBClient {
async fn create_bytes(&self, key: &[u8], val: Value) -> Result<()> {
// Serialise the value
let val = bincode::serialize(&val)?;
// Create a new transaction
let mut txn = self.db.begin(true);
// Process the data
txn.set(key, val)?;
txn.commit()?;
Ok(())
}

async fn read_bytes(&self, key: &[u8]) -> Result<()> {
// Create a new transaction
let txn = self.db.begin(false);
// Process the data
let res = txn.get(key.to_vec())?;
// Check the value exists
assert!(res.is_some());
// Deserialise the value
black_box(res.unwrap());
// All ok
Ok(())
}

async fn update_bytes(&self, key: &[u8], val: Value) -> Result<()> {
// Serialise the value
let val = bincode::serialize(&val)?;
// Create a new transaction
let mut txn = self.db.begin(true);
// Process the data
txn.set(key, val)?;
txn.commit()?;
Ok(())
}

async fn delete_bytes(&self, key: &[u8]) -> Result<()> {
// Create a new transaction
let mut txn = self.db.begin(true);
// Process the data
txn.del(key)?;
txn.commit()?;
Ok(())
}

async fn scan_bytes(&self, scan: &Scan) -> Result<usize> {
// Contional scans are not supported
if scan.condition.is_some() {
bail!(NOT_SUPPORTED_ERROR);
}
// Extract parameters
let s = scan.start.unwrap_or(0);
let l = scan.limit.unwrap_or(usize::MAX);
let p = scan.projection()?;
// Create a new transaction
let txn = self.db.begin(false);
let beg = [0u8].to_vec();
let end = [255u8].to_vec();
// Perform the relevant projection scan type
match p {
Projection::Id => {
// Scan the desired range of keys
let iter = txn.keys(beg..end, s + l)?;
// Create an iterator starting at the beginning
let iter = iter.into_iter();
// We use a for loop to iterate over the results, while
// calling black_box internally. This is necessary as
// an iterator with `filter_map` or `map` is optimised
// out by the compiler when calling `count` at the end.
let mut count = 0;
for v in iter.skip(s).take(l) {
black_box(v);
count += 1;
}
Ok(count)
}
Projection::Full => {
// Scan the desired range of keys
let iter = txn.scan(beg..end, s + l)?;
// Create an iterator starting at the beginning
let iter = iter.into_iter();
// We use a for loop to iterate over the results, while
// calling black_box internally. This is necessary as
// an iterator with `filter_map` or `map` is optimised
// out by the compiler when calling `count` at the end.
let mut count = 0;
for v in iter.skip(s).take(l) {
black_box(v.1);
count += 1;
}
Ok(count)
}
Projection::Count => {
Ok(txn
.keys(beg..end, s + l)?
.iter()
.skip(s) // Skip the first `offset` entries
.take(l) // Take the next `limit` entries
.count())
}
}
}
}

0 comments on commit d62e24e

Please sign in to comment.