Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 378fc0f

Browse files
authoredOct 29, 2024··
refactor: move ProtocolHandler impl
1 parent f87c3bf commit 378fc0f

File tree

5 files changed

+408
-20
lines changed

5 files changed

+408
-20
lines changed
 

‎Cargo.lock

Lines changed: 84 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ iroh-base = { version = "0.27.0", features = ["redb"] }
2828
iroh-io = { version = "0.6.0", features = ["stats"] }
2929
iroh-metrics = { version = "0.27.0", default-features = false }
3030
iroh-net = { version = "0.27.0" }
31+
iroh-router = "0.27.0"
3132
num_cpus = "1.15.0"
3233
oneshot = "0.1.8"
3334
parking_lot = { version = "0.12.1", optional = true }
@@ -113,3 +114,9 @@ debug-assertions = false
113114
opt-level = 3
114115
panic = 'abort'
115116
incremental = false
117+
118+
[patch.crates-io]
119+
iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" }
120+
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
121+
iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
122+
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }

‎deny.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,8 @@ license-files = [
3434
ignore = [
3535
"RUSTSEC-2024-0370", # unmaintained, no upgrade available
3636
]
37+
38+
[sources]
39+
allow-git = [
40+
"https://github.com/n0-computer/iroh.git",
41+
]

‎src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub mod format;
3535
pub mod get;
3636
pub mod hashseq;
3737
pub mod metrics;
38+
#[cfg(feature = "downloader")]
39+
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "downloader")))]
40+
pub mod net_protocol;
3841
pub mod protocol;
3942
pub mod provider;
4043
pub mod store;

‎src/net_protocol.rs

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2+
3+
// TODO: reduce API surface and add documentation
4+
#![allow(missing_docs)]
5+
6+
use std::{collections::BTreeMap, sync::Arc};
7+
8+
use anyhow::{anyhow, Result};
9+
use futures_lite::future::Boxed as BoxedFuture;
10+
use iroh_base::hash::{BlobFormat, Hash};
11+
use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr};
12+
use iroh_router::ProtocolHandler;
13+
use serde::{Deserialize, Serialize};
14+
use tracing::{debug, warn};
15+
16+
use crate::{
17+
downloader::{DownloadRequest, Downloader},
18+
get::{
19+
db::{DownloadProgress, GetState},
20+
Stats,
21+
},
22+
provider::EventSender,
23+
util::{
24+
local_pool::LocalPoolHandle,
25+
progress::{AsyncChannelProgressSender, ProgressSender},
26+
SetTagOption,
27+
},
28+
HashAndFormat, TempTag,
29+
};
30+
31+
#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
32+
pub struct BatchId(u64);
33+
34+
/// A request to the node to download and share the data specified by the hash.
35+
#[derive(Debug, Clone, Serialize, Deserialize)]
36+
pub struct BlobDownloadRequest {
37+
/// This mandatory field contains the hash of the data to download and share.
38+
pub hash: Hash,
39+
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
40+
/// well.
41+
pub format: BlobFormat,
42+
/// This mandatory field specifies the nodes to download the data from.
43+
///
44+
/// If set to more than a single node, they will all be tried. If `mode` is set to
45+
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
46+
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
47+
/// if the concurrency limits permit.
48+
pub nodes: Vec<NodeAddr>,
49+
/// Optional tag to tag the data with.
50+
pub tag: SetTagOption,
51+
/// Whether to directly start the download or add it to the download queue.
52+
pub mode: DownloadMode,
53+
}
54+
55+
/// Set the mode for whether to directly start the download or add it to the download queue.
56+
#[derive(Debug, Clone, Serialize, Deserialize)]
57+
pub enum DownloadMode {
58+
/// Start the download right away.
59+
///
60+
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
61+
/// concurrency.
62+
Direct,
63+
/// Queue the download.
64+
///
65+
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
66+
Queued,
67+
}
68+
69+
#[derive(Debug)]
70+
pub struct Blobs<S> {
71+
rt: LocalPoolHandle,
72+
store: S,
73+
events: EventSender,
74+
downloader: Downloader,
75+
batches: tokio::sync::Mutex<BlobBatches>,
76+
}
77+
78+
/// Name used for logging when new node addresses are added from gossip.
79+
const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download";
80+
81+
/// Keeps track of all the currently active batch operations of the blobs api.
82+
#[derive(Debug, Default)]
83+
pub struct BlobBatches {
84+
/// Currently active batches
85+
batches: BTreeMap<BatchId, BlobBatch>,
86+
/// Used to generate new batch ids.
87+
max: u64,
88+
}
89+
90+
/// A single batch of blob operations
91+
#[derive(Debug, Default)]
92+
struct BlobBatch {
93+
/// The tags in this batch.
94+
tags: BTreeMap<HashAndFormat, Vec<TempTag>>,
95+
}
96+
97+
impl BlobBatches {
98+
/// Create a new unique batch id.
99+
pub fn create(&mut self) -> BatchId {
100+
let id = self.max;
101+
self.max += 1;
102+
BatchId(id)
103+
}
104+
105+
/// Store a temp tag in a batch identified by a batch id.
106+
pub fn store(&mut self, batch: BatchId, tt: TempTag) {
107+
let entry = self.batches.entry(batch).or_default();
108+
entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
109+
}
110+
111+
/// Remove a tag from a batch.
112+
pub fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> {
113+
if let Some(batch) = self.batches.get_mut(&batch) {
114+
if let Some(tags) = batch.tags.get_mut(content) {
115+
tags.pop();
116+
if tags.is_empty() {
117+
batch.tags.remove(content);
118+
}
119+
return Ok(());
120+
}
121+
}
122+
// this can happen if we try to upgrade a tag from an expired batch
123+
anyhow::bail!("tag not found in batch");
124+
}
125+
126+
/// Remove an entire batch.
127+
pub fn remove(&mut self, batch: BatchId) {
128+
self.batches.remove(&batch);
129+
}
130+
}
131+
132+
impl<S: crate::store::Store> Blobs<S> {
133+
pub fn new_with_events(
134+
store: S,
135+
rt: LocalPoolHandle,
136+
events: EventSender,
137+
downloader: Downloader,
138+
) -> Self {
139+
Self {
140+
rt,
141+
store,
142+
events,
143+
downloader,
144+
batches: Default::default(),
145+
}
146+
}
147+
148+
pub fn store(&self) -> &S {
149+
&self.store
150+
}
151+
152+
pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
153+
self.batches.lock().await
154+
}
155+
156+
pub async fn download(
157+
&self,
158+
endpoint: Endpoint,
159+
req: BlobDownloadRequest,
160+
progress: AsyncChannelProgressSender<DownloadProgress>,
161+
) -> Result<()> {
162+
let BlobDownloadRequest {
163+
hash,
164+
format,
165+
nodes,
166+
tag,
167+
mode,
168+
} = req;
169+
let hash_and_format = HashAndFormat { hash, format };
170+
let temp_tag = self.store.temp_tag(hash_and_format);
171+
let stats = match mode {
172+
DownloadMode::Queued => {
173+
self.download_queued(endpoint, hash_and_format, nodes, progress.clone())
174+
.await?
175+
}
176+
DownloadMode::Direct => {
177+
self.download_direct_from_nodes(endpoint, hash_and_format, nodes, progress.clone())
178+
.await?
179+
}
180+
};
181+
182+
progress.send(DownloadProgress::AllDone(stats)).await.ok();
183+
match tag {
184+
SetTagOption::Named(tag) => {
185+
self.store.set_tag(tag, Some(hash_and_format)).await?;
186+
}
187+
SetTagOption::Auto => {
188+
self.store.create_tag(hash_and_format).await?;
189+
}
190+
}
191+
drop(temp_tag);
192+
193+
Ok(())
194+
}
195+
196+
async fn download_queued(
197+
&self,
198+
endpoint: Endpoint,
199+
hash_and_format: HashAndFormat,
200+
nodes: Vec<NodeAddr>,
201+
progress: AsyncChannelProgressSender<DownloadProgress>,
202+
) -> Result<Stats> {
203+
let mut node_ids = Vec::with_capacity(nodes.len());
204+
let mut any_added = false;
205+
for node in nodes {
206+
node_ids.push(node.node_id);
207+
if !node.info.is_empty() {
208+
endpoint.add_node_addr_with_source(node, BLOB_DOWNLOAD_SOURCE_NAME)?;
209+
any_added = true;
210+
}
211+
}
212+
let can_download = !node_ids.is_empty() && (any_added || endpoint.discovery().is_some());
213+
anyhow::ensure!(can_download, "no way to reach a node for download");
214+
let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress);
215+
let handle = self.downloader.queue(req).await;
216+
let stats = handle.await?;
217+
Ok(stats)
218+
}
219+
220+
#[tracing::instrument("download_direct", skip_all, fields(hash=%hash_and_format.hash.fmt_short()))]
221+
async fn download_direct_from_nodes(
222+
&self,
223+
endpoint: Endpoint,
224+
hash_and_format: HashAndFormat,
225+
nodes: Vec<NodeAddr>,
226+
progress: AsyncChannelProgressSender<DownloadProgress>,
227+
) -> Result<Stats> {
228+
let mut last_err = None;
229+
let mut remaining_nodes = nodes.len();
230+
let mut nodes_iter = nodes.into_iter();
231+
'outer: loop {
232+
match crate::get::db::get_to_db_in_steps(
233+
self.store.clone(),
234+
hash_and_format,
235+
progress.clone(),
236+
)
237+
.await?
238+
{
239+
GetState::Complete(stats) => return Ok(stats),
240+
GetState::NeedsConn(needs_conn) => {
241+
let (conn, node_id) = 'inner: loop {
242+
match nodes_iter.next() {
243+
None => break 'outer,
244+
Some(node) => {
245+
remaining_nodes -= 1;
246+
let node_id = node.node_id;
247+
if node_id == endpoint.node_id() {
248+
debug!(
249+
?remaining_nodes,
250+
"skip node {} (it is the node id of ourselves)",
251+
node_id.fmt_short()
252+
);
253+
continue 'inner;
254+
}
255+
match endpoint.connect(node, crate::protocol::ALPN).await {
256+
Ok(conn) => break 'inner (conn, node_id),
257+
Err(err) => {
258+
debug!(
259+
?remaining_nodes,
260+
"failed to connect to {}: {err}",
261+
node_id.fmt_short()
262+
);
263+
continue 'inner;
264+
}
265+
}
266+
}
267+
}
268+
};
269+
match needs_conn.proceed(conn).await {
270+
Ok(stats) => return Ok(stats),
271+
Err(err) => {
272+
warn!(
273+
?remaining_nodes,
274+
"failed to download from {}: {err}",
275+
node_id.fmt_short()
276+
);
277+
last_err = Some(err);
278+
}
279+
}
280+
}
281+
}
282+
}
283+
match last_err {
284+
Some(err) => Err(err.into()),
285+
None => Err(anyhow!("No nodes to download from provided")),
286+
}
287+
}
288+
}
289+
290+
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
291+
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
292+
Box::pin(async move {
293+
crate::provider::handle_connection(
294+
conn.await?,
295+
self.store.clone(),
296+
self.events.clone(),
297+
self.rt.clone(),
298+
)
299+
.await;
300+
Ok(())
301+
})
302+
}
303+
304+
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
305+
Box::pin(async move {
306+
self.store.shutdown().await;
307+
})
308+
}
309+
}

0 commit comments

Comments
 (0)
Please sign in to comment.