Skip to content

Seanaye/persistence trait #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iroh-blobs"
version = "0.34.1"
version = "0.35.1"
edition = "2021"
readme = "README.md"
description = "blob and collection transfer support for iroh"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
#![recursion_limit = "256"]
#![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))]
#![feature(async_closure)]

#[cfg(feature = "cli")]
pub mod cli;
16 changes: 12 additions & 4 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use std::{
collections::BTreeSet,
fmt::Debug,
ops::{Deref, DerefMut},
path::PathBuf,
sync::Arc,
};

@@ -20,7 +21,7 @@ use tracing::debug;
use crate::{
downloader::{ConcurrencyLimits, Downloader, RetryConfig},
provider::EventSender,
store::GcConfig,
store::{fs::Persistence, GcConfig},
util::{
local_pool::{self, LocalPool, LocalPoolHandle},
SetTagOption,
@@ -221,12 +222,19 @@ impl Blobs<crate::store::mem::Store> {
}
}

impl Blobs<crate::store::fs::Store> {
impl<T> Blobs<crate::store::fs::Store<T>>
where
T: Persistence,
{
/// Load a persistent Blobs protocol handler from a path.
pub async fn persistent(
path: impl AsRef<std::path::Path>,
) -> anyhow::Result<Builder<crate::store::fs::Store>> {
Ok(Self::builder(crate::store::fs::Store::load(path).await?))
db_path: PathBuf,
backend: T,
) -> anyhow::Result<Builder<crate::store::fs::Store<T>>> {
Ok(Self::builder(
crate::store::fs::Store::load_with_backend(path, db_path, backend).await?,
))
}
}

4 changes: 3 additions & 1 deletion src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
@@ -1028,6 +1028,7 @@ mod tests {
net_protocol::Blobs,
provider::{CustomEventSender, EventSender},
rpc::client::{blobs, tags},
store::fs::FileSystemPersistence,
};

type RpcClient = quic_rpc::RpcClient<RpcService>;
@@ -1113,7 +1114,8 @@ mod tests {
/// Creates a new node with persistent storage
pub async fn persistent(
path: impl AsRef<Path>,
) -> anyhow::Result<Builder<crate::store::fs::Store>> {
) -> anyhow::Result<Builder<crate::store::fs::Store<FileSystemPersistence>>>
{
Ok(Builder {
store: crate::store::fs::Store::load(path).await?,
events: Default::default(),
260 changes: 165 additions & 95 deletions src/store/bao_file.rs

Large diffs are not rendered by default.

542 changes: 405 additions & 137 deletions src/store/fs.rs

Large diffs are not rendered by default.

27 changes: 18 additions & 9 deletions src/store/fs/test_support.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ use redb::ReadableTable;
use super::{
tables::{ReadableTables, Tables},
ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate,
OutboardLocation, OuterResult, Store, StoreInner,
OutboardLocation, OuterResult, Persistence, Store, StoreInner,
};
use crate::{
store::{mutable_mem_storage::SizeInfo, DbIter},
@@ -46,10 +46,13 @@ pub enum EntryData {
},
}

impl Store {
impl<T> Store<T>
where
T: Persistence,
{
/// Get the complete state of an entry, both in memory and in redb.
#[cfg(test)]
pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse> {
pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result<EntryStateResponse<T::File>> {
Ok(self.0.entry_state(hash).await?)
}

@@ -102,9 +105,12 @@ impl Store {
}
}

impl StoreInner {
impl<T> StoreInner<T>
where
T: Persistence,
{
#[cfg(test)]
async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse<T::File>> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::EntryState { hash, tx }).await?;
Ok(rx.await??)
@@ -145,12 +151,15 @@ impl StoreInner {

#[cfg(test)]
#[derive(Debug)]
pub(crate) struct EntryStateResponse {
pub mem: Option<crate::store::bao_file::BaoFileHandle>,
pub(crate) struct EntryStateResponse<T> {
pub mem: Option<crate::store::bao_file::BaoFileHandle<T>>,
pub db: Option<EntryState<Vec<u8>>>,
}

impl ActorState {
impl<T> ActorState<T>
where
T: Persistence,
{
pub(super) fn get_full_entry_state(
&mut self,
tables: &impl ReadableTables,
@@ -297,7 +306,7 @@ impl ActorState {
&mut self,
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<EntryStateResponse> {
) -> ActorResult<EntryStateResponse<T::File>> {
let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade());
let db = match tables.blobs().get(hash)? {
Some(entry) => Some({
10 changes: 7 additions & 3 deletions src/store/fs/tests.rs
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ pub fn to_stream(
.boxed()
}

async fn create_test_db() -> (tempfile::TempDir, Store) {
async fn create_test_db() -> (tempfile::TempDir, Store<FileSystemPersistence>) {
let _ = tracing_subscriber::fmt::try_init();
let testdir = tempfile::tempdir().unwrap();
let db_path = testdir.path().join("db.redb");
@@ -59,7 +59,9 @@ async fn create_test_db() -> (tempfile::TempDir, Store) {
batch: Default::default(),
inline: Default::default(),
};
let db = Store::new(db_path, options).await.unwrap();
let db = Store::new(db_path, options, FileSystemPersistence)
.await
.unwrap();
(testdir, db)
}

@@ -788,7 +790,9 @@ async fn actor_store_smoke() {
batch: Default::default(),
inline: Default::default(),
};
let db = Store::new(db_path, options).await.unwrap();
let db = Store::new(db_path, options, FileSystemPersistence)
.await
.unwrap();
db.dump().await.unwrap();
let data = random_test_data(1024 * 1024);
#[allow(clippy::single_range_in_vec_init)]
7 changes: 5 additions & 2 deletions src/store/fs/validate.rs
Original file line number Diff line number Diff line change
@@ -5,14 +5,17 @@ use redb::ReadableTable;

use super::{
raw_outboard_size, tables::Tables, ActorResult, ActorState, DataLocation, EntryState, Hash,
OutboardLocation,
OutboardLocation, Persistence,
};
use crate::{
store::{fs::tables::BaoFilePart, ConsistencyCheckProgress, ReportLevel},
util::progress::BoxedProgressSender,
};

impl ActorState {
impl<T> ActorState<T>
where
T: Persistence,
{
//! This performs a full consistency check. Eventually it will also validate
//! file content again, but that part is not yet implemented.
//!
3 changes: 2 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
@@ -19,9 +19,10 @@ pub mod fs;
pub mod io;
mod mem_or_file;
pub mod progress;
pub use mem_or_file::MemOrFile;
pub use mem_or_file::{FileAndSize, MemOrFile};
mod sparse_mem_file;
pub use sparse_mem_file::SparseMemFile;
pub mod callback_lock;
pub mod local_pool;

#[cfg(test)]
77 changes: 77 additions & 0 deletions src/util/callback_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! This module defines a wrapper around a [`tokio::sync::RwLock`] that runs a callback
//! After any write operation occurs
use std::future::Future;

/// A wrapper over a [`tokio::sync::RwLock`] that executes a callback function after
/// the write guard is dropped
#[derive(derive_more::Debug)]
pub struct CallbackLock<T, F> {
inner: tokio::sync::RwLock<T>,
#[debug(skip)]
callback: F,
}

/// the wrapper type over a [tokio::sync::RwLockWriteGuard]
#[derive(Debug)]
pub struct CallbackLockWriteGuard<'a, T, F: Fn(&T)> {
inner: tokio::sync::RwLockWriteGuard<'a, T>,
callback: &'a F,
}

impl<T, F: Fn(&T)> std::ops::Deref for CallbackLockWriteGuard<'_, T, F> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T, F: Fn(&T)> std::ops::DerefMut for CallbackLockWriteGuard<'_, T, F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<T, F: Fn(&T)> Drop for CallbackLockWriteGuard<'_, T, F> {
fn drop(&mut self) {
(self.callback)(&*self.inner);
}
}

impl<T, F> CallbackLock<T, F>
where
F: Fn(&T),
{
/// create a new instance of the lock from a value
/// and the callback to evaluate when a write guard is dropped
pub fn new(val: T, callback: F) -> Self {
CallbackLock {
inner: tokio::sync::RwLock::new(val),
callback,
}
}

/// return an instance of the write guard
pub async fn write(&self) -> CallbackLockWriteGuard<'_, T, F> {
let guard = self.inner.write().await;

CallbackLockWriteGuard {
inner: guard,
callback: &self.callback,
}
}

/// return the [tokio::sync::RwLockReadGuard]
/// this will not invoke the callback
pub fn read(&self) -> impl Future<Output = tokio::sync::RwLockReadGuard<'_, T>> {
self.inner.read()
}

/// try to synchronously acquire a read lock
pub fn try_read(
&self,
) -> Result<tokio::sync::RwLockReadGuard<'_, T>, tokio::sync::TryLockError> {
self.inner.try_read()
}
}
51 changes: 48 additions & 3 deletions src/util/mem_or_file.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::File, io};
use std::{fs::File, future::Future, io};

use bao_tree::io::sync::{ReadAt, Size};
use bytes::Bytes;
@@ -14,17 +14,62 @@ pub enum MemOrFile<M, F> {
File(F),
}

/// A struct which represents a handle to some file which
/// is _not_ in memory and its size
#[derive(derive_more::Debug)]
pub struct FileAndSize<T> {
/// the generic file type
pub file: T,
/// the size in bytes of the file
pub size: u64,
}

impl<T> FileAndSize<T> {
/// map the type of file asynchronously.
/// This is analogous to [Option::map]
pub fn map_async<U, F>(
self,
f: F,
) -> impl Future<Output = FileAndSize<U::Output>> + use<U, F, T> + 'static
where
F: FnOnce(T) -> U + Send + 'static,
T: 'static,
U: Future + Send + 'static,
U::Output: Send + 'static,
{
let FileAndSize { file, size } = self;
async move {
FileAndSize {
file: f(file).await,
size,
}
}
}
}

impl<T, U> FileAndSize<Result<T, U>> {
/// factor out the error from inside the [FileAndSize]
/// this is analogous to [Option::transpose]
pub fn transpose(self) -> Result<FileAndSize<T>, U> {
let FileAndSize { file, size } = self;
match file {
Ok(t) => Ok(FileAndSize { file: t, size }),
Err(e) => Err(e),
}
}
}

/// Helper methods for a common way to use MemOrFile, where the memory part is something
/// like a slice, and the file part is a tuple consisiting of path or file and size.
impl<M, F> MemOrFile<M, (F, u64)>
impl<M, F> MemOrFile<M, FileAndSize<F>>
where
M: AsRef<[u8]>,
{
/// Get the size of the MemOrFile
pub fn size(&self) -> u64 {
match self {
MemOrFile::Mem(mem) => mem.as_ref().len() as u64,
MemOrFile::File((_, size)) => *size,
MemOrFile::File(FileAndSize { file: _, size }) => *size,
}
}
}
6 changes: 3 additions & 3 deletions tests/gc.rs
Original file line number Diff line number Diff line change
@@ -22,8 +22,8 @@ use iroh_blobs::{
net_protocol::Blobs,
rpc::client::{blobs, tags},
store::{
bao_tree, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, GcConfig, MapEntryMut,
MapMut, ReportLevel, Store,
bao_tree, fs::FileSystemPersistence, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus,
GcConfig, MapEntryMut, MapMut, ReportLevel, Store,
},
util::{
progress::{AsyncChannelProgressSender, ProgressSender as _},
@@ -127,7 +127,7 @@ async fn persistent_node(
path: PathBuf,
gc_period: Duration,
) -> (
Node<iroh_blobs::store::fs::Store>,
Node<iroh_blobs::store::fs::Store<FileSystemPersistence>>,
async_channel::Receiver<()>,
) {
let store = iroh_blobs::store::fs::Store::load(path).await.unwrap();
4 changes: 2 additions & 2 deletions tests/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg(feature = "test")]
use std::{net::SocketAddr, path::PathBuf, vec};

use iroh_blobs::net_protocol::Blobs;
use iroh_blobs::{net_protocol::Blobs, store::fs::FileSystemPersistence};
use quic_rpc::client::QuinnConnector;
use tempfile::TempDir;
use testresult::TestResult;
@@ -14,7 +14,7 @@ type BlobsClient = iroh_blobs::rpc::client::blobs::Client<QC>;
#[derive(Debug)]
pub struct Node {
pub router: iroh::protocol::Router,
pub blobs: Blobs<iroh_blobs::store::fs::Store>,
pub blobs: Blobs<iroh_blobs::store::fs::Store<FileSystemPersistence>>,
pub rpc_task: AbortOnDropHandle<()>,
}

3 changes: 2 additions & 1 deletion tests/tags.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use iroh_blobs::{
client::tags::{self, TagInfo},
proto::RpcService,
},
store::fs::FileSystemPersistence,
Hash, HashAndFormat,
};
use testresult::TestResult;
@@ -142,7 +143,7 @@ async fn tags_smoke_mem() -> TestResult<()> {
async fn tags_smoke_fs() -> TestResult<()> {
let td = tempfile::tempdir()?;
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::persistent(td.path().join("blobs.db"))
let blobs = Blobs::persistent(td.path(), td.path().join("blobs.db"), FileSystemPersistence)
.await?
.build(&endpoint);
let client = blobs.client();