Skip to content

feat: add back gc protect callback #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: Frando/fix-deletion
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions src/store/fs/gc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashSet;
use std::{collections::HashSet, pin::Pin, sync::Arc};

use bao_tree::ChunkRanges;
use genawaiter::sync::{Co, Gen};
use n0_future::{Stream, StreamExt};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use crate::{api::Store, Hash, HashAndFormat};

Expand Down Expand Up @@ -130,14 +130,52 @@ fn gc_sweep<'a>(
})
}

#[derive(Debug, Clone)]
/// Configuration for garbage collection.
#[derive(derive_more::Debug, Clone)]
pub struct GcConfig {
/// Interval in which to run garbage collection.
pub interval: std::time::Duration,
/// Optional callback to manually add protected blobs.
///
/// The callback is called before each garbage collection run. It gets a `&mut HashSet<Hash>`
/// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the
/// [`HashSet`] will be protected from garbage collection during this run.
///
/// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return
/// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your
/// source of hashes to protect returned an error, and thus garbage collection should be skipped
/// completely to not unintentionally delete blobs that should be protected.
#[debug("ProtectCallback")]
pub add_protected: Option<ProtectCb>,
}

/// Returned from [`ProtectCb`].
///
/// See [`GcConfig::add_protected] for details.
#[derive(Debug)]
pub enum ProtectOutcome {
/// Continue with the garbage collection run.
Continue,
/// Abort the garbage collection run.
Abort,
}

/// The type of the garbage collection callback.
///
/// See [`GcConfig::add_protected] for details.
pub type ProtectCb = Arc<
dyn for<'a> Fn(
&'a mut HashSet<Hash>,
)
-> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
+ Send
+ Sync
+ 'static,
>;

pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
debug!(externally_protected = live.len(), "gc: start");
{
live.clear();
store.clear_protected().await?;
let mut stream = gc_mark(store, live);
while let Some(ev) = stream.next().await {
Expand All @@ -155,6 +193,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
}
}
}
debug!(total_protected = live.len(), "gc: sweep");
{
let mut stream = gc_sweep(store, live);
while let Some(ev) = stream.next().await {
Expand All @@ -172,14 +211,26 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
}
}
}
debug!("gc: done");

Ok(())
}

pub async fn run_gc(store: Store, config: GcConfig) {
debug!("gc enabled with interval {:?}", config.interval);
let mut live = HashSet::new();
loop {
live.clear();
tokio::time::sleep(config.interval).await;
if let Some(ref cb) = config.add_protected {
match (cb)(&mut live).await {
ProtectOutcome::Continue => {}
ProtectOutcome::Abort => {
info!("abort gc run: protect callback indicated abort");
continue;
}
}
}
if let Err(e) = gc_run_once(&store, &mut live).await {
error!("error during gc run: {e}");
break;
Expand Down Expand Up @@ -288,6 +339,7 @@ mod tests {
assert!(!data_path.exists());
assert!(!outboard_path.exists());
}
live.clear();
// create a large partial file and check that the data and outboard file as well as
// the sizes and bitfield files are deleted by gc
{
Expand Down
3 changes: 2 additions & 1 deletion src/store/fs/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::{
time::Duration,
};

use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
pub use super::gc::{GcConfig, ProtectCb, ProtectOutcome};
use super::{meta::raw_outboard_size, temp_name};
use crate::Hash;

/// Options for directories used by the file store.
Expand Down
Loading