Skip to content

refactor: Consolidate the blobs API #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "net_protocol"]
default = ["fs-store", "net_protocol", "formats-collection"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
@@ -112,6 +112,8 @@ rpc = [
"dep:walkdir",
"downloader",
]
formats = []
formats-collection = ["formats"]

example-iroh = [
"dep:clap",
@@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]

[[example]]
name = "provide-bytes"
required-features = ["formats-collection"]

[[example]]
name = "fetch-fsm"
4 changes: 2 additions & 2 deletions examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
@@ -66,13 +66,13 @@ async fn main() -> Result<()> {
"'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.",
);

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress
// `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress
// on the state of your download.
let download_stream = blobs_client
.download(ticket.hash(), ticket.node_addr().clone())
.await?;

// You can also just `await` the stream, which will poll the `DownloadProgress` stream for you.
// You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you.
let outcome = download_stream.await.context("unable to download hash")?;

println!(
29 changes: 15 additions & 14 deletions examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
@@ -140,13 +140,14 @@ mod progress {
ProgressStyle,
};
use iroh_blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
get::Stats,
rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent},
Hash,
};

pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
@@ -157,7 +158,7 @@ mod progress {
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
@@ -177,21 +178,21 @@ mod progress {
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
@@ -201,7 +202,7 @@ mod progress {
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
@@ -210,13 +211,13 @@ mod progress {
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
@@ -230,7 +231,7 @@ mod progress {
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
47 changes: 23 additions & 24 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -19,11 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
use tokio::io::AsyncWriteExt;

use crate::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
net_protocol::DownloadMode,
provider::AddProgress,
get::Stats,
rpc::client::blobs::{
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo,
DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption,
},
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
ticket::BlobTicket,
@@ -895,29 +894,29 @@ pub struct ProvideResponseEntry {
pub hash: Hash,
}

/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple.
/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple.
pub async fn aggregate_add_response(
mut stream: impl Stream<Item = Result<AddProgress>> + Unpin,
mut stream: impl Stream<Item = Result<AddProgressEvent>> + Unpin,
) -> Result<(Hash, BlobFormat, Vec<ProvideResponseEntry>)> {
let mut hash_and_format = None;
let mut collections = BTreeMap::<u64, (String, u64, Option<Hash>)>::new();
let mut mp = Some(ProvideProgressState::new());
while let Some(item) = stream.next().await {
match item? {
AddProgress::Found { name, id, size } => {
AddProgressEvent::Found { name, id, size } => {
tracing::trace!("Found({id},{name},{size})");
if let Some(mp) = mp.as_mut() {
mp.found(name.clone(), id, size);
}
collections.insert(id, (name, size, None));
}
AddProgress::Progress { id, offset } => {
AddProgressEvent::Progress { id, offset } => {
tracing::trace!("Progress({id}, {offset})");
if let Some(mp) = mp.as_mut() {
mp.progress(id, offset);
}
}
AddProgress::Done { hash, id } => {
AddProgressEvent::Done { hash, id } => {
tracing::trace!("Done({id},{hash:?})");
if let Some(mp) = mp.as_mut() {
mp.done(id, hash);
@@ -931,15 +930,15 @@ pub async fn aggregate_add_response(
}
}
}
AddProgress::AllDone { hash, format, .. } => {
AddProgressEvent::AllDone { hash, format, .. } => {
tracing::trace!("AllDone({hash:?})");
if let Some(mp) = mp.take() {
mp.all_done();
}
hash_and_format = Some(HashAndFormat { hash, format });
break;
}
AddProgress::Abort(e) => {
AddProgressEvent::Abort(e) => {
if let Some(mp) = mp.take() {
mp.error();
}
@@ -1032,7 +1031,7 @@ impl ProvideProgressState {
/// Displays the download progress for a given stream.
pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
@@ -1043,7 +1042,7 @@ pub async fn show_download_progress(
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
@@ -1063,21 +1062,21 @@ pub async fn show_download_progress(
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
@@ -1087,7 +1086,7 @@ pub async fn show_download_progress(
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
@@ -1096,13 +1095,13 @@ pub async fn show_download_progress(
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
@@ -1116,7 +1115,7 @@ pub async fn show_download_progress(
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{progress::DownloadProgressEvent, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
@@ -797,7 +797,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
if let Some(sender) = handlers.on_progress {
self.progress_tracker.unsubscribe(&kind, &sender);
sender
.send(DownloadProgress::Abort(serde_error::Error::new(
.send(DownloadProgressEvent::Abort(serde_error::Error::new(
&*anyhow::Error::from(DownloadError::Cancelled),
)))
.await
34 changes: 16 additions & 18 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
@@ -7,20 +7,20 @@ use iroh::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
get::{db::get_to_db_in_steps, error::GetError},
store::Store,
get::Error,
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is get and fetch now? this is confusing 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that needs a store is in the store module. I might even make it a fn on the store traits (see the OO or not OO question)

One reason for this is that I think it would be neat if you could hide the entire store behind a ff, but to do that you must make sure that store stuff is not all over the place, otherwise you will go crazy.

};

impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
impl From<Error> for FailureAction {
fn from(e: Error) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
}
}
}
@@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {

impl<S: Store> Getter for IoGetter<S> {
type Connection = endpoint::Connection;
type NeedsConn = crate::get::db::GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
@@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
async move {
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
Err(err) => Err(err.into()),
Ok(crate::get::db::GetState::Complete(stats)) => {
Ok(super::GetOutput::Complete(stats))
}
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
Ok(FetchState::NeedsConn(needs_conn)) => {
Ok(super::GetOutput::NeedsConn(needs_conn))
}
}
@@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
}
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
@@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
@@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
},
}
Loading