diff --git a/Cargo.toml b/Cargo.toml index a05c5f9f6..847401cf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ futures-util = "0.3.30" testdir = "0.9.1" [features] -default = ["fs-store", "net_protocol"] +default = ["fs-store", "net_protocol", "formats-collection"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] net_protocol = ["downloader", "dep:futures-util"] fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"] @@ -112,6 +112,8 @@ rpc = [ "dep:walkdir", "downloader", ] +formats = [] +formats-collection = ["formats"] example-iroh = [ "dep:clap", @@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"] [[example]] name = "provide-bytes" +required-features = ["formats-collection"] [[example]] name = "fetch-fsm" diff --git a/examples/hello-world-fetch.rs b/examples/hello-world-fetch.rs index 7a6c61407..e0afaae71 100644 --- a/examples/hello-world-fetch.rs +++ b/examples/hello-world-fetch.rs @@ -66,13 +66,13 @@ async fn main() -> Result<()> { "'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.", ); - // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress + // `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress // on the state of your download. let download_stream = blobs_client .download(ticket.hash(), ticket.node_addr().clone()) .await?; - // You can also just `await` the stream, which will poll the `DownloadProgress` stream for you. + // You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you. let outcome = download_stream.await.context("unable to download hash")?; println!( diff --git a/examples/local-swarm-discovery.rs b/examples/local-swarm-discovery.rs index 7305b7755..aec54e54d 100644 --- a/examples/local-swarm-discovery.rs +++ b/examples/local-swarm-discovery.rs @@ -140,13 +140,14 @@ mod progress { ProgressStyle, }; use iroh_blobs::{ - get::{db::DownloadProgress, progress::BlobProgress, Stats}, + get::Stats, + rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent}, Hash, }; pub async fn show_download_progress( hash: Hash, - mut stream: impl Stream> + Unpin, + mut stream: impl Stream> + Unpin, ) -> Result<()> { eprintln!("Fetching: {}", hash); let mp = MultiProgress::new(); @@ -157,7 +158,7 @@ mod progress { let mut seq = false; while let Some(x) = stream.next().await { match x? { - DownloadProgress::InitialState(state) => { + DownloadProgressEvent::InitialState(state) => { if state.connected { op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); } @@ -177,9 +178,9 @@ mod progress { ip.set_length(size.value()); ip.reset(); match blob.progress { - BlobProgress::Pending => {} - BlobProgress::Progressing(offset) => ip.set_position(offset), - BlobProgress::Done => ip.finish_and_clear(), + BlobProgressEvent::Pending => {} + BlobProgressEvent::Progressing(offset) => ip.set_position(offset), + BlobProgressEvent::Done => ip.finish_and_clear(), } if !seq { op.finish_and_clear(); @@ -187,11 +188,11 @@ mod progress { } } } - DownloadProgress::FoundLocal { .. } => {} - DownloadProgress::Connected => { + DownloadProgressEvent::FoundLocal { .. } => {} + DownloadProgressEvent::Connected => { op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); } - DownloadProgress::FoundHashSeq { children, .. } => { + DownloadProgressEvent::FoundHashSeq { children, .. } => { op.set_message(format!( "{} Downloading {} blob(s)\n", style("[3/3]").bold().dim(), @@ -201,7 +202,7 @@ mod progress { op.reset(); seq = true; } - DownloadProgress::Found { size, child, .. } => { + DownloadProgressEvent::Found { size, child, .. } => { if seq { op.set_position(child.into()); } else { @@ -210,13 +211,13 @@ mod progress { ip.set_length(size); ip.reset(); } - DownloadProgress::Progress { offset, .. } => { + DownloadProgressEvent::Progress { offset, .. } => { ip.set_position(offset); } - DownloadProgress::Done { .. } => { + DownloadProgressEvent::Done { .. } => { ip.finish_and_clear(); } - DownloadProgress::AllDone(Stats { + DownloadProgressEvent::AllDone(Stats { bytes_read, elapsed, .. @@ -230,7 +231,7 @@ mod progress { ); break; } - DownloadProgress::Abort(e) => { + DownloadProgressEvent::Abort(e) => { bail!("download aborted: {}", e); } } diff --git a/src/cli.rs b/src/cli.rs index 29a594dba..b829520f1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -19,11 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl}; use tokio::io::AsyncWriteExt; use crate::{ - get::{db::DownloadProgress, progress::BlobProgress, Stats}, - net_protocol::DownloadMode, - provider::AddProgress, + get::Stats, rpc::client::blobs::{ - self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption, + self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo, + DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption, }, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress}, ticket::BlobTicket, @@ -895,29 +894,29 @@ pub struct ProvideResponseEntry { pub hash: Hash, } -/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple. +/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple. pub async fn aggregate_add_response( - mut stream: impl Stream> + Unpin, + mut stream: impl Stream> + Unpin, ) -> Result<(Hash, BlobFormat, Vec)> { let mut hash_and_format = None; let mut collections = BTreeMap::)>::new(); let mut mp = Some(ProvideProgressState::new()); while let Some(item) = stream.next().await { match item? { - AddProgress::Found { name, id, size } => { + AddProgressEvent::Found { name, id, size } => { tracing::trace!("Found({id},{name},{size})"); if let Some(mp) = mp.as_mut() { mp.found(name.clone(), id, size); } collections.insert(id, (name, size, None)); } - AddProgress::Progress { id, offset } => { + AddProgressEvent::Progress { id, offset } => { tracing::trace!("Progress({id}, {offset})"); if let Some(mp) = mp.as_mut() { mp.progress(id, offset); } } - AddProgress::Done { hash, id } => { + AddProgressEvent::Done { hash, id } => { tracing::trace!("Done({id},{hash:?})"); if let Some(mp) = mp.as_mut() { mp.done(id, hash); @@ -931,7 +930,7 @@ pub async fn aggregate_add_response( } } } - AddProgress::AllDone { hash, format, .. } => { + AddProgressEvent::AllDone { hash, format, .. } => { tracing::trace!("AllDone({hash:?})"); if let Some(mp) = mp.take() { mp.all_done(); @@ -939,7 +938,7 @@ pub async fn aggregate_add_response( hash_and_format = Some(HashAndFormat { hash, format }); break; } - AddProgress::Abort(e) => { + AddProgressEvent::Abort(e) => { if let Some(mp) = mp.take() { mp.error(); } @@ -1032,7 +1031,7 @@ impl ProvideProgressState { /// Displays the download progress for a given stream. pub async fn show_download_progress( hash: Hash, - mut stream: impl Stream> + Unpin, + mut stream: impl Stream> + Unpin, ) -> Result<()> { eprintln!("Fetching: {}", hash); let mp = MultiProgress::new(); @@ -1043,7 +1042,7 @@ pub async fn show_download_progress( let mut seq = false; while let Some(x) = stream.next().await { match x? { - DownloadProgress::InitialState(state) => { + DownloadProgressEvent::InitialState(state) => { if state.connected { op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); } @@ -1063,9 +1062,9 @@ pub async fn show_download_progress( ip.set_length(size.value()); ip.reset(); match blob.progress { - BlobProgress::Pending => {} - BlobProgress::Progressing(offset) => ip.set_position(offset), - BlobProgress::Done => ip.finish_and_clear(), + BlobProgressEvent::Pending => {} + BlobProgressEvent::Progressing(offset) => ip.set_position(offset), + BlobProgressEvent::Done => ip.finish_and_clear(), } if !seq { op.finish_and_clear(); @@ -1073,11 +1072,11 @@ pub async fn show_download_progress( } } } - DownloadProgress::FoundLocal { .. } => {} - DownloadProgress::Connected => { + DownloadProgressEvent::FoundLocal { .. } => {} + DownloadProgressEvent::Connected => { op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); } - DownloadProgress::FoundHashSeq { children, .. } => { + DownloadProgressEvent::FoundHashSeq { children, .. } => { op.set_message(format!( "{} Downloading {} blob(s)\n", style("[3/3]").bold().dim(), @@ -1087,7 +1086,7 @@ pub async fn show_download_progress( op.reset(); seq = true; } - DownloadProgress::Found { size, child, .. } => { + DownloadProgressEvent::Found { size, child, .. } => { if seq { op.set_position(child.into()); } else { @@ -1096,13 +1095,13 @@ pub async fn show_download_progress( ip.set_length(size); ip.reset(); } - DownloadProgress::Progress { offset, .. } => { + DownloadProgressEvent::Progress { offset, .. } => { ip.set_position(offset); } - DownloadProgress::Done { .. } => { + DownloadProgressEvent::Done { .. } => { ip.finish_and_clear(); } - DownloadProgress::AllDone(Stats { + DownloadProgressEvent::AllDone(Stats { bytes_read, elapsed, .. @@ -1116,7 +1115,7 @@ pub async fn show_download_progress( ); break; } - DownloadProgress::Abort(e) => { + DownloadProgressEvent::Abort(e) => { bail!("download aborted: {}", e); } } diff --git a/src/downloader.rs b/src/downloader.rs index a147131ae..8c593be4a 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue}; use tracing::{debug, error, error_span, trace, warn, Instrument}; use crate::{ - get::{db::DownloadProgress, Stats}, + get::{progress::DownloadProgressEvent, Stats}, metrics::Metrics, store::Store, util::{local_pool::LocalPoolHandle, progress::ProgressSender}, @@ -797,7 +797,7 @@ impl, D: DialerT> Service { if let Some(sender) = handlers.on_progress { self.progress_tracker.unsubscribe(&kind, &sender); sender - .send(DownloadProgress::Abort(serde_error::Error::new( + .send(DownloadProgressEvent::Abort(serde_error::Error::new( &*anyhow::Error::from(DownloadError::Cancelled), ))) .await diff --git a/src/downloader/get.rs b/src/downloader/get.rs index c4a17d18c..eb333f8bf 100644 --- a/src/downloader/get.rs +++ b/src/downloader/get.rs @@ -7,20 +7,20 @@ use iroh::endpoint; use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter}; use crate::{ - get::{db::get_to_db_in_steps, error::GetError}, - store::Store, + get::Error, + store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store}, }; -impl From for FailureAction { - fn from(e: GetError) -> Self { +impl From for FailureAction { + fn from(e: Error) -> Self { match e { - e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()), - e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()), - e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()), - e @ GetError::Io(_) => FailureAction::RetryLater(e.into()), - e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()), + e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()), + e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()), + e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()), + e @ Error::Io(_) => FailureAction::RetryLater(e.into()), + e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()), // TODO: what do we want to do on local failures? - e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()), + e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()), } } } @@ -34,7 +34,7 @@ pub(crate) struct IoGetter { impl Getter for IoGetter { type Connection = endpoint::Connection; - type NeedsConn = crate::get::db::GetStateNeedsConn; + type NeedsConn = FetchStateNeedsConn; fn get( &mut self, @@ -45,10 +45,8 @@ impl Getter for IoGetter { async move { match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await { Err(err) => Err(err.into()), - Ok(crate::get::db::GetState::Complete(stats)) => { - Ok(super::GetOutput::Complete(stats)) - } - Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => { + Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)), + Ok(FetchState::NeedsConn(needs_conn)) => { Ok(super::GetOutput::NeedsConn(needs_conn)) } } @@ -57,7 +55,7 @@ impl Getter for IoGetter { } } -impl super::NeedsConn for crate::get::db::GetStateNeedsConn { +impl super::NeedsConn for FetchStateNeedsConn { fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut { async move { let res = self.proceed(conn).await; @@ -73,7 +71,7 @@ impl super::NeedsConn for crate::get::db::GetStateNeedsCon } #[cfg(feature = "metrics")] -fn track_metrics(res: &Result) { +fn track_metrics(res: &Result) { use iroh_metrics::{inc, inc_by}; use crate::metrics::Metrics; @@ -90,7 +88,7 @@ fn track_metrics(res: &Result) { inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64); } Err(e) => match &e { - GetError::NotFound(_) => inc!(Metrics, downloads_notfound), + Error::NotFound(_) => inc!(Metrics, downloads_notfound), _ => inc!(Metrics, downloads_error), }, } diff --git a/src/downloader/progress.rs b/src/downloader/progress.rs index 9b0372976..a9122cdf3 100644 --- a/src/downloader/progress.rs +++ b/src/downloader/progress.rs @@ -11,21 +11,21 @@ use parking_lot::Mutex; use super::DownloadKind; use crate::{ - get::{db::DownloadProgress, progress::TransferState}, + get::progress::{DownloadProgressEvent, TransferState}, util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender}, }; /// The channel that can be used to subscribe to progress updates. -pub type ProgressSubscriber = AsyncChannelProgressSender; +pub type ProgressSubscriber = AsyncChannelProgressSender; /// Track the progress of downloads. /// /// This struct allows to create [`ProgressSender`] structs to be passed to -/// [`crate::get::db::get_to_db`]. Each progress sender can be subscribed to by any number of +/// [`crate::store::get_to_db`]. Each progress sender can be subscribed to by any number of /// [`ProgressSubscriber`] channel senders, which will receive each progress update (if they have /// capacity). Additionally, the [`ProgressTracker`] maintains a [`TransferState`] for each /// transfer, applying each progress update to update this state. When subscribing to an already -/// running transfer, the subscriber will receive a [`DownloadProgress::InitialState`] message +/// running transfer, the subscriber will receive a [`DownloadProgressEvent::InitialState`] message /// containing the state at the time of the subscription, and then receive all further progress /// events directly. #[derive(Debug, Default)] @@ -101,8 +101,8 @@ struct Inner { } impl Inner { - fn subscribe(&mut self, subscriber: ProgressSubscriber) -> DownloadProgress { - let msg = DownloadProgress::InitialState(self.state.clone()); + fn subscribe(&mut self, subscriber: ProgressSubscriber) -> DownloadProgressEvent { + let msg = DownloadProgressEvent::InitialState(self.state.clone()); self.subscribers.push(subscriber); msg } @@ -111,7 +111,7 @@ impl Inner { self.subscribers.retain(|s| !s.same_channel(sender)); } - fn on_progress(&mut self, progress: DownloadProgress) { + fn on_progress(&mut self, progress: DownloadProgressEvent) { self.state.on_progress(progress); } } @@ -129,7 +129,7 @@ impl IdGenerator for BroadcastProgressSender { } impl ProgressSender for BroadcastProgressSender { - type Msg = DownloadProgress; + type Msg = DownloadProgressEvent; async fn send(&self, msg: Self::Msg) -> Result<(), ProgressSendError> { // making sure that the lock is not held across an await point. diff --git a/src/downloader/test.rs b/src/downloader/test.rs index 1fab8ff8e..694604ec0 100644 --- a/src/downloader/test.rs +++ b/src/downloader/test.rs @@ -10,10 +10,7 @@ use iroh::SecretKey; use super::*; use crate::{ - get::{ - db::BlobId, - progress::{BlobProgress, TransferState}, - }, + get::progress::{BlobId, BlobProgressEvent, DownloadProgressEvent, TransferState}, util::{ local_pool::LocalPool, progress::{AsyncChannelProgressSender, IdGenerator}, @@ -258,7 +255,7 @@ async fn concurrent_progress() { start_rx.await.unwrap(); let id = progress.new_id(); progress - .send(DownloadProgress::Found { + .send(DownloadProgressEvent::Found { id, child: BlobId::Root, hash, @@ -267,7 +264,10 @@ async fn concurrent_progress() { .await .unwrap(); done_rx.await.unwrap(); - progress.send(DownloadProgress::Done { id }).await.unwrap(); + progress + .send(DownloadProgressEvent::Done { id }) + .await + .unwrap(); Ok(Stats::default()) } .boxed() @@ -296,7 +296,7 @@ async fn concurrent_progress() { let prog0_b = prog_b_rx.recv().await.unwrap(); assert!(matches!( prog0_b, - DownloadProgress::InitialState(state) if state.root.hash == hash && state.root.progress == BlobProgress::Pending, + DownloadProgressEvent::InitialState(state) if state.root.hash == hash && state.root.progress == BlobProgressEvent::Pending, )); start_tx.send(()).unwrap(); @@ -304,10 +304,10 @@ async fn concurrent_progress() { let prog1_a = prog_a_rx.recv().await.unwrap(); let prog1_b = prog_b_rx.recv().await.unwrap(); assert!( - matches!(prog1_a, DownloadProgress::Found { hash: found_hash, size: 100, ..} if found_hash == hash) + matches!(prog1_a, DownloadProgressEvent::Found { hash: found_hash, size: 100, ..} if found_hash == hash) ); assert!( - matches!(prog1_b, DownloadProgress::Found { hash: found_hash, size: 100, ..} if found_hash == hash) + matches!(prog1_b, DownloadProgressEvent::Found { hash: found_hash, size: 100, ..} if found_hash == hash) ); state_a.on_progress(prog1_a); @@ -320,7 +320,7 @@ async fn concurrent_progress() { let handle_c = downloader.queue(req).await; let prog1_c = prog_c_rx.recv().await.unwrap(); - assert!(matches!(&prog1_c, DownloadProgress::InitialState(state) if state == &state_a)); + assert!(matches!(&prog1_c, DownloadProgressEvent::InitialState(state) if state == &state_a)); state_c.on_progress(prog1_c); done_tx.send(()).unwrap(); @@ -338,9 +338,9 @@ async fn concurrent_progress() { assert_eq!(prog_b.len(), 1); assert_eq!(prog_c.len(), 1); - assert!(matches!(prog_a[0], DownloadProgress::Done { .. })); - assert!(matches!(prog_b[0], DownloadProgress::Done { .. })); - assert!(matches!(prog_c[0], DownloadProgress::Done { .. })); + assert!(matches!(prog_a[0], DownloadProgressEvent::Done { .. })); + assert!(matches!(prog_b[0], DownloadProgressEvent::Done { .. })); + assert!(matches!(prog_c[0], DownloadProgressEvent::Done { .. })); for p in prog_a { state_a.on_progress(p); diff --git a/src/downloader/test/getter.rs b/src/downloader/test/getter.rs index 0ea200caa..1ac24de60 100644 --- a/src/downloader/test/getter.rs +++ b/src/downloader/test/getter.rs @@ -36,14 +36,14 @@ impl Getter for TestingGetter { // since for testing we don't need a real connection, just keep track of what peer is the // request being sent to type Connection = NodeId; - type NeedsConn = GetStateNeedsConn; + type NeedsConn = FetchStateNeedsConn; fn get( &mut self, kind: DownloadKind, progress_sender: BroadcastProgressSender, ) -> GetStartFut { - std::future::ready(Ok(downloader::GetOutput::NeedsConn(GetStateNeedsConn( + std::future::ready(Ok(downloader::GetOutput::NeedsConn(FetchStateNeedsConn( self.clone(), kind, progress_sender, @@ -53,11 +53,11 @@ impl Getter for TestingGetter { } #[derive(Debug)] -pub(super) struct GetStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender); +pub(super) struct FetchStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender); -impl downloader::NeedsConn for GetStateNeedsConn { +impl downloader::NeedsConn for FetchStateNeedsConn { fn proceed(self, peer: NodeId) -> super::GetProceedFut { - let GetStateNeedsConn(getter, kind, progress_sender) = self; + let FetchStateNeedsConn(getter, kind, progress_sender) = self; let mut inner = getter.0.write(); inner.request_history.push((kind, peer)); let request_duration = inner.request_duration; diff --git a/src/format.rs b/src/format.rs index 2ccb8f3ba..59616ecae 100644 --- a/src/format.rs +++ b/src/format.rs @@ -13,4 +13,5 @@ //! n-1 items, where n is the number of blobs in the HashSeq. //! //! [postcard]: https://docs.rs/postcard/latest/postcard/ +#[cfg(feature = "formats-collection")] pub mod collection; diff --git a/src/get.rs b/src/get.rs index 7c1190f34..0c344c97d 100644 --- a/src/get.rs +++ b/src/get.rs @@ -1,6 +1,6 @@ //! The client side API //! -//! To get data, create a connection using [iroh-net] or use any quinn +//! To get data, create a connection using [iroh] or use any [iroh-quinn] //! connection that was obtained in another way. //! //! Create a request describing the data you want to get. @@ -11,9 +11,9 @@ //! For some states you have to provide additional arguments when calling next, //! or you can choose to finish early. //! -//! [iroh-net]: https://docs.rs/iroh-net +//! [iroh-net]: https://docs.rs/iroh +//! [iroh-quinn]: https://docs.rs/iroh-quinn use std::{ - error::Error, fmt::{self, Debug}, time::{Duration, Instant}, }; @@ -30,9 +30,9 @@ use crate::{ Hash, IROH_BLOCK_SIZE, }; -pub mod db; -pub mod error; -pub mod progress; +mod error; +pub use error::Error; +pub(crate) mod progress; pub mod request; /// Stats about the transfer. @@ -882,64 +882,3 @@ pub mod fsm { ranges_iter: RangesIter, } } - -/// Error when processing a response -#[derive(thiserror::Error, Debug)] -pub enum GetResponseError { - /// Error when opening a stream - #[error("connection: {0}")] - Connection(#[from] endpoint::ConnectionError), - /// Error when writing the handshake or request to the stream - #[error("write: {0}")] - Write(#[from] endpoint::WriteError), - /// Error when reading from the stream - #[error("read: {0}")] - Read(#[from] endpoint::ReadError), - /// Error when decoding, e.g. hash mismatch - #[error("decode: {0}")] - Decode(bao_tree::io::DecodeError), - /// A generic error - #[error("generic: {0}")] - Generic(anyhow::Error), -} - -impl From for GetResponseError { - fn from(cause: postcard::Error) -> Self { - Self::Generic(cause.into()) - } -} - -impl From for GetResponseError { - fn from(cause: bao_tree::io::DecodeError) -> Self { - match cause { - bao_tree::io::DecodeError::Io(cause) => { - // try to downcast to specific quinn errors - if let Some(source) = cause.source() { - if let Some(error) = source.downcast_ref::() { - return Self::Connection(error.clone()); - } - if let Some(error) = source.downcast_ref::() { - return Self::Read(error.clone()); - } - if let Some(error) = source.downcast_ref::() { - return Self::Write(error.clone()); - } - } - Self::Generic(cause.into()) - } - _ => Self::Decode(cause), - } - } -} - -impl From for GetResponseError { - fn from(cause: anyhow::Error) -> Self { - Self::Generic(cause) - } -} - -impl From for std::io::Error { - fn from(cause: GetResponseError) -> Self { - Self::new(std::io::ErrorKind::Other, cause) - } -} diff --git a/src/get/error.rs b/src/get/error.rs index 95f6beefc..91e63885e 100644 --- a/src/get/error.rs +++ b/src/get/error.rs @@ -1,12 +1,11 @@ //! Error returned from get operations - use iroh::endpoint; use crate::util::progress::ProgressSendError; -/// Failures for a get operation +/// Failures for a fetch operation #[derive(Debug, thiserror::Error)] -pub enum GetError { +pub enum Error { /// Hash not found. #[error("Hash not found")] NotFound(#[source] anyhow::Error), @@ -29,13 +28,13 @@ pub enum GetError { LocalFailure(#[source] anyhow::Error), } -impl From for GetError { +impl From for Error { fn from(value: ProgressSendError) -> Self { Self::LocalFailure(value.into()) } } -impl From for GetError { +impl From for Error { fn from(value: endpoint::ConnectionError) -> Self { // explicit match just to be sure we are taking everything into account use endpoint::ConnectionError; @@ -43,137 +42,137 @@ impl From for GetError { e @ ConnectionError::VersionMismatch => { // > The peer doesn't implement any supported version // unsupported version is likely a long time error, so this peer is not usable - GetError::NoncompliantNode(e.into()) + Error::NoncompliantNode(e.into()) } e @ ConnectionError::TransportError(_) => { // > The peer violated the QUIC specification as understood by this implementation // bad peer we don't want to keep around - GetError::NoncompliantNode(e.into()) + Error::NoncompliantNode(e.into()) } e @ ConnectionError::ConnectionClosed(_) => { // > The peer's QUIC stack aborted the connection automatically // peer might be disconnecting or otherwise unavailable, drop it - GetError::Io(e.into()) + Error::Io(e.into()) } e @ ConnectionError::ApplicationClosed(_) => { // > The peer closed the connection // peer might be disconnecting or otherwise unavailable, drop it - GetError::Io(e.into()) + Error::Io(e.into()) } e @ ConnectionError::Reset => { // > The peer is unable to continue processing this connection, usually due to having restarted - GetError::RemoteReset(e.into()) + Error::RemoteReset(e.into()) } e @ ConnectionError::TimedOut => { // > Communication with the peer has lapsed for longer than the negotiated idle timeout - GetError::Io(e.into()) + Error::Io(e.into()) } e @ ConnectionError::LocallyClosed => { // > The local application closed the connection // TODO(@divma): don't see how this is reachable but let's just not use the peer - GetError::Io(e.into()) + Error::Io(e.into()) } e @ quinn::ConnectionError::CidsExhausted => { // > The connection could not be created because not enough of the CID space // > is available - GetError::Io(e.into()) + Error::Io(e.into()) } } } } -impl From for GetError { +impl From for Error { fn from(value: endpoint::ReadError) -> Self { use endpoint::ReadError; match value { - e @ ReadError::Reset(_) => GetError::RemoteReset(e.into()), + e @ ReadError::Reset(_) => Error::RemoteReset(e.into()), ReadError::ConnectionLost(conn_error) => conn_error.into(), ReadError::ClosedStream | ReadError::IllegalOrderedRead | ReadError::ZeroRttRejected => { // all these errors indicate the peer is not usable at this moment - GetError::Io(value.into()) + Error::Io(value.into()) } } } } -impl From for GetError { +impl From for Error { fn from(value: quinn::ClosedStream) -> Self { - GetError::Io(value.into()) + Error::Io(value.into()) } } -impl From for GetError { +impl From for Error { fn from(value: endpoint::WriteError) -> Self { use endpoint::WriteError; match value { - e @ WriteError::Stopped(_) => GetError::RemoteReset(e.into()), + e @ WriteError::Stopped(_) => Error::RemoteReset(e.into()), WriteError::ConnectionLost(conn_error) => conn_error.into(), WriteError::ClosedStream | WriteError::ZeroRttRejected => { // all these errors indicate the peer is not usable at this moment - GetError::Io(value.into()) + Error::Io(value.into()) } } } } -impl From for GetError { +impl From for Error { fn from(value: crate::get::fsm::ConnectedNextError) -> Self { use crate::get::fsm::ConnectedNextError::*; match value { e @ PostcardSer(_) => { // serialization errors indicate something wrong with the request itself - GetError::BadRequest(e.into()) + Error::BadRequest(e.into()) } e @ RequestTooBig => { // request will never be sent, drop it - GetError::BadRequest(e.into()) + Error::BadRequest(e.into()) } Write(e) => e.into(), Closed(e) => e.into(), e @ Io(_) => { // io errors are likely recoverable - GetError::Io(e.into()) + Error::Io(e.into()) } } } } -impl From for GetError { +impl From for Error { fn from(value: crate::get::fsm::AtBlobHeaderNextError) -> Self { use crate::get::fsm::AtBlobHeaderNextError::*; match value { e @ NotFound => { // > This indicates that the provider does not have the requested data. // peer might have the data later, simply retry it - GetError::NotFound(e.into()) + Error::NotFound(e.into()) } Read(e) => e.into(), e @ Io(_) => { // io errors are likely recoverable - GetError::Io(e.into()) + Error::Io(e.into()) } } } } -impl From for GetError { +impl From for Error { fn from(value: crate::get::fsm::DecodeError) -> Self { use crate::get::fsm::DecodeError::*; match value { - e @ NotFound => GetError::NotFound(e.into()), - e @ ParentNotFound(_) => GetError::NotFound(e.into()), - e @ LeafNotFound(_) => GetError::NotFound(e.into()), + e @ NotFound => Error::NotFound(e.into()), + e @ ParentNotFound(_) => Error::NotFound(e.into()), + e @ LeafNotFound(_) => Error::NotFound(e.into()), e @ ParentHashMismatch(_) => { // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong // request? - GetError::NoncompliantNode(e.into()) + Error::NoncompliantNode(e.into()) } e @ LeafHashMismatch(_) => { // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong // request? - GetError::NoncompliantNode(e.into()) + Error::NoncompliantNode(e.into()) } Read(e) => e.into(), Io(e) => e.into(), @@ -181,10 +180,10 @@ impl From for GetError { } } -impl From for GetError { +impl From for Error { fn from(value: std::io::Error) -> Self { // generally consider io errors recoverable // we might want to revisit this at some point - GetError::Io(value.into()) + Error::Io(value.into()) } } diff --git a/src/get/progress.rs b/src/get/progress.rs index d4025e5c3..121cd69ca 100644 --- a/src/get/progress.rs +++ b/src/get/progress.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, num::NonZeroU64}; use serde::{Deserialize, Serialize}; use tracing::warn; -use super::db::{BlobId, DownloadProgress}; +use super::Stats; use crate::{protocol::RangeSpec, store::BaoBlobSize, Hash}; /// The identifier for progress events. @@ -48,7 +48,7 @@ pub struct BlobState { /// received the size from the remote. pub size: Option, /// The current state of the blob transfer. - pub progress: BlobProgress, + pub progress: BlobProgressEvent, /// Ranges already available locally at the time of starting the transfer. pub local_ranges: Option, /// Number of children (only applies to hashseqs, None for raw blobs). @@ -57,7 +57,7 @@ pub struct BlobState { /// Progress state for a single blob #[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub enum BlobProgress { +pub enum BlobProgressEvent { /// Download is pending #[default] Pending, @@ -75,7 +75,7 @@ impl BlobState { size: None, local_ranges: None, child_count: None, - progress: BlobProgress::default(), + progress: BlobProgressEvent::default(), } } } @@ -120,13 +120,13 @@ impl TransferState { self.get_blob_mut(&blob_id) } - /// Update the state with a new [`DownloadProgress`] event for this transfer. - pub fn on_progress(&mut self, event: DownloadProgress) { + /// Update the state with a new [`DownloadProgressEvent`] for this transfer. + pub fn on_progress(&mut self, event: DownloadProgressEvent) { match event { - DownloadProgress::InitialState(s) => { + DownloadProgressEvent::InitialState(s) => { *self = s; } - DownloadProgress::FoundLocal { + DownloadProgressEvent::FoundLocal { child, hash, size, @@ -136,8 +136,8 @@ impl TransferState { blob.size = Some(size); blob.local_ranges = Some(valid_ranges); } - DownloadProgress::Connected => self.connected = true, - DownloadProgress::Found { + DownloadProgressEvent::Connected => self.connected = true, + DownloadProgressEvent::Found { id: progress_id, child: blob_id, hash, @@ -151,11 +151,11 @@ impl TransferState { // Otherwise, keep the existing verified size. value @ Some(BaoBlobSize::Verified(_)) => value, }; - blob.progress = BlobProgress::Progressing(0); + blob.progress = BlobProgressEvent::Progressing(0); self.progress_id_to_blob.insert(progress_id, blob_id); self.current = Some(blob_id); } - DownloadProgress::FoundHashSeq { hash, children } => { + DownloadProgressEvent::FoundHashSeq { hash, children } => { if hash == self.root.hash { self.root.child_count = Some(children); } else { @@ -164,22 +164,109 @@ impl TransferState { warn!("Received `FoundHashSeq` event for a hash which is not the download's root hash.") } } - DownloadProgress::Progress { id, offset } => { + DownloadProgressEvent::Progress { id, offset } => { if let Some(blob) = self.get_by_progress_id(id) { - blob.progress = BlobProgress::Progressing(offset); + blob.progress = BlobProgressEvent::Progressing(offset); } else { warn!(%id, "Received `Progress` event for unknown progress id.") } } - DownloadProgress::Done { id } => { + DownloadProgressEvent::Done { id } => { if let Some(blob) = self.get_by_progress_id(id) { - blob.progress = BlobProgress::Done; + blob.progress = BlobProgressEvent::Done; self.progress_id_to_blob.remove(&id); } else { warn!(%id, "Received `Done` event for unknown progress id.") } } - DownloadProgress::AllDone(_) | DownloadProgress::Abort(_) => {} + DownloadProgressEvent::AllDone(_) | DownloadProgressEvent::Abort(_) => {} + } + } +} + +/// Progress updates for the get operation. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DownloadProgressEvent { + /// Initial state if subscribing to a running or queued transfer. + InitialState(TransferState), + /// Data was found locally. + FoundLocal { + /// child offset + child: BlobId, + /// The hash of the entry. + hash: Hash, + /// The size of the entry in bytes. + size: BaoBlobSize, + /// The ranges that are available locally. + valid_ranges: RangeSpec, + }, + /// A new connection was established. + Connected, + /// An item was found with hash `hash`, from now on referred to via `id`. + Found { + /// A new unique progress id for this entry. + id: u64, + /// Identifier for this blob within this download. + /// + /// Will always be [`BlobId::Root`] unless a hashseq is downloaded, in which case this + /// allows to identify the children by their offset in the hashseq. + child: BlobId, + /// The hash of the entry. + hash: Hash, + /// The size of the entry in bytes. + size: u64, + }, + /// An item was found with hash `hash`, from now on referred to via `id`. + FoundHashSeq { + /// The name of the entry. + hash: Hash, + /// Number of children in the collection, if known. + children: u64, + }, + /// We got progress ingesting item `id`. + Progress { + /// The unique id of the entry. + id: u64, + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done with `id`. + Done { + /// The unique id of the entry. + id: u64, + }, + /// All operations finished. + /// + /// This will be the last message in the stream. + AllDone(Stats), + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(serde_error::Error), +} + +/// The id of a blob in a transfer +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, std::hash::Hash, Serialize, Deserialize, +)] +pub enum BlobId { + /// The root blob (child id 0) + Root, + /// A child blob (child id > 0) + Child(NonZeroU64), +} + +impl BlobId { + pub(crate) fn from_offset(id: u64) -> Self { + NonZeroU64::new(id).map(Self::Child).unwrap_or(Self::Root) + } +} + +impl From for u64 { + fn from(value: BlobId) -> Self { + match value { + BlobId::Root => 0, + BlobId::Child(id) => id.into(), } } } diff --git a/src/lib.rs b/src/lib.rs index 7091ad795..18f4021a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ pub mod cli; #[cfg(feature = "downloader")] pub mod downloader; -pub mod export; +#[cfg(feature = "formats")] pub mod format; pub mod get; pub mod hashseq; diff --git a/src/net_protocol.rs b/src/net_protocol.rs index ca62c146b..1cfb24f66 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -1,26 +1,22 @@ //! Adaptation of `iroh-blobs` as an `iroh` protocol. - -// TODO: reduce API surface and add documentation -#![allow(missing_docs)] - +//! +//! A blobs protocol handler wraps a store, so you must first create a store. +//! +//! The entry point to create a blobs protocol handler is [`Blobs::builder`]. use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc}; use anyhow::{bail, Result}; use futures_lite::future::Boxed as BoxedFuture; use futures_util::future::BoxFuture; -use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr}; -use serde::{Deserialize, Serialize}; +use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint}; use tracing::debug; use crate::{ downloader::Downloader, provider::EventSender, - store::GcConfig, - util::{ - local_pool::{self, LocalPoolHandle}, - SetTagOption, - }, - BlobFormat, Hash, + store::{GcConfig, Store}, + util::local_pool::{self, LocalPoolHandle}, + Hash, }; /// A callback that blobs can ask about a set of hashes that should not be garbage collected. @@ -50,9 +46,10 @@ pub(crate) struct BlobsInner { pub(crate) endpoint: Endpoint, gc_state: std::sync::Mutex, #[cfg(feature = "rpc")] - pub(crate) batches: tokio::sync::Mutex, + pub(crate) batches: tokio::sync::Mutex, } +/// Blobs protocol handler. #[derive(Debug, Clone)] pub struct Blobs { pub(crate) inner: Arc>, @@ -60,57 +57,69 @@ pub struct Blobs { pub(crate) rpc_handler: Arc>, } -/// Keeps track of all the currently active batch operations of the blobs api. -#[cfg(feature = "rpc")] -#[derive(Debug, Default)] -pub(crate) struct BlobBatches { - /// Currently active batches - batches: std::collections::BTreeMap, - /// Used to generate new batch ids. - max: u64, -} +pub(crate) mod batches { + use serde::{Deserialize, Serialize}; -/// A single batch of blob operations -#[cfg(feature = "rpc")] -#[derive(Debug, Default)] -struct BlobBatch { - /// The tags in this batch. - tags: std::collections::BTreeMap>, -} + /// Newtype for a batch id + #[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] + pub struct BatchId(pub u64); -#[cfg(feature = "rpc")] -impl BlobBatches { - /// Create a new unique batch id. - pub fn create(&mut self) -> BatchId { - let id = self.max; - self.max += 1; - BatchId(id) + /// Keeps track of all the currently active batch operations of the blobs api. + #[cfg(feature = "rpc")] + #[derive(Debug, Default)] + pub(crate) struct BlobBatches { + /// Currently active batches + batches: std::collections::BTreeMap, + /// Used to generate new batch ids. + max: u64, } - /// Store a temp tag in a batch identified by a batch id. - pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) { - let entry = self.batches.entry(batch).or_default(); - entry.tags.entry(tt.hash_and_format()).or_default().push(tt); + /// A single batch of blob operations + #[cfg(feature = "rpc")] + #[derive(Debug, Default)] + struct BlobBatch { + /// The tags in this batch. + tags: std::collections::BTreeMap>, } - /// Remove a tag from a batch. - pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> { - if let Some(batch) = self.batches.get_mut(&batch) { - if let Some(tags) = batch.tags.get_mut(content) { - tags.pop(); - if tags.is_empty() { - batch.tags.remove(content); + #[cfg(feature = "rpc")] + impl BlobBatches { + /// Create a new unique batch id. + pub fn create(&mut self) -> BatchId { + let id = self.max; + self.max += 1; + BatchId(id) + } + + /// Store a temp tag in a batch identified by a batch id. + pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) { + let entry = self.batches.entry(batch).or_default(); + entry.tags.entry(tt.hash_and_format()).or_default().push(tt); + } + + /// Remove a tag from a batch. + pub fn remove_one( + &mut self, + batch: BatchId, + content: &crate::HashAndFormat, + ) -> anyhow::Result<()> { + if let Some(batch) = self.batches.get_mut(&batch) { + if let Some(tags) = batch.tags.get_mut(content) { + tags.pop(); + if tags.is_empty() { + batch.tags.remove(content); + } + return Ok(()); } - return Ok(()); } + // this can happen if we try to upgrade a tag from an expired batch + anyhow::bail!("tag not found in batch"); } - // this can happen if we try to upgrade a tag from an expired batch - anyhow::bail!("tag not found in batch"); - } - /// Remove an entire batch. - pub fn remove(&mut self, batch: BatchId) { - self.batches.remove(&batch); + /// Remove an entire batch. + pub fn remove(&mut self, batch: BatchId) { + self.batches.remove(&batch); + } } } @@ -121,7 +130,7 @@ pub struct Builder { events: Option, } -impl Builder { +impl Builder { /// Set the event sender for the blobs protocol. pub fn events(mut self, value: EventSender) -> Self { self.events = Some(value); @@ -168,7 +177,11 @@ impl Blobs { } } -impl Blobs { +impl Blobs { + /// Create a new Blobs protocol handler. + /// + /// This is the low-level constructor that allows you to customize + /// everything. If you don't need that, consider using [`Blobs::builder`]. pub fn new( store: S, rt: LocalPoolHandle, @@ -192,22 +205,27 @@ impl Blobs { } } + /// Get the store. pub fn store(&self) -> &S { &self.inner.store } + /// Get the event sender. pub fn events(&self) -> &EventSender { &self.inner.events } + /// Get the local pool handle. pub fn rt(&self) -> &LocalPoolHandle { &self.inner.rt } + /// Get the downloader. pub fn downloader(&self) -> &Downloader { &self.inner.downloader } + /// Get the endpoint. pub fn endpoint(&self) -> &Endpoint { &self.inner.endpoint } @@ -255,7 +273,7 @@ impl Blobs { } } -impl ProtocolHandler for Blobs { +impl ProtocolHandler for Blobs { fn accept(&self, conn: Connecting) -> BoxedFuture> { let db = self.store().clone(); let events = self.events().clone(); @@ -274,42 +292,3 @@ impl ProtocolHandler for Blobs { }) } } - -/// A request to the node to download and share the data specified by the hash. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BlobDownloadRequest { - /// This mandatory field contains the hash of the data to download and share. - pub hash: Hash, - /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as - /// well. - pub format: BlobFormat, - /// This mandatory field specifies the nodes to download the data from. - /// - /// If set to more than a single node, they will all be tried. If `mode` is set to - /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. - /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, - /// if the concurrency limits permit. - pub nodes: Vec, - /// Optional tag to tag the data with. - pub tag: SetTagOption, - /// Whether to directly start the download or add it to the download queue. - pub mode: DownloadMode, -} - -/// Set the mode for whether to directly start the download or add it to the download queue. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum DownloadMode { - /// Start the download right away. - /// - /// No concurrency limits or queuing will be applied. It is up to the user to manage download - /// concurrency. - Direct, - /// Queue the download. - /// - /// The download queue will be processed in-order, while respecting the downloader concurrency limits. - Queued, -} - -/// Newtype for a batch id -#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] -pub struct BatchId(pub u64); diff --git a/src/provider.rs b/src/provider.rs index 9ae9c02fc..ea63ac13c 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -12,7 +12,6 @@ use iroh_io::{ stats::{SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter}, AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter, }; -use serde::{Deserialize, Serialize}; use tracing::{debug, debug_span, info, trace, warn}; use tracing_futures::Instrument; @@ -118,71 +117,6 @@ pub struct TransferStats { pub duration: Duration, } -/// Progress updates for the add operation. -#[derive(Debug, Serialize, Deserialize)] -pub enum AddProgress { - /// An item was found with name `name`, from now on referred to via `id` - Found { - /// A new unique id for this entry. - id: u64, - /// The name of the entry. - name: String, - /// The size of the entry in bytes. - size: u64, - }, - /// We got progress ingesting item `id`. - Progress { - /// The unique id of the entry. - id: u64, - /// The offset of the progress, in bytes. - offset: u64, - }, - /// We are done with `id`, and the hash is `hash`. - Done { - /// The unique id of the entry. - id: u64, - /// The hash of the entry. - hash: Hash, - }, - /// We are done with the whole operation. - AllDone { - /// The hash of the created data. - hash: Hash, - /// The format of the added data. - format: BlobFormat, - /// The tag of the added data. - tag: Tag, - }, - /// We got an error and need to abort. - /// - /// This will be the last message in the stream. - Abort(serde_error::Error), -} - -/// Progress updates for the batch add operation. -#[derive(Debug, Serialize, Deserialize)] -pub enum BatchAddPathProgress { - /// An item was found with the given size - Found { - /// The size of the entry in bytes. - size: u64, - }, - /// We got progress ingesting the item. - Progress { - /// The offset of the progress, in bytes. - offset: u64, - }, - /// We are done, and the hash is `hash`. - Done { - /// The hash of the entry. - hash: Hash, - }, - /// We got an error and need to abort. - /// - /// This will be the last message in the stream. - Abort(serde_error::Error), -} - /// Read the request from the getter. /// /// Will fail if there is an error while reading, if the reader diff --git a/src/rpc.rs b/src/rpc.rs index e1739b52c..09d4be154 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -8,7 +8,10 @@ use std::{ use anyhow::anyhow; use client::{ - blobs::{self, BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, MemClient, WrapOption}, + blobs::{ + self, AddProgressEvent, BatchAddPathProgressEvent, BlobInfo, BlobStatus, DownloadMode, + IncompleteBlobInfo, MemClient, WrapOption, + }, tags::TagInfo, MemConnector, }; @@ -23,10 +26,10 @@ use proto::{ AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate, BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, - BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse, - ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, - ReadAtResponse, ValidateRequest, + BatchUpdate, BlobDownloadRequest, BlobStatusRequest, BlobStatusResponse, + ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, + DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, + ReadAtRequest, ReadAtResponse, ValidateRequest, }, tags::{ CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, @@ -43,15 +46,13 @@ use tracing::{debug, warn}; use crate::{ downloader::{DownloadRequest, Downloader}, - export::ExportProgress, format::collection::Collection, - get::{ - db::{DownloadProgress, GetState}, - Stats, + get::{progress::DownloadProgressEvent, Stats}, + net_protocol::{Blobs, BlobsInner}, + store::{ + ConsistencyCheckProgress, ExportProgress, FetchState, ImportProgress, MapEntry, + ValidateProgress, }, - net_protocol::{BlobDownloadRequest, Blobs, BlobsInner}, - provider::{AddProgress, BatchAddPathProgress}, - store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress}, util::{ local_pool::LocalPoolHandle, progress::{AsyncChannelProgressSender, ProgressSender}, @@ -124,7 +125,7 @@ impl Handler { #[cfg(feature = "rpc")] pub(crate) async fn batches( &self, - ) -> tokio::sync::MutexGuard<'_, crate::net_protocol::BlobBatches> { + ) -> tokio::sync::MutexGuard<'_, crate::net_protocol::batches::BlobBatches> { self.0.batches.lock().await } @@ -376,7 +377,9 @@ impl Handler { let rt = self.rt().clone(); rt.spawn_detached(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { - tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok(); + tx2.send(AddProgressEvent::Abort(RpcError::new(&*e))) + .await + .ok(); } }); rx.map(AddPathResponse) @@ -437,7 +440,7 @@ impl Handler { .await { progress - .send(DownloadProgress::Abort(RpcError::new(&*err))) + .send(DownloadProgressEvent::Abort(RpcError::new(&*err))) .await .ok(); } @@ -451,7 +454,7 @@ impl Handler { let progress = AsyncChannelProgressSender::new(tx); let rt = self.rt().clone(); rt.spawn_detached(move || async move { - let res = crate::export::export( + let res = crate::store::export( self.store(), msg.hash, msg.path, @@ -474,7 +477,7 @@ impl Handler { async fn blob_add_from_path0( self, msg: AddPathRequest, - progress: async_channel::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { use std::collections::BTreeMap; @@ -491,12 +494,12 @@ impl Handler { } ImportProgress::Size { id, size } => { let name = names.lock().unwrap().remove(&id)?; - Some(AddProgress::Found { id, name, size }) + Some(AddProgressEvent::Found { id, name, size }) } ImportProgress::OutboardProgress { id, offset } => { - Some(AddProgress::Progress { id, offset }) + Some(AddProgressEvent::Progress { id, offset }) } - ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), + ImportProgress::OutboardDone { hash, id } => Some(AddProgressEvent::Done { hash, id }), _ => None, }); let AddPathRequest { @@ -581,7 +584,7 @@ impl Handler { SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, }; progress - .send(AddProgress::AllDone { + .send(AddProgressEvent::AllDone { hash, format, tag: tag.clone(), @@ -625,7 +628,7 @@ impl Handler { let this = self.clone(); self.rt().spawn_detached(|| async move { if let Err(e) = this.batch_add_from_path0(msg, tx).await { - tx2.send(BatchAddPathProgress::Abort(RpcError::new(&*e))) + tx2.send(BatchAddPathProgressEvent::Abort(RpcError::new(&*e))) .await .ok(); } @@ -670,16 +673,18 @@ impl Handler { async fn batch_add_from_path0( self, msg: BatchAddPathRequest, - progress: async_channel::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let progress = AsyncChannelProgressSender::new(progress); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), + ImportProgress::Size { size, .. } => Some(BatchAddPathProgressEvent::Found { size }), ImportProgress::OutboardProgress { offset, .. } => { - Some(BatchAddPathProgress::Progress { offset }) + Some(BatchAddPathProgressEvent::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => { + Some(BatchAddPathProgressEvent::Done { hash }) } - ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), _ => None, }); let BatchAddPathRequest { @@ -703,7 +708,9 @@ impl Handler { let hash = *tag.hash(); blobs.batches().await.store(batch, tag); - progress.send(BatchAddPathProgress::Done { hash }).await?; + progress + .send(BatchAddPathProgressEvent::Done { hash }) + .await?; Ok(()) } @@ -717,7 +724,9 @@ impl Handler { self.rt().spawn_detached(|| async move { if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { - tx.send(AddProgress::Abort(RpcError::new(&*err))).await.ok(); + tx.send(AddProgressEvent::Abort(RpcError::new(&*err))) + .await + .ok(); } }); @@ -728,7 +737,7 @@ impl Handler { self, msg: AddStreamRequest, stream: impl Stream + Send + Unpin + 'static, - progress: async_channel::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let progress = AsyncChannelProgressSender::new(progress); @@ -747,12 +756,12 @@ impl Handler { } ImportProgress::Size { id, size } => { let name = name_cache.lock().unwrap().take()?; - Some(AddProgress::Found { id, name, size }) + Some(AddProgressEvent::Found { id, name, size }) } ImportProgress::OutboardProgress { id, offset } => { - Some(AddProgress::Progress { id, offset }) + Some(AddProgressEvent::Progress { id, offset }) } - ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), + ImportProgress::OutboardDone { hash, id } => Some(AddProgressEvent::Done { hash, id }), _ => None, }); let blobs = self; @@ -773,7 +782,7 @@ impl Handler { SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?, }; progress - .send(AddProgress::AllDone { hash, tag, format }) + .send(AddProgressEvent::AllDone { hash, tag, format }) .await?; Ok(()) } @@ -934,7 +943,7 @@ impl Handler { &self, endpoint: Endpoint, req: BlobDownloadRequest, - progress: AsyncChannelProgressSender, + progress: AsyncChannelProgressSender, ) -> anyhow::Result<()> { let BlobDownloadRequest { hash, @@ -956,7 +965,10 @@ impl Handler { } }; - progress.send(DownloadProgress::AllDone(stats)).await.ok(); + progress + .send(DownloadProgressEvent::AllDone(stats)) + .await + .ok(); match tag { SetTagOption::Named(tag) => { self.store().set_tag(tag, Some(hash_and_format)).await?; @@ -975,7 +987,7 @@ impl Handler { endpoint: Endpoint, hash_and_format: HashAndFormat, nodes: Vec, - progress: AsyncChannelProgressSender, + progress: AsyncChannelProgressSender, ) -> anyhow::Result { /// Name used for logging when new node addresses are added from gossip. const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download"; @@ -1003,21 +1015,21 @@ impl Handler { endpoint: Endpoint, hash_and_format: HashAndFormat, nodes: Vec, - progress: AsyncChannelProgressSender, + progress: AsyncChannelProgressSender, ) -> anyhow::Result { let mut last_err = None; let mut remaining_nodes = nodes.len(); let mut nodes_iter = nodes.into_iter(); 'outer: loop { - match crate::get::db::get_to_db_in_steps( + match crate::store::get_to_db_in_steps( self.store().clone(), hash_and_format, progress.clone(), ) .await? { - GetState::Complete(stats) => return Ok(stats), - GetState::NeedsConn(needs_conn) => { + FetchState::Complete(stats) => return Ok(stats), + FetchState::NeedsConn(needs_conn) => { let (conn, node_id) = 'inner: loop { match nodes_iter.next() { None => break 'outer, diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 1809b6aaf..466f2c152 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -81,17 +81,22 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; -pub use crate::net_protocol::DownloadMode; use crate::{ - export::ExportProgress as BytesExportProgress, format::collection::{Collection, SimpleStore}, - get::db::DownloadProgress as BytesDownloadProgress, - net_protocol::BlobDownloadRequest, - rpc::proto::RpcService, - store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + rpc::proto::{blobs::BlobDownloadRequest, RpcService}, + store::{ + BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, + ExportProgress as BytesExportProgress, ValidateProgress, + }, util::SetTagOption, BlobFormat, Hash, Tag, }; +pub use crate::{ + get::progress::{ + BlobId, BlobProgressEvent, BlobState, DownloadProgressEvent, ProgressId, TransferState, + }, + rpc::proto::blobs::{AddProgressEvent, BatchAddPathProgressEvent}, +}; mod batch; pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; @@ -587,16 +592,14 @@ pub struct IncompleteBlobInfo { #[derive(derive_more::Debug)] pub struct AddProgress { #[debug(skip)] - stream: - Pin> + Send + Unpin + 'static>>, + stream: Pin> + Send + Unpin + 'static>>, current_total_size: Arc, } impl AddProgress { fn new( - stream: (impl Stream< - Item = Result, impl Into>, - > + Send + stream: (impl Stream, impl Into>> + + Send + Unpin + 'static), ) -> Self { @@ -605,7 +608,7 @@ impl AddProgress { let stream = stream.map(move |item| match item { Ok(item) => { let item = item.into(); - if let crate::provider::AddProgress::Found { size, .. } = &item { + if let AddProgressEvent::Found { size, .. } = &item { total_size.fetch_add(*size, Ordering::Relaxed); } Ok(item) @@ -630,7 +633,7 @@ impl AddProgress { } impl Stream for AddProgress { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } @@ -648,7 +651,7 @@ impl Future for AddProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - crate::provider::AddProgress::AllDone { hash, format, tag } => { + AddProgressEvent::AllDone { hash, format, tag } => { let outcome = AddOutcome { hash, format, @@ -657,7 +660,7 @@ impl Future for AddProgress { }; return Poll::Ready(Ok(outcome)); } - crate::provider::AddProgress::Abort(err) => { + AddProgressEvent::Abort(err) => { return Poll::Ready(Err(err.into())); } _ => {} @@ -682,15 +685,15 @@ pub struct DownloadOutcome { #[derive(derive_more::Debug)] pub struct DownloadProgress { #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, + stream: Pin> + Send + Unpin + 'static>>, current_local_size: Arc, current_network_size: Arc, } impl DownloadProgress { - /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors. + /// Create a [`DownloadProgress`] that can help you easily poll the [`DownloadProgressEvent`] stream from your download until it is finished or errors. pub fn new( - stream: (impl Stream, impl Into>> + stream: (impl Stream, impl Into>> + Send + Unpin + 'static), @@ -705,10 +708,10 @@ impl DownloadProgress { Ok(item) => { let item = item.into(); match &item { - BytesDownloadProgress::FoundLocal { size, .. } => { + DownloadProgressEvent::FoundLocal { size, .. } => { local_size.fetch_add(size.value(), Ordering::Relaxed); } - BytesDownloadProgress::Found { size, .. } => { + DownloadProgressEvent::Found { size, .. } => { network_size.fetch_add(*size, Ordering::Relaxed); } _ => {} @@ -736,7 +739,7 @@ impl DownloadProgress { } impl Stream for DownloadProgress { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } @@ -754,7 +757,7 @@ impl Future for DownloadProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - BytesDownloadProgress::AllDone(stats) => { + DownloadProgressEvent::AllDone(stats) => { let outcome = DownloadOutcome { local_size: self.current_local_size.load(Ordering::Relaxed), downloaded_size: self.current_network_size.load(Ordering::Relaxed), @@ -762,7 +765,7 @@ impl Future for DownloadProgress { }; return Poll::Ready(Ok(outcome)); } - BytesDownloadProgress::Abort(err) => { + DownloadProgressEvent::Abort(err) => { return Poll::Ready(Err(err.into())); } _ => {} @@ -859,6 +862,20 @@ impl Future for ExportProgress { } } +/// Set the mode for whether to directly start the download or add it to the download queue. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DownloadMode { + /// Start the download right away. + /// + /// No concurrency limits or queuing will be applied. It is up to the user to manage download + /// concurrency. + Direct, + /// Queue the download. + /// + /// The download queue will be processed in-order, while respecting the downloader concurrency limits. + Queued, +} + /// Data reader for a single blob. /// /// Implements [`AsyncRead`]. @@ -1848,10 +1865,10 @@ mod tests { while let Some(progress) = stream.next().await { match progress? { - crate::provider::AddProgress::AllDone { hash, .. } => { + AddProgressEvent::AllDone { hash, .. } => { return Ok(hash); } - crate::provider::AddProgress::Abort(e) => { + AddProgressEvent::Abort(e) => { anyhow::bail!("Error while adding data: {e}"); } _ => {} diff --git a/src/rpc/client/blobs/batch.rs b/src/rpc/client/blobs/batch.rs index b82f17837..dda57593f 100644 --- a/src/rpc/client/blobs/batch.rs +++ b/src/rpc/client/blobs/batch.rs @@ -17,15 +17,17 @@ use tracing::{debug, warn}; use super::WrapOption; use crate::{ format::collection::Collection, - net_protocol::BatchId, - provider::BatchAddPathProgress, - rpc::proto::{ - blobs::{ - BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, - BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, + net_protocol::batches::BatchId, + rpc::{ + client::blobs::BatchAddPathProgressEvent, + proto::{ + blobs::{ + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, + }, + tags::{self, SyncMode}, + RpcService, }, - tags::{self, SyncMode}, - RpcService, }, store::ImportMode, util::{SetTagOption, TagDrop}, @@ -266,13 +268,13 @@ where let mut res_size = None; while let Some(item) = stream.next().await { match item?.0 { - BatchAddPathProgress::Abort(cause) => { + BatchAddPathProgressEvent::Abort(cause) => { Err(cause)?; } - BatchAddPathProgress::Done { hash } => { + BatchAddPathProgressEvent::Done { hash } => { res_hash = Some(hash); } - BatchAddPathProgress::Found { size } => { + BatchAddPathProgressEvent::Found { size } => { res_size = Some(size); } _ => {} diff --git a/src/rpc/proto/blobs.rs b/src/rpc/proto/blobs.rs index 75fdad1c7..22e5f0188 100644 --- a/src/rpc/proto/blobs.rs +++ b/src/rpc/proto/blobs.rs @@ -2,21 +2,22 @@ use std::path::PathBuf; use bytes::Bytes; +use iroh::NodeAddr; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::{RpcError, RpcResult, RpcService}; +pub use crate::get::progress::DownloadProgressEvent; use crate::{ - export::ExportProgress, format::collection::Collection, - get::db::DownloadProgress, - net_protocol::{BatchId, BlobDownloadRequest}, - provider::{AddProgress, BatchAddPathProgress}, - rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption}, + net_protocol::batches::BatchId, + rpc::client::blobs::{ + BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption, + }, store::{ - BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, - ValidateProgress, + BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ExportProgress, + ImportMode, ValidateProgress, }, util::SetTagOption, BlobFormat, Hash, HashAndFormat, Tag, @@ -87,7 +88,7 @@ pub enum Response { /// A request to the node to provide the data at the given path /// -/// Will produce a stream of [`AddProgress`] messages. +/// Will produce a stream of [`AddProgressEvent`] messages. #[derive(Debug, Serialize, Deserialize)] pub struct AddPathRequest { /// The path to the data to provide. @@ -105,13 +106,13 @@ pub struct AddPathRequest { pub wrap: WrapOption, } -/// Wrapper around [`AddProgress`]. +/// Wrapper around [`AddProgressEvent`]. #[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct AddPathResponse(pub AddProgress); +pub struct AddPathResponse(pub AddProgressEvent); /// Progress response for [`BlobDownloadRequest`] #[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] -pub struct DownloadResponse(pub DownloadProgress); +pub struct DownloadResponse(pub DownloadProgressEvent); /// A request to the node to download and share the data specified by the hash. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -202,9 +203,9 @@ pub enum AddStreamUpdate { Abort, } -/// Wrapper around [`AddProgress`]. +/// Wrapper around [`AddProgressEvent`]. #[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct AddStreamResponse(pub AddProgress); +pub struct AddStreamResponse(pub AddProgressEvent); /// Delete a blob #[derive(Debug, Serialize, Deserialize)] @@ -291,7 +292,7 @@ pub enum BatchAddStreamUpdate { Abort, } -/// Wrapper around [`AddProgress`]. +/// Wrapper around [`AddProgressEvent`]. #[allow(missing_docs)] #[derive(Debug, Serialize, Deserialize)] pub enum BatchAddStreamResponse { @@ -315,4 +316,90 @@ pub struct BatchAddPathRequest { /// Response to a batch add path request #[derive(Serialize, Deserialize, Debug)] -pub struct BatchAddPathResponse(pub BatchAddPathProgress); +pub struct BatchAddPathResponse(pub BatchAddPathProgressEvent); + +/// A request to the node to download and share the data specified by the hash. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlobDownloadRequest { + /// This mandatory field contains the hash of the data to download and share. + pub hash: Hash, + /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as + /// well. + pub format: BlobFormat, + /// This mandatory field specifies the nodes to download the data from. + /// + /// If set to more than a single node, they will all be tried. If `mode` is set to + /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. + /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, + /// if the concurrency limits permit. + pub nodes: Vec, + /// Optional tag to tag the data with. + pub tag: SetTagOption, + /// Whether to directly start the download or add it to the download queue. + pub mode: DownloadMode, +} + +/// Progress updates for the add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum AddProgressEvent { + /// An item was found with name `name`, from now on referred to via `id` + Found { + /// A new unique id for this entry. + id: u64, + /// The name of the entry. + name: String, + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting item `id`. + Progress { + /// The unique id of the entry. + id: u64, + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done with `id`, and the hash is `hash`. + Done { + /// The unique id of the entry. + id: u64, + /// The hash of the entry. + hash: Hash, + }, + /// We are done with the whole operation. + AllDone { + /// The hash of the created data. + hash: Hash, + /// The format of the added data. + format: BlobFormat, + /// The tag of the added data. + tag: Tag, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(serde_error::Error), +} + +/// Progress updates for the batch add operation. +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddPathProgressEvent { + /// An item was found with the given size + Found { + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting the item. + Progress { + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done, and the hash is `hash`. + Done { + /// The hash of the entry. + hash: Hash, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(serde_error::Error), +} diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 54d35f625..4ba6353aa 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -4,7 +4,7 @@ use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::{RpcResult, RpcService}; -use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag}; +use crate::{net_protocol::batches::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/src/store.rs b/src/store.rs index 3030d55b3..81562f5d9 100644 --- a/src/store.rs +++ b/src/store.rs @@ -10,6 +10,15 @@ pub mod readonly_mem; #[cfg(feature = "fs-store")] pub mod fs; +mod get_to_db; +pub use get_to_db::{ + blob_info, get_to_db, get_to_db_in_steps, valid_ranges, FetchState, FetchStateNeedsConn, +}; +mod export; +#[cfg(feature = "formats-collection")] +pub use export::export_collection; +pub use export::{export, export_blob, ExportProgress}; + mod traits; use tracing::warn; pub use traits::*; diff --git a/src/export.rs b/src/store/export.rs similarity index 95% rename from src/export.rs rename to src/store/export.rs index 3576edf1d..2f7286600 100644 --- a/src/export.rs +++ b/src/store/export.rs @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize}; use tracing::trace; use crate::{ - format::collection::Collection, store::{BaoBlobSize, ExportFormat, ExportMode, MapEntry, Store as BaoStore}, util::progress::{IdGenerator, ProgressSender}, Hash, @@ -32,11 +31,13 @@ pub async fn export( ) -> anyhow::Result<()> { match format { ExportFormat::Blob => export_blob(db, hash, outpath, mode, progress).await, + #[cfg(feature = "formats-collection")] ExportFormat::Collection => export_collection(db, hash, outpath, mode, progress).await, } } /// Export all entries of a collection, recursively, to files on the local filesystem. +#[cfg(feature = "formats-collection")] pub async fn export_collection( db: &D, hash: Hash, @@ -45,7 +46,7 @@ pub async fn export_collection( progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { tokio::fs::create_dir_all(&outpath).await?; - let collection = Collection::load_db(db, &hash).await?; + let collection = crate::format::collection::Collection::load_db(db, &hash).await?; for (name, hash) in collection.into_iter() { #[allow(clippy::needless_borrow)] let path = outpath.join(pathbuf_from_name(&name)); @@ -126,6 +127,7 @@ pub enum ExportProgress { Abort(serde_error::Error), } +#[cfg(feature = "formats-collection")] fn pathbuf_from_name(name: &str) -> PathBuf { let mut path = PathBuf::new(); for part in name.split('/') { diff --git a/src/get/db.rs b/src/store/get_to_db.rs similarity index 77% rename from src/get/db.rs rename to src/store/get_to_db.rs index 783bbabc5..f90764136 100644 --- a/src/get/db.rs +++ b/src/store/get_to_db.rs @@ -1,6 +1,6 @@ //! Functions that use the iroh-blobs protocol in conjunction with a bao store. -use std::{future::Future, io, num::NonZeroU64, pin::Pin}; +use std::{future::Future, io, pin::Pin}; use anyhow::anyhow; use bao_tree::{ChunkNum, ChunkRanges}; @@ -11,17 +11,15 @@ use genawaiter::{ }; use iroh::endpoint::Connection; use iroh_io::AsyncSliceReader; -use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; use tracing::trace; use crate::{ get::{ self, - error::GetError, fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, - progress::TransferState, - Stats, + progress::{BlobId, DownloadProgressEvent}, + Error, Stats, }, hashseq::parse_hash_seq, protocol::{GetRequest, RangeSpec, RangeSpecSeq}, @@ -33,16 +31,16 @@ use crate::{ BlobFormat, Hash, HashAndFormat, }; -type GetGenerator = Gen>>>>; -type GetFuture = Pin> + 'static>>; +type GetGenerator = Gen>>>>; +type GetFuture = Pin> + 'static>>; /// Get a blob or collection into a store. /// /// This considers data that is already in the store, and will only request /// the remaining data. /// -/// Progress is reported as [`DownloadProgress`] through a [`ProgressSender`]. Note that the -/// [`DownloadProgress::AllDone`] event is not emitted from here, but left to an upper layer to send, +/// Progress is reported as [`DownloadProgressEvent`] through a [`ProgressSender`]. Note that the +/// [`DownloadProgressEvent::AllDone`] event is not emitted from here, but left to an upper layer to send, /// if desired. pub async fn get_to_db< D: BaoStore, @@ -52,12 +50,12 @@ pub async fn get_to_db< db: &D, get_conn: C, hash_and_format: &HashAndFormat, - progress_sender: impl ProgressSender + IdGenerator, -) -> Result { + progress_sender: impl ProgressSender + IdGenerator, +) -> Result { match get_to_db_in_steps(db.clone(), *hash_and_format, progress_sender).await? { - GetState::Complete(res) => Ok(res), - GetState::NeedsConn(state) => { - let conn = get_conn().await.map_err(GetError::Io)?; + FetchState::Complete(res) => Ok(res), + FetchState::NeedsConn(state) => { + let conn = get_conn().await.map_err(Error::Io)?; state.proceed(conn).await } } @@ -65,22 +63,22 @@ pub async fn get_to_db< /// Get a blob or collection into a store, yielding if a connection is needed. /// -/// This checks a get request against a local store, and returns [`GetState`], +/// This checks a get request against a local store, and returns [`FetchState`], /// which is either `Complete` in case the requested data is fully available in the local store, or /// `NeedsConn`, once a connection is needed to proceed downloading the missing data. /// -/// In the latter case, call [`GetStateNeedsConn::proceed`] with a connection to a provider to +/// In the latter case, call [`FetchStateNeedsConn::proceed`] with a connection to a provider to /// proceed with the download. /// /// Progress reporting works in the same way as documented in [`get_to_db`]. pub async fn get_to_db_in_steps< D: BaoStore, - P: ProgressSender + IdGenerator, + P: ProgressSender + IdGenerator, >( db: D, hash_and_format: HashAndFormat, progress_sender: P, -) -> Result { +) -> Result { let mut gen: GetGenerator = genawaiter::rc::Gen::new(move |co| { let fut = async move { producer(co, &db, &hash_and_format, progress_sender).await }; let fut: GetFuture = Box::pin(fut); @@ -88,21 +86,21 @@ pub async fn get_to_db_in_steps< }); match gen.async_resume().await { GeneratorState::Yielded(Yield::NeedConn(reply)) => { - Ok(GetState::NeedsConn(GetStateNeedsConn(gen, reply))) + Ok(FetchState::NeedsConn(FetchStateNeedsConn(gen, reply))) } - GeneratorState::Complete(res) => res.map(GetState::Complete), + GeneratorState::Complete(res) => res.map(FetchState::Complete), } } /// Intermediary state returned from [`get_to_db_in_steps`] for a download request that needs a /// connection to proceed. #[derive(derive_more::Debug)] -#[debug("GetStateNeedsConn")] -pub struct GetStateNeedsConn(GetGenerator, oneshot::Sender); +#[debug("FetchStateNeedsConn")] +pub struct FetchStateNeedsConn(GetGenerator, oneshot::Sender); -impl GetStateNeedsConn { +impl FetchStateNeedsConn { /// Proceed with the download by providing a connection to a provider. - pub async fn proceed(mut self, conn: Connection) -> Result { + pub async fn proceed(mut self, conn: Connection) -> Result { self.1.send(conn).expect("receiver is not dropped"); match self.0.async_resume().await { GeneratorState::Yielded(y) => match y { @@ -115,15 +113,15 @@ impl GetStateNeedsConn { /// Output of [`get_to_db_in_steps`]. #[derive(Debug)] -pub enum GetState { +pub enum FetchState { /// The requested data is completely available in the local store, no network requests are /// needed. Complete(Stats), /// The requested data is not fully available in the local store, we need a connection to /// proceed. /// - /// Once a connection is available, call [`GetStateNeedsConn::proceed`] to continue. - NeedsConn(GetStateNeedsConn), + /// Once a connection is available, call [`FetchStateNeedsConn::proceed`] to continue. + NeedsConn(FetchStateNeedsConn), } struct GetCo(Co); @@ -144,8 +142,8 @@ async fn producer( co: Co, db: &D, hash_and_format: &HashAndFormat, - progress: impl ProgressSender + IdGenerator, -) -> Result { + progress: impl ProgressSender + IdGenerator, +) -> Result { let HashAndFormat { hash, format } = hash_and_format; let co = GetCo(co); match format { @@ -162,13 +160,13 @@ async fn get_blob( db: &D, co: GetCo, hash: &Hash, - progress: impl ProgressSender + IdGenerator, -) -> Result { + progress: impl ProgressSender + IdGenerator, +) -> Result { let end = match db.get_mut(hash).await? { Some(entry) if entry.is_complete() => { tracing::info!("already got entire blob"); progress - .send(DownloadProgress::FoundLocal { + .send(DownloadProgressEvent::FoundLocal { child: BlobId::Root, hash: *hash, size: entry.size(), @@ -184,7 +182,7 @@ async fn get_blob( .ok() .unwrap_or_else(ChunkRanges::all); progress - .send(DownloadProgress::FoundLocal { + .send(DownloadProgressEvent::FoundLocal { child: BlobId::Root, hash: *hash, size: entry.size(), @@ -201,7 +199,7 @@ async fn get_blob( let connected = request.next().await?; // next step. we have requested a single hash, so this must be StartRoot let ConnectedNext::StartRoot(start) = connected.next().await? else { - return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot"))); + return Err(Error::NoncompliantNode(anyhow!("expected StartRoot"))); }; // move to the header let header = start.next(); @@ -217,7 +215,7 @@ async fn get_blob( let connected = request.next().await?; // next step. we have requested a single hash, so this must be StartRoot let ConnectedNext::StartRoot(start) = connected.next().await? else { - return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot"))); + return Err(Error::NoncompliantNode(anyhow!("expected StartRoot"))); }; // move to the header let header = start.next(); @@ -228,7 +226,7 @@ async fn get_blob( // we have requested a single hash, so we must be at closing let EndBlobNext::Closing(end) = end.next() else { - return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot"))); + return Err(Error::NoncompliantNode(anyhow!("expected StartRoot"))); }; // this closes the bidi stream. Do something with the stats? let stats = end.next().await?; @@ -263,8 +261,8 @@ pub async fn valid_ranges(entry: &D::EntryMut) -> anyhow::Result( db: &D, at_header: AtBlobHeader, - sender: impl ProgressSender + IdGenerator, -) -> Result { + sender: impl ProgressSender + IdGenerator, +) -> Result { // read the size. The size we get here is not verified, but since we use // it for the tree traversal we are guaranteed not to get more than size. let (at_content, size) = at_header.next().await?; @@ -277,7 +275,7 @@ async fn get_blob_inner( // allocate a new id for progress reports for this transfer let id = sender.new_id(); sender - .send(DownloadProgress::Found { + .send(DownloadProgressEvent::Found { id, hash, size, @@ -289,7 +287,7 @@ async fn get_blob_inner( // if try send fails it means that the receiver has been dropped. // in that case we want to abort the write_all_with_outboard. sender2 - .try_send(DownloadProgress::Progress { id, offset }) + .try_send(DownloadProgressEvent::Progress { id, offset }) .inspect_err(|_| { tracing::info!("aborting download of {}", hash); })?; @@ -303,7 +301,7 @@ async fn get_blob_inner( drop(bw); db.insert_complete(entry).await?; // notify that we are done - sender.send(DownloadProgress::Done { id }).await?; + sender.send(DownloadProgressEvent::Done { id }).await?; Ok(end) } @@ -315,8 +313,8 @@ async fn get_blob_inner_partial( db: &D, at_header: AtBlobHeader, entry: D::EntryMut, - sender: impl ProgressSender + IdGenerator, -) -> Result { + sender: impl ProgressSender + IdGenerator, +) -> Result { // read the size. The size we get here is not verified, but since we use // it for the tree traversal we are guaranteed not to get more than size. let (at_content, size) = at_header.next().await?; @@ -327,7 +325,7 @@ async fn get_blob_inner_partial( let hash = at_content.hash(); let child_offset = at_content.offset(); sender - .send(DownloadProgress::Found { + .send(DownloadProgressEvent::Found { id, hash, size, @@ -339,7 +337,7 @@ async fn get_blob_inner_partial( // if try send fails it means that the receiver has been dropped. // in that case we want to abort the write_all_with_outboard. sender2 - .try_send(DownloadProgress::Progress { id, offset }) + .try_send(DownloadProgressEvent::Progress { id, offset }) .inspect_err(|_| { tracing::info!("aborting download of {}", hash); })?; @@ -357,7 +355,7 @@ async fn get_blob_inner_partial( // data. We can't re-check this here since that would be very expensive. db.insert_complete(entry).await?; // notify that we are done - sender.send(DownloadProgress::Done { id }).await?; + sender.send(DownloadProgressEvent::Done { id }).await?; Ok(at_end) } @@ -396,15 +394,15 @@ async fn get_hash_seq( db: &D, co: GetCo, root_hash: &Hash, - sender: impl ProgressSender + IdGenerator, -) -> Result { + sender: impl ProgressSender + IdGenerator, +) -> Result { use tracing::info as log; let finishing = match db.get_mut(root_hash).await? { Some(entry) if entry.is_complete() => { log!("already got collection - doing partial download"); // send info that we have the hashseq itself entirely sender - .send(DownloadProgress::FoundLocal { + .send(DownloadProgressEvent::FoundLocal { child: BlobId::Root, hash: *root_hash, size: entry.size(), @@ -414,10 +412,10 @@ async fn get_hash_seq( // got the collection let reader = entry.data_reader().await?; let (mut hash_seq, children) = parse_hash_seq(reader).await.map_err(|err| { - GetError::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}")) + Error::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}")) })?; sender - .send(DownloadProgress::FoundHashSeq { + .send(DownloadProgressEvent::FoundHashSeq { hash: *root_hash, children, }) @@ -431,7 +429,7 @@ async fn get_hash_seq( for (i, info) in missing_info.iter().enumerate() { if let Some(size) = info.size() { sender - .send(DownloadProgress::FoundLocal { + .send(DownloadProgressEvent::FoundLocal { child: BlobId::from_offset((i as u64) + 1), hash: children[i], size, @@ -460,7 +458,7 @@ async fn get_hash_seq( log!("connected"); // we have not requested the root, so this must be StartChild let ConnectedNext::StartChild(start) = connected.next().await? else { - return Err(GetError::NoncompliantNode(anyhow!("expected StartChild"))); + return Err(Error::NoncompliantNode(anyhow!("expected StartChild"))); }; let mut next = EndBlobNext::MoreChildren(start); // read all the children @@ -470,7 +468,7 @@ async fn get_hash_seq( EndBlobNext::Closing(finish) => break finish, }; let child_offset = usize::try_from(start.child_offset()) - .map_err(|_| GetError::NoncompliantNode(anyhow!("child offset too large")))?; + .map_err(|_| Error::NoncompliantNode(anyhow!("child offset too large")))?; let (child_hash, info) = match (children.get(child_offset), missing_info.get(child_offset)) { (Some(blob), Some(info)) => (*blob, info), @@ -488,7 +486,7 @@ async fn get_hash_seq( get_blob_inner_partial(db, header, entry.clone(), sender.clone()).await? } BlobInfo::Complete { .. } => { - return Err(GetError::NoncompliantNode(anyhow!( + return Err(Error::NoncompliantNode(anyhow!( "got data we have not requested" ))); } @@ -505,7 +503,7 @@ async fn get_hash_seq( let connected = request.next().await?; // next step. we have requested a single hash, so this must be StartRoot let ConnectedNext::StartRoot(start) = connected.next().await? else { - return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot"))); + return Err(Error::NoncompliantNode(anyhow!("expected StartRoot"))); }; // move to the header let header = start.next(); @@ -515,13 +513,13 @@ async fn get_hash_seq( let entry = db .get(root_hash) .await? - .ok_or_else(|| GetError::LocalFailure(anyhow!("just downloaded but not in db")))?; + .ok_or_else(|| Error::LocalFailure(anyhow!("just downloaded but not in db")))?; let reader = entry.data_reader().await?; let (mut collection, count) = parse_hash_seq(reader).await.map_err(|err| { - GetError::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}")) + Error::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}")) })?; sender - .send(DownloadProgress::FoundHashSeq { + .send(DownloadProgressEvent::FoundHashSeq { hash: *root_hash, children: count, }) @@ -538,7 +536,7 @@ async fn get_hash_seq( EndBlobNext::Closing(finish) => break finish, }; let child_offset = usize::try_from(start.child_offset()) - .map_err(|_| GetError::NoncompliantNode(anyhow!("child offset too large")))?; + .map_err(|_| Error::NoncompliantNode(anyhow!("child offset too large")))?; let child_hash = match children.get(child_offset) { Some(blob) => *blob, @@ -608,91 +606,3 @@ impl BlobInfo { } } } - -/// Progress updates for the get operation. -// TODO: Move to super::progress -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum DownloadProgress { - /// Initial state if subscribing to a running or queued transfer. - InitialState(TransferState), - /// Data was found locally. - FoundLocal { - /// child offset - child: BlobId, - /// The hash of the entry. - hash: Hash, - /// The size of the entry in bytes. - size: BaoBlobSize, - /// The ranges that are available locally. - valid_ranges: RangeSpec, - }, - /// A new connection was established. - Connected, - /// An item was found with hash `hash`, from now on referred to via `id`. - Found { - /// A new unique progress id for this entry. - id: u64, - /// Identifier for this blob within this download. - /// - /// Will always be [`BlobId::Root`] unless a hashseq is downloaded, in which case this - /// allows to identify the children by their offset in the hashseq. - child: BlobId, - /// The hash of the entry. - hash: Hash, - /// The size of the entry in bytes. - size: u64, - }, - /// An item was found with hash `hash`, from now on referred to via `id`. - FoundHashSeq { - /// The name of the entry. - hash: Hash, - /// Number of children in the collection, if known. - children: u64, - }, - /// We got progress ingesting item `id`. - Progress { - /// The unique id of the entry. - id: u64, - /// The offset of the progress, in bytes. - offset: u64, - }, - /// We are done with `id`. - Done { - /// The unique id of the entry. - id: u64, - }, - /// All operations finished. - /// - /// This will be the last message in the stream. - AllDone(Stats), - /// We got an error and need to abort. - /// - /// This will be the last message in the stream. - Abort(serde_error::Error), -} - -/// The id of a blob in a transfer -#[derive( - Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, std::hash::Hash, Serialize, Deserialize, -)] -pub enum BlobId { - /// The root blob (child id 0) - Root, - /// A child blob (child id > 0) - Child(NonZeroU64), -} - -impl BlobId { - fn from_offset(id: u64) -> Self { - NonZeroU64::new(id).map(Self::Child).unwrap_or(Self::Root) - } -} - -impl From for u64 { - fn from(value: BlobId) -> Self { - match value { - BlobId::Root => 0, - BlobId::Child(id) => id.into(), - } - } -} diff --git a/src/store/traits.rs b/src/store/traits.rs index e7982ebbd..d40fc0926 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -819,31 +819,10 @@ pub enum ExportFormat { /// destination path. /// /// If the blob cannot be parsed as a collection, the operation will fail. + #[cfg(feature = "formats-collection")] Collection, } -#[allow(missing_docs)] -#[derive(Debug)] -pub enum ExportProgress { - /// Starting to export to a file - /// - /// This will be the first message for an id - Start { - id: u64, - hash: Hash, - path: PathBuf, - stable: bool, - }, - /// Progress when copying the file to the target - /// - /// This will be omitted if the store can move the file or use copy on write - /// - /// There will be multiple of these messages for an id - Progress { id: u64, offset: u64 }, - /// Done exporting - Done { id: u64 }, -} - /// Level for generic validation messages #[derive( Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,