Skip to content

Commit e4273da

Browse files
committed
back to the lazy client
1 parent eeddbaa commit e4273da

9 files changed

+237
-185
lines changed

examples/custom-protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn main() -> Result<()> {
9191
let local_pool = LocalPool::default();
9292
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
9393
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
94-
let blobs_client = blobs.spawn_rpc();
94+
let blobs_client = blobs.client();
9595

9696
// Build our custom protocol handler. The `builder` exposes access to various subsystems in the
9797
// iroh node. In our case, we need a blobs client and the endpoint.
@@ -122,7 +122,7 @@ async fn main() -> Result<()> {
122122

123123
// Print out our query results.
124124
for hash in hashes {
125-
read_and_print(&blobs_client, hash).await?;
125+
read_and_print(blobs_client, hash).await?;
126126
}
127127
}
128128
}

examples/hello-world-fetch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn main() -> Result<()> {
4242
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
4343
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
4444
let node = builder.spawn().await?;
45-
let blobs_client = blobs.spawn_rpc();
45+
let blobs_client = blobs.client();
4646

4747
println!("fetching hash: {}", ticket.hash());
4848
println!("node id: {}", node.endpoint().node_id());

examples/hello-world-provide.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> {
2828
let local_pool = LocalPool::default();
2929
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
3030
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
31-
let blobs_client = blobs.spawn_rpc();
31+
let blobs_client = blobs.client();
3232
let node = builder.spawn().await?;
3333

3434
// add some data and remember the hash

examples/local-swarm-discovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn main() -> anyhow::Result<()> {
7878
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
7979
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
8080
let node = builder.spawn().await?;
81-
let blobs_client = blobs.spawn_rpc();
81+
let blobs_client = blobs.client();
8282

8383
match &cli.command {
8484
Commands::Accept { path } => {

examples/transfer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn main() -> Result<()> {
2626
.spawn()
2727
.await?;
2828

29-
let blobs = blobs.spawn_rpc();
29+
let blobs = blobs.client();
3030

3131
let args = std::env::args().collect::<Vec<_>>();
3232
match &args.iter().map(String::as_str).collect::<Vec<_>>()[..] {

src/net_protocol.rs

Lines changed: 19 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,22 @@
55

66
use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};
77

8-
use anyhow::{anyhow, bail, Result};
8+
use anyhow::{bail, Result};
99
use futures_lite::future::Boxed as BoxedFuture;
1010
use futures_util::future::BoxFuture;
1111
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
1212
use iroh_base::hash::{BlobFormat, Hash};
1313
use serde::{Deserialize, Serialize};
14-
use tracing::{debug, warn};
14+
use tracing::debug;
1515

1616
use crate::{
17-
downloader::{DownloadRequest, Downloader},
18-
get::{
19-
db::{DownloadProgress, GetState},
20-
Stats,
21-
},
17+
downloader::Downloader,
2218
provider::EventSender,
2319
store::GcConfig,
2420
util::{
2521
local_pool::{self, LocalPoolHandle},
26-
progress::{AsyncChannelProgressSender, ProgressSender},
2722
SetTagOption,
2823
},
29-
HashAndFormat,
3024
};
3125

3226
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
@@ -48,20 +42,22 @@ impl Default for GcState {
4842
}
4943

5044
#[derive(Debug)]
51-
struct BlobsInner<S> {
52-
rt: LocalPoolHandle,
45+
pub(crate) struct BlobsInner<S> {
46+
pub(crate) rt: LocalPoolHandle,
5347
pub(crate) store: S,
5448
events: EventSender,
55-
downloader: Downloader,
56-
endpoint: Endpoint,
49+
pub(crate) downloader: Downloader,
50+
pub(crate) endpoint: Endpoint,
5751
gc_state: std::sync::Mutex<GcState>,
5852
#[cfg(feature = "rpc")]
59-
batches: tokio::sync::Mutex<BlobBatches>,
53+
pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
6054
}
6155

6256
#[derive(Debug, Clone)]
6357
pub struct Blobs<S> {
64-
inner: Arc<BlobsInner<S>>,
58+
pub(crate) inner: Arc<BlobsInner<S>>,
59+
#[cfg(feature = "rpc")]
60+
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
6561
}
6662

6763
/// Keeps track of all the currently active batch operations of the blobs api.
@@ -79,7 +75,7 @@ pub(crate) struct BlobBatches {
7975
#[derive(Debug, Default)]
8076
struct BlobBatch {
8177
/// The tags in this batch.
82-
tags: std::collections::BTreeMap<HashAndFormat, Vec<crate::TempTag>>,
78+
tags: std::collections::BTreeMap<iroh::hash::HashAndFormat, Vec<crate::TempTag>>,
8379
}
8480

8581
#[cfg(feature = "rpc")]
@@ -98,7 +94,11 @@ impl BlobBatches {
9894
}
9995

10096
/// Remove a tag from a batch.
101-
pub fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> {
97+
pub fn remove_one(
98+
&mut self,
99+
batch: BatchId,
100+
content: &iroh::hash::HashAndFormat,
101+
) -> Result<()> {
102102
if let Some(batch) = self.batches.get_mut(&batch) {
103103
if let Some(tags) = batch.tags.get_mut(content) {
104104
tags.pop();
@@ -191,6 +191,8 @@ impl<S: crate::store::Store> Blobs<S> {
191191
batches: Default::default(),
192192
gc_state: Default::default(),
193193
}),
194+
#[cfg(feature = "rpc")]
195+
rpc_handler: Default::default(),
194196
}
195197
}
196198

@@ -255,147 +257,6 @@ impl<S: crate::store::Store> Blobs<S> {
255257
*state = GcState::Started(Some(run));
256258
Ok(())
257259
}
258-
259-
#[cfg(feature = "rpc")]
260-
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
261-
self.inner.batches.lock().await
262-
}
263-
264-
pub(crate) async fn download(
265-
&self,
266-
endpoint: Endpoint,
267-
req: BlobDownloadRequest,
268-
progress: AsyncChannelProgressSender<DownloadProgress>,
269-
) -> Result<()> {
270-
let BlobDownloadRequest {
271-
hash,
272-
format,
273-
nodes,
274-
tag,
275-
mode,
276-
} = req;
277-
let hash_and_format = HashAndFormat { hash, format };
278-
let temp_tag = self.store().temp_tag(hash_and_format);
279-
let stats = match mode {
280-
DownloadMode::Queued => {
281-
self.download_queued(endpoint, hash_and_format, nodes, progress.clone())
282-
.await?
283-
}
284-
DownloadMode::Direct => {
285-
self.download_direct_from_nodes(endpoint, hash_and_format, nodes, progress.clone())
286-
.await?
287-
}
288-
};
289-
290-
progress.send(DownloadProgress::AllDone(stats)).await.ok();
291-
match tag {
292-
SetTagOption::Named(tag) => {
293-
self.store().set_tag(tag, Some(hash_and_format)).await?;
294-
}
295-
SetTagOption::Auto => {
296-
self.store().create_tag(hash_and_format).await?;
297-
}
298-
}
299-
drop(temp_tag);
300-
301-
Ok(())
302-
}
303-
304-
async fn download_queued(
305-
&self,
306-
endpoint: Endpoint,
307-
hash_and_format: HashAndFormat,
308-
nodes: Vec<NodeAddr>,
309-
progress: AsyncChannelProgressSender<DownloadProgress>,
310-
) -> Result<Stats> {
311-
/// Name used for logging when new node addresses are added from gossip.
312-
const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download";
313-
314-
let mut node_ids = Vec::with_capacity(nodes.len());
315-
let mut any_added = false;
316-
for node in nodes {
317-
node_ids.push(node.node_id);
318-
if !node.info.is_empty() {
319-
endpoint.add_node_addr_with_source(node, BLOB_DOWNLOAD_SOURCE_NAME)?;
320-
any_added = true;
321-
}
322-
}
323-
let can_download = !node_ids.is_empty() && (any_added || endpoint.discovery().is_some());
324-
anyhow::ensure!(can_download, "no way to reach a node for download");
325-
let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress);
326-
let handle = self.downloader().queue(req).await;
327-
let stats = handle.await?;
328-
Ok(stats)
329-
}
330-
331-
#[tracing::instrument("download_direct", skip_all, fields(hash=%hash_and_format.hash.fmt_short()))]
332-
async fn download_direct_from_nodes(
333-
&self,
334-
endpoint: Endpoint,
335-
hash_and_format: HashAndFormat,
336-
nodes: Vec<NodeAddr>,
337-
progress: AsyncChannelProgressSender<DownloadProgress>,
338-
) -> Result<Stats> {
339-
let mut last_err = None;
340-
let mut remaining_nodes = nodes.len();
341-
let mut nodes_iter = nodes.into_iter();
342-
'outer: loop {
343-
match crate::get::db::get_to_db_in_steps(
344-
self.store().clone(),
345-
hash_and_format,
346-
progress.clone(),
347-
)
348-
.await?
349-
{
350-
GetState::Complete(stats) => return Ok(stats),
351-
GetState::NeedsConn(needs_conn) => {
352-
let (conn, node_id) = 'inner: loop {
353-
match nodes_iter.next() {
354-
None => break 'outer,
355-
Some(node) => {
356-
remaining_nodes -= 1;
357-
let node_id = node.node_id;
358-
if node_id == endpoint.node_id() {
359-
debug!(
360-
?remaining_nodes,
361-
"skip node {} (it is the node id of ourselves)",
362-
node_id.fmt_short()
363-
);
364-
continue 'inner;
365-
}
366-
match endpoint.connect(node, crate::protocol::ALPN).await {
367-
Ok(conn) => break 'inner (conn, node_id),
368-
Err(err) => {
369-
debug!(
370-
?remaining_nodes,
371-
"failed to connect to {}: {err}",
372-
node_id.fmt_short()
373-
);
374-
continue 'inner;
375-
}
376-
}
377-
}
378-
}
379-
};
380-
match needs_conn.proceed(conn).await {
381-
Ok(stats) => return Ok(stats),
382-
Err(err) => {
383-
warn!(
384-
?remaining_nodes,
385-
"failed to download from {}: {err}",
386-
node_id.fmt_short()
387-
);
388-
last_err = Some(err);
389-
}
390-
}
391-
}
392-
}
393-
}
394-
match last_err {
395-
Some(err) => Err(err.into()),
396-
None => Err(anyhow!("No nodes to download from provided")),
397-
}
398-
}
399260
}
400261

401262
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {

0 commit comments

Comments
 (0)