Skip to content

refactor: Consolidate the blobs API #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "net_protocol"]
default = ["fs-store", "net_protocol", "formats-collection"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
Expand All @@ -112,6 +112,8 @@ rpc = [
"dep:walkdir",
"downloader",
]
formats = []
formats-collection = ["formats"]

example-iroh = [
"dep:clap",
Expand All @@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]

[[example]]
name = "provide-bytes"
required-features = ["formats-collection"]

[[example]]
name = "fetch-fsm"
Expand Down
4 changes: 2 additions & 2 deletions examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ async fn main() -> Result<()> {
"'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.",
);

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress
// `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress
// on the state of your download.
let download_stream = blobs_client
.download(ticket.hash(), ticket.node_addr().clone())
.await?;

// You can also just `await` the stream, which will poll the `DownloadProgress` stream for you.
// You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you.
let outcome = download_stream.await.context("unable to download hash")?;

println!(
Expand Down
29 changes: 15 additions & 14 deletions examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ mod progress {
ProgressStyle,
};
use iroh_blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
get::Stats,
rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent},
Hash,
};

pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
Expand All @@ -157,7 +158,7 @@ mod progress {
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
Expand All @@ -177,21 +178,21 @@ mod progress {
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
Expand All @@ -201,7 +202,7 @@ mod progress {
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
Expand All @@ -210,13 +211,13 @@ mod progress {
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -230,7 +231,7 @@ mod progress {
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
Expand Down
47 changes: 23 additions & 24 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
use tokio::io::AsyncWriteExt;

use crate::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
net_protocol::DownloadMode,
provider::AddProgress,
get::Stats,
rpc::client::blobs::{
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo,
DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption,
},
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
ticket::BlobTicket,
Expand Down Expand Up @@ -895,29 +894,29 @@ pub struct ProvideResponseEntry {
pub hash: Hash,
}

/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple.
/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple.
pub async fn aggregate_add_response(
mut stream: impl Stream<Item = Result<AddProgress>> + Unpin,
mut stream: impl Stream<Item = Result<AddProgressEvent>> + Unpin,
) -> Result<(Hash, BlobFormat, Vec<ProvideResponseEntry>)> {
let mut hash_and_format = None;
let mut collections = BTreeMap::<u64, (String, u64, Option<Hash>)>::new();
let mut mp = Some(ProvideProgressState::new());
while let Some(item) = stream.next().await {
match item? {
AddProgress::Found { name, id, size } => {
AddProgressEvent::Found { name, id, size } => {
tracing::trace!("Found({id},{name},{size})");
if let Some(mp) = mp.as_mut() {
mp.found(name.clone(), id, size);
}
collections.insert(id, (name, size, None));
}
AddProgress::Progress { id, offset } => {
AddProgressEvent::Progress { id, offset } => {
tracing::trace!("Progress({id}, {offset})");
if let Some(mp) = mp.as_mut() {
mp.progress(id, offset);
}
}
AddProgress::Done { hash, id } => {
AddProgressEvent::Done { hash, id } => {
tracing::trace!("Done({id},{hash:?})");
if let Some(mp) = mp.as_mut() {
mp.done(id, hash);
Expand All @@ -931,15 +930,15 @@ pub async fn aggregate_add_response(
}
}
}
AddProgress::AllDone { hash, format, .. } => {
AddProgressEvent::AllDone { hash, format, .. } => {
tracing::trace!("AllDone({hash:?})");
if let Some(mp) = mp.take() {
mp.all_done();
}
hash_and_format = Some(HashAndFormat { hash, format });
break;
}
AddProgress::Abort(e) => {
AddProgressEvent::Abort(e) => {
if let Some(mp) = mp.take() {
mp.error();
}
Expand Down Expand Up @@ -1032,7 +1031,7 @@ impl ProvideProgressState {
/// Displays the download progress for a given stream.
pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
Expand All @@ -1043,7 +1042,7 @@ pub async fn show_download_progress(
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
Expand All @@ -1063,21 +1062,21 @@ pub async fn show_download_progress(
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
Expand All @@ -1087,7 +1086,7 @@ pub async fn show_download_progress(
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
Expand All @@ -1096,13 +1095,13 @@ pub async fn show_download_progress(
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -1116,7 +1115,7 @@ pub async fn show_download_progress(
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{progress::DownloadProgressEvent, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down Expand Up @@ -797,7 +797,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
if let Some(sender) = handlers.on_progress {
self.progress_tracker.unsubscribe(&kind, &sender);
sender
.send(DownloadProgress::Abort(serde_error::Error::new(
.send(DownloadProgressEvent::Abort(serde_error::Error::new(
&*anyhow::Error::from(DownloadError::Cancelled),
)))
.await
Expand Down
34 changes: 16 additions & 18 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use iroh::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
get::{db::get_to_db_in_steps, error::GetError},
store::Store,
get::Error,
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is get and fetch now? this is confusing 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that needs a store is in the store module. I might even make it a fn on the store traits (see the OO or not OO question)

One reason for this is that I think it would be neat if you could hide the entire store behind a ff, but to do that you must make sure that store stuff is not all over the place, otherwise you will go crazy.

};

impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
impl From<Error> for FailureAction {
fn from(e: Error) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
}
}
}
Expand All @@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {

impl<S: Store> Getter for IoGetter<S> {
type Connection = endpoint::Connection;
type NeedsConn = crate::get::db::GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
Expand All @@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
async move {
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
Err(err) => Err(err.into()),
Ok(crate::get::db::GetState::Complete(stats)) => {
Ok(super::GetOutput::Complete(stats))
}
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
Ok(FetchState::NeedsConn(needs_conn)) => {
Ok(super::GetOutput::NeedsConn(needs_conn))
}
}
Expand All @@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
}
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
Expand All @@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
Expand All @@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
},
}
Expand Down
Loading
Loading