Skip to content

Commit 60be4ff

Browse files
rklaehnFrando
andauthored
feat!: Allow configuring the downloader when creating a blobs protocol handler (#76)
## Description Allow configuring the downloader when creating a blobs protocol handler we don't allow passing in the entire downloader, but all config options. ## Breaking Changes `iroh_blobs::downloader::Downloader` now takes a `config: Config ` argument instead of `concurrency_limits: ConcurrencyLimits, retry_config: RetryConfig`. `Config` has two public field `concurrency_limits` and `retry_config`, so the migration is straightforward. Otherwise there's only additions (a new method `downloader_config` on the `Blobs` constructor). ## Notes & open questions ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --------- Co-authored-by: Frando <[email protected]>
1 parent 8a975ec commit 60be4ff

File tree

4 files changed

+98
-57
lines changed

4 files changed

+98
-57
lines changed

src/downloader.rs

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ pub enum GetOutput<N> {
140140
}
141141

142142
/// Concurrency limits for the [`Downloader`].
143-
#[derive(Debug)]
143+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144144
pub struct ConcurrencyLimits {
145145
/// Maximum number of requests the service performs concurrently.
146146
pub max_concurrent_requests: usize,
@@ -192,7 +192,7 @@ impl ConcurrencyLimits {
192192
}
193193

194194
/// Configuration for retry behavior of the [`Downloader`].
195-
#[derive(Debug)]
195+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196196
pub struct RetryConfig {
197197
/// Maximum number of retry attempts for a node that failed to dial or failed with IO errors.
198198
pub max_retries_per_node: u32,
@@ -324,13 +324,29 @@ impl Future for DownloadHandle {
324324
}
325325
}
326326

327+
/// All numerical config options for the downloader.
328+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
329+
pub struct Config {
330+
/// Concurrency limits for the downloader.
331+
pub concurrency: ConcurrencyLimits,
332+
/// Retry configuration for the downloader.
333+
pub retry: RetryConfig,
334+
}
335+
327336
/// Handle for the download services.
328-
#[derive(Clone, Debug)]
337+
#[derive(Debug, Clone)]
329338
pub struct Downloader {
339+
inner: Arc<Inner>,
340+
}
341+
342+
#[derive(Debug)]
343+
struct Inner {
330344
/// Next id to use for a download intent.
331-
next_id: Arc<AtomicU64>,
345+
next_id: AtomicU64,
332346
/// Channel to communicate with the service.
333347
msg_tx: mpsc::Sender<Message>,
348+
/// Configuration for the downloader.
349+
config: Arc<Config>,
334350
metrics: Arc<Metrics>,
335351
}
336352

@@ -340,54 +356,48 @@ impl Downloader {
340356
where
341357
S: Store,
342358
{
343-
Self::with_config(store, endpoint, rt, Default::default(), Default::default())
359+
Self::with_config(store, endpoint, rt, Default::default())
344360
}
345361

346362
/// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
347-
pub fn with_config<S>(
348-
store: S,
349-
endpoint: Endpoint,
350-
rt: LocalPoolHandle,
351-
concurrency_limits: ConcurrencyLimits,
352-
retry_config: RetryConfig,
353-
) -> Self
363+
pub fn with_config<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle, config: Config) -> Self
354364
where
355365
S: Store,
356366
{
357367
let metrics = Arc::new(Metrics::default());
368+
let metrics2 = metrics.clone();
358369
let me = endpoint.node_id().fmt_short();
359370
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
360371
let dialer = Dialer::new(endpoint);
361-
362-
let metrics_clone = metrics.clone();
372+
let config = Arc::new(config);
373+
let config2 = config.clone();
363374
let create_future = move || {
364375
let getter = get::IoGetter {
365376
store: store.clone(),
366377
};
367-
368-
let service = Service::new(
369-
getter,
370-
dialer,
371-
concurrency_limits,
372-
retry_config,
373-
msg_rx,
374-
metrics_clone,
375-
);
376-
378+
let service = Service::new(getter, dialer, config2, msg_rx, metrics2);
377379
service.run().instrument(error_span!("downloader", %me))
378380
};
379381
rt.spawn_detached(create_future);
380382
Self {
381-
next_id: Arc::new(AtomicU64::new(0)),
382-
msg_tx,
383-
metrics,
383+
inner: Arc::new(Inner {
384+
next_id: AtomicU64::new(0),
385+
msg_tx,
386+
config,
387+
metrics,
388+
}),
384389
}
385390
}
386391

392+
/// Get the current configuration.
393+
pub fn config(&self) -> &Config {
394+
&self.inner.config
395+
}
396+
387397
/// Queue a download.
388398
pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle {
389399
let kind = request.kind;
390-
let intent_id = IntentId(self.next_id.fetch_add(1, Ordering::SeqCst));
400+
let intent_id = IntentId(self.inner.next_id.fetch_add(1, Ordering::SeqCst));
391401
let (sender, receiver) = oneshot::channel();
392402
let handle = DownloadHandle {
393403
id: intent_id,
@@ -401,7 +411,7 @@ impl Downloader {
401411
};
402412
// if this fails polling the handle will fail as well since the sender side of the oneshot
403413
// will be dropped
404-
if let Err(send_err) = self.msg_tx.send(msg).await {
414+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
405415
let msg = send_err.0;
406416
debug!(?msg, "download not sent");
407417
}
@@ -417,7 +427,7 @@ impl Downloader {
417427
receiver: _,
418428
} = handle;
419429
let msg = Message::CancelIntent { id, kind };
420-
if let Err(send_err) = self.msg_tx.send(msg).await {
430+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
421431
let msg = send_err.0;
422432
debug!(?msg, "cancel not sent");
423433
}
@@ -429,15 +439,15 @@ impl Downloader {
429439
/// downloads. Use [`Self::queue`] to queue a download.
430440
pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec<NodeId>) {
431441
let msg = Message::NodesHave { hash, nodes };
432-
if let Err(send_err) = self.msg_tx.send(msg).await {
442+
if let Err(send_err) = self.inner.msg_tx.send(msg).await {
433443
let msg = send_err.0;
434444
debug!(?msg, "nodes have not been sent")
435445
}
436446
}
437447

438448
/// Returns the metrics collected for this downloader.
439449
pub fn metrics(&self) -> &Arc<Metrics> {
440-
&self.metrics
450+
&self.inner.metrics
441451
}
442452
}
443453

@@ -586,17 +596,16 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
586596
fn new(
587597
getter: G,
588598
dialer: D,
589-
concurrency_limits: ConcurrencyLimits,
590-
retry_config: RetryConfig,
599+
config: Arc<Config>,
591600
msg_rx: mpsc::Receiver<Message>,
592601
metrics: Arc<Metrics>,
593602
) -> Self {
594603
Service {
595604
getter,
596605
dialer,
597606
msg_rx,
598-
concurrency_limits,
599-
retry_config,
607+
concurrency_limits: config.concurrency,
608+
retry_config: config.retry,
600609
connected_nodes: Default::default(),
601610
retry_node_state: Default::default(),
602611
providers: Default::default(),

src/downloader/test.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,25 @@ impl Downloader {
4949

5050
let lp = LocalPool::default();
5151
let metrics_clone = metrics.clone();
52+
let config = Arc::new(Config {
53+
concurrency: concurrency_limits,
54+
retry: retry_config,
55+
});
56+
let config2 = config.clone();
5257
lp.spawn_detached(move || async move {
5358
// we want to see the logs of the service
54-
let service = Service::new(
55-
getter,
56-
dialer,
57-
concurrency_limits,
58-
retry_config,
59-
msg_rx,
60-
metrics_clone,
61-
);
59+
let service = Service::new(getter, dialer, config2, msg_rx, metrics_clone);
6260
service.run().await
6361
});
6462

6563
(
6664
Downloader {
67-
next_id: Arc::new(AtomicU64::new(0)),
68-
msg_tx,
69-
metrics,
65+
inner: Arc::new(Inner {
66+
next_id: AtomicU64::new(0),
67+
msg_tx,
68+
config,
69+
metrics,
70+
}),
7071
},
7172
lp,
7273
)

src/net_protocol.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
1818
use tracing::debug;
1919

2020
use crate::{
21-
downloader::{ConcurrencyLimits, Downloader, RetryConfig},
21+
downloader::{self, ConcurrencyLimits, Downloader, RetryConfig},
2222
metrics::Metrics,
2323
provider::EventSender,
2424
store::GcConfig,
@@ -148,9 +148,8 @@ impl BlobBatches {
148148
pub struct Builder<S> {
149149
store: S,
150150
events: Option<EventSender>,
151+
downloader_config: Option<crate::downloader::Config>,
151152
rt: Option<LocalPoolHandle>,
152-
concurrency_limits: Option<ConcurrencyLimits>,
153-
retry_config: Option<RetryConfig>,
154153
}
155154

156155
impl<S: crate::store::Store> Builder<S> {
@@ -166,15 +165,23 @@ impl<S: crate::store::Store> Builder<S> {
166165
self
167166
}
168167

168+
/// Set custom downloader config
169+
pub fn downloader_config(mut self, downloader_config: downloader::Config) -> Self {
170+
self.downloader_config = Some(downloader_config);
171+
self
172+
}
173+
169174
/// Set custom [`ConcurrencyLimits`] to use.
170175
pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self {
171-
self.concurrency_limits = Some(concurrency_limits);
176+
let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
177+
downloader_config.concurrency = concurrency_limits;
172178
self
173179
}
174180

175181
/// Set a custom [`RetryConfig`] to use.
176182
pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
177-
self.retry_config = Some(retry_config);
183+
let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
184+
downloader_config.retry = retry_config;
178185
self
179186
}
180187

@@ -185,12 +192,12 @@ impl<S: crate::store::Store> Builder<S> {
185192
.rt
186193
.map(Rt::Handle)
187194
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
195+
let downloader_config = self.downloader_config.unwrap_or_default();
188196
let downloader = Downloader::with_config(
189197
self.store.clone(),
190198
endpoint.clone(),
191199
rt.clone(),
192-
self.concurrency_limits.unwrap_or_default(),
193-
self.retry_config.unwrap_or_default(),
200+
downloader_config,
194201
);
195202
Blobs::new(
196203
self.store,
@@ -208,9 +215,8 @@ impl<S> Blobs<S> {
208215
Builder {
209216
store,
210217
events: None,
218+
downloader_config: None,
211219
rt: None,
212-
concurrency_limits: None,
213-
retry_config: None,
214220
}
215221
}
216222
}

tests/rpc.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![cfg(feature = "test")]
22
use std::{net::SocketAddr, path::PathBuf, vec};
33

4-
use iroh_blobs::net_protocol::Blobs;
4+
use iroh_blobs::{downloader, net_protocol::Blobs};
55
use quic_rpc::client::QuinnConnector;
66
use tempfile::TempDir;
77
use testresult::TestResult;
@@ -84,3 +84,28 @@ async fn quinn_rpc_large() -> TestResult<()> {
8484
assert_eq!(data, &data2[..]);
8585
Ok(())
8686
}
87+
88+
#[tokio::test]
89+
async fn downloader_config() -> TestResult<()> {
90+
let _ = tracing_subscriber::fmt::try_init();
91+
let endpoint = iroh::Endpoint::builder().bind().await?;
92+
let store = iroh_blobs::store::mem::Store::default();
93+
let expected = downloader::Config {
94+
concurrency: downloader::ConcurrencyLimits {
95+
max_concurrent_requests: usize::MAX,
96+
max_concurrent_requests_per_node: usize::MAX,
97+
max_open_connections: usize::MAX,
98+
max_concurrent_dials_per_hash: usize::MAX,
99+
},
100+
retry: downloader::RetryConfig {
101+
max_retries_per_node: u32::MAX,
102+
initial_retry_delay: std::time::Duration::from_secs(1),
103+
},
104+
};
105+
let blobs = Blobs::builder(store)
106+
.downloader_config(expected)
107+
.build(&endpoint);
108+
let actual = blobs.downloader().config();
109+
assert_eq!(&expected, actual);
110+
Ok(())
111+
}

0 commit comments

Comments
 (0)