Skip to content

Commit 67f0d31

Browse files
committed
Rename DownloadProgress to DownloadProgressEvent and BlobProgress to BlobProgressEvent
1 parent 3919677 commit 67f0d31

15 files changed

+253
-239
lines changed

examples/hello-world-fetch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ async fn main() -> Result<()> {
6666
"'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.",
6767
);
6868

69-
// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress
69+
// `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress
7070
// on the state of your download.
7171
let download_stream = blobs_client
7272
.download(ticket.hash(), ticket.node_addr().clone())
7373
.await?;
7474

75-
// You can also just `await` the stream, which will poll the `DownloadProgress` stream for you.
75+
// You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you.
7676
let outcome = download_stream.await.context("unable to download hash")?;
7777

7878
println!(

examples/local-swarm-discovery.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,14 @@ mod progress {
140140
ProgressStyle,
141141
};
142142
use iroh_blobs::{
143-
get::{
144-
progress::{BlobProgress, DownloadProgress},
145-
Stats,
146-
},
143+
get::Stats,
144+
rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent},
147145
Hash,
148146
};
149147

150148
pub async fn show_download_progress(
151149
hash: Hash,
152-
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
150+
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
153151
) -> Result<()> {
154152
eprintln!("Fetching: {}", hash);
155153
let mp = MultiProgress::new();
@@ -160,7 +158,7 @@ mod progress {
160158
let mut seq = false;
161159
while let Some(x) = stream.next().await {
162160
match x? {
163-
DownloadProgress::InitialState(state) => {
161+
DownloadProgressEvent::InitialState(state) => {
164162
if state.connected {
165163
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
166164
}
@@ -180,21 +178,21 @@ mod progress {
180178
ip.set_length(size.value());
181179
ip.reset();
182180
match blob.progress {
183-
BlobProgress::Pending => {}
184-
BlobProgress::Progressing(offset) => ip.set_position(offset),
185-
BlobProgress::Done => ip.finish_and_clear(),
181+
BlobProgressEvent::Pending => {}
182+
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
183+
BlobProgressEvent::Done => ip.finish_and_clear(),
186184
}
187185
if !seq {
188186
op.finish_and_clear();
189187
}
190188
}
191189
}
192190
}
193-
DownloadProgress::FoundLocal { .. } => {}
194-
DownloadProgress::Connected => {
191+
DownloadProgressEvent::FoundLocal { .. } => {}
192+
DownloadProgressEvent::Connected => {
195193
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
196194
}
197-
DownloadProgress::FoundHashSeq { children, .. } => {
195+
DownloadProgressEvent::FoundHashSeq { children, .. } => {
198196
op.set_message(format!(
199197
"{} Downloading {} blob(s)\n",
200198
style("[3/3]").bold().dim(),
@@ -204,7 +202,7 @@ mod progress {
204202
op.reset();
205203
seq = true;
206204
}
207-
DownloadProgress::Found { size, child, .. } => {
205+
DownloadProgressEvent::Found { size, child, .. } => {
208206
if seq {
209207
op.set_position(child.into());
210208
} else {
@@ -213,13 +211,13 @@ mod progress {
213211
ip.set_length(size);
214212
ip.reset();
215213
}
216-
DownloadProgress::Progress { offset, .. } => {
214+
DownloadProgressEvent::Progress { offset, .. } => {
217215
ip.set_position(offset);
218216
}
219-
DownloadProgress::Done { .. } => {
217+
DownloadProgressEvent::Done { .. } => {
220218
ip.finish_and_clear();
221219
}
222-
DownloadProgress::AllDone(Stats {
220+
DownloadProgressEvent::AllDone(Stats {
223221
bytes_read,
224222
elapsed,
225223
..
@@ -233,7 +231,7 @@ mod progress {
233231
);
234232
break;
235233
}
236-
DownloadProgress::Abort(e) => {
234+
DownloadProgressEvent::Abort(e) => {
237235
bail!("download aborted: {}", e);
238236
}
239237
}

src/cli.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
1919
use tokio::io::AsyncWriteExt;
2020

2121
use crate::{
22-
get::{
23-
progress::{BlobProgress, DownloadProgress},
24-
Stats,
25-
},
26-
provider::AddProgress,
22+
get::Stats,
2723
rpc::client::blobs::{
28-
self, BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions,
29-
IncompleteBlobInfo, WrapOption,
24+
self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo,
25+
DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption,
3026
},
3127
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
3228
ticket::BlobTicket,
@@ -898,29 +894,29 @@ pub struct ProvideResponseEntry {
898894
pub hash: Hash,
899895
}
900896

901-
/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple.
897+
/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple.
902898
pub async fn aggregate_add_response(
903-
mut stream: impl Stream<Item = Result<AddProgress>> + Unpin,
899+
mut stream: impl Stream<Item = Result<AddProgressEvent>> + Unpin,
904900
) -> Result<(Hash, BlobFormat, Vec<ProvideResponseEntry>)> {
905901
let mut hash_and_format = None;
906902
let mut collections = BTreeMap::<u64, (String, u64, Option<Hash>)>::new();
907903
let mut mp = Some(ProvideProgressState::new());
908904
while let Some(item) = stream.next().await {
909905
match item? {
910-
AddProgress::Found { name, id, size } => {
906+
AddProgressEvent::Found { name, id, size } => {
911907
tracing::trace!("Found({id},{name},{size})");
912908
if let Some(mp) = mp.as_mut() {
913909
mp.found(name.clone(), id, size);
914910
}
915911
collections.insert(id, (name, size, None));
916912
}
917-
AddProgress::Progress { id, offset } => {
913+
AddProgressEvent::Progress { id, offset } => {
918914
tracing::trace!("Progress({id}, {offset})");
919915
if let Some(mp) = mp.as_mut() {
920916
mp.progress(id, offset);
921917
}
922918
}
923-
AddProgress::Done { hash, id } => {
919+
AddProgressEvent::Done { hash, id } => {
924920
tracing::trace!("Done({id},{hash:?})");
925921
if let Some(mp) = mp.as_mut() {
926922
mp.done(id, hash);
@@ -934,15 +930,15 @@ pub async fn aggregate_add_response(
934930
}
935931
}
936932
}
937-
AddProgress::AllDone { hash, format, .. } => {
933+
AddProgressEvent::AllDone { hash, format, .. } => {
938934
tracing::trace!("AllDone({hash:?})");
939935
if let Some(mp) = mp.take() {
940936
mp.all_done();
941937
}
942938
hash_and_format = Some(HashAndFormat { hash, format });
943939
break;
944940
}
945-
AddProgress::Abort(e) => {
941+
AddProgressEvent::Abort(e) => {
946942
if let Some(mp) = mp.take() {
947943
mp.error();
948944
}
@@ -1035,7 +1031,7 @@ impl ProvideProgressState {
10351031
/// Displays the download progress for a given stream.
10361032
pub async fn show_download_progress(
10371033
hash: Hash,
1038-
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
1034+
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
10391035
) -> Result<()> {
10401036
eprintln!("Fetching: {}", hash);
10411037
let mp = MultiProgress::new();
@@ -1046,7 +1042,7 @@ pub async fn show_download_progress(
10461042
let mut seq = false;
10471043
while let Some(x) = stream.next().await {
10481044
match x? {
1049-
DownloadProgress::InitialState(state) => {
1045+
DownloadProgressEvent::InitialState(state) => {
10501046
if state.connected {
10511047
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
10521048
}
@@ -1066,21 +1062,21 @@ pub async fn show_download_progress(
10661062
ip.set_length(size.value());
10671063
ip.reset();
10681064
match blob.progress {
1069-
BlobProgress::Pending => {}
1070-
BlobProgress::Progressing(offset) => ip.set_position(offset),
1071-
BlobProgress::Done => ip.finish_and_clear(),
1065+
BlobProgressEvent::Pending => {}
1066+
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
1067+
BlobProgressEvent::Done => ip.finish_and_clear(),
10721068
}
10731069
if !seq {
10741070
op.finish_and_clear();
10751071
}
10761072
}
10771073
}
10781074
}
1079-
DownloadProgress::FoundLocal { .. } => {}
1080-
DownloadProgress::Connected => {
1075+
DownloadProgressEvent::FoundLocal { .. } => {}
1076+
DownloadProgressEvent::Connected => {
10811077
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
10821078
}
1083-
DownloadProgress::FoundHashSeq { children, .. } => {
1079+
DownloadProgressEvent::FoundHashSeq { children, .. } => {
10841080
op.set_message(format!(
10851081
"{} Downloading {} blob(s)\n",
10861082
style("[3/3]").bold().dim(),
@@ -1090,7 +1086,7 @@ pub async fn show_download_progress(
10901086
op.reset();
10911087
seq = true;
10921088
}
1093-
DownloadProgress::Found { size, child, .. } => {
1089+
DownloadProgressEvent::Found { size, child, .. } => {
10941090
if seq {
10951091
op.set_position(child.into());
10961092
} else {
@@ -1099,13 +1095,13 @@ pub async fn show_download_progress(
10991095
ip.set_length(size);
11001096
ip.reset();
11011097
}
1102-
DownloadProgress::Progress { offset, .. } => {
1098+
DownloadProgressEvent::Progress { offset, .. } => {
11031099
ip.set_position(offset);
11041100
}
1105-
DownloadProgress::Done { .. } => {
1101+
DownloadProgressEvent::Done { .. } => {
11061102
ip.finish_and_clear();
11071103
}
1108-
DownloadProgress::AllDone(Stats {
1104+
DownloadProgressEvent::AllDone(Stats {
11091105
bytes_read,
11101106
elapsed,
11111107
..
@@ -1119,7 +1115,7 @@ pub async fn show_download_progress(
11191115
);
11201116
break;
11211117
}
1122-
DownloadProgress::Abort(e) => {
1118+
DownloadProgressEvent::Abort(e) => {
11231119
bail!("download aborted: {}", e);
11241120
}
11251121
}

src/downloader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
5555
use tracing::{debug, error, error_span, trace, warn, Instrument};
5656

5757
use crate::{
58-
get::{progress::DownloadProgress, Stats},
58+
get::{progress::DownloadProgressEvent, Stats},
5959
metrics::Metrics,
6060
store::Store,
6161
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
@@ -797,7 +797,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
797797
if let Some(sender) = handlers.on_progress {
798798
self.progress_tracker.unsubscribe(&kind, &sender);
799799
sender
800-
.send(DownloadProgress::Abort(serde_error::Error::new(
800+
.send(DownloadProgressEvent::Abort(serde_error::Error::new(
801801
&*anyhow::Error::from(DownloadError::Cancelled),
802802
)))
803803
.await

src/downloader/progress.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ use parking_lot::Mutex;
1111

1212
use super::DownloadKind;
1313
use crate::{
14-
get::progress::{DownloadProgress, TransferState},
14+
get::progress::{DownloadProgressEvent, TransferState},
1515
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
1616
};
1717

1818
/// The channel that can be used to subscribe to progress updates.
19-
pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgress>;
19+
pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgressEvent>;
2020

2121
/// Track the progress of downloads.
2222
///
@@ -101,8 +101,8 @@ struct Inner {
101101
}
102102

103103
impl Inner {
104-
fn subscribe(&mut self, subscriber: ProgressSubscriber) -> DownloadProgress {
105-
let msg = DownloadProgress::InitialState(self.state.clone());
104+
fn subscribe(&mut self, subscriber: ProgressSubscriber) -> DownloadProgressEvent {
105+
let msg = DownloadProgressEvent::InitialState(self.state.clone());
106106
self.subscribers.push(subscriber);
107107
msg
108108
}
@@ -111,7 +111,7 @@ impl Inner {
111111
self.subscribers.retain(|s| !s.same_channel(sender));
112112
}
113113

114-
fn on_progress(&mut self, progress: DownloadProgress) {
114+
fn on_progress(&mut self, progress: DownloadProgressEvent) {
115115
self.state.on_progress(progress);
116116
}
117117
}
@@ -129,7 +129,7 @@ impl IdGenerator for BroadcastProgressSender {
129129
}
130130

131131
impl ProgressSender for BroadcastProgressSender {
132-
type Msg = DownloadProgress;
132+
type Msg = DownloadProgressEvent;
133133

134134
async fn send(&self, msg: Self::Msg) -> Result<(), ProgressSendError> {
135135
// making sure that the lock is not held across an await point.

src/downloader/test.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use iroh::SecretKey;
1010

1111
use super::*;
1212
use crate::{
13-
get::progress::{BlobId, BlobProgress, TransferState},
13+
get::progress::{BlobId, BlobProgressEvent, DownloadProgressEvent, TransferState},
1414
util::{
1515
local_pool::LocalPool,
1616
progress::{AsyncChannelProgressSender, IdGenerator},
@@ -255,7 +255,7 @@ async fn concurrent_progress() {
255255
start_rx.await.unwrap();
256256
let id = progress.new_id();
257257
progress
258-
.send(DownloadProgress::Found {
258+
.send(DownloadProgressEvent::Found {
259259
id,
260260
child: BlobId::Root,
261261
hash,
@@ -264,7 +264,10 @@ async fn concurrent_progress() {
264264
.await
265265
.unwrap();
266266
done_rx.await.unwrap();
267-
progress.send(DownloadProgress::Done { id }).await.unwrap();
267+
progress
268+
.send(DownloadProgressEvent::Done { id })
269+
.await
270+
.unwrap();
268271
Ok(Stats::default())
269272
}
270273
.boxed()
@@ -293,18 +296,18 @@ async fn concurrent_progress() {
293296
let prog0_b = prog_b_rx.recv().await.unwrap();
294297
assert!(matches!(
295298
prog0_b,
296-
DownloadProgress::InitialState(state) if state.root.hash == hash && state.root.progress == BlobProgress::Pending,
299+
DownloadProgressEvent::InitialState(state) if state.root.hash == hash && state.root.progress == BlobProgressEvent::Pending,
297300
));
298301

299302
start_tx.send(()).unwrap();
300303

301304
let prog1_a = prog_a_rx.recv().await.unwrap();
302305
let prog1_b = prog_b_rx.recv().await.unwrap();
303306
assert!(
304-
matches!(prog1_a, DownloadProgress::Found { hash: found_hash, size: 100, ..} if found_hash == hash)
307+
matches!(prog1_a, DownloadProgressEvent::Found { hash: found_hash, size: 100, ..} if found_hash == hash)
305308
);
306309
assert!(
307-
matches!(prog1_b, DownloadProgress::Found { hash: found_hash, size: 100, ..} if found_hash == hash)
310+
matches!(prog1_b, DownloadProgressEvent::Found { hash: found_hash, size: 100, ..} if found_hash == hash)
308311
);
309312

310313
state_a.on_progress(prog1_a);
@@ -317,7 +320,7 @@ async fn concurrent_progress() {
317320
let handle_c = downloader.queue(req).await;
318321

319322
let prog1_c = prog_c_rx.recv().await.unwrap();
320-
assert!(matches!(&prog1_c, DownloadProgress::InitialState(state) if state == &state_a));
323+
assert!(matches!(&prog1_c, DownloadProgressEvent::InitialState(state) if state == &state_a));
321324
state_c.on_progress(prog1_c);
322325

323326
done_tx.send(()).unwrap();
@@ -335,9 +338,9 @@ async fn concurrent_progress() {
335338
assert_eq!(prog_b.len(), 1);
336339
assert_eq!(prog_c.len(), 1);
337340

338-
assert!(matches!(prog_a[0], DownloadProgress::Done { .. }));
339-
assert!(matches!(prog_b[0], DownloadProgress::Done { .. }));
340-
assert!(matches!(prog_c[0], DownloadProgress::Done { .. }));
341+
assert!(matches!(prog_a[0], DownloadProgressEvent::Done { .. }));
342+
assert!(matches!(prog_b[0], DownloadProgressEvent::Done { .. }));
343+
assert!(matches!(prog_c[0], DownloadProgressEvent::Done { .. }));
341344

342345
for p in prog_a {
343346
state_a.on_progress(p);

0 commit comments

Comments
 (0)