diff --git a/Cargo.lock b/Cargo.lock index ca2aa06e..7d3d7b1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -829,6 +829,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytemuck" version = "1.14.0" @@ -886,6 +892,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + [[package]] name = "cap-fs-ext" version = "1.0.15" @@ -949,6 +964,28 @@ dependencies = [ "winx 0.35.1", ] +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cc" version = "1.0.83" @@ -1536,6 +1573,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -2462,6 +2508,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -2591,6 +2646,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3147,6 +3226,33 @@ dependencies = [ "unicase", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -3201,6 +3307,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.7.0" @@ -3517,6 +3632,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" @@ -3570,6 +3694,9 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -3717,6 +3844,21 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark 0.9.3", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.9" @@ -3802,6 +3944,7 @@ dependencies = [ "libsql-client", "memmap", "mimalloc", + "moka", "nix", "once_cell", "parking_lot", @@ -3934,6 +4077,12 @@ dependencies = [ "winx 0.36.2", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.40" @@ -4384,6 +4533,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -4562,6 +4717,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -5294,7 +5459,7 @@ dependencies = [ "id-arena", "indexmap 1.9.3", "log", - "pulldown-cmark", + "pulldown-cmark 0.8.0", "unicode-xid", "url", ] diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 16cabbdb..1c134fe3 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -69,6 +69,7 @@ chrono = { version = "0.4.26", features = ["serde"] } hyper-rustls = { git = "https://github.com/rustls/hyper-rustls.git", rev = "163b3f5" } rustls-pemfile = "1.0.3" rustls = "0.21.7" +moka = { version = "0.12.0", features = ["future"] } [dev-dependencies] proptest = "1.0.0" diff --git a/sqld/src/connection/write_proxy.rs b/sqld/src/connection/write_proxy.rs index b4208fd7..406ae5d3 100644 --- a/sqld/src/connection/write_proxy.rs +++ b/sqld/src/connection/write_proxy.rs @@ -25,7 +25,6 @@ use crate::rpc::proxy::rpc::{DisconnectMessage, ExecuteResults}; use crate::rpc::NAMESPACE_METADATA_KEY; use crate::stats::Stats; use crate::{Result, DEFAULT_AUTO_CHECKPOINT}; - use super::config::DatabaseConfigStore; use super::libsql::LibSqlConnection; use super::program::DescribeResult; diff --git a/sqld/src/error.rs b/sqld/src/error.rs index 611ed47b..fc74d278 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::response::IntoResponse; use hyper::StatusCode; use tonic::metadata::errors::InvalidMetadataValueBytes; @@ -79,6 +81,9 @@ pub enum Error { ConflictingRestoreParameters, #[error("failed to fork database: {0}")] Fork(#[from] ForkError), + // This is for errors returned by moka + #[error(transparent)] + Ref(#[from] Arc), } trait ResponseError: std::error::Error { @@ -92,6 +97,12 @@ trait ResponseError: std::error::Error { impl ResponseError for Error {} impl IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + (&self).into_response() + } +} + +impl IntoResponse for &Error { fn into_response(self) -> axum::response::Response { use Error::*; @@ -129,6 +140,7 @@ impl IntoResponse for Error { LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST), ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST), Fork(e) => e.into_response(), + Ref(this) => this.as_ref().into_response(), } } } @@ -169,7 +181,7 @@ pub enum LoadDumpError { impl ResponseError for LoadDumpError {} -impl IntoResponse for LoadDumpError { +impl IntoResponse for &LoadDumpError { fn into_response(self) -> axum::response::Response { use LoadDumpError::*; @@ -187,7 +199,7 @@ impl IntoResponse for LoadDumpError { impl ResponseError for ForkError {} -impl IntoResponse for ForkError { +impl IntoResponse for &ForkError { fn into_response(self) -> axum::response::Response { match self { ForkError::Internal(_) diff --git a/sqld/src/namespace/mod.rs b/sqld/src/namespace/mod.rs index 41157373..e2377e81 100644 --- a/sqld/src/namespace/mod.rs +++ b/sqld/src/namespace/mod.rs @@ -1,18 +1,17 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::fmt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{bail, Context as _}; -use async_lock::{RwLock, RwLockUpgradableReadGuard}; +use async_lock::RwLock; use bottomless::replicator::Options; use bytes::Bytes; use chrono::NaiveDateTime; use enclose::enclose; +use futures::TryFutureExt; use futures_core::future::BoxFuture; -use futures_core::Stream; +use futures_core::{Stream, Future}; use hyper::Uri; use rusqlite::ErrorCode; use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; @@ -22,6 +21,7 @@ use tokio::task::{block_in_place, JoinSet}; use tokio_util::io::StreamReader; use tonic::transport::Channel; use uuid::Uuid; +use moka::future::Cache; use crate::connection::config::DatabaseConfigStore; use crate::connection::libsql::{open_db, LibSqlDbFactory}; @@ -249,8 +249,11 @@ impl Clone for NamespaceStore { } } +type NamespaceEntry = Arc>>>; + struct NamespaceStoreInner { - store: RwLock>>, + // store: RwLock>>, + store: Cache>, /// The namespace factory, to create new namespaces. make_namespace: M, allow_lazy_creation: bool, @@ -258,9 +261,22 @@ struct NamespaceStoreInner { impl NamespaceStore { pub fn new(make_namespace: M, allow_lazy_creation: bool) -> Self { + let store = Cache::>::builder() + .async_eviction_listener(|name, ns, _| Box::pin(async move { + tracing::info!("namespace `{name}` deallocated"); + // shutdown namespace + if let Some(ns) = ns.write().await.take() { + if let Err(e) = ns.destroy().await { + tracing::error!("error deallocating `{name}`: {e}") + } + } + })) + // TODO(marin): configurable capacity + .max_capacity(25) + .build(); Self { inner: Arc::new(NamespaceStoreInner { - store: Default::default(), + store, make_namespace, allow_lazy_creation, }), @@ -268,14 +284,15 @@ impl NamespaceStore { } pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> { - let mut lock = self.inner.store.write().await; - if let Some(ns) = lock.remove(&namespace) { + if let Some(ns) = self.inner.store.remove(&namespace).await { // FIXME: when destroying, we are waiting for all the tasks associated with the // allocation to finnish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. // deallocate in-memory resources - ns.destroy().await?; + if let Some(ns) = ns.write().await.take() { + ns.destroy().await?; + } } // destroy on-disk database and backups @@ -294,8 +311,14 @@ impl NamespaceStore { namespace: NamespaceName, restore_option: RestoreOption, ) -> anyhow::Result<()> { - let mut lock = self.inner.store.write().await; - if let Some(ns) = lock.remove(&namespace) { + // The process for reseting is as follow: + // - get a lock on the namespace entry, if the entry exists, then it's a lock on the entry, + // if it doesn't exist, insert an empty entry and take a lock on it + // - destroy the old namespace + // - create a new namespace and insert it in the held lock + let entry = self.inner.store.get_with(namespace.clone(), async { Default::default() }).await; + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { // FIXME: when destroying, we are waiting for all the tasks associated with the // allocation to finnish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. @@ -319,7 +342,8 @@ impl NamespaceStore { self.make_reset_cb(), ) .await?; - lock.insert(namespace, ns); + + lock.replace(ns); Ok(()) } @@ -344,66 +368,92 @@ impl NamespaceStore { } pub async fn fork(&self, from: NamespaceName, to: NamespaceName) -> crate::Result<()> { - let mut lock = self.inner.store.write().await; - if lock.contains_key(&to) { - return Err(crate::error::Error::NamespaceAlreadyExist( - to.as_str().to_string(), - )); + let to_entry = self.inner.store.get_with(to.clone(), async { Default::default() }).await; + let mut to_lock = to_entry.write().await; + if to_lock.is_some() { + return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string())); } // check that the source namespace exists - let from_ns = match lock.entry(from.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // we just want to load the namespace into memory, so we refuse creation. + let from_entry = self.inner.store.try_get_with(from.clone(), async { + let ns = self + .inner + .make_namespace + .create( + from.clone(), + RestoreOption::Latest, + false, + self.make_reset_cb(), + ) + .await?; + tracing::info!("loaded namespace: `{to}`"); + Ok::<_, crate::error::Error>(Arc::new(RwLock::new(Some(ns)))) + }).await + // find how to deal with Arc + .unwrap(); + + let from_lock = from_entry.read().await; + let Some(from_ns) = &*from_lock else { + return Err(crate::error::Error::NamespaceDoesntExist(to.to_string())); + }; + + let to_ns = self + .inner + .make_namespace + .fork(from_ns, to.clone(), self.make_reset_cb()) + .await?; + + to_lock.replace(to_ns); + + Ok(()) + } + + pub async fn with(&self, namespace: NamespaceName, f: Fun) -> crate::Result + where + Fun: FnOnce(&Namespace) -> R + 'static, + { + let init = { + let namespace = namespace.clone(); + async move { let ns = self .inner .make_namespace .create( - from.clone(), + namespace.clone(), RestoreOption::Latest, - false, + self.inner.allow_lazy_creation, self.make_reset_cb(), ) .await?; - e.insert(ns) - } + tracing::info!("loaded namespace: `{namespace}`"); + + Ok(Some(ns)) + } }; - let forked = self - .inner - .make_namespace - .fork(from_ns, to.clone(), self.make_reset_cb()) - .await?; - lock.insert(to.clone(), forked); + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| async move { + let lock = ns.read().await; + match &*lock { + Some(ns) => Ok(f(ns)), + // the namespace was taken out of the entry + None => Err(Error::NamespaceDoesntExist(name.to_string())), + } + } + }; - Ok(()) + self.with_lock_or_init(namespace, f, init).await? } - pub async fn with(&self, namespace: NamespaceName, f: Fun) -> crate::Result + async fn with_lock_or_init(&self, namespace: NamespaceName, f: Fun, init: Init) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(NamespaceEntry) -> Fut, + Fut: Future, + Init: Future>>>, { - let lock = self.inner.store.upgradable_read().await; - if let Some(ns) = lock.get(&namespace) { - Ok(f(ns)) - } else { - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - let ns = self - .inner - .make_namespace - .create( - namespace.clone(), - RestoreOption::Latest, - self.inner.allow_lazy_creation, - self.make_reset_cb(), - ) - .await?; - let ret = f(&ns); - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); - Ok(ret) - } + let ns = self.inner.store.try_get_with(namespace.clone(), init.map_ok(|ns| Arc::new(RwLock::new(ns)))).await?; + Ok(f(ns).await) } pub async fn create( @@ -411,29 +461,61 @@ impl NamespaceStore { namespace: NamespaceName, restore_option: RestoreOption, ) -> crate::Result<()> { - let lock = self.inner.store.upgradable_read().await; - if lock.contains_key(&namespace) { - return Err(crate::error::Error::NamespaceAlreadyExist( - namespace.as_str().to_owned(), - )); - } - - let ns = self - .inner - .make_namespace - .create( - namespace.clone(), - restore_option, - true, - self.make_reset_cb(), - ) - .await?; + let name = namespace.clone(); + let init = async { + let ns = self + .inner + .make_namespace + .create( + name.clone(), + RestoreOption::Latest, + false, + self.make_reset_cb(), + ) + .await; + match ns { + // the namespace already exist, load it, and let the `f` function fail + Ok(ns) => { + tracing::info!("loaded namespace: `{name}`"); + Ok(Some(ns)) + }, + // return an empty slot to put the new namespace in + Err(Error::NamespaceDoesntExist(_)) => Ok(None), + Err(e) => Err(e), + } + }; - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| { + let ns = ns.clone(); + let name = name.clone(); + async move { + let mut lock = ns.write().await; + if lock.is_some() { + return Err(Error::NamespaceAlreadyExist(name.to_string())); + } + let ns = self + .inner + .make_namespace + .create( + name.clone(), + restore_option, + true, + self.make_reset_cb(), + ) + .await?; + + tracing::info!("loaded namespace: `{name}`"); + + lock.replace(ns); + + Ok(()) + } + } + }; - Ok(()) + self.with_lock_or_init(namespace, f, init).await? } pub(crate) async fn stats(&self, namespace: NamespaceName) -> crate::Result> {