Skip to content

Commit 048538f

Browse files
committed
Add garbage collection to blobs
also add ability to add excepmtions for gc before gc is started.
1 parent a24171c commit 048538f

File tree

1 file changed

+66
-53
lines changed

1 file changed

+66
-53
lines changed

src/net_protocol.rs

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
#![allow(missing_docs)]
55

66
use std::{
7-
collections::BTreeMap,
7+
collections::{BTreeMap, BTreeSet},
88
fmt::Debug,
9+
ops::DerefMut,
910
sync::{Arc, OnceLock},
1011
};
1112

12-
use anyhow::{anyhow, Result};
13+
use anyhow::{anyhow, bail, Result};
1314
use futures_lite::future::Boxed as BoxedFuture;
15+
use futures_util::future::BoxFuture;
1416
use iroh_base::hash::{BlobFormat, Hash};
1517
use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr};
1618
use iroh_router::ProtocolHandler;
@@ -24,27 +26,32 @@ use crate::{
2426
Stats,
2527
},
2628
provider::EventSender,
29+
store::GcConfig,
2730
util::{
28-
local_pool::LocalPoolHandle,
31+
local_pool::{self, LocalPoolHandle},
2932
progress::{AsyncChannelProgressSender, ProgressSender},
3033
SetTagOption,
3134
},
3235
HashAndFormat, TempTag,
3336
};
3437

35-
// pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
36-
//
37-
// #[derive(derive_more::Debug)]
38-
// enum GcState {
39-
// Initial(#[debug(skip)] Vec<ProtectCb>),
40-
// Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41-
// }
42-
//
43-
// impl Default for GcState {
44-
// fn default() -> Self {
45-
// Self::Initial(Vec::new())
46-
// }
47-
// }
38+
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
39+
pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
40+
41+
/// The state of the gc loop.
42+
#[derive(derive_more::Debug)]
43+
enum GcState {
44+
// Gc loop is not yet running. Other protcols can add protect callbacks
45+
Initial(#[debug(skip)] Vec<ProtectCb>),
46+
// Gc loop is running. No more protect callbacks can be added.
47+
Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
48+
}
49+
50+
impl Default for GcState {
51+
fn default() -> Self {
52+
Self::Initial(Vec::new())
53+
}
54+
}
4855

4956
#[derive(Debug)]
5057
pub struct Blobs<S> {
@@ -54,6 +61,7 @@ pub struct Blobs<S> {
5461
downloader: Downloader,
5562
batches: tokio::sync::Mutex<BlobBatches>,
5663
endpoint: Endpoint,
64+
gc_state: Arc<std::sync::Mutex<GcState>>,
5765
#[cfg(feature = "rpc")]
5866
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
5967
}
@@ -185,6 +193,7 @@ impl<S: crate::store::Store> Blobs<S> {
185193
downloader,
186194
endpoint,
187195
batches: Default::default(),
196+
gc_state: Default::default(),
188197
#[cfg(feature = "rpc")]
189198
rpc_handler: Arc::new(OnceLock::new()),
190199
}
@@ -206,43 +215,47 @@ impl<S: crate::store::Store> Blobs<S> {
206215
&self.endpoint
207216
}
208217

209-
// pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
210-
// let mut state = self.gc_state.lock().unwrap();
211-
// match &mut *state {
212-
// GcState::Initial(cbs) => {
213-
// cbs.push(cb);
214-
// }
215-
// GcState::Started(_) => {
216-
// anyhow::bail!("cannot add protected blobs after gc has started");
217-
// }
218-
// }
219-
// Ok(())
220-
// }
221-
//
222-
// pub fn start_gc(&self, config: GcConfig) -> Result<()> {
223-
// let mut state = self.gc_state.lock().unwrap();
224-
// let protected = match state.deref_mut() {
225-
// GcState::Initial(items) => std::mem::take(items),
226-
// GcState::Started(_) => anyhow::bail!("gc already started"),
227-
// };
228-
// let protected = Arc::new(protected);
229-
// let protected_cb = move || {
230-
// let protected = protected.clone();
231-
// async move {
232-
// let mut set = BTreeSet::new();
233-
// for cb in protected.iter() {
234-
// cb(&mut set).await;
235-
// }
236-
// set
237-
// }
238-
// };
239-
// let store = self.store.clone();
240-
// let run = self
241-
// .rt
242-
// .spawn(move || async move { store.gc_run(config, protected_cb).await });
243-
// *state = GcState::Started(Some(run));
244-
// Ok(())
245-
// }
218+
/// Add a callback that will be called before the garbage collector runs.
219+
///
220+
/// This can only be called before the garbage collector has started, otherwise it will return an error.
221+
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
222+
let mut state = self.gc_state.lock().unwrap();
223+
match &mut *state {
224+
GcState::Initial(cbs) => {
225+
cbs.push(cb);
226+
}
227+
GcState::Started(_) => {
228+
anyhow::bail!("cannot add protected blobs after gc has started");
229+
}
230+
}
231+
Ok(())
232+
}
233+
234+
/// Start garbage collection with the given settings.
235+
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
236+
let mut state = self.gc_state.lock().unwrap();
237+
let protected = match state.deref_mut() {
238+
GcState::Initial(items) => std::mem::take(items),
239+
GcState::Started(_) => bail!("gc already started"),
240+
};
241+
let protected = Arc::new(protected);
242+
let protected_cb = move || {
243+
let protected = protected.clone();
244+
async move {
245+
let mut set = BTreeSet::new();
246+
for cb in protected.iter() {
247+
cb(&mut set).await;
248+
}
249+
set
250+
}
251+
};
252+
let store = self.store.clone();
253+
let run = self
254+
.rt
255+
.spawn(move || async move { store.gc_run(config, protected_cb).await });
256+
*state = GcState::Started(Some(run));
257+
Ok(())
258+
}
246259

247260
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
248261
self.batches.lock().await

0 commit comments

Comments
 (0)