From 9ec6451a531151dbaaad14cf7f10ac125fe5aa73 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Fri, 25 Apr 2025 23:20:40 -0400 Subject: [PATCH 01/15] begin replacing fs with interface replace fs::metadata --- src/store/fs.rs | 72 +++++++++++++++++++++++++++++------- src/store/fs/test_support.rs | 2 +- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 0628dc183..53054a3fe 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -84,7 +84,7 @@ use iroh_io::AsyncSliceReader; use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use tokio::io::AsyncWriteExt; +use tokio::{io::AsyncWriteExt, runtime::Handle}; use tracing::trace_span; mod tables; #[doc(hidden)] @@ -441,6 +441,30 @@ pub(crate) enum ImportSource { Memory(#[debug(skip)] Bytes), } +/// trait which defines the backend persistence layer +/// for this store. e.g. filesystem, s3 etc +pub trait Persistence: Clone { + /// the error type that is returned for the persistence layer + type Err; + + /// return the size of the file in bytes if it can be found/read + /// otherwise return a [Self::Err] + fn size(&self, path: &Path) -> impl Future>; +} + +/// A persistence layer that writes to the local file system +#[derive(Debug, Clone, Copy)] +pub struct FileSystemPersistence; + +impl Persistence for FileSystemPersistence { + type Err = io::Error; + + fn size(&self, path: &Path) -> impl Future> { + let res = std::fs::metadata(path).map(|m| m.len()); + async move { res } + } +} + impl ImportSource { fn content(&self) -> MemOrFile<&[u8], &Path> { match self { @@ -450,10 +474,10 @@ impl ImportSource { } } - fn len(&self) -> io::Result { + async fn len(&self, fs: &T) -> Result { match self { - Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()), - Self::External(path) => std::fs::metadata(path).map(|m| m.len()), + Self::TempFile(path) => fs.size(path).await, + Self::External(path) => fs.size(path).await, Self::Memory(data) => Ok(data.len() as u64), } } @@ -711,7 +735,7 @@ pub(crate) type FilterPredicate = /// Storage that is using a redb database for small files and files for /// large files. #[derive(Debug, Clone)] -pub struct Store(Arc); +pub struct Store(Arc>); impl Store { /// Load or create a new store. @@ -758,11 +782,12 @@ impl Store { } #[derive(Debug)] -struct StoreInner { +struct StoreInner { tx: async_channel::Sender, temp: Arc>, handle: Option>, path_options: Arc, + fs: T, } impl TagDrop for RwLock { @@ -777,8 +802,23 @@ impl TagCounter for RwLock { } } -impl StoreInner { +impl StoreInner { fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { + Self::new_sync_with_backend(path, options, rt, FileSystemPersistence) + } +} + +impl StoreInner +where + T: Persistence, + OuterError: From, +{ + fn new_sync_with_backend( + path: PathBuf, + options: Options, + rt: tokio::runtime::Handle, + fs: T, + ) -> io::Result { tracing::trace!( "creating data directory: {}", options.path.data_path.display() @@ -811,6 +851,7 @@ impl StoreInner { temp, handle: Some(handle), path_options: Arc::new(options.path), + fs, }) } @@ -977,10 +1018,13 @@ impl StoreInner { .into()); } let parent = target.parent().ok_or_else(|| { - OuterError::from(io::Error::new( - io::ErrorKind::InvalidInput, - "target path has no parent directory", - )) + OuterError::Inner( + io::Error::new( + io::ErrorKind::InvalidInput, + "target path has no parent directory", + ) + .into(), + ) })?; std::fs::create_dir_all(parent)?; let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); @@ -1069,7 +1113,7 @@ impl StoreInner { let file = match mode { ImportMode::TryReference => ImportSource::External(path), ImportMode::Copy => { - if std::fs::metadata(&path)?.len() < 16 * 1024 { + if Handle::current().block_on(self.fs.size(&path))? < 16 * 1024 { // we don't know if the data will be inlined since we don't // have the inline options here. But still for such a small file // it does not seem worth it do to the temp file ceremony. @@ -1108,7 +1152,7 @@ impl StoreInner { id: u64, progress: impl ProgressSender + IdGenerator, ) -> OuterResult<(TempTag, u64)> { - let data_size = file.len()?; + let data_size = Handle::current().block_on(file.len(&self.fs))?; tracing::debug!("finalize_import_sync {:?} {}", file, data_size); progress.blocking_send(ImportProgress::Size { id, @@ -1161,7 +1205,7 @@ impl StoreInner { } } -impl Drop for StoreInner { +impl Drop for StoreInner { fn drop(&mut self) { if let Some(handle) = self.handle.take() { self.tx diff --git a/src/store/fs/test_support.rs b/src/store/fs/test_support.rs index 9cc62bb86..b1c5ce8bd 100644 --- a/src/store/fs/test_support.rs +++ b/src/store/fs/test_support.rs @@ -102,7 +102,7 @@ impl Store { } } -impl StoreInner { +impl StoreInner { #[cfg(test)] async fn entry_state(&self, hash: Hash) -> OuterResult { let (tx, rx) = oneshot::channel(); From e8648c729006c6465706ea442cf9e45897c71dfd Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Fri, 25 Apr 2025 23:36:31 -0400 Subject: [PATCH 02/15] add read method to interface --- src/store/fs.rs | 49 ++++++++++++++++++++++++++++-------- src/store/fs/test_support.rs | 2 +- src/store/fs/validate.rs | 2 +- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 53054a3fe..3a7b489ee 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -450,6 +450,11 @@ pub trait Persistence: Clone { /// return the size of the file in bytes if it can be found/read /// otherwise return a [Self::Err] fn size(&self, path: &Path) -> impl Future>; + + /// read the contents of the file at the path + /// returning the bytes of the file in the success case + /// and [Self::Err] in the error case + fn read(&self, path: &Path) -> impl Future, Self::Err>>; } /// A persistence layer that writes to the local file system @@ -463,6 +468,11 @@ impl Persistence for FileSystemPersistence { let res = std::fs::metadata(path).map(|m| m.len()); async move { res } } + + fn read(&self, path: &Path) -> impl Future, Self::Err>> { + let res = std::fs::read(path); + async move { res } + } } impl ImportSource { @@ -1117,7 +1127,7 @@ where // we don't know if the data will be inlined since we don't // have the inline options here. But still for such a small file // it does not seem worth it do to the temp file ceremony. - let data = std::fs::read(&path)?; + let data = Handle::current().block_on(self.fs.read(&path))?; ImportSource::Memory(data.into()) } else { let temp_path = self.temp_file_name(); @@ -1216,7 +1226,7 @@ impl Drop for StoreInner { } } -struct ActorState { +struct ActorState { handles: BTreeMap, protected: BTreeSet, temp: Arc>, @@ -1224,15 +1234,16 @@ struct ActorState { create_options: Arc, options: Options, rt: tokio::runtime::Handle, + fs: T, } /// The actor for the redb store. /// /// It is split into the database and the rest of the state to allow for split /// borrows in the message handlers. -struct Actor { +struct Actor { db: redb::Database, - state: ActorState, + state: ActorState, } /// Error type for message handler functions of the redb actor. @@ -1586,12 +1597,13 @@ pub(super) async fn gc_sweep_task( Ok(()) } -impl Actor { - fn new( +impl Actor { + fn new_with_backend( path: &Path, options: Options, temp: Arc>, rt: tokio::runtime::Handle, + fs: T, ) -> ActorResult<(Self, async_channel::Sender)> { let db = match redb::Database::create(path) { Ok(db) => db, @@ -1635,11 +1647,23 @@ impl Actor { options, create_options: Arc::new(create_options), rt, + fs, }, }, tx, )) } +} + +impl Actor { + fn new( + path: &Path, + options: Options, + temp: Arc>, + rt: tokio::runtime::Handle, + ) -> ActorResult<(Self, async_channel::Sender)> { + Self::new_with_backend(path, options, temp, rt, FileSystemPersistence) + } async fn run_batched(mut self) -> ActorResult<()> { let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone()); @@ -1723,7 +1747,11 @@ impl Actor { } } -impl ActorState { +impl ActorState +where + T: Persistence, + ActorError: From, +{ fn entry_status( &mut self, tables: &impl ReadableTables, @@ -1914,7 +1942,8 @@ impl ActorState { "reading external data to inline it: {}", external_path.display() ); - let data = Bytes::from(std::fs::read(&external_path)?); + let data = + Bytes::from(Handle::current().block_on(self.fs.read(&external_path))?); DataLocation::Inline(data) } else { DataLocation::External(vec![external_path], data_size) @@ -2167,7 +2196,7 @@ impl ActorState { // inline if size <= self.options.inline.max_data_inlined { let path = self.options.path.owned_data_path(&hash); - let data = std::fs::read(&path)?; + let data = Handle::current().block_on(self.fs.read(&path))?; tables.delete_after_commit.insert(hash, [BaoFilePart::Data]); tables.inline_data.insert(hash, data.as_slice())?; (DataLocation::Inline(()), size, true) @@ -2202,7 +2231,7 @@ impl ActorState { if outboard_size <= self.options.inline.max_outboard_inlined => { let path = self.options.path.owned_outboard_path(&hash); - let outboard = std::fs::read(&path)?; + let outboard = Handle::current().block_on(self.fs.read(&path))?; tables .delete_after_commit .insert(hash, [BaoFilePart::Outboard]); diff --git a/src/store/fs/test_support.rs b/src/store/fs/test_support.rs index b1c5ce8bd..f8087e493 100644 --- a/src/store/fs/test_support.rs +++ b/src/store/fs/test_support.rs @@ -150,7 +150,7 @@ pub(crate) struct EntryStateResponse { pub db: Option>>, } -impl ActorState { +impl ActorState { pub(super) fn get_full_entry_state( &mut self, tables: &impl ReadableTables, diff --git a/src/store/fs/validate.rs b/src/store/fs/validate.rs index ae1870471..b42916742 100644 --- a/src/store/fs/validate.rs +++ b/src/store/fs/validate.rs @@ -12,7 +12,7 @@ use crate::{ util::progress::BoxedProgressSender, }; -impl ActorState { +impl ActorState { //! This performs a full consistency check. Eventually it will also validate //! file content again, but that part is not yet implemented. //! From 53240c2d1ab7de432f2a60aceb2cd91ce2363371 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Fri, 25 Apr 2025 23:51:43 -0400 Subject: [PATCH 03/15] add create dir all to interface --- src/store/fs.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 3a7b489ee..50547ccef 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -455,6 +455,9 @@ pub trait Persistence: Clone { /// returning the bytes of the file in the success case /// and [Self::Err] in the error case fn read(&self, path: &Path) -> impl Future, Self::Err>>; + + /// recursively ensure that the input path exists + fn create_dir_all(&self, path: &Path) -> impl Future>; } /// A persistence layer that writes to the local file system @@ -473,6 +476,11 @@ impl Persistence for FileSystemPersistence { let res = std::fs::read(path); async move { res } } + + fn create_dir_all(&self, path: &Path) -> impl Future> { + let res = std::fs::create_dir_all(path); + async move { res } + } } impl ImportSource { @@ -822,6 +830,7 @@ impl StoreInner where T: Persistence, OuterError: From, + io::Error: From, { fn new_sync_with_backend( path: PathBuf, @@ -833,17 +842,17 @@ where "creating data directory: {}", options.path.data_path.display() ); - std::fs::create_dir_all(&options.path.data_path)?; + rt.block_on(fs.create_dir_all(&options.path.data_path))?; tracing::trace!( "creating temp directory: {}", options.path.temp_path.display() ); - std::fs::create_dir_all(&options.path.temp_path)?; + rt.block_on(fs.create_dir_all(&options.path.temp_path))?; tracing::trace!( "creating parent directory for db file{}", path.parent().unwrap().display() ); - std::fs::create_dir_all(path.parent().unwrap())?; + rt.block_on(fs.create_dir_all(path.parent().unwrap()))?; let temp: Arc> = Default::default(); let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?; let handle = std::thread::Builder::new() @@ -1036,7 +1045,7 @@ where .into(), ) })?; - std::fs::create_dir_all(parent)?; + Handle::current().block_on(self.fs.create_dir_all(parent))?; let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx From 4d4d8b6af4fae30e35f7fed534b02b97f0b6c049 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Sun, 27 Apr 2025 11:10:25 -0400 Subject: [PATCH 04/15] update fs trait impl --- src/store/fs.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 50547ccef..3282046af 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -443,10 +443,14 @@ pub(crate) enum ImportSource { /// trait which defines the backend persistence layer /// for this store. e.g. filesystem, s3 etc -pub trait Persistence: Clone { +pub trait Persistence { /// the error type that is returned for the persistence layer type Err; + /// the type which represents a file which was read from the persistence + /// layer + type File; + /// return the size of the file in bytes if it can be found/read /// otherwise return a [Self::Err] fn size(&self, path: &Path) -> impl Future>; @@ -458,6 +462,9 @@ pub trait Persistence: Clone { /// recursively ensure that the input path exists fn create_dir_all(&self, path: &Path) -> impl Future>; + + /// read and return the file at the input path + fn open(&self, path: &Path) -> impl Future>; } /// A persistence layer that writes to the local file system @@ -466,20 +473,22 @@ pub struct FileSystemPersistence; impl Persistence for FileSystemPersistence { type Err = io::Error; + type File = tokio::fs::File; - fn size(&self, path: &Path) -> impl Future> { - let res = std::fs::metadata(path).map(|m| m.len()); - async move { res } + async fn size(&self, path: &Path) -> Result { + tokio::fs::metadata(path).await.map(|m| m.len()) } fn read(&self, path: &Path) -> impl Future, Self::Err>> { - let res = std::fs::read(path); - async move { res } + tokio::fs::read(path) } fn create_dir_all(&self, path: &Path) -> impl Future> { - let res = std::fs::create_dir_all(path); - async move { res } + tokio::fs::create_dir_all(path) + } + + fn open(&self, path: &Path) -> impl Future> { + tokio::fs::File::open(path) } } From ec911ab94311066f97e1e298c1f158227fc9adb9 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Sun, 27 Apr 2025 12:03:49 -0400 Subject: [PATCH 05/15] begin to pass around generic file type --- src/store/bao_file.rs | 132 ++++++++++++++++++++++++++++------------ src/store/fs.rs | 26 +++++--- src/util.rs | 2 +- src/util/mem_or_file.rs | 14 ++++- 4 files changed, 121 insertions(+), 53 deletions(-) diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index c06328669..65b28e27f 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -32,7 +32,7 @@ use iroh_io::AsyncSliceReader; use super::mutable_mem_storage::{MutableMemStorage, SizeInfo}; use crate::{ store::BaoBatchWriter, - util::{get_limited_slice, MemOrFile, SparseMemFile}, + util::{callback_lock::CallbackLock, get_limited_slice, FileAndSize, MemOrFile, SparseMemFile}, Hash, IROH_BLOCK_SIZE, }; @@ -81,22 +81,37 @@ struct DataPaths { /// /// For the memory variant, it does reading in a zero copy way, since storage /// is already a `Bytes`. -#[derive(Default, derive_more::Debug)] -pub struct CompleteStorage { +#[derive(derive_more::Debug)] +#[debug(bound(T: Debug))] +pub struct CompleteStorage { /// data part, which can be in memory or on disk. #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))] - pub data: MemOrFile, + pub data: MemOrFile>, /// outboard part, which can be in memory or on disk. #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))] - pub outboard: MemOrFile, + pub outboard: MemOrFile>, } -impl CompleteStorage { +impl Default for CompleteStorage { + fn default() -> Self { + Self { + data: Default::default(), + outboard: Default::default(), + } + } +} + +impl CompleteStorage +where + T: bao_tree::io::sync::ReadAt, +{ /// Read from the data file at the given offset, until end of file or max bytes. pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes { match &self.data { MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len), - MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(), + MemOrFile::File(FileAndSize { file, size: _ }) => { + read_to_end(file, offset, len).unwrap() + } } } @@ -104,7 +119,9 @@ impl CompleteStorage { pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes { match &self.outboard { MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len), - MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(), + MemOrFile::File(FileAndSize { file, size: _ }) => { + read_to_end(file, offset, len).unwrap() + } } } @@ -112,7 +129,7 @@ impl CompleteStorage { pub fn data_size(&self) -> u64 { match &self.data { MemOrFile::Mem(mem) => mem.len() as u64, - MemOrFile::File((_file, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } @@ -120,7 +137,7 @@ impl CompleteStorage { pub fn outboard_size(&self) -> u64 { match &self.outboard { MemOrFile::Mem(mem) => mem.len() as u64, - MemOrFile::File((_file, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } } @@ -244,7 +261,7 @@ impl FileStorage { /// The storage for a bao file. This can be either in memory or on disk. #[derive(Debug)] -pub(crate) enum BaoFileStorage { +pub(crate) enum BaoFileStorage { /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -261,16 +278,16 @@ pub(crate) enum BaoFileStorage { /// (memory or file). /// /// Writing to this is a no-op, since it is already complete. - Complete(CompleteStorage), + Complete(CompleteStorage), } -impl Default for BaoFileStorage { +impl Default for BaoFileStorage { fn default() -> Self { BaoFileStorage::Complete(Default::default()) } } -impl BaoFileStorage { +impl BaoFileStorage { /// Take the storage out, leaving an empty storage in its place. /// /// Be careful to put something back in its place, or you will lose data. @@ -310,11 +327,11 @@ impl BaoFileStorage { /// A weak reference to a bao file handle. #[derive(Debug, Clone)] -pub struct BaoFileHandleWeak(Weak); +pub struct BaoFileHandleWeak(Weak>); -impl BaoFileHandleWeak { +impl BaoFileHandleWeak { /// Upgrade to a strong reference if possible. - pub fn upgrade(&self) -> Option { + pub fn upgrade(&self) -> Option> { self.0.upgrade().map(BaoFileHandle) } @@ -326,15 +343,29 @@ impl BaoFileHandleWeak { /// The inner part of a bao file handle. #[derive(Debug)] -pub struct BaoFileHandleInner { - pub(crate) storage: RwLock, +pub struct BaoFileHandleInner { + pub(crate) storage: RwLock>, config: Arc, hash: Hash, } /// A cheaply cloneable handle to a bao file, including the hash and the configuration. -#[derive(Debug, Clone, derive_more::Deref)] -pub struct BaoFileHandle(Arc); +#[derive(Debug)] +pub struct BaoFileHandle(Arc>); + +impl Deref for BaoFileHandle { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Clone for BaoFileHandle { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} pub(crate) type CreateCb = Arc io::Result<()> + Send + Sync>; @@ -375,13 +406,18 @@ impl BaoFileConfig { /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(Option); +pub struct DataReader(Option>); -async fn with_storage(opt: &mut Option, no_io: P, f: F) -> io::Result +async fn with_storage( + opt: &mut Option>, + no_io: P, + f: F, +) -> io::Result where - P: Fn(&BaoFileStorage) -> bool + Send + 'static, - F: FnOnce(&BaoFileStorage) -> io::Result + Send + 'static, + P: Fn(&BaoFileStorage) -> bool + Send + 'static, + F: FnOnce(&BaoFileStorage) -> io::Result + Send + 'static, T: Send + 'static, + H: Send + Sync + 'static, { let handle = opt .take() @@ -410,7 +446,10 @@ where res } -impl AsyncSliceReader for DataReader { +impl AsyncSliceReader for DataReader +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { with_storage( &mut self.0, @@ -440,9 +479,12 @@ impl AsyncSliceReader for DataReader { /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(Option); +pub struct OutboardReader(Option>); -impl AsyncSliceReader for OutboardReader { +impl AsyncSliceReader for OutboardReader +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { with_storage( &mut self.0, @@ -476,7 +518,10 @@ enum HandleChange { // later: size verified } -impl BaoFileHandle { +impl BaoFileHandle +where + T: bao_tree::io::sync::ReadAt, +{ /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. @@ -509,8 +554,8 @@ impl BaoFileHandle { pub fn new_complete( config: Arc, hash: Hash, - data: MemOrFile, - outboard: MemOrFile, + data: MemOrFile>, + outboard: MemOrFile>, ) -> Self { let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard }); Self(Arc::new(BaoFileHandleInner { @@ -525,7 +570,7 @@ impl BaoFileHandle { #[cfg(feature = "fs-store")] pub(crate) fn transform( &self, - f: impl FnOnce(BaoFileStorage) -> io::Result, + f: impl FnOnce(BaoFileStorage) -> io::Result>, ) -> io::Result<()> { let mut lock = self.storage.write().unwrap(); let storage = lock.take(); @@ -545,7 +590,7 @@ impl BaoFileHandle { /// /// Caution: this is a reader for the unvalidated data file. Reading this /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { + pub fn data_reader(&self) -> DataReader { DataReader(Some(self.clone())) } @@ -553,7 +598,7 @@ impl BaoFileHandle { /// /// The outboard file is used to validate the data file. It is not guaranteed /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { + pub fn outboard_reader(&self) -> OutboardReader { OutboardReader(Some(self.clone())) } @@ -567,7 +612,7 @@ impl BaoFileHandle { } /// The outboard for the file. - pub fn outboard(&self) -> io::Result> { + pub fn outboard(&self) -> io::Result>> { let root = self.hash.into(); let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); let outboard = self.outboard_reader(); @@ -584,7 +629,7 @@ impl BaoFileHandle { } /// Create a new writer from the handle. - pub fn writer(&self) -> BaoFileWriter { + pub fn writer(&self) -> BaoFileWriter { BaoFileWriter(Some(self.clone())) } @@ -625,7 +670,7 @@ impl BaoFileHandle { } /// Downgrade to a weak reference. - pub fn downgrade(&self) -> BaoFileHandleWeak { + pub fn downgrade(&self) -> BaoFileHandleWeak { BaoFileHandleWeak(Arc::downgrade(&self.0)) } } @@ -676,9 +721,12 @@ impl MutableMemStorage { /// It is a BaoFileHandle wrapped in an Option, so that we can take it out /// in the future. #[derive(Debug)] -pub struct BaoFileWriter(Option); +pub struct BaoFileWriter(Option>); -impl BaoBatchWriter for BaoFileWriter { +impl BaoBatchWriter for BaoFileWriter +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn write_batch(&mut self, size: u64, batch: Vec) -> std::io::Result<()> { let Some(handle) = self.0.take() else { return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy")); @@ -828,7 +876,11 @@ pub mod test_support { (outboard.root.into(), chunk_ranges, encoded) } - pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range]) { + pub async fn validate( + handle: &BaoFileHandle, + original: &[u8], + ranges: &[Range], + ) { let mut r = handle.data_reader(); for range in ranges { let start = range.start; diff --git a/src/store/fs.rs b/src/store/fs.rs index 3282046af..097cb81bb 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -449,7 +449,7 @@ pub trait Persistence { /// the type which represents a file which was read from the persistence /// layer - type File; + type File: std::io::Read; /// return the size of the file in bytes if it can be found/read /// otherwise return a [Self::Err] @@ -473,7 +473,7 @@ pub struct FileSystemPersistence; impl Persistence for FileSystemPersistence { type Err = io::Error; - type File = tokio::fs::File; + type File = std::fs::File; async fn size(&self, path: &Path) -> Result { tokio::fs::metadata(path).await.map(|m| m.len()) @@ -487,8 +487,9 @@ impl Persistence for FileSystemPersistence { tokio::fs::create_dir_all(path) } - fn open(&self, path: &Path) -> impl Future> { - tokio::fs::File::open(path) + async fn open(&self, path: &Path) -> Result { + let file = tokio::fs::File::open(path).await?; + Ok(file.into_std().await) } } @@ -1191,7 +1192,7 @@ where MemOrFile::File(path) => { let span = trace_span!("outboard.compute", path = %path.display()); let _guard = span.enter(); - let file = std::fs::File::open(path)?; + let file = Handle::current().block_on(self.fs.open(path))?; compute_outboard(file, data_size, move |offset| { Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) })? @@ -1805,7 +1806,7 @@ where data_location, outboard_location, } => { - let data = load_data(tables, &self.options.path, data_location, &hash)?; + let data = load_data(tables, &self.options.path, data_location, &hash, &self.fs)?; let outboard = load_outboard( tables, &self.options.path, @@ -2049,7 +2050,8 @@ where outboard_location, .. } => { - let data = load_data(tables, &self.options.path, data_location, &hash)?; + let data = + load_data(tables, &self.options.path, data_location, &hash, &self.fs)?; let outboard = load_outboard( tables, &self.options.path, @@ -2601,12 +2603,16 @@ fn dump(tables: &impl ReadableTables) -> ActorResult<()> { Ok(()) } -fn load_data( +fn load_data( tables: &impl ReadableTables, options: &PathOptions, location: DataLocation<(), u64>, hash: &Hash, -) -> ActorResult> { + fs: &T, +) -> ActorResult> +where + T: Persistence, +{ Ok(match location { DataLocation::Inline(()) => { let Some(data) = tables.inline_data().get(hash)? else { @@ -2619,7 +2625,7 @@ fn load_data( } DataLocation::Owned(data_size) => { let path = options.owned_data_path(hash); - let Ok(file) = std::fs::File::open(&path) else { + let Ok(file) = Handle::current().block_on(fs.open(&path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {}", path.display()), diff --git a/src/util.rs b/src/util.rs index fcf3115bf..8ec4e48b6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -19,7 +19,7 @@ pub mod fs; pub mod io; mod mem_or_file; pub mod progress; -pub use mem_or_file::MemOrFile; +pub use mem_or_file::{FileAndSize, MemOrFile}; mod sparse_mem_file; pub use sparse_mem_file::SparseMemFile; pub mod local_pool; diff --git a/src/util/mem_or_file.rs b/src/util/mem_or_file.rs index d929a19c9..e12a7c28b 100644 --- a/src/util/mem_or_file.rs +++ b/src/util/mem_or_file.rs @@ -14,9 +14,19 @@ pub enum MemOrFile { File(F), } +/// A struct which represents a handle to some file which +/// is _not_ in memory and its size +#[derive(derive_more::Debug)] +pub struct FileAndSize { + /// the generic file type + pub file: T, + /// the size in bytes of the file + pub size: u64, +} + /// Helper methods for a common way to use MemOrFile, where the memory part is something /// like a slice, and the file part is a tuple consisiting of path or file and size. -impl MemOrFile +impl MemOrFile> where M: AsRef<[u8]>, { @@ -24,7 +34,7 @@ where pub fn size(&self) -> u64 { match self { MemOrFile::Mem(mem) => mem.as_ref().len() as u64, - MemOrFile::File((_, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } } From 4bbf0de1b591f116ff9e9877ca35240f0a919e29 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Sun, 27 Apr 2025 12:18:47 -0400 Subject: [PATCH 06/15] continue to make file generic in store --- src/store/fs.rs | 65 +++++++++++++++++++++--------------- src/store/fs/test_support.rs | 14 +++++--- src/store/fs/validate.rs | 7 ++-- 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 097cb81bb..5d5e11ea3 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -512,9 +512,9 @@ impl ImportSource { } /// Use BaoFileHandle as the entry type for the map. -pub type Entry = BaoFileHandle; +pub type Entry = BaoFileHandle; -impl super::MapEntry for Entry { +impl super::MapEntry for Entry { fn hash(&self) -> Hash { self.hash() } @@ -530,7 +530,7 @@ impl super::MapEntry for Entry { } async fn outboard(&self) -> io::Result { - self.outboard() + BaoFileHandle::outboard(self) } async fn data_reader(&self) -> io::Result { @@ -538,7 +538,7 @@ impl super::MapEntry for Entry { } } -impl super::MapEntryMut for Entry { +impl super::MapEntryMut for Entry { async fn batch_writer(&self) -> io::Result { Ok(self.writer()) } @@ -572,12 +572,12 @@ pub(crate) struct Export { } #[derive(derive_more::Debug)] -pub(crate) enum ActorMessage { +pub(crate) enum ActorMessage { // Query method: get a file handle for a hash, if it exists. // This will produce a file handle even for entries that are not yet in redb at all. Get { hash: Hash, - tx: oneshot::Sender>>, + tx: oneshot::Sender>>>, }, /// Query method: get the rough entry status for a hash. Just complete, partial or not found. EntryStatus { @@ -609,7 +609,7 @@ pub(crate) enum ActorMessage { /// will be created, but not yet written to redb. GetOrCreate { hash: Hash, - tx: oneshot::Sender>, + tx: oneshot::Sender>>, }, /// Modification method: inline size was exceeded for a partial entry. /// If the entry is complete, this is a no-op. If the entry is partial and in @@ -617,7 +617,7 @@ pub(crate) enum ActorMessage { OnMemSizeExceeded { hash: Hash }, /// Modification method: marks a partial entry as complete. /// Calling this on a complete entry is a no-op. - OnComplete { handle: BaoFileHandle }, + OnComplete { handle: BaoFileHandle }, /// Modification method: import data into a redb store /// /// At this point the size, hash and outboard must already be known. @@ -718,7 +718,7 @@ pub(crate) enum ActorMessage { Shutdown { tx: Option> }, } -impl ActorMessage { +impl ActorMessage { fn category(&self) -> MessageCategory { match self { Self::Get { .. } @@ -810,8 +810,8 @@ impl Store { } #[derive(Debug)] -struct StoreInner { - tx: async_channel::Sender, +struct StoreInner { + tx: async_channel::Sender>, temp: Arc>, handle: Option>, path_options: Arc, @@ -1019,7 +1019,7 @@ where Ok(rx.recv()??) } - async fn complete(&self, entry: Entry) -> OuterResult<()> { + async fn complete(&self, entry: Entry) -> OuterResult<()> { self.tx .send(ActorMessage::OnComplete { handle: entry }) .await?; @@ -1245,11 +1245,11 @@ impl Drop for StoreInner { } } -struct ActorState { - handles: BTreeMap, +struct ActorState { + handles: BTreeMap>, protected: BTreeSet, temp: Arc>, - msgs_rx: async_channel::Receiver, + msgs_rx: async_channel::Receiver>, create_options: Arc, options: Options, rt: tokio::runtime::Handle, @@ -1323,8 +1323,8 @@ pub(crate) enum OuterError { JoinTask(#[from] tokio::task::JoinError), } -impl From> for OuterError { - fn from(_e: async_channel::SendError) -> Self { +impl From>> for OuterError { + fn from(_e: async_channel::SendError>) -> Self { OuterError::Send } } @@ -1616,14 +1616,17 @@ pub(super) async fn gc_sweep_task( Ok(()) } -impl Actor { +impl Actor +where + T: Persistence, +{ fn new_with_backend( path: &Path, options: Options, temp: Arc>, rt: tokio::runtime::Handle, fs: T, - ) -> ActorResult<(Self, async_channel::Sender)> { + ) -> ActorResult<(Self, async_channel::Sender>)> { let db = match redb::Database::create(path) { Ok(db) => db, Err(DatabaseError::UpgradeRequired(1)) => { @@ -1790,7 +1793,7 @@ where &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult> { + ) -> ActorResult>> { if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) { return Ok(Some(handle)); } @@ -2036,7 +2039,7 @@ where &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult { + ) -> ActorResult> { self.protected.insert(hash); if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) { return Ok(handle); @@ -2347,7 +2350,11 @@ where Ok(()) } - fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> { + fn on_complete( + &mut self, + tables: &mut Tables, + entry: BaoFileHandle, + ) -> ActorResult<()> { let hash = entry.hash(); let mut info = None; tracing::trace!("on_complete({})", hash.to_hex()); @@ -2417,7 +2424,11 @@ where Ok(()) } - fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> { + fn handle_toplevel( + &mut self, + db: &redb::Database, + msg: ActorMessage, + ) -> ActorResult<()> { match msg { ActorMessage::UpdateInlineOptions { inline_options, @@ -2451,8 +2462,8 @@ where fn handle_readonly( &mut self, tables: &impl ReadableTables, - msg: ActorMessage, - ) -> ActorResult> { + msg: ActorMessage, + ) -> ActorResult>> { match msg { ActorMessage::Get { hash, tx } => { let res = self.get(tables, hash); @@ -2498,8 +2509,8 @@ where fn handle_readwrite( &mut self, tables: &mut Tables, - msg: ActorMessage, - ) -> ActorResult> { + msg: ActorMessage, + ) -> ActorResult>> { match msg { ActorMessage::Import { cmd, tx } => { let res = self.import(tables, cmd); diff --git a/src/store/fs/test_support.rs b/src/store/fs/test_support.rs index f8087e493..8cdb030a9 100644 --- a/src/store/fs/test_support.rs +++ b/src/store/fs/test_support.rs @@ -12,7 +12,7 @@ use redb::ReadableTable; use super::{ tables::{ReadableTables, Tables}, ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate, - OutboardLocation, OuterResult, Store, StoreInner, + OutboardLocation, OuterResult, Persistence, Store, StoreInner, }; use crate::{ store::{mutable_mem_storage::SizeInfo, DbIter}, @@ -102,7 +102,10 @@ impl Store { } } -impl StoreInner { +impl StoreInner +where + T: Persistence, +{ #[cfg(test)] async fn entry_state(&self, hash: Hash) -> OuterResult { let (tx, rx) = oneshot::channel(); @@ -146,11 +149,14 @@ impl StoreInner { #[cfg(test)] #[derive(Debug)] pub(crate) struct EntryStateResponse { - pub mem: Option, + pub mem: Option>, pub db: Option>>, } -impl ActorState { +impl ActorState +where + T: Persistence, +{ pub(super) fn get_full_entry_state( &mut self, tables: &impl ReadableTables, diff --git a/src/store/fs/validate.rs b/src/store/fs/validate.rs index b42916742..253afe377 100644 --- a/src/store/fs/validate.rs +++ b/src/store/fs/validate.rs @@ -5,14 +5,17 @@ use redb::ReadableTable; use super::{ raw_outboard_size, tables::Tables, ActorResult, ActorState, DataLocation, EntryState, Hash, - OutboardLocation, + OutboardLocation, Persistence, }; use crate::{ store::{fs::tables::BaoFilePart, ConsistencyCheckProgress, ReportLevel}, util::progress::BoxedProgressSender, }; -impl ActorState { +impl ActorState +where + T: Persistence, +{ //! This performs a full consistency check. Eventually it will also validate //! file content again, but that part is not yet implemented. //! From 184f9803d05b441a3066dc0c3138def88516406e Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Sun, 27 Apr 2025 14:36:07 -0400 Subject: [PATCH 07/15] add method to convert from std::fs::File --- src/store/bao_file.rs | 5 +- src/store/fs.rs | 173 ++++++++++++++++++++++++++--------- src/store/fs/test_support.rs | 15 +-- src/util/mem_or_file.rs | 33 +++++++ 4 files changed, 175 insertions(+), 51 deletions(-) diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index 65b28e27f..a20bfd2c1 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -82,13 +82,12 @@ struct DataPaths { /// For the memory variant, it does reading in a zero copy way, since storage /// is already a `Bytes`. #[derive(derive_more::Debug)] -#[debug(bound(T: Debug))] pub struct CompleteStorage { /// data part, which can be in memory or on disk. - #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))] + #[debug("{:?}", data.as_ref().map_mem(|x| x.len()).map_file(|f| f.size))] pub data: MemOrFile>, /// outboard part, which can be in memory or on disk. - #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))] + #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()).map_file(|f| f.size))] pub outboard: MemOrFile>, } diff --git a/src/store/fs.rs b/src/store/fs.rs index 5d5e11ea3..ddad06071 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -117,7 +117,7 @@ use crate::{ BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender, }, - raw_outboard_size, MemOrFile, TagCounter, TagDrop, + raw_outboard_size, FileAndSize, MemOrFile, TagCounter, TagDrop, }, BlobFormat, Hash, HashAndFormat, Tag, TempTag, }; @@ -443,13 +443,13 @@ pub(crate) enum ImportSource { /// trait which defines the backend persistence layer /// for this store. e.g. filesystem, s3 etc -pub trait Persistence { +pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { /// the error type that is returned for the persistence layer - type Err; + type Err: Into; /// the type which represents a file which was read from the persistence /// layer - type File: std::io::Read; + type File: IrohFile + std::io::Read + std::fmt::Debug; /// return the size of the file in bytes if it can be found/read /// otherwise return a [Self::Err] @@ -465,6 +465,14 @@ pub trait Persistence { /// read and return the file at the input path fn open(&self, path: &Path) -> impl Future>; + + /// convert from a [std::fs::File] into this persistence layer [Self::File] type. + /// This is called when converting from a partial file (which exists on disk) + /// into a complete file (which exists where ever your implementation wants it to) + fn convert_std_file( + &self, + file: std::fs::File, + ) -> impl Future>; } /// A persistence layer that writes to the local file system @@ -491,6 +499,10 @@ impl Persistence for FileSystemPersistence { let file = tokio::fs::File::open(path).await?; Ok(file.into_std().await) } + + async fn convert_std_file(&self, file: std::fs::File) -> Result { + Ok(file) + } } impl ImportSource { @@ -514,7 +526,11 @@ impl ImportSource { /// Use BaoFileHandle as the entry type for the map. pub type Entry = BaoFileHandle; -impl super::MapEntry for Entry { +/// a trait which defines the interface which any [Persistence::File] type must adhere to +pub trait IrohFile: bao_tree::io::sync::ReadAt + Send + Sync + 'static {} +impl IrohFile for T where T: bao_tree::io::sync::ReadAt + Send + Sync + 'static {} + +impl super::MapEntry for Entry { fn hash(&self) -> Hash { self.hash() } @@ -538,7 +554,7 @@ impl super::MapEntry for Entry { } } -impl super::MapEntryMut for Entry { +impl super::MapEntryMut for Entry { async fn batch_writer(&self) -> io::Result { Ok(self.writer()) } @@ -589,7 +605,7 @@ pub(crate) enum ActorMessage { /// This is everything we got about the entry, including the actual inline outboard and data. EntryState { hash: Hash, - tx: oneshot::Sender>, + tx: oneshot::Sender>>, }, /// Query method: get the full entry state for a hash. GetFullEntryState { @@ -763,7 +779,7 @@ pub(crate) type FilterPredicate = /// Storage that is using a redb database for small files and files for /// large files. #[derive(Debug, Clone)] -pub struct Store(Arc>); +pub struct Store(Arc>); impl Store { /// Load or create a new store. @@ -864,7 +880,7 @@ where ); rt.block_on(fs.create_dir_all(path.parent().unwrap()))?; let temp: Arc> = Default::default(); - let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?; + let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone(), fs.clone())?; let handle = std::thread::Builder::new() .name("redb-actor".to_string()) .spawn(move || { @@ -884,13 +900,13 @@ where }) } - pub async fn get(&self, hash: Hash) -> OuterResult> { + pub async fn get(&self, hash: Hash) -> OuterResult>> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::Get { hash, tx }).await?; Ok(rx.await??) } - async fn get_or_create(&self, hash: Hash) -> OuterResult { + async fn get_or_create(&self, hash: Hash) -> OuterResult> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?; Ok(rx.await??) @@ -1234,7 +1250,7 @@ where } } -impl Drop for StoreInner { +impl Drop for StoreInner { fn drop(&mut self) { if let Some(handle) = self.handle.take() { self.tx @@ -1260,7 +1276,7 @@ struct ActorState { /// /// It is split into the database and the rest of the state to allow for split /// borrows in the message handlers. -struct Actor { +struct Actor { db: redb::Database, state: ActorState, } @@ -1349,16 +1365,26 @@ impl From for io::Error { } } -impl super::Map for Store { - type Entry = Entry; +impl super::Map for Store +where + T: Persistence, + OuterError: From, + io::Error: From, +{ + type Entry = Entry; async fn get(&self, hash: &Hash) -> io::Result> { Ok(self.0.get(*hash).await?) } } -impl super::MapMut for Store { - type EntryMut = Entry; +impl super::MapMut for Store +where + T: Persistence, + OuterError: From, + io::Error: From, +{ + type EntryMut = Entry; async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result { Ok(self.0.get_or_create(hash).await?) @@ -1677,14 +1703,18 @@ where } } -impl Actor { +impl Actor +where + T: Persistence, +{ fn new( path: &Path, options: Options, temp: Arc>, rt: tokio::runtime::Handle, - ) -> ActorResult<(Self, async_channel::Sender)> { - Self::new_with_backend(path, options, temp, rt, FileSystemPersistence) + fs: T, + ) -> ActorResult<(Self, async_channel::Sender>)> { + Self::new_with_backend(path, options, temp, rt, fs) } async fn run_batched(mut self) -> ActorResult<()> { @@ -1772,7 +1802,6 @@ impl Actor { impl ActorState where T: Persistence, - ActorError: From, { fn entry_status( &mut self, @@ -1816,6 +1845,7 @@ where outboard_location, data.size(), &hash, + &self.fs, )?; BaoFileHandle::new_complete(config, hash, data, outboard) } @@ -1964,8 +1994,11 @@ where "reading external data to inline it: {}", external_path.display() ); - let data = - Bytes::from(Handle::current().block_on(self.fs.read(&external_path))?); + let data = Bytes::from( + Handle::current() + .block_on(self.fs.read(&external_path)) + .map_err(|e| ActorError::Io(e.into()))?, + ); DataLocation::Inline(data) } else { DataLocation::External(vec![external_path], data_size) @@ -2061,6 +2094,7 @@ where outboard_location, data.size(), &hash, + &self.fs, )?; tracing::debug!("creating complete entry for {}", hash.to_hex()); BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard) @@ -2219,7 +2253,9 @@ where // inline if size <= self.options.inline.max_data_inlined { let path = self.options.path.owned_data_path(&hash); - let data = Handle::current().block_on(self.fs.read(&path))?; + let data = Handle::current() + .block_on(self.fs.read(&path)) + .map_err(|e| ActorError::Io(e.into()))?; tables.delete_after_commit.insert(hash, [BaoFilePart::Data]); tables.inline_data.insert(hash, data.as_slice())?; (DataLocation::Inline(()), size, true) @@ -2254,7 +2290,9 @@ where if outboard_size <= self.options.inline.max_outboard_inlined => { let path = self.options.path.owned_outboard_path(&hash); - let outboard = Handle::current().block_on(self.fs.read(&path))?; + let outboard = Handle::current() + .block_on(self.fs.read(&path)) + .map_err(|e| ActorError::Io(e.into()))?; tables .delete_after_commit .insert(hash, [BaoFilePart::Outboard]); @@ -2360,13 +2398,14 @@ where tracing::trace!("on_complete({})", hash.to_hex()); entry.transform(|state| { tracing::trace!("on_complete transform {:?}", state); - let entry = match complete_storage( + let entry = match Handle::current().block_on(complete_storage( state, &hash, &self.options.path, &self.options.inline, tables.delete_after_commit, - )? { + &self.fs, + ))? { Ok(entry) => { // store the info so we can insert it into the db later info = Some(( @@ -2620,7 +2659,7 @@ fn load_data( location: DataLocation<(), u64>, hash: &Hash, fs: &T, -) -> ActorResult> +) -> ActorResult>> where T: Persistence, { @@ -2643,7 +2682,10 @@ where ) .into()); }; - MemOrFile::File((file, data_size)) + MemOrFile::File(FileAndSize { + file, + size: data_size, + }) } DataLocation::External(paths, data_size) => { if paths.is_empty() { @@ -2652,25 +2694,29 @@ where )); } let path = &paths[0]; - let Ok(file) = std::fs::File::open(path) else { + let Ok(file) = Handle::current().block_on(fs.open(path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("external file not found: {}", path.display()), ) .into()); }; - MemOrFile::File((file, data_size)) + MemOrFile::File(FileAndSize { + file, + size: data_size, + }) } }) } -fn load_outboard( +fn load_outboard( tables: &impl ReadableTables, options: &PathOptions, location: OutboardLocation, size: u64, hash: &Hash, -) -> ActorResult> { + fs: &T, +) -> ActorResult>> { Ok(match location { OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()), OutboardLocation::Inline(_) => { @@ -2685,26 +2731,33 @@ fn load_outboard( OutboardLocation::Owned => { let outboard_size = raw_outboard_size(size); let path = options.owned_outboard_path(hash); - let Ok(file) = std::fs::File::open(&path) else { + let Ok(file) = Handle::current().block_on(fs.open(&path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {} size={}", path.display(), outboard_size), ) .into()); }; - MemOrFile::File((file, outboard_size)) + MemOrFile::File(FileAndSize { + file, + size: outboard_size, + }) } }) } /// Take a possibly incomplete storage and turn it into complete -fn complete_storage( - storage: BaoFileStorage, +async fn complete_storage( + storage: BaoFileStorage, hash: &Hash, path_options: &PathOptions, inline_options: &InlineOptions, delete_after_commit: &mut DeleteSet, -) -> ActorResult> { + fs: &T, +) -> ActorResult, CompleteStorage>> +where + T: Persistence, +{ let (data, outboard, _sizes) = match storage { BaoFileStorage::Complete(c) => return Ok(Err(c)), BaoFileStorage::IncompleteMem(storage) => { @@ -2747,9 +2800,27 @@ fn complete_storage( MemOrFile::Mem(data) => { let path = path_options.owned_data_path(hash); let file = overwrite_and_sync(&path, &data)?; - MemOrFile::File((file, data_size)) + MemOrFile::File( + FileAndSize { + file, + size: data_size, + } + .map_async(|f| fs.convert_std_file(f)) + .await + .transpose() + .map_err(|e| ActorError::Io(e.into()))?, + ) } - MemOrFile::File(data) => MemOrFile::File((data, data_size)), + MemOrFile::File(data) => MemOrFile::File( + FileAndSize { + file: data, + size: data_size, + } + .map_async(|f| fs.convert_std_file(f)) + .await + .transpose() + .map_err(|e| ActorError::Io(e.into()))?, + ), } }; // inline outboard if needed, or write to file if needed @@ -2774,9 +2845,27 @@ fn complete_storage( MemOrFile::Mem(outboard) => { let path = path_options.owned_outboard_path(hash); let file = overwrite_and_sync(&path, &outboard)?; - MemOrFile::File((file, outboard_size)) + MemOrFile::File( + FileAndSize { + file, + size: outboard_size, + } + .map_async(|f| fs.convert_std_file(f)) + .await + .transpose() + .map_err(|e| ActorError::Io(e.into()))?, + ) } - MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)), + MemOrFile::File(outboard) => MemOrFile::File( + FileAndSize { + file: outboard, + size: outboard_size, + } + .map_async(|f| fs.convert_std_file(f)) + .await + .transpose() + .map_err(|e| ActorError::Io(e.into()))?, + ), } }; // mark sizes for deletion after commit in any case - a complete entry diff --git a/src/store/fs/test_support.rs b/src/store/fs/test_support.rs index 8cdb030a9..07d718a52 100644 --- a/src/store/fs/test_support.rs +++ b/src/store/fs/test_support.rs @@ -46,10 +46,13 @@ pub enum EntryData { }, } -impl Store { +impl Store +where + T: Persistence, +{ /// Get the complete state of an entry, both in memory and in redb. #[cfg(test)] - pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result { + pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result> { Ok(self.0.entry_state(hash).await?) } @@ -107,7 +110,7 @@ where T: Persistence, { #[cfg(test)] - async fn entry_state(&self, hash: Hash) -> OuterResult { + async fn entry_state(&self, hash: Hash) -> OuterResult> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::EntryState { hash, tx }).await?; Ok(rx.await??) @@ -148,8 +151,8 @@ where #[cfg(test)] #[derive(Debug)] -pub(crate) struct EntryStateResponse { - pub mem: Option>, +pub(crate) struct EntryStateResponse { + pub mem: Option>, pub db: Option>>, } @@ -303,7 +306,7 @@ where &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult { + ) -> ActorResult> { let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade()); let db = match tables.blobs().get(hash)? { Some(entry) => Some({ diff --git a/src/util/mem_or_file.rs b/src/util/mem_or_file.rs index e12a7c28b..5b32cf990 100644 --- a/src/util/mem_or_file.rs +++ b/src/util/mem_or_file.rs @@ -24,6 +24,39 @@ pub struct FileAndSize { pub size: u64, } +impl FileAndSize { + /// map the type of file asynchronously. + /// This is analogous to [Option::map] + pub fn map_async( + self, + f: F, + ) -> impl Future> + use + 'static + where + F: FnOnce(T) -> U + Send + 'static, + T: 'static, + U: Future + Send + 'static, + U::Output: Send + 'static, + { + let FileAndSize { file, size } = self; + FileAndSize { + file: f(file).await, + size, + } + } +} + +impl FileAndSize> { + /// factor out the error from inside the [FileAndSize] + /// this is analogous to [Option::transpose] + pub fn transpose(self) -> Result, U> { + let FileAndSize { file, size } = self; + match file { + Ok(t) => Ok(FileAndSize { file: t, size }), + Err(e) => Err(e), + } + } +} + /// Helper methods for a common way to use MemOrFile, where the memory part is something /// like a slice, and the file part is a tuple consisiting of path or file and size. impl MemOrFile> From 98e89921821e73f7331e12bb4c5047619fb880a1 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Sun, 27 Apr 2025 16:04:35 -0400 Subject: [PATCH 08/15] add block for --- src/store/fs.rs | 195 +++++++++++++++++++++++++--------------- src/util/mem_or_file.rs | 10 ++- 2 files changed, 129 insertions(+), 76 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index ddad06071..fe8a7f027 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -84,7 +84,7 @@ use iroh_io::AsyncSliceReader; use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use tokio::{io::AsyncWriteExt, runtime::Handle}; +use tokio::io::AsyncWriteExt; use tracing::trace_span; mod tables; #[doc(hidden)] @@ -445,7 +445,7 @@ pub(crate) enum ImportSource { /// for this store. e.g. filesystem, s3 etc pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { /// the error type that is returned for the persistence layer - type Err: Into; + type Err: Into + Send + 'static; /// the type which represents a file which was read from the persistence /// layer @@ -453,26 +453,35 @@ pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { /// return the size of the file in bytes if it can be found/read /// otherwise return a [Self::Err] - fn size(&self, path: &Path) -> impl Future>; + fn size(&self, path: &Path) -> impl Future> + Send + 'static; /// read the contents of the file at the path /// returning the bytes of the file in the success case /// and [Self::Err] in the error case - fn read(&self, path: &Path) -> impl Future, Self::Err>>; + fn read( + &self, + path: &Path, + ) -> impl Future, Self::Err>> + Send + 'static; /// recursively ensure that the input path exists - fn create_dir_all(&self, path: &Path) -> impl Future>; + fn create_dir_all( + &self, + path: &Path, + ) -> impl Future> + Send + 'static; /// read and return the file at the input path - fn open(&self, path: &Path) -> impl Future>; + fn open<'a>( + &'a self, + path: &'a Path, + ) -> impl Future> + Send + 'static; /// convert from a [std::fs::File] into this persistence layer [Self::File] type. /// This is called when converting from a partial file (which exists on disk) /// into a complete file (which exists where ever your implementation wants it to) fn convert_std_file( - &self, + self: Arc, file: std::fs::File, - ) -> impl Future>; + ) -> impl Future> + Send + 'static; } /// A persistence layer that writes to the local file system @@ -483,24 +492,31 @@ impl Persistence for FileSystemPersistence { type Err = io::Error; type File = std::fs::File; - async fn size(&self, path: &Path) -> Result { - tokio::fs::metadata(path).await.map(|m| m.len()) + fn size(&self, path: &Path) -> impl Future> + 'static { + let fut = tokio::fs::metadata(path.to_owned()); + async move { fut.await.map(|m| m.len()) } } - fn read(&self, path: &Path) -> impl Future, Self::Err>> { - tokio::fs::read(path) + fn read(&self, path: &Path) -> impl Future, Self::Err>> + 'static { + tokio::fs::read(path.to_owned()) } - fn create_dir_all(&self, path: &Path) -> impl Future> { - tokio::fs::create_dir_all(path) + fn create_dir_all(&self, path: &Path) -> impl Future> + 'static { + tokio::fs::create_dir_all(path.to_owned()) } - async fn open(&self, path: &Path) -> Result { - let file = tokio::fs::File::open(path).await?; - Ok(file.into_std().await) + fn open(&self, path: &Path) -> impl Future> + 'static { + let fut = tokio::fs::File::open(path.to_owned()); + async move { + let file = fut.await?; + Ok(file.into_std().await) + } } - async fn convert_std_file(&self, file: std::fs::File) -> Result { + async fn convert_std_file( + self: Arc, + file: std::fs::File, + ) -> Result { Ok(file) } } @@ -514,11 +530,29 @@ impl ImportSource { } } - async fn len(&self, fs: &T) -> Result { - match self { - Self::TempFile(path) => fs.size(path).await, - Self::External(path) => fs.size(path).await, - Self::Memory(data) => Ok(data.len() as u64), + fn len<'a, T: Persistence>( + &'a self, + fs: &'a T, + ) -> impl Future> + 'static { + enum Either { + Left(u64), + Right(T), + } + + let output = match self { + Self::TempFile(path) | Self::External(path) => { + let fut: std::pin::Pin< + Box::Err>> + Send + 'static>, + > = Box::pin(fs.size(path)); + Either::Right(fut) + } + Self::Memory(data) => Either::Left(data.len() as u64), + }; + async move { + match output { + Either::Left(size) => Ok(size), + Either::Right(fut) => fut.await, + } } } } @@ -1071,7 +1105,7 @@ where .into(), ) })?; - Handle::current().block_on(self.fs.create_dir_all(parent))?; + block_for(self.fs.create_dir_all(parent))?; let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx @@ -1158,11 +1192,11 @@ where let file = match mode { ImportMode::TryReference => ImportSource::External(path), ImportMode::Copy => { - if Handle::current().block_on(self.fs.size(&path))? < 16 * 1024 { + if block_for(self.fs.size(&path))? < 16 * 1024 { // we don't know if the data will be inlined since we don't // have the inline options here. But still for such a small file // it does not seem worth it do to the temp file ceremony. - let data = Handle::current().block_on(self.fs.read(&path))?; + let data = block_for(self.fs.read(&path))?; ImportSource::Memory(data.into()) } else { let temp_path = self.temp_file_name(); @@ -1197,7 +1231,7 @@ where id: u64, progress: impl ProgressSender + IdGenerator, ) -> OuterResult<(TempTag, u64)> { - let data_size = Handle::current().block_on(file.len(&self.fs))?; + let data_size = block_for(file.len(&self.fs))?; tracing::debug!("finalize_import_sync {:?} {}", file, data_size); progress.blocking_send(ImportProgress::Size { id, @@ -1208,7 +1242,7 @@ where MemOrFile::File(path) => { let span = trace_span!("outboard.compute", path = %path.display()); let _guard = span.enter(); - let file = Handle::current().block_on(self.fs.open(path))?; + let file = block_for(self.fs.open(path))?; compute_outboard(file, data_size, move |offset| { Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) })? @@ -1269,7 +1303,7 @@ struct ActorState { create_options: Arc, options: Options, rt: tokio::runtime::Handle, - fs: T, + fs: Arc, } /// The actor for the redb store. @@ -1695,7 +1729,7 @@ where options, create_options: Arc::new(create_options), rt, - fs, + fs: Arc::new(fs), }, }, tx, @@ -1838,14 +1872,14 @@ where data_location, outboard_location, } => { - let data = load_data(tables, &self.options.path, data_location, &hash, &self.fs)?; + let data = load_data(tables, &self.options.path, data_location, &hash, &*self.fs)?; let outboard = load_outboard( tables, &self.options.path, outboard_location, data.size(), &hash, - &self.fs, + &*self.fs, )?; BaoFileHandle::new_complete(config, hash, data, outboard) } @@ -1995,8 +2029,7 @@ where external_path.display() ); let data = Bytes::from( - Handle::current() - .block_on(self.fs.read(&external_path)) + block_for(self.fs.read(&external_path)) .map_err(|e| ActorError::Io(e.into()))?, ); DataLocation::Inline(data) @@ -2087,14 +2120,14 @@ where .. } => { let data = - load_data(tables, &self.options.path, data_location, &hash, &self.fs)?; + load_data(tables, &self.options.path, data_location, &hash, &*self.fs)?; let outboard = load_outboard( tables, &self.options.path, outboard_location, data.size(), &hash, - &self.fs, + &*self.fs, )?; tracing::debug!("creating complete entry for {}", hash.to_hex()); BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard) @@ -2253,8 +2286,7 @@ where // inline if size <= self.options.inline.max_data_inlined { let path = self.options.path.owned_data_path(&hash); - let data = Handle::current() - .block_on(self.fs.read(&path)) + let data = block_for(self.fs.read(&path)) .map_err(|e| ActorError::Io(e.into()))?; tables.delete_after_commit.insert(hash, [BaoFilePart::Data]); tables.inline_data.insert(hash, data.as_slice())?; @@ -2290,8 +2322,7 @@ where if outboard_size <= self.options.inline.max_outboard_inlined => { let path = self.options.path.owned_outboard_path(&hash); - let outboard = Handle::current() - .block_on(self.fs.read(&path)) + let outboard = block_for(self.fs.read(&path)) .map_err(|e| ActorError::Io(e.into()))?; tables .delete_after_commit @@ -2398,14 +2429,14 @@ where tracing::trace!("on_complete({})", hash.to_hex()); entry.transform(|state| { tracing::trace!("on_complete transform {:?}", state); - let entry = match Handle::current().block_on(complete_storage( + let entry = match complete_storage( state, &hash, &self.options.path, &self.options.inline, tables.delete_after_commit, - &self.fs, - ))? { + self.fs.clone(), + )? { Ok(entry) => { // store the info so we can insert it into the db later info = Some(( @@ -2653,6 +2684,21 @@ fn dump(tables: &impl ReadableTables) -> ActorResult<()> { Ok(()) } +fn block_for(fut: F) -> F::Output +where + F: Future + Send + 'static, + F::Output: Send, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let res = fut.await; + tx.send(res) + .map_err(|_| format!("Error sending {}", std::any::type_name::())) + .expect("rx cannot be dropped yet"); + }); + rx.blocking_recv().expect("The sender cannot be dropped") +} + fn load_data( tables: &impl ReadableTables, options: &PathOptions, @@ -2675,7 +2721,7 @@ where } DataLocation::Owned(data_size) => { let path = options.owned_data_path(hash); - let Ok(file) = Handle::current().block_on(fs.open(&path)) else { + let Ok(file) = block_for(fs.open(&path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {}", path.display()), @@ -2694,7 +2740,7 @@ where )); } let path = &paths[0]; - let Ok(file) = Handle::current().block_on(fs.open(path)) else { + let Ok(file) = block_for(fs.open(path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("external file not found: {}", path.display()), @@ -2731,7 +2777,7 @@ fn load_outboard( OutboardLocation::Owned => { let outboard_size = raw_outboard_size(size); let path = options.owned_outboard_path(hash); - let Ok(file) = Handle::current().block_on(fs.open(&path)) else { + let Ok(file) = block_for(fs.open(&path)) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {} size={}", path.display(), outboard_size), @@ -2747,13 +2793,13 @@ fn load_outboard( } /// Take a possibly incomplete storage and turn it into complete -async fn complete_storage( +fn complete_storage( storage: BaoFileStorage, hash: &Hash, path_options: &PathOptions, inline_options: &InlineOptions, delete_after_commit: &mut DeleteSet, - fs: &T, + fs: Arc, ) -> ActorResult, CompleteStorage>> where T: Persistence, @@ -2796,28 +2842,31 @@ where } else { // protect the data from previous deletions delete_after_commit.remove(*hash, [BaoFilePart::Data]); + let fs_2 = fs.clone(); match data { MemOrFile::Mem(data) => { let path = path_options.owned_data_path(hash); let file = overwrite_and_sync(&path, &data)?; MemOrFile::File( - FileAndSize { - file, - size: data_size, - } - .map_async(|f| fs.convert_std_file(f)) - .await + block_for( + FileAndSize { + file, + size: data_size, + } + .map_async(move |f| fs_2.convert_std_file(f)), + ) .transpose() .map_err(|e| ActorError::Io(e.into()))?, ) } MemOrFile::File(data) => MemOrFile::File( - FileAndSize { - file: data, - size: data_size, - } - .map_async(|f| fs.convert_std_file(f)) - .await + block_for( + FileAndSize { + file: data, + size: data_size, + } + .map_async(move |f| fs_2.convert_std_file(f)), + ) .transpose() .map_err(|e| ActorError::Io(e.into()))?, ), @@ -2846,23 +2895,25 @@ where let path = path_options.owned_outboard_path(hash); let file = overwrite_and_sync(&path, &outboard)?; MemOrFile::File( - FileAndSize { - file, - size: outboard_size, - } - .map_async(|f| fs.convert_std_file(f)) - .await + block_for( + FileAndSize { + file, + size: outboard_size, + } + .map_async(move |f| fs.convert_std_file(f)), + ) .transpose() .map_err(|e| ActorError::Io(e.into()))?, ) } MemOrFile::File(outboard) => MemOrFile::File( - FileAndSize { - file: outboard, - size: outboard_size, - } - .map_async(|f| fs.convert_std_file(f)) - .await + block_for( + FileAndSize { + file: outboard, + size: outboard_size, + } + .map_async(|f| fs.convert_std_file(f)), + ) .transpose() .map_err(|e| ActorError::Io(e.into()))?, ), diff --git a/src/util/mem_or_file.rs b/src/util/mem_or_file.rs index 5b32cf990..93aa27697 100644 --- a/src/util/mem_or_file.rs +++ b/src/util/mem_or_file.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io}; +use std::{fs::File, future::Future, io}; use bao_tree::io::sync::{ReadAt, Size}; use bytes::Bytes; @@ -38,9 +38,11 @@ impl FileAndSize { U::Output: Send + 'static, { let FileAndSize { file, size } = self; - FileAndSize { - file: f(file).await, - size, + async move { + FileAndSize { + file: f(file).await, + size, + } } } } From dacb7b42dd875ab2293a650610daa85893b26db5 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Mon, 28 Apr 2025 18:32:36 -0400 Subject: [PATCH 09/15] fix some panics --- src/store/fs.rs | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index fe8a7f027..856db3b27 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1778,7 +1778,7 @@ where tokio::select! { msg = msgs.recv() => { if let Some(msg) = msg { - if let Err(msg) = self.state.handle_readonly(&tables, msg)? { + if let Err(msg) = self.state.handle_readonly(&tables, msg).await? { msgs.push_back(msg).expect("just recv'd"); break; } @@ -1807,7 +1807,7 @@ where tokio::select! { msg = msgs.recv() => { if let Some(msg) = msg { - if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? { + if let Err(msg) = self.state.handle_readwrite(&mut tables, msg).await? { msgs.push_back(msg).expect("just recv'd"); break; } @@ -1852,7 +1852,7 @@ where Ok(status) } - fn get( + async fn get( &mut self, tables: &impl ReadableTables, hash: Hash, @@ -1872,7 +1872,8 @@ where data_location, outboard_location, } => { - let data = load_data(tables, &self.options.path, data_location, &hash, &*self.fs)?; + let data = + load_data(tables, &self.options.path, data_location, &hash, &*self.fs).await?; let outboard = load_outboard( tables, &self.options.path, @@ -1880,7 +1881,8 @@ where data.size(), &hash, &*self.fs, - )?; + ) + .await?; BaoFileHandle::new_complete(config, hash, data, outboard) } EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?, @@ -2101,7 +2103,7 @@ where Ok((tag, data_size)) } - fn get_or_create( + async fn get_or_create( &mut self, tables: &impl ReadableTables, hash: Hash, @@ -2120,7 +2122,8 @@ where .. } => { let data = - load_data(tables, &self.options.path, data_location, &hash, &*self.fs)?; + load_data(tables, &self.options.path, data_location, &hash, &*self.fs) + .await?; let outboard = load_outboard( tables, &self.options.path, @@ -2128,7 +2131,8 @@ where data.size(), &hash, &*self.fs, - )?; + ) + .await?; tracing::debug!("creating complete entry for {}", hash.to_hex()); BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard) } @@ -2529,18 +2533,18 @@ where Ok(()) } - fn handle_readonly( + async fn handle_readonly( &mut self, tables: &impl ReadableTables, msg: ActorMessage, ) -> ActorResult>> { match msg { ActorMessage::Get { hash, tx } => { - let res = self.get(tables, hash); + let res = self.get(tables, hash).await; tx.send(res).ok(); } ActorMessage::GetOrCreate { hash, tx } => { - let res = self.get_or_create(tables, hash); + let res = self.get_or_create(tables, hash).await; tx.send(res).ok(); } ActorMessage::EntryStatus { hash, tx } => { @@ -2576,9 +2580,9 @@ where Ok(Ok(())) } - fn handle_readwrite( + async fn handle_readwrite( &mut self, - tables: &mut Tables, + tables: &mut Tables<'_>, msg: ActorMessage, ) -> ActorResult>> { match msg { @@ -2631,7 +2635,7 @@ where } msg => { // try to handle it as readonly - if let Err(msg) = self.handle_readonly(tables, msg)? { + if let Err(msg) = self.handle_readonly(tables, msg).await? { return Ok(Err(msg)); } } @@ -2699,7 +2703,7 @@ where rx.blocking_recv().expect("The sender cannot be dropped") } -fn load_data( +async fn load_data( tables: &impl ReadableTables, options: &PathOptions, location: DataLocation<(), u64>, @@ -2721,7 +2725,7 @@ where } DataLocation::Owned(data_size) => { let path = options.owned_data_path(hash); - let Ok(file) = block_for(fs.open(&path)) else { + let Ok(file) = fs.open(&path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {}", path.display()), @@ -2740,7 +2744,7 @@ where )); } let path = &paths[0]; - let Ok(file) = block_for(fs.open(path)) else { + let Ok(file) = fs.open(path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("external file not found: {}", path.display()), @@ -2755,7 +2759,7 @@ where }) } -fn load_outboard( +async fn load_outboard( tables: &impl ReadableTables, options: &PathOptions, location: OutboardLocation, @@ -2777,7 +2781,7 @@ fn load_outboard( OutboardLocation::Owned => { let outboard_size = raw_outboard_size(size); let path = options.owned_outboard_path(hash); - let Ok(file) = block_for(fs.open(&path)) else { + let Ok(file) = fs.open(&path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {} size={}", path.display(), outboard_size), From e9901d94831fc2bb5bf4c858d5d2b614c413ae12 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Mon, 28 Apr 2025 20:34:46 -0400 Subject: [PATCH 10/15] fix all tests --- src/store/bao_file.rs | 52 ++++++++--------------- src/store/fs.rs | 95 ++++++++++++++++++++++++------------------- 2 files changed, 71 insertions(+), 76 deletions(-) diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index a20bfd2c1..fbffeac21 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -14,7 +14,7 @@ use std::{ io, ops::{Deref, DerefMut}, path::{Path, PathBuf}, - sync::{Arc, RwLock, Weak}, + sync::{Arc, Weak}, }; use bao_tree::{ @@ -343,7 +343,7 @@ impl BaoFileHandleWeak { /// The inner part of a bao file handle. #[derive(Debug)] pub struct BaoFileHandleInner { - pub(crate) storage: RwLock>, + pub(crate) storage: tokio::sync::RwLock>, config: Arc, hash: Hash, } @@ -432,15 +432,9 @@ where return res; } }; - // otherwise, we have to spawn a task. - let (handle, res) = tokio::task::spawn_blocking(move || { - let storage = handle.storage.read().unwrap(); - let res = f(storage.deref()); - drop(storage); - (handle, res) - }) - .await - .expect("spawn_blocking failed"); + let storage_guard = handle.storage.read().await; + let res = f(storage_guard.deref()); + drop(storage_guard); *opt = Some(handle); res } @@ -528,7 +522,7 @@ where pub fn incomplete_mem(config: Arc, hash: Hash) -> Self { let storage = BaoFileStorage::incomplete_mem(); Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), + storage: tokio::sync::RwLock::new(storage), config, hash, })) @@ -543,7 +537,7 @@ where sizes: create_read_write(&paths.sizes)?, }); Ok(Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), + storage: tokio::sync::RwLock::new(storage), config, hash, }))) @@ -558,7 +552,7 @@ where ) -> Self { let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard }); Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), + storage: tokio::sync::RwLock::new(storage), config, hash, })) @@ -567,20 +561,20 @@ where /// Transform the storage in place. If the transform fails, the storage will /// be an immutable empty storage. #[cfg(feature = "fs-store")] - pub(crate) fn transform( + pub(crate) async fn transform( &self, - f: impl FnOnce(BaoFileStorage) -> io::Result>, + f: impl std::ops::AsyncFnOnce(BaoFileStorage) -> io::Result>, ) -> io::Result<()> { - let mut lock = self.storage.write().unwrap(); + let mut lock = self.storage.write().await; let storage = lock.take(); - *lock = f(storage)?; + *lock = f(storage).await?; Ok(()) } /// True if the file is complete. pub fn is_complete(&self) -> bool { matches!( - self.storage.read().unwrap().deref(), + self.storage.try_read().unwrap().deref(), BaoFileStorage::Complete(_) ) } @@ -603,7 +597,7 @@ where /// The most precise known total size of the data file. pub fn current_size(&self) -> io::Result { - match self.storage.read().unwrap().deref() { + match self.storage.try_read().unwrap().deref() { BaoFileStorage::Complete(mem) => Ok(mem.data_size()), BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()), BaoFileStorage::IncompleteFile(file) => file.current_size(), @@ -633,8 +627,8 @@ where } /// This is the synchronous impl for writing a batch. - fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result { - let mut storage = self.storage.write().unwrap(); + async fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result { + let mut storage = self.storage.write().await; match storage.deref_mut() { BaoFileStorage::IncompleteMem(mem) => { // check if we need to switch to file mode, otherwise write to memory @@ -730,12 +724,7 @@ where let Some(handle) = self.0.take() else { return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy")); }; - let (handle, change) = tokio::task::spawn_blocking(move || { - let change = handle.write_batch(size, &batch); - (handle, change) - }) - .await - .expect("spawn_blocking failed"); + let change = handle.write_batch(size, &batch).await; match change? { HandleChange::None => {} HandleChange::MemToFile => { @@ -752,12 +741,7 @@ where let Some(handle) = self.0.take() else { return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy")); }; - let (handle, res) = tokio::task::spawn_blocking(move || { - let res = handle.storage.write().unwrap().sync_all(); - (handle, res) - }) - .await - .expect("spawn_blocking failed"); + let res = handle.storage.write().await.sync_all(); self.0 = Some(handle); res } diff --git a/src/store/fs.rs b/src/store/fs.rs index 856db3b27..d80bc2179 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1105,7 +1105,7 @@ where .into(), ) })?; - block_for(self.fs.create_dir_all(parent))?; + self.fs.create_dir_all(parent).await?; let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx @@ -2006,7 +2006,11 @@ where Ok(()) } - fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> { + async fn import( + &mut self, + tables: &mut Tables<'_>, + cmd: Import, + ) -> ActorResult<(TempTag, u64)> { let Import { content_id, source: file, @@ -2031,7 +2035,9 @@ where external_path.display() ); let data = Bytes::from( - block_for(self.fs.read(&external_path)) + self.fs + .read(&external_path) + .await .map_err(|e| ActorError::Io(e.into()))?, ); DataLocation::Inline(data) @@ -2423,41 +2429,47 @@ where Ok(()) } - fn on_complete( + async fn on_complete( &mut self, - tables: &mut Tables, + tables: &mut Tables<'_>, entry: BaoFileHandle, ) -> ActorResult<()> { let hash = entry.hash(); let mut info = None; tracing::trace!("on_complete({})", hash.to_hex()); - entry.transform(|state| { - tracing::trace!("on_complete transform {:?}", state); - let entry = match complete_storage( - state, - &hash, - &self.options.path, - &self.options.inline, - tables.delete_after_commit, - self.fs.clone(), - )? { - Ok(entry) => { - // store the info so we can insert it into the db later - info = Some(( - entry.data_size(), - entry.data.mem().cloned(), - entry.outboard_size(), - entry.outboard.mem().cloned(), - )); - entry - } - Err(entry) => { - // the entry was already complete, nothing to do - entry - } - }; - Ok(BaoFileStorage::Complete(entry)) - })?; + entry + // TODO: this errors on edition 2024, it should be changed to + // an async closure as they are now stable + .transform(|state| async { + tracing::trace!("on_complete transform {:?}", state); + let entry = match complete_storage( + state, + &hash, + &self.options.path, + &self.options.inline, + tables.delete_after_commit, + self.fs.clone(), + ) + .await? + { + Ok(entry) => { + // store the info so we can insert it into the db later + info = Some(( + entry.data_size(), + entry.data.mem().cloned(), + entry.outboard_size(), + entry.outboard.mem().cloned(), + )); + entry + } + Err(entry) => { + // the entry was already complete, nothing to do + entry + } + }; + Ok(BaoFileStorage::Complete(entry)) + }) + .await?; if let Some((data_size, data, outboard_size, outboard)) = info { let data_location = if data.is_some() { DataLocation::Inline(()) @@ -2587,7 +2599,7 @@ where ) -> ActorResult>> { match msg { ActorMessage::Import { cmd, tx } => { - let res = self.import(tables, cmd); + let res = self.import(tables, cmd).await; tx.send(res).ok(); } ActorMessage::SetTag { tag, value, tx } => { @@ -2615,7 +2627,7 @@ where tx.send(res).ok(); } ActorMessage::OnComplete { handle } => { - let res = self.on_complete(tables, handle); + let res = self.on_complete(tables, handle).await; res.ok(); } ActorMessage::Export { cmd, tx } => { @@ -2797,7 +2809,7 @@ async fn load_outboard( } /// Take a possibly incomplete storage and turn it into complete -fn complete_storage( +async fn complete_storage( storage: BaoFileStorage, hash: &Hash, path_options: &PathOptions, @@ -2864,13 +2876,12 @@ where ) } MemOrFile::File(data) => MemOrFile::File( - block_for( - FileAndSize { - file: data, - size: data_size, - } - .map_async(move |f| fs_2.convert_std_file(f)), - ) + FileAndSize { + file: data, + size: data_size, + } + .map_async(move |f| fs_2.convert_std_file(f)) + .await .transpose() .map_err(|e| ActorError::Io(e.into()))?, ), From f33f1e9def0255d3e6223a6e91266230041b2888 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Tue, 29 Apr 2025 22:28:12 -0400 Subject: [PATCH 11/15] Add callback lock --- src/util.rs | 1 + src/util/callback_lock.rs | 77 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 src/util/callback_lock.rs diff --git a/src/util.rs b/src/util.rs index 8ec4e48b6..c5c4c94a4 100644 --- a/src/util.rs +++ b/src/util.rs @@ -22,6 +22,7 @@ pub mod progress; pub use mem_or_file::{FileAndSize, MemOrFile}; mod sparse_mem_file; pub use sparse_mem_file::SparseMemFile; +pub mod callback_lock; pub mod local_pool; #[cfg(test)] diff --git a/src/util/callback_lock.rs b/src/util/callback_lock.rs new file mode 100644 index 000000000..41fba994a --- /dev/null +++ b/src/util/callback_lock.rs @@ -0,0 +1,77 @@ +//! This module defines a wrapper around a [`tokio::sync::RwLock`] that runs a callback +//! After any write operation occurs + +use std::future::Future; + +/// A wrapper over a [`tokio::sync::RwLock`] that executes a callback function after +/// the write guard is dropped +#[derive(derive_more::Debug)] +pub struct CallbackLock { + inner: tokio::sync::RwLock, + #[debug(skip)] + callback: F, +} + +/// the wrapper type over a [tokio::sync::RwLockWriteGuard] +#[derive(Debug)] +pub struct CallbackLockWriteGuard<'a, T, F: Fn(&T)> { + inner: tokio::sync::RwLockWriteGuard<'a, T>, + callback: &'a F, +} + +impl std::ops::Deref for CallbackLockWriteGuard<'_, T, F> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for CallbackLockWriteGuard<'_, T, F> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Drop for CallbackLockWriteGuard<'_, T, F> { + fn drop(&mut self) { + (self.callback)(&*self.inner); + } +} + +impl CallbackLock +where + F: Fn(&T), +{ + /// create a new instance of the lock from a value + /// and the callback to evaluate when a write guard is dropped + pub fn new(val: T, callback: F) -> Self { + CallbackLock { + inner: tokio::sync::RwLock::new(val), + callback, + } + } + + /// return an instance of the write guard + pub async fn write(&self) -> CallbackLockWriteGuard<'_, T, F> { + let guard = self.inner.write().await; + + CallbackLockWriteGuard { + inner: guard, + callback: &self.callback, + } + } + + /// return the [tokio::sync::RwLockReadGuard] + /// this will not invoke the callback + pub fn read(&self) -> impl Future> { + self.inner.read() + } + + /// try to synchronously acquire a read lock + pub fn try_read( + &self, + ) -> Result, tokio::sync::TryLockError> { + self.inner.try_read() + } +} From 4f3e0daf33f5784af02b45ca4f4d59b22c5d6600 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Tue, 29 Apr 2025 22:28:12 -0400 Subject: [PATCH 12/15] use callback lock to watch internal state --- src/store/bao_file.rs | 89 ++++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index fbffeac21..cbe388687 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -340,12 +340,17 @@ impl BaoFileHandleWeak { } } +/// a type alias which represents the callback which is executed after +/// the write guard is dropped +type AfterLockWriteCb = Box) + Send + Sync + 'static>; + /// The inner part of a bao file handle. #[derive(Debug)] pub struct BaoFileHandleInner { - pub(crate) storage: tokio::sync::RwLock>, + pub(crate) storage: CallbackLock, AfterLockWriteCb>, config: Arc, hash: Hash, + rx: tokio::sync::watch::Receiver, } /// A cheaply cloneable handle to a bao file, including the hash and the configuration. @@ -511,21 +516,55 @@ enum HandleChange { // later: size verified } +/// struct which stores simple metadata about the [BaoFileHandle] in a way that is +/// accessible in synchronous function calls +#[derive(Debug)] +struct StorageMeta { + complete: bool, + size: Result, +} + +impl StorageMeta { + fn new(storage: &BaoFileStorage) -> Self { + let size = match storage { + BaoFileStorage::Complete(mem) => Ok(mem.data_size()), + BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()), + BaoFileStorage::IncompleteFile(file) => file.current_size(), + }; + StorageMeta { + complete: matches!(storage, BaoFileStorage::Complete(_)), + size, + } + } +} + impl BaoFileHandle where T: bao_tree::io::sync::ReadAt, { + /// internal helper function to initialize a new instance of self + fn new_inner(storage: BaoFileStorage, config: Arc, hash: Hash) -> Self { + let (tx, rx) = tokio::sync::watch::channel(StorageMeta::new(&storage)); + Self(Arc::new(BaoFileHandleInner { + storage: CallbackLock::new( + storage, + Box::new(move |storage: &BaoFileStorage| { + let _ = tx.send(StorageMeta::new(storage)); + }), + ), + config, + hash, + rx, + })) + } + /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. /// Since there are very likely to be many of these, we use an arc rwlock pub fn incomplete_mem(config: Arc, hash: Hash) -> Self { let storage = BaoFileStorage::incomplete_mem(); - Self(Arc::new(BaoFileHandleInner { - storage: tokio::sync::RwLock::new(storage), - config, - hash, - })) + Self::new_inner(storage, config, hash) } /// Create a new bao file handle with a partial file. @@ -536,11 +575,7 @@ where outboard: create_read_write(&paths.outboard)?, sizes: create_read_write(&paths.sizes)?, }); - Ok(Self(Arc::new(BaoFileHandleInner { - storage: tokio::sync::RwLock::new(storage), - config, - hash, - }))) + Ok(Self::new_inner(storage, config, hash)) } /// Create a new complete bao file handle. @@ -551,11 +586,7 @@ where outboard: MemOrFile>, ) -> Self { let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard }); - Self(Arc::new(BaoFileHandleInner { - storage: tokio::sync::RwLock::new(storage), - config, - hash, - })) + Self::new_inner(storage, config, hash) } /// Transform the storage in place. If the transform fails, the storage will @@ -573,10 +604,7 @@ where /// True if the file is complete. pub fn is_complete(&self) -> bool { - matches!( - self.storage.try_read().unwrap().deref(), - BaoFileStorage::Complete(_) - ) + self.rx.borrow().deref().complete } /// An AsyncSliceReader for the data file. @@ -596,18 +624,25 @@ where } /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.storage.try_read().unwrap().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.data_size()), - BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()), - BaoFileStorage::IncompleteFile(file) => file.current_size(), - } + pub fn current_size(&self) -> Result { + self.rx + .borrow() + .size + .as_ref() + // NB: we return the io::ErrorKind here + // because io::Error is !Clone + .map_err(|e| e.kind()) + .copied() } /// The outboard for the file. pub fn outboard(&self) -> io::Result>> { let root = self.hash.into(); - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let tree = BaoTree::new( + self.current_size() + .map_err(|kind| io::Error::new(kind, "an io error has occurred"))?, + IROH_BLOCK_SIZE, + ); let outboard = self.outboard_reader(); Ok(PreOrderOutboard { root, From ef8c7f0743e5de5bdc6ad3056cb6c9c1dcdb18c0 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Thu, 1 May 2025 17:38:34 -0400 Subject: [PATCH 13/15] bump: semver --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 526486e43..c387222a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2086,7 +2086,7 @@ dependencies = [ [[package]] name = "iroh-blobs" -version = "0.34.1" +version = "0.35.1" dependencies = [ "anyhow", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index 345f56d13..c1e48c2e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-blobs" -version = "0.34.1" +version = "0.35.1" edition = "2021" readme = "README.md" description = "blob and collection transfer support for iroh" From ee4d8e4a193ef2d14bf9e59551e3f5a22c7a4381 Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Thu, 1 May 2025 22:10:04 -0400 Subject: [PATCH 14/15] fix build remove comment --- src/lib.rs | 1 + src/store/fs.rs | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fc9e397bb..3c4db795f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] #![recursion_limit = "256"] #![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] +#![feature(async_closure)] #[cfg(feature = "cli")] pub mod cli; diff --git a/src/store/fs.rs b/src/store/fs.rs index d80bc2179..1ff44e111 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -2438,9 +2438,7 @@ where let mut info = None; tracing::trace!("on_complete({})", hash.to_hex()); entry - // TODO: this errors on edition 2024, it should be changed to - // an async closure as they are now stable - .transform(|state| async { + .transform(async |state| { tracing::trace!("on_complete transform {:?}", state); let entry = match complete_storage( state, From b7c3bf2a5b3c3fd9fce307f2fdc84d82863174bf Mon Sep 17 00:00:00 2001 From: Sean Aye Date: Thu, 1 May 2025 22:10:04 -0400 Subject: [PATCH 15/15] clean up store generics --- src/net_protocol.rs | 16 +++-- src/rpc/client/blobs.rs | 4 +- src/store/fs.rs | 125 +++++++++++++++++++++------------------- src/store/fs/tests.rs | 10 +++- tests/gc.rs | 6 +- tests/rpc.rs | 4 +- tests/tags.rs | 3 +- 7 files changed, 95 insertions(+), 73 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 9fc8ba7ee..fa4a72113 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -7,6 +7,7 @@ use std::{ collections::BTreeSet, fmt::Debug, ops::{Deref, DerefMut}, + path::PathBuf, sync::Arc, }; @@ -20,7 +21,7 @@ use tracing::debug; use crate::{ downloader::{ConcurrencyLimits, Downloader, RetryConfig}, provider::EventSender, - store::GcConfig, + store::{fs::Persistence, GcConfig}, util::{ local_pool::{self, LocalPool, LocalPoolHandle}, SetTagOption, @@ -221,12 +222,19 @@ impl Blobs { } } -impl Blobs { +impl Blobs> +where + T: Persistence, +{ /// Load a persistent Blobs protocol handler from a path. pub async fn persistent( path: impl AsRef, - ) -> anyhow::Result> { - Ok(Self::builder(crate::store::fs::Store::load(path).await?)) + db_path: PathBuf, + backend: T, + ) -> anyhow::Result>> { + Ok(Self::builder( + crate::store::fs::Store::load_with_backend(path, db_path, backend).await?, + )) } } diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index e5f5acee6..b213fefcf 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -1028,6 +1028,7 @@ mod tests { net_protocol::Blobs, provider::{CustomEventSender, EventSender}, rpc::client::{blobs, tags}, + store::fs::FileSystemPersistence, }; type RpcClient = quic_rpc::RpcClient; @@ -1113,7 +1114,8 @@ mod tests { /// Creates a new node with persistent storage pub async fn persistent( path: impl AsRef, - ) -> anyhow::Result> { + ) -> anyhow::Result>> + { Ok(Builder { store: crate::store::fs::Store::load(path).await?, events: Default::default(), diff --git a/src/store/fs.rs b/src/store/fs.rs index 1ff44e111..eb11d67f6 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -443,37 +443,34 @@ pub(crate) enum ImportSource { /// trait which defines the backend persistence layer /// for this store. e.g. filesystem, s3 etc -pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { - /// the error type that is returned for the persistence layer - type Err: Into + Send + 'static; - +pub trait Persistence: Send + Sync + Clone + 'static { /// the type which represents a file which was read from the persistence /// layer type File: IrohFile + std::io::Read + std::fmt::Debug; /// return the size of the file in bytes if it can be found/read - /// otherwise return a [Self::Err] - fn size(&self, path: &Path) -> impl Future> + Send + 'static; + /// otherwise return a [io::Error] + fn size(&self, path: &Path) -> impl Future> + Send + 'static; /// read the contents of the file at the path /// returning the bytes of the file in the success case - /// and [Self::Err] in the error case + /// and [io::Error] in the error case fn read( &self, path: &Path, - ) -> impl Future, Self::Err>> + Send + 'static; + ) -> impl Future, io::Error>> + Send + 'static; /// recursively ensure that the input path exists fn create_dir_all( &self, path: &Path, - ) -> impl Future> + Send + 'static; + ) -> impl Future> + Send + 'static; /// read and return the file at the input path fn open<'a>( &'a self, path: &'a Path, - ) -> impl Future> + Send + 'static; + ) -> impl Future> + Send + 'static; /// convert from a [std::fs::File] into this persistence layer [Self::File] type. /// This is called when converting from a partial file (which exists on disk) @@ -481,7 +478,7 @@ pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { fn convert_std_file( self: Arc, file: std::fs::File, - ) -> impl Future> + Send + 'static; + ) -> impl Future> + Send + 'static; } /// A persistence layer that writes to the local file system @@ -489,23 +486,22 @@ pub trait Persistence: Send + Sync + Clone + std::fmt::Debug + 'static { pub struct FileSystemPersistence; impl Persistence for FileSystemPersistence { - type Err = io::Error; type File = std::fs::File; - fn size(&self, path: &Path) -> impl Future> + 'static { + fn size(&self, path: &Path) -> impl Future> + 'static { let fut = tokio::fs::metadata(path.to_owned()); async move { fut.await.map(|m| m.len()) } } - fn read(&self, path: &Path) -> impl Future, Self::Err>> + 'static { + fn read(&self, path: &Path) -> impl Future, io::Error>> + 'static { tokio::fs::read(path.to_owned()) } - fn create_dir_all(&self, path: &Path) -> impl Future> + 'static { + fn create_dir_all(&self, path: &Path) -> impl Future> + 'static { tokio::fs::create_dir_all(path.to_owned()) } - fn open(&self, path: &Path) -> impl Future> + 'static { + fn open(&self, path: &Path) -> impl Future> + 'static { let fut = tokio::fs::File::open(path.to_owned()); async move { let file = fut.await?; @@ -516,7 +512,7 @@ impl Persistence for FileSystemPersistence { async fn convert_std_file( self: Arc, file: std::fs::File, - ) -> Result { + ) -> Result { Ok(file) } } @@ -533,7 +529,7 @@ impl ImportSource { fn len<'a, T: Persistence>( &'a self, fs: &'a T, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { enum Either { Left(u64), Right(T), @@ -542,7 +538,7 @@ impl ImportSource { let output = match self { Self::TempFile(path) | Self::External(path) => { let fut: std::pin::Pin< - Box::Err>> + Send + 'static>, + Box> + Send + 'static>, > = Box::pin(fs.size(path)); Either::Right(fut) } @@ -813,28 +809,45 @@ pub(crate) type FilterPredicate = /// Storage that is using a redb database for small files and files for /// large files. #[derive(Debug, Clone)] -pub struct Store(Arc>); +pub struct Store(Arc>); -impl Store { - /// Load or create a new store. - pub async fn load(root: impl AsRef) -> io::Result { +impl Store { + /// load a new instance of a file system backed store at the given path + pub fn load(root: impl AsRef) -> impl Future> { let path = root.as_ref(); let db_path = path.join("blobs.db"); + + Store::load_with_backend(root, db_path, FileSystemPersistence) + } +} + +impl Store +where + T: Persistence, +{ + /// Load or create a new store. + pub async fn load_with_backend( + root: impl AsRef, + db_path: PathBuf, + backend: T, + ) -> io::Result { let options = Options { - path: PathOptions::new(path), + path: PathOptions::new(root.as_ref()), inline: Default::default(), batch: Default::default(), }; - Self::new(db_path, options).await + Self::new(db_path, options, backend).await } /// Create a new store with custom options. - pub async fn new(path: PathBuf, options: Options) -> io::Result { + pub async fn new(path: PathBuf, options: Options, backend: T) -> io::Result { // spawn_blocking because StoreInner::new creates directories let rt = tokio::runtime::Handle::try_current() .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?; - let inner = - tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??; + let inner = tokio::task::spawn_blocking(move || { + StoreInner::new_sync_with_backend(path, options, rt, backend) + }) + .await??; Ok(Self(Arc::new(inner))) } @@ -880,17 +893,9 @@ impl TagCounter for RwLock { } } -impl StoreInner { - fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { - Self::new_sync_with_backend(path, options, rt, FileSystemPersistence) - } -} - impl StoreInner where T: Persistence, - OuterError: From, - io::Error: From, { fn new_sync_with_backend( path: PathBuf, @@ -1402,8 +1407,6 @@ impl From for io::Error { impl super::Map for Store where T: Persistence, - OuterError: From, - io::Error: From, { type Entry = Entry; @@ -1415,8 +1418,6 @@ where impl super::MapMut for Store where T: Persistence, - OuterError: From, - io::Error: From, { type EntryMut = Entry; @@ -1441,7 +1442,10 @@ where } } -impl super::ReadableStore for Store { +impl super::ReadableStore for Store +where + T: Persistence, +{ async fn blobs(&self) -> io::Result> { Ok(Box::new(self.0.blobs().await?.into_iter())) } @@ -1482,7 +1486,10 @@ impl super::ReadableStore for Store { } } -impl super::Store for Store { +impl super::Store for Store +where + T: Persistence + std::fmt::Debug, +{ async fn import_file( &self, path: PathBuf, @@ -1646,11 +1653,14 @@ impl super::Store for Store { } } -pub(super) async fn gc_sweep_task( - store: &Store, +pub(super) async fn gc_sweep_task( + store: &Store, live: &BTreeSet, co: &Co, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +where + T: Persistence, +{ let blobs = store.blobs().await?.chain(store.partial_blobs().await?); let mut count = 0; let mut batch = Vec::new(); @@ -2034,12 +2044,8 @@ where "reading external data to inline it: {}", external_path.display() ); - let data = Bytes::from( - self.fs - .read(&external_path) - .await - .map_err(|e| ActorError::Io(e.into()))?, - ); + let data = + Bytes::from(self.fs.read(&external_path).await.map_err(ActorError::Io)?); DataLocation::Inline(data) } else { DataLocation::External(vec![external_path], data_size) @@ -2296,8 +2302,8 @@ where // inline if size <= self.options.inline.max_data_inlined { let path = self.options.path.owned_data_path(&hash); - let data = block_for(self.fs.read(&path)) - .map_err(|e| ActorError::Io(e.into()))?; + let data = + block_for(self.fs.read(&path)).map_err(ActorError::Io)?; tables.delete_after_commit.insert(hash, [BaoFilePart::Data]); tables.inline_data.insert(hash, data.as_slice())?; (DataLocation::Inline(()), size, true) @@ -2332,8 +2338,9 @@ where if outboard_size <= self.options.inline.max_outboard_inlined => { let path = self.options.path.owned_outboard_path(&hash); - let outboard = block_for(self.fs.read(&path)) - .map_err(|e| ActorError::Io(e.into()))?; + let outboard = + block_for(self.fs.read(&path)).map_err(ActorError::Io)?; + tables .delete_after_commit .insert(hash, [BaoFilePart::Outboard]); @@ -2870,7 +2877,7 @@ where .map_async(move |f| fs_2.convert_std_file(f)), ) .transpose() - .map_err(|e| ActorError::Io(e.into()))?, + .map_err(ActorError::Io)?, ) } MemOrFile::File(data) => MemOrFile::File( @@ -2881,7 +2888,7 @@ where .map_async(move |f| fs_2.convert_std_file(f)) .await .transpose() - .map_err(|e| ActorError::Io(e.into()))?, + .map_err(ActorError::Io)?, ), } }; @@ -2916,7 +2923,7 @@ where .map_async(move |f| fs.convert_std_file(f)), ) .transpose() - .map_err(|e| ActorError::Io(e.into()))?, + .map_err(ActorError::Io)?, ) } MemOrFile::File(outboard) => MemOrFile::File( @@ -2928,7 +2935,7 @@ where .map_async(|f| fs.convert_std_file(f)), ) .transpose() - .map_err(|e| ActorError::Io(e.into()))?, + .map_err(ActorError::Io)?, ), } }; diff --git a/src/store/fs/tests.rs b/src/store/fs/tests.rs index 85540eb89..a503335a9 100644 --- a/src/store/fs/tests.rs +++ b/src/store/fs/tests.rs @@ -50,7 +50,7 @@ pub fn to_stream( .boxed() } -async fn create_test_db() -> (tempfile::TempDir, Store) { +async fn create_test_db() -> (tempfile::TempDir, Store) { let _ = tracing_subscriber::fmt::try_init(); let testdir = tempfile::tempdir().unwrap(); let db_path = testdir.path().join("db.redb"); @@ -59,7 +59,9 @@ async fn create_test_db() -> (tempfile::TempDir, Store) { batch: Default::default(), inline: Default::default(), }; - let db = Store::new(db_path, options).await.unwrap(); + let db = Store::new(db_path, options, FileSystemPersistence) + .await + .unwrap(); (testdir, db) } @@ -788,7 +790,9 @@ async fn actor_store_smoke() { batch: Default::default(), inline: Default::default(), }; - let db = Store::new(db_path, options).await.unwrap(); + let db = Store::new(db_path, options, FileSystemPersistence) + .await + .unwrap(); db.dump().await.unwrap(); let data = random_test_data(1024 * 1024); #[allow(clippy::single_range_in_vec_init)] diff --git a/tests/gc.rs b/tests/gc.rs index dcf76b4ef..d3b1ba017 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -22,8 +22,8 @@ use iroh_blobs::{ net_protocol::Blobs, rpc::client::{blobs, tags}, store::{ - bao_tree, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, GcConfig, MapEntryMut, - MapMut, ReportLevel, Store, + bao_tree, fs::FileSystemPersistence, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, + GcConfig, MapEntryMut, MapMut, ReportLevel, Store, }, util::{ progress::{AsyncChannelProgressSender, ProgressSender as _}, @@ -127,7 +127,7 @@ async fn persistent_node( path: PathBuf, gc_period: Duration, ) -> ( - Node, + Node>, async_channel::Receiver<()>, ) { let store = iroh_blobs::store::fs::Store::load(path).await.unwrap(); diff --git a/tests/rpc.rs b/tests/rpc.rs index 7dc12e7b2..a18eb1398 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -1,7 +1,7 @@ #![cfg(feature = "test")] use std::{net::SocketAddr, path::PathBuf, vec}; -use iroh_blobs::net_protocol::Blobs; +use iroh_blobs::{net_protocol::Blobs, store::fs::FileSystemPersistence}; use quic_rpc::client::QuinnConnector; use tempfile::TempDir; use testresult::TestResult; @@ -14,7 +14,7 @@ type BlobsClient = iroh_blobs::rpc::client::blobs::Client; #[derive(Debug)] pub struct Node { pub router: iroh::protocol::Router, - pub blobs: Blobs, + pub blobs: Blobs>, pub rpc_task: AbortOnDropHandle<()>, } diff --git a/tests/tags.rs b/tests/tags.rs index 8a4af2d54..13ad3d50a 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -8,6 +8,7 @@ use iroh_blobs::{ client::tags::{self, TagInfo}, proto::RpcService, }, + store::fs::FileSystemPersistence, Hash, HashAndFormat, }; use testresult::TestResult; @@ -142,7 +143,7 @@ async fn tags_smoke_mem() -> TestResult<()> { async fn tags_smoke_fs() -> TestResult<()> { let td = tempfile::tempdir()?; let endpoint = Endpoint::builder().bind().await?; - let blobs = Blobs::persistent(td.path().join("blobs.db")) + let blobs = Blobs::persistent(td.path(), td.path().join("blobs.db"), FileSystemPersistence) .await? .build(&endpoint); let client = blobs.client();