Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a3bef55

Browse files
committedJan 2, 2025·
WIP
1 parent ccacf9e commit a3bef55

File tree

18 files changed

+129
-148
lines changed

18 files changed

+129
-148
lines changed
 

‎Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ futures-util = "0.3.30"
9595
testdir = "0.9.1"
9696

9797
[features]
98-
default = ["fs-store", "net_protocol"]
98+
default = ["fs-store", "net_protocol", "formats-collection"]
9999
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
100100
net_protocol = ["downloader", "dep:futures-util"]
101101
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
@@ -112,6 +112,8 @@ rpc = [
112112
"dep:walkdir",
113113
"downloader",
114114
]
115+
formats = []
116+
formats-collection = ["formats"]
115117

116118
example-iroh = [
117119
"dep:clap",

‎examples/local-swarm-discovery.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ mod progress {
140140
ProgressStyle,
141141
};
142142
use iroh_blobs::{
143-
fetch::{db::DownloadProgress, progress::BlobProgress, Stats},
143+
fetch::{
144+
progress::{BlobProgress, DownloadProgress},
145+
Stats,
146+
},
144147
Hash,
145148
};
146149

‎src/cli.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
1919
use tokio::io::AsyncWriteExt;
2020

2121
use crate::{
22-
fetch::{db::DownloadProgress, progress::BlobProgress, Stats},
22+
fetch::{
23+
progress::{BlobProgress, DownloadProgress},
24+
Stats,
25+
},
2326
net_protocol::DownloadMode,
2427
provider::AddProgress,
2528
rpc::client::blobs::{

‎src/downloader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
5555
use tracing::{debug, error, error_span, trace, warn, Instrument};
5656

5757
use crate::{
58-
fetch::{db::DownloadProgress, Stats},
58+
fetch::{progress::DownloadProgress, Stats},
5959
metrics::Metrics,
6060
store::Store,
6161
util::{local_pool::LocalPoolHandle, progress::ProgressSender},

‎src/downloader/get.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use iroh::endpoint;
77

88
use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
99
use crate::{
10-
fetch::{db::fetch_to_db_in_steps, Error},
11-
store::Store,
10+
fetch::Error,
11+
store::{fetch_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
1212
};
1313

1414
impl From<Error> for FailureAction {
@@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {
3434

3535
impl<S: Store> Getter for IoGetter<S> {
3636
type Connection = endpoint::Connection;
37-
type NeedsConn = crate::fetch::db::FetchStateNeedsConn;
37+
type NeedsConn = FetchStateNeedsConn;
3838

3939
fn get(
4040
&mut self,
@@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
4545
async move {
4646
match fetch_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
4747
Err(err) => Err(err.into()),
48-
Ok(crate::fetch::db::FetchState::Complete(stats)) => {
49-
Ok(super::GetOutput::Complete(stats))
50-
}
51-
Ok(crate::fetch::db::FetchState::NeedsConn(needs_conn)) => {
48+
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
49+
Ok(FetchState::NeedsConn(needs_conn)) => {
5250
Ok(super::GetOutput::NeedsConn(needs_conn))
5351
}
5452
}
@@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
5755
}
5856
}
5957

60-
impl super::NeedsConn<endpoint::Connection> for crate::fetch::db::FetchStateNeedsConn {
58+
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
6159
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
6260
async move {
6361
let res = self.proceed(conn).await;

‎src/downloader/progress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use parking_lot::Mutex;
1111

1212
use super::DownloadKind;
1313
use crate::{
14-
fetch::{db::DownloadProgress, progress::TransferState},
14+
fetch::progress::{DownloadProgress, TransferState},
1515
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
1616
};
1717

‎src/downloader/test.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use iroh::SecretKey;
1010

1111
use super::*;
1212
use crate::{
13-
fetch::{
14-
db::BlobId,
15-
progress::{BlobProgress, TransferState},
16-
},
13+
fetch::progress::{BlobId, BlobProgress, TransferState},
1714
util::{
1815
local_pool::LocalPool,
1916
progress::{AsyncChannelProgressSender, IdGenerator},

‎src/fetch.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use crate::{
3131

3232
mod error;
3333
pub use error::Error;
34-
pub mod db;
3534
pub mod progress;
3635
pub mod request;
3736

‎src/fetch/progress.rs

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{collections::HashMap, num::NonZeroU64};
55
use serde::{Deserialize, Serialize};
66
use tracing::warn;
77

8-
use super::db::{BlobId, DownloadProgress};
8+
use super::Stats;
99
use crate::{protocol::RangeSpec, store::BaoBlobSize, Hash};
1010

1111
/// The identifier for progress events.
@@ -183,3 +183,90 @@ impl TransferState {
183183
}
184184
}
185185
}
186+
187+
/// Progress updates for the get operation.
188+
#[derive(Debug, Clone, Serialize, Deserialize)]
189+
pub enum DownloadProgress {
190+
/// Initial state if subscribing to a running or queued transfer.
191+
InitialState(TransferState),
192+
/// Data was found locally.
193+
FoundLocal {
194+
/// child offset
195+
child: BlobId,
196+
/// The hash of the entry.
197+
hash: Hash,
198+
/// The size of the entry in bytes.
199+
size: BaoBlobSize,
200+
/// The ranges that are available locally.
201+
valid_ranges: RangeSpec,
202+
},
203+
/// A new connection was established.
204+
Connected,
205+
/// An item was found with hash `hash`, from now on referred to via `id`.
206+
Found {
207+
/// A new unique progress id for this entry.
208+
id: u64,
209+
/// Identifier for this blob within this download.
210+
///
211+
/// Will always be [`BlobId::Root`] unless a hashseq is downloaded, in which case this
212+
/// allows to identify the children by their offset in the hashseq.
213+
child: BlobId,
214+
/// The hash of the entry.
215+
hash: Hash,
216+
/// The size of the entry in bytes.
217+
size: u64,
218+
},
219+
/// An item was found with hash `hash`, from now on referred to via `id`.
220+
FoundHashSeq {
221+
/// The name of the entry.
222+
hash: Hash,
223+
/// Number of children in the collection, if known.
224+
children: u64,
225+
},
226+
/// We got progress ingesting item `id`.
227+
Progress {
228+
/// The unique id of the entry.
229+
id: u64,
230+
/// The offset of the progress, in bytes.
231+
offset: u64,
232+
},
233+
/// We are done with `id`.
234+
Done {
235+
/// The unique id of the entry.
236+
id: u64,
237+
},
238+
/// All operations finished.
239+
///
240+
/// This will be the last message in the stream.
241+
AllDone(Stats),
242+
/// We got an error and need to abort.
243+
///
244+
/// This will be the last message in the stream.
245+
Abort(serde_error::Error),
246+
}
247+
248+
/// The id of a blob in a transfer
249+
#[derive(
250+
Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, std::hash::Hash, Serialize, Deserialize,
251+
)]
252+
pub enum BlobId {
253+
/// The root blob (child id 0)
254+
Root,
255+
/// A child blob (child id > 0)
256+
Child(NonZeroU64),
257+
}
258+
259+
impl BlobId {
260+
pub(crate) fn from_offset(id: u64) -> Self {
261+
NonZeroU64::new(id).map(Self::Child).unwrap_or(Self::Root)
262+
}
263+
}
264+
265+
impl From<BlobId> for u64 {
266+
fn from(value: BlobId) -> Self {
267+
match value {
268+
BlobId::Root => 0,
269+
BlobId::Child(id) => id.into(),
270+
}
271+
}
272+
}

‎src/format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
//! n-1 items, where n is the number of blobs in the HashSeq.
1414
//!
1515
//! [postcard]: https://docs.rs/postcard/latest/postcard/
16+
#[cfg(feature = "formats-collection")]
1617
pub mod collection;

‎src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
pub mod cli;
3333
#[cfg(feature = "downloader")]
3434
pub mod downloader;
35-
pub mod export;
3635
pub mod fetch;
36+
#[cfg(feature = "formats")]
3737
pub mod format;
3838
pub mod hashseq;
3939
pub mod metrics;

‎src/rpc.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,14 @@ use tracing::{debug, warn};
4343

4444
use crate::{
4545
downloader::{DownloadRequest, Downloader},
46-
export::ExportProgress,
47-
fetch::{
48-
db::{DownloadProgress, FetchState},
49-
Stats,
50-
},
46+
fetch::{progress::DownloadProgress, Stats},
5147
format::collection::Collection,
5248
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
5349
provider::{AddProgress, BatchAddPathProgress},
54-
store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress},
50+
store::{
51+
ConsistencyCheckProgress, ExportProgress, FetchState, ImportProgress, MapEntry,
52+
ValidateProgress,
53+
},
5554
util::{
5655
local_pool::LocalPoolHandle,
5756
progress::{AsyncChannelProgressSender, ProgressSender},
@@ -451,7 +450,7 @@ impl<D: crate::store::Store> Handler<D> {
451450
let progress = AsyncChannelProgressSender::new(tx);
452451
let rt = self.rt().clone();
453452
rt.spawn_detached(move || async move {
454-
let res = crate::export::export(
453+
let res = crate::store::export(
455454
self.store(),
456455
msg.hash,
457456
msg.path,
@@ -1009,7 +1008,7 @@ impl<D: crate::store::Store> Handler<D> {
10091008
let mut remaining_nodes = nodes.len();
10101009
let mut nodes_iter = nodes.into_iter();
10111010
'outer: loop {
1012-
match crate::fetch::db::fetch_to_db_in_steps(
1011+
match crate::store::fetch_to_db_in_steps(
10131012
self.store().clone(),
10141013
hash_and_format,
10151014
progress.clone(),

‎src/rpc/client/blobs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ use tracing::warn;
8383

8484
pub use crate::net_protocol::DownloadMode;
8585
use crate::{
86-
export::ExportProgress as BytesExportProgress,
87-
fetch::db::DownloadProgress as BytesDownloadProgress,
86+
fetch::progress::DownloadProgress as BytesDownloadProgress,
8887
format::collection::{Collection, SimpleStore},
8988
net_protocol::BlobDownloadRequest,
9089
rpc::proto::RpcService,
90+
store::ExportProgress as BytesExportProgress,
9191
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
9292
util::SetTagOption,
9393
BlobFormat, Hash, Tag,

‎src/rpc/proto/blobs.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
88

99
use super::{RpcError, RpcResult, RpcService};
1010
use crate::{
11-
export::ExportProgress,
12-
fetch::db::DownloadProgress,
1311
format::collection::Collection,
1412
net_protocol::{BatchId, BlobDownloadRequest},
1513
provider::{AddProgress, BatchAddPathProgress},
1614
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
15+
store::ExportProgress,
1716
store::{
1817
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode,
1918
ValidateProgress,
@@ -111,7 +110,7 @@ pub struct AddPathResponse(pub AddProgress);
111110

112111
/// Progress response for [`BlobDownloadRequest`]
113112
#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)]
114-
pub struct DownloadResponse(pub DownloadProgress);
113+
pub struct DownloadResponse(pub crate::fetch::progress::DownloadProgress);
115114

116115
/// A request to the node to download and share the data specified by the hash.
117116
#[derive(Debug, Clone, Serialize, Deserialize)]

‎src/store.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ pub mod readonly_mem;
1010
#[cfg(feature = "fs-store")]
1111
pub mod fs;
1212

13+
mod fetch_to_db;
14+
pub use fetch_to_db::{fetch_to_db, fetch_to_db_in_steps, FetchState, FetchStateNeedsConn};
15+
mod export;
16+
pub use export::{export, export_blob, export_collection, ExportProgress};
17+
1318
mod traits;
1419
use tracing::warn;
1520
pub use traits::*;
File renamed without changes.

‎src/fetch/db.rs renamed to ‎src/store/fetch_to_db.rs

Lines changed: 3 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Functions that use the iroh-blobs protocol in conjunction with a bao store.
22
3-
use std::{future::Future, io, num::NonZeroU64, pin::Pin};
3+
use std::{future::Future, io, pin::Pin};
44

55
use anyhow::anyhow;
66
use bao_tree::{ChunkNum, ChunkRanges};
@@ -11,17 +11,15 @@ use genawaiter::{
1111
};
1212
use iroh::endpoint::Connection;
1313
use iroh_io::AsyncSliceReader;
14-
use serde::{Deserialize, Serialize};
1514
use tokio::sync::oneshot;
1615
use tracing::trace;
1716

1817
use crate::{
1918
fetch::{
2019
self,
21-
error::Error,
2220
fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext},
23-
progress::TransferState,
24-
Stats,
21+
progress::{BlobId, DownloadProgress},
22+
Error, Stats,
2523
},
2624
hashseq::parse_hash_seq,
2725
protocol::{GetRequest, RangeSpec, RangeSpecSeq},
@@ -608,91 +606,3 @@ impl<D: BaoStore> BlobInfo<D> {
608606
}
609607
}
610608
}
611-
612-
/// Progress updates for the get operation.
613-
// TODO: Move to super::progress
614-
#[derive(Debug, Clone, Serialize, Deserialize)]
615-
pub enum DownloadProgress {
616-
/// Initial state if subscribing to a running or queued transfer.
617-
InitialState(TransferState),
618-
/// Data was found locally.
619-
FoundLocal {
620-
/// child offset
621-
child: BlobId,
622-
/// The hash of the entry.
623-
hash: Hash,
624-
/// The size of the entry in bytes.
625-
size: BaoBlobSize,
626-
/// The ranges that are available locally.
627-
valid_ranges: RangeSpec,
628-
},
629-
/// A new connection was established.
630-
Connected,
631-
/// An item was found with hash `hash`, from now on referred to via `id`.
632-
Found {
633-
/// A new unique progress id for this entry.
634-
id: u64,
635-
/// Identifier for this blob within this download.
636-
///
637-
/// Will always be [`BlobId::Root`] unless a hashseq is downloaded, in which case this
638-
/// allows to identify the children by their offset in the hashseq.
639-
child: BlobId,
640-
/// The hash of the entry.
641-
hash: Hash,
642-
/// The size of the entry in bytes.
643-
size: u64,
644-
},
645-
/// An item was found with hash `hash`, from now on referred to via `id`.
646-
FoundHashSeq {
647-
/// The name of the entry.
648-
hash: Hash,
649-
/// Number of children in the collection, if known.
650-
children: u64,
651-
},
652-
/// We got progress ingesting item `id`.
653-
Progress {
654-
/// The unique id of the entry.
655-
id: u64,
656-
/// The offset of the progress, in bytes.
657-
offset: u64,
658-
},
659-
/// We are done with `id`.
660-
Done {
661-
/// The unique id of the entry.
662-
id: u64,
663-
},
664-
/// All operations finished.
665-
///
666-
/// This will be the last message in the stream.
667-
AllDone(Stats),
668-
/// We got an error and need to abort.
669-
///
670-
/// This will be the last message in the stream.
671-
Abort(serde_error::Error),
672-
}
673-
674-
/// The id of a blob in a transfer
675-
#[derive(
676-
Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, std::hash::Hash, Serialize, Deserialize,
677-
)]
678-
pub enum BlobId {
679-
/// The root blob (child id 0)
680-
Root,
681-
/// A child blob (child id > 0)
682-
Child(NonZeroU64),
683-
}
684-
685-
impl BlobId {
686-
fn from_offset(id: u64) -> Self {
687-
NonZeroU64::new(id).map(Self::Child).unwrap_or(Self::Root)
688-
}
689-
}
690-
691-
impl From<BlobId> for u64 {
692-
fn from(value: BlobId) -> Self {
693-
match value {
694-
BlobId::Root => 0,
695-
BlobId::Child(id) => id.into(),
696-
}
697-
}
698-
}

‎src/store/traits.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -822,28 +822,6 @@ pub enum ExportFormat {
822822
Collection,
823823
}
824824

825-
#[allow(missing_docs)]
826-
#[derive(Debug)]
827-
pub enum ExportProgress {
828-
/// Starting to export to a file
829-
///
830-
/// This will be the first message for an id
831-
Start {
832-
id: u64,
833-
hash: Hash,
834-
path: PathBuf,
835-
stable: bool,
836-
},
837-
/// Progress when copying the file to the target
838-
///
839-
/// This will be omitted if the store can move the file or use copy on write
840-
///
841-
/// There will be multiple of these messages for an id
842-
Progress { id: u64, offset: u64 },
843-
/// Done exporting
844-
Done { id: u64 },
845-
}
846-
847825
/// Level for generic validation messages
848826
#[derive(
849827
Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,

0 commit comments

Comments
 (0)
Please sign in to comment.