From 77dbefff5bf9b91787704023bd8d8c5d54cda992 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Mon, 15 Jul 2024 22:40:49 -0400 Subject: [PATCH] only truncate the WAL past a certain size threshold --- crates/corro-agent/src/agent/handlers.rs | 124 +++++++++++++++-------- crates/corro-agent/src/agent/run_root.rs | 4 +- crates/corro-types/src/sqlite.rs | 1 - 3 files changed, 84 insertions(+), 45 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 4cfe016e..a2fa83de 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -18,6 +18,7 @@ use crate::{ api::peer::parallel_sync, transport::Transport, }; +use camino::Utf8Path; use corro_types::{ actor::{Actor, ActorId}, agent::{Agent, Bookie, SplitPool}, @@ -352,36 +353,36 @@ pub async fn handle_notifications( } } -// /// We keep a write-ahead-log, which under write-pressure can grow to -// /// multiple gigabytes and needs periodic truncation. We don't want -// /// to schedule this task too often since it locks the whole DB. -// // TODO: can we get around the lock somehow? -// fn wal_checkpoint(conn: &rusqlite::Connection) -> eyre::Result<()> { -// debug!("handling db_cleanup (WAL truncation)"); -// let start = Instant::now(); +/// We keep a write-ahead-log, which under write-pressure can grow to +/// multiple gigabytes and needs periodic truncation. We don't want +/// to schedule this task too often since it locks the whole DB. +// TODO: can we get around the lock somehow? +fn wal_checkpoint(conn: &rusqlite::Connection) -> eyre::Result<()> { + debug!("handling db_cleanup (WAL truncation)"); + let start = Instant::now(); -// let orig: u64 = conn.pragma_query_value(None, "busy_timeout", |row| row.get(0))?; -// conn.pragma_update(None, "busy_timeout", 60000)?; + let orig: u64 = conn.pragma_query_value(None, "busy_timeout", |row| row.get(0))?; + conn.pragma_update(None, "busy_timeout", 60000)?; -// let busy: bool = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?; -// if busy { -// warn!("could not truncate sqlite WAL, database busy"); -// counter!("corro.db.wal.truncate.busy").increment(1); -// } else { -// debug!("successfully truncated sqlite WAL!"); -// histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64()); -// } + let busy: bool = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?; + if busy { + warn!("could not truncate sqlite WAL, database busy"); + counter!("corro.db.wal.truncate.busy").increment(1); + } else { + debug!("successfully truncated sqlite WAL!"); + histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64()); + } -// _ = conn.pragma_update(None, "busy_timeout", orig); + _ = conn.pragma_update(None, "busy_timeout", orig); -// Ok::<_, eyre::Report>(()) -// } + Ok::<_, eyre::Report>(()) +} /// If the number of unused free pages is above the provided limit, /// This function continously runs an incremental_vacuum /// until it is below the limit /// -async fn vacuum_db(pool: SplitPool, lim: u64) -> eyre::Result<()> { +async fn vacuum_db(pool: &SplitPool, lim: u64) -> eyre::Result<()> { let mut freelist: u64 = { let conn = pool.read().await?; @@ -433,31 +434,70 @@ async fn vacuum_db(pool: SplitPool, lim: u64) -> eyre::Result<()> { } let conn = pool.write_low().await?; - _ = conn.pragma_update(None, "busy_timeout", busy_timeout)?; - _ = conn.pragma_update(None, "cache_size", cache_size)?; + conn.pragma_update(None, "busy_timeout", busy_timeout)?; + conn.pragma_update(None, "cache_size", cache_size)?; Ok::<_, eyre::Report>(()) } /// See `db_cleanup` and `vacuum_db` -pub fn spawn_handle_vacuum(pool: SplitPool) { +pub fn spawn_handle_db_maintenance(agent: &Agent) { + let mut wal_path = agent.config().db.path.clone(); + wal_path.set_extension(format!("{}-wal", wal_path.extension().unwrap_or_default())); + + let pool = agent.pool().clone(); + tokio::spawn(async move { - // large sleep right at the start to give node time to sync - const MAX_DB_FREE_PAGES: u64 = 10000; + const TRUNCATE_WAL_THRESHOLD: u64 = 5 * 1024 * 1024 * 1024; + // try to initially truncate the WAL + match wal_checkpoint_over_threshold(wal_path.as_path(), &pool, TRUNCATE_WAL_THRESHOLD).await + { + Ok(truncated) if truncated => { + info!("initially truncated WAL"); + } + Err(e) => { + error!("could not initially truncate WAL: {e}"); + } + _ => {} + } + + // large sleep right at the start to give node time to sync sleep(Duration::from_secs(60)).await; - let mut vacuum_interval = tokio::time::interval(Duration::from_secs(60 * 15)); + let mut vacuum_interval = tokio::time::interval(Duration::from_secs(60 * 5)); + + const MAX_DB_FREE_PAGES: u64 = 10000; loop { vacuum_interval.tick().await; - if let Err(e) = vacuum_db(pool.clone(), MAX_DB_FREE_PAGES).await { + if let Err(e) = vacuum_db(&pool, MAX_DB_FREE_PAGES).await { error!("could not check freelist and vacuum: {e}"); } + + if let Err(e) = + wal_checkpoint_over_threshold(wal_path.as_path(), &pool, TRUNCATE_WAL_THRESHOLD) + .await + { + error!("could not wal_checkpoint truncate: {e}"); + } } }); } +async fn wal_checkpoint_over_threshold( + wal_path: &Utf8Path, + pool: &SplitPool, + threshold: u64, +) -> eyre::Result { + let should_truncate = wal_path.metadata()?.len() > threshold; + if should_truncate { + let conn = pool.write_low().await?; + block_in_place(|| wal_checkpoint(&conn))?; + } + Ok(should_truncate) +} + /// Handle incoming emptyset received during syncs ///_ #[allow(dead_code)] @@ -996,22 +1036,22 @@ mod tests { use tokio::sync::Semaphore; use tokio::time::timeout; - // #[test] - // fn ensure_truncate_works() -> eyre::Result<()> { - // let tmpdir = tempfile::tempdir()?; + #[test] + fn ensure_truncate_works() -> eyre::Result<()> { + let tmpdir = tempfile::tempdir()?; - // let conn = rusqlite::Connection::open(tmpdir.path().join("db.sqlite"))?; - // let pragma_value = 12345u64; - // conn.pragma_update(None, "busy_timeout", pragma_value)?; + let conn = rusqlite::Connection::open(tmpdir.path().join("db.sqlite"))?; + let pragma_value = 12345u64; + conn.pragma_update(None, "busy_timeout", pragma_value)?; - // wal_checkpoint(&conn)?; - // assert_eq!( - // conn.pragma_query_value(None, "busy_timeout", |row| row.get::<_, u64>(0))?, - // pragma_value - // ); + wal_checkpoint(&conn)?; + assert_eq!( + conn.pragma_query_value(None, "busy_timeout", |row| row.get::<_, u64>(0))?, + pragma_value + ); - // Ok(()) - // } + Ok(()) + } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn ensure_vacuum_works() -> eyre::Result<()> { @@ -1058,7 +1098,7 @@ mod tests { assert!(freelist > 1000); } - timeout(Duration::from_secs(2), vacuum_db(pool.clone(), 1000)).await??; + timeout(Duration::from_secs(2), vacuum_db(&pool, 1000)).await??; let conn = pool.read().await?; assert!( diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 0f62f983..c6b9730e 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -4,7 +4,7 @@ use std::time::Instant; use crate::{ agent::{ - handlers::{self, spawn_handle_vacuum}, + handlers::{self, spawn_handle_db_maintenance}, metrics, setup, util, AgentOptions, }, broadcast::runtime_loop, @@ -119,7 +119,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul notifications_rx, )); - spawn_handle_vacuum(agent.pool().clone()); + spawn_handle_db_maintenance(&agent); let bookie = Bookie::new_with_registry(Default::default(), lock_registry); { diff --git a/crates/corro-types/src/sqlite.rs b/crates/corro-types/src/sqlite.rs index 2544b902..dcfc140c 100644 --- a/crates/corro-types/src/sqlite.rs +++ b/crates/corro-types/src/sqlite.rs @@ -114,7 +114,6 @@ pub fn setup_conn(conn: &Connection) -> Result<(), rusqlite::Error> { r#" PRAGMA journal_mode = WAL; PRAGMA journal_size_limit = 1073741824; - PRAGMA wal_autocheckpoint = 524288; PRAGMA synchronous = NORMAL; PRAGMA recursive_triggers = ON; "#,