Skip to content

Commit

Permalink
Add initial EchoDB benchmark implementation (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiemh authored Feb 20, 2025
1 parent 48cb76e commit 3b6daa8
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ jobs:
database: dry
enabled: true
description: Dry
# EchoDB
- name: echodb
database: echodb
enabled: true
description: EchoDB
# Fjall
- name: fjall
database: fjall
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ jobs:
database: dry
enabled: true
description: Dry
# EchoDB
- name: echodb
database: echodb
enabled: true
description: EchoDB
# Fjall
- name: Fjall
database: fjall
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ jobs:
database: dry
enabled: true
description: Dry
# EchoDB
- name: echodb
database: echodb
enabled: true
description: EchoDB
# Fjall
- name: fjall
database: fjall
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ readme = "README.md"
default = [
"arangodb",
"dragonfly",
"echodb",
"fjall",
"keydb",
"lmdb",
Expand All @@ -26,6 +27,7 @@ default = [
]
arangodb = ["dep:arangors"]
dragonfly = ["dep:redis"]
echodb = ["dep:echodb"]
keydb = ["dep:redis"]
fjall = ["dep:fjall"]
lmdb = ["dep:heed"]
Expand Down Expand Up @@ -59,6 +61,7 @@ chrono = "0.4.39"
clap = { version = "4.5.26", features = ["derive", "string", "env", "color"] }
csv = "1.3.1"
dashmap = "6.1.0"
echodb = { version = "0.8.0", optional = true }
env_logger = "0.11.6"
fjall = { version = "2.5.0", optional = true }
flatten-json-object = "0.6.1"
Expand Down
14 changes: 14 additions & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) enum Database {
Arangodb,
#[cfg(feature = "dragonfly")]
Dragonfly,
#[cfg(feature = "echodb")]
Echodb,
#[cfg(feature = "fjall")]
Fjall,
#[cfg(feature = "keydb")]
Expand Down Expand Up @@ -144,6 +146,18 @@ impl Database {
)
.await
}
#[cfg(feature = "echodb")]
Database::Echodb => {
benchmark
.run::<_, DefaultDialect, _>(
crate::echodb::EchoDBClientProvider::setup(kt, vp.columns(), benchmark)
.await?,
kp,
vp,
scans,
)
.await
}
#[cfg(feature = "fjall")]
Database::Fjall => {
benchmark
Expand Down
185 changes: 185 additions & 0 deletions src/echodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#![cfg(feature = "echodb")]

use crate::benchmark::NOT_SUPPORTED_ERROR;
use crate::engine::{BenchmarkClient, BenchmarkEngine};
use crate::valueprovider::Columns;
use crate::{Benchmark, KeyType, Projection, Scan};
use anyhow::{bail, Result};
use echodb::{new, Database};
use serde_json::Value;
use std::hint::black_box;
use std::sync::Arc;
use std::time::Duration;

type Key = Vec<u8>;
type Val = Vec<u8>;

pub(crate) struct EchoDBClientProvider(Arc<Database<Key, Val>>);

impl BenchmarkEngine<EchoDBClient> for EchoDBClientProvider {
/// The number of seconds to wait before connecting
fn wait_timeout(&self) -> Option<Duration> {
None
}
/// Initiates a new datastore benchmarking engine
async fn setup(_: KeyType, _columns: Columns, _options: &Benchmark) -> Result<Self> {
// Create the store
Ok(Self(Arc::new(new())))
}
/// Creates a new client for this benchmarking engine
async fn create_client(&self) -> Result<EchoDBClient> {
Ok(EchoDBClient {
db: self.0.clone(),
})
}
}

pub(crate) struct EchoDBClient {
db: Arc<Database<Key, Val>>,
}

impl BenchmarkClient for EchoDBClient {
async fn create_u32(&self, key: u32, val: Value) -> Result<()> {
self.create_bytes(key.to_ne_bytes().to_vec(), val).await
}

async fn create_string(&self, key: String, val: Value) -> Result<()> {
self.create_bytes(key.into_bytes(), val).await
}

async fn read_u32(&self, key: u32) -> Result<()> {
self.read_bytes(key.to_ne_bytes().to_vec()).await
}

async fn read_string(&self, key: String) -> Result<()> {
self.read_bytes(key.into_bytes()).await
}

async fn update_u32(&self, key: u32, val: Value) -> Result<()> {
self.update_bytes(key.to_ne_bytes().to_vec(), val).await
}

async fn update_string(&self, key: String, val: Value) -> Result<()> {
self.update_bytes(key.into_bytes(), val).await
}

async fn delete_u32(&self, key: u32) -> Result<()> {
self.delete_bytes(key.to_ne_bytes().to_vec()).await
}

async fn delete_string(&self, key: String) -> Result<()> {
self.delete_bytes(key.into_bytes()).await
}

async fn scan_u32(&self, scan: &Scan) -> Result<usize> {
self.scan_bytes(scan).await
}

async fn scan_string(&self, scan: &Scan) -> Result<usize> {
self.scan_bytes(scan).await
}
}

impl EchoDBClient {
async fn create_bytes(&self, key: Vec<u8>, val: Value) -> Result<()> {
// Serialise the value
let val = bincode::serialize(&val)?;
// Create a new transaction
let mut txn = self.db.begin(true).await;
// Process the data
txn.set(key, val)?;
txn.commit()?;
Ok(())
}

async fn read_bytes(&self, key: Vec<u8>) -> Result<()> {
// Create a new transaction
let txn = self.db.begin(false).await;
// Process the data
let res = txn.get(key)?;
// Check the value exists
assert!(res.is_some());
// Deserialise the value
black_box(res.unwrap());
// All ok
Ok(())
}

async fn update_bytes(&self, key: Vec<u8>, val: Value) -> Result<()> {
// Serialise the value
let val = bincode::serialize(&val)?;
// Create a new transaction
let mut txn = self.db.begin(true).await;
// Process the data
txn.set(key, val)?;
txn.commit()?;
Ok(())
}

async fn delete_bytes(&self, key: Vec<u8>) -> Result<()> {
// Create a new transaction
let mut txn = self.db.begin(true).await;
// Process the data
txn.del(key)?;
txn.commit()?;
Ok(())
}

async fn scan_bytes(&self, scan: &Scan) -> Result<usize> {
// Contional scans are not supported
if scan.condition.is_some() {
bail!(NOT_SUPPORTED_ERROR);
}
// Extract parameters
let s = scan.start.unwrap_or(0);
let l = scan.limit.unwrap_or(usize::MAX);
let p = scan.projection()?;
// Create a new transaction
let txn = self.db.begin(false).await;
let beg = [0u8].to_vec();
let end = [255u8].to_vec();
// Perform the relevant projection scan type
match p {
Projection::Id => {
// Scan the desired range of keys
let iter = txn.keys(beg..end, s + l)?;
// Create an iterator starting at the beginning
let iter = iter.into_iter();
// We use a for loop to iterate over the results, while
// calling black_box internally. This is necessary as
// an iterator with `filter_map` or `map` is optimised
// out by the compiler when calling `count` at the end.
let mut count = 0;
for v in iter.skip(s).take(l) {
black_box(v);
count += 1;
}
Ok(count)
}
Projection::Full => {
// Scan the desired range of keys
let iter = txn.scan(beg..end, s + l)?;
// Create an iterator starting at the beginning
let iter = iter.into_iter();
// We use a for loop to iterate over the results, while
// calling black_box internally. This is necessary as
// an iterator with `filter_map` or `map` is optimised
// out by the compiler when calling `count` at the end.
let mut count = 0;
for v in iter.skip(s).take(l) {
black_box(v.1);
count += 1;
}
Ok(count)
}
Projection::Count => {
Ok(txn
.keys(beg..end, s + l)?
.iter()
.skip(s) // Skip the first `offset` entries
.take(l) // Take the next `limit` entries
.count())
}
}
}
}
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod valueprovider;
mod arangodb;
mod dragonfly;
mod dry;
mod echodb;
mod fjall;
mod keydb;
mod lmdb;
Expand Down

0 comments on commit 3b6daa8

Please sign in to comment.