Skip to content

Commit

Permalink
only truncate the WAL past a certain size threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Jul 16, 2024
1 parent ff885aa commit 77dbeff
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 45 deletions.
124 changes: 82 additions & 42 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
api::peer::parallel_sync,
transport::Transport,
};
use camino::Utf8Path;
use corro_types::{
actor::{Actor, ActorId},
agent::{Agent, Bookie, SplitPool},
Expand Down Expand Up @@ -352,36 +353,36 @@ pub async fn handle_notifications(
}
}

// /// We keep a write-ahead-log, which under write-pressure can grow to
// /// multiple gigabytes and needs periodic truncation. We don't want
// /// to schedule this task too often since it locks the whole DB.
// // TODO: can we get around the lock somehow?
// fn wal_checkpoint(conn: &rusqlite::Connection) -> eyre::Result<()> {
// debug!("handling db_cleanup (WAL truncation)");
// let start = Instant::now();
/// We keep a write-ahead-log, which under write-pressure can grow to
/// multiple gigabytes and needs periodic truncation. We don't want
/// to schedule this task too often since it locks the whole DB.
// TODO: can we get around the lock somehow?
fn wal_checkpoint(conn: &rusqlite::Connection) -> eyre::Result<()> {
debug!("handling db_cleanup (WAL truncation)");
let start = Instant::now();

// let orig: u64 = conn.pragma_query_value(None, "busy_timeout", |row| row.get(0))?;
// conn.pragma_update(None, "busy_timeout", 60000)?;
let orig: u64 = conn.pragma_query_value(None, "busy_timeout", |row| row.get(0))?;
conn.pragma_update(None, "busy_timeout", 60000)?;

// let busy: bool = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?;
// if busy {
// warn!("could not truncate sqlite WAL, database busy");
// counter!("corro.db.wal.truncate.busy").increment(1);
// } else {
// debug!("successfully truncated sqlite WAL!");
// histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64());
// }
let busy: bool = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE);", [], |row| row.get(0))?;
if busy {
warn!("could not truncate sqlite WAL, database busy");
counter!("corro.db.wal.truncate.busy").increment(1);
} else {
debug!("successfully truncated sqlite WAL!");
histogram!("corro.db.wal.truncate.seconds").record(start.elapsed().as_secs_f64());
}

// _ = conn.pragma_update(None, "busy_timeout", orig);
_ = conn.pragma_update(None, "busy_timeout", orig);

// Ok::<_, eyre::Report>(())
// }
Ok::<_, eyre::Report>(())
}

/// If the number of unused free pages is above the provided limit,
/// This function continously runs an incremental_vacuum
/// until it is below the limit
///
async fn vacuum_db(pool: SplitPool, lim: u64) -> eyre::Result<()> {
async fn vacuum_db(pool: &SplitPool, lim: u64) -> eyre::Result<()> {
let mut freelist: u64 = {
let conn = pool.read().await?;

Expand Down Expand Up @@ -433,31 +434,70 @@ async fn vacuum_db(pool: SplitPool, lim: u64) -> eyre::Result<()> {
}

let conn = pool.write_low().await?;
_ = conn.pragma_update(None, "busy_timeout", busy_timeout)?;
_ = conn.pragma_update(None, "cache_size", cache_size)?;
conn.pragma_update(None, "busy_timeout", busy_timeout)?;
conn.pragma_update(None, "cache_size", cache_size)?;

Ok::<_, eyre::Report>(())
}

/// See `db_cleanup` and `vacuum_db`
pub fn spawn_handle_vacuum(pool: SplitPool) {
pub fn spawn_handle_db_maintenance(agent: &Agent) {
let mut wal_path = agent.config().db.path.clone();
wal_path.set_extension(format!("{}-wal", wal_path.extension().unwrap_or_default()));

let pool = agent.pool().clone();

tokio::spawn(async move {
// large sleep right at the start to give node time to sync
const MAX_DB_FREE_PAGES: u64 = 10000;
const TRUNCATE_WAL_THRESHOLD: u64 = 5 * 1024 * 1024 * 1024;

// try to initially truncate the WAL
match wal_checkpoint_over_threshold(wal_path.as_path(), &pool, TRUNCATE_WAL_THRESHOLD).await
{
Ok(truncated) if truncated => {
info!("initially truncated WAL");
}
Err(e) => {
error!("could not initially truncate WAL: {e}");
}
_ => {}
}

// large sleep right at the start to give node time to sync
sleep(Duration::from_secs(60)).await;

let mut vacuum_interval = tokio::time::interval(Duration::from_secs(60 * 15));
let mut vacuum_interval = tokio::time::interval(Duration::from_secs(60 * 5));

const MAX_DB_FREE_PAGES: u64 = 10000;

loop {
vacuum_interval.tick().await;
if let Err(e) = vacuum_db(pool.clone(), MAX_DB_FREE_PAGES).await {
if let Err(e) = vacuum_db(&pool, MAX_DB_FREE_PAGES).await {
error!("could not check freelist and vacuum: {e}");
}

if let Err(e) =
wal_checkpoint_over_threshold(wal_path.as_path(), &pool, TRUNCATE_WAL_THRESHOLD)
.await
{
error!("could not wal_checkpoint truncate: {e}");
}
}
});
}

async fn wal_checkpoint_over_threshold(
wal_path: &Utf8Path,
pool: &SplitPool,
threshold: u64,
) -> eyre::Result<bool> {
let should_truncate = wal_path.metadata()?.len() > threshold;
if should_truncate {
let conn = pool.write_low().await?;
block_in_place(|| wal_checkpoint(&conn))?;
}
Ok(should_truncate)
}

/// Handle incoming emptyset received during syncs
///_
#[allow(dead_code)]
Expand Down Expand Up @@ -996,22 +1036,22 @@ mod tests {
use tokio::sync::Semaphore;
use tokio::time::timeout;

// #[test]
// fn ensure_truncate_works() -> eyre::Result<()> {
// let tmpdir = tempfile::tempdir()?;
#[test]
fn ensure_truncate_works() -> eyre::Result<()> {
let tmpdir = tempfile::tempdir()?;

// let conn = rusqlite::Connection::open(tmpdir.path().join("db.sqlite"))?;
// let pragma_value = 12345u64;
// conn.pragma_update(None, "busy_timeout", pragma_value)?;
let conn = rusqlite::Connection::open(tmpdir.path().join("db.sqlite"))?;
let pragma_value = 12345u64;
conn.pragma_update(None, "busy_timeout", pragma_value)?;

// wal_checkpoint(&conn)?;
// assert_eq!(
// conn.pragma_query_value(None, "busy_timeout", |row| row.get::<_, u64>(0))?,
// pragma_value
// );
wal_checkpoint(&conn)?;
assert_eq!(
conn.pragma_query_value(None, "busy_timeout", |row| row.get::<_, u64>(0))?,
pragma_value
);

// Ok(())
// }
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn ensure_vacuum_works() -> eyre::Result<()> {
Expand Down Expand Up @@ -1058,7 +1098,7 @@ mod tests {
assert!(freelist > 1000);
}

timeout(Duration::from_secs(2), vacuum_db(pool.clone(), 1000)).await??;
timeout(Duration::from_secs(2), vacuum_db(&pool, 1000)).await??;

let conn = pool.read().await?;
assert!(
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Instant;

use crate::{
agent::{
handlers::{self, spawn_handle_vacuum},
handlers::{self, spawn_handle_db_maintenance},
metrics, setup, util, AgentOptions,
},
broadcast::runtime_loop,
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
notifications_rx,
));

spawn_handle_vacuum(agent.pool().clone());
spawn_handle_db_maintenance(&agent);

let bookie = Bookie::new_with_registry(Default::default(), lock_registry);
{
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub fn setup_conn(conn: &Connection) -> Result<(), rusqlite::Error> {
r#"
PRAGMA journal_mode = WAL;
PRAGMA journal_size_limit = 1073741824;
PRAGMA wal_autocheckpoint = 524288;
PRAGMA synchronous = NORMAL;
PRAGMA recursive_triggers = ON;
"#,
Expand Down

0 comments on commit 77dbeff

Please sign in to comment.