Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6b094d3

Browse files
committedDec 5, 2024·
feat: update to new protocolhandler
1 parent 4c1446f commit 6b094d3

File tree

6 files changed

+81
-172
lines changed

6 files changed

+81
-172
lines changed
 

‎Cargo.lock

Lines changed: 56 additions & 89 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ futures-util = "0.3.30"
9595
testdir = "0.9.1"
9696

9797
[features]
98-
default = ["fs-store", "net_protocol"]
98+
default = ["fs-store", "net_protocol", "rpc"]
9999
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
100100
net_protocol = ["downloader", "dep:futures-util"]
101101
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
@@ -184,6 +184,5 @@ panic = 'abort'
184184
incremental = false
185185

186186
[patch.crates-io]
187-
# iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
188-
# iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
189-
# iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
187+
iroh-base = { git = "https://github.com/n0-computer/iroh" }
188+
iroh = { git = "https://github.com/n0-computer/iroh" }

‎src/net_protocol.rs

Lines changed: 14 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ impl Default for GcState {
4747
}
4848
}
4949

50-
#[derive(Debug)]
50+
#[derive(Debug, Clone)]
5151
pub struct Blobs<S> {
5252
rt: LocalPoolHandle,
5353
pub(crate) store: S,
5454
events: EventSender,
5555
downloader: Downloader,
5656
#[cfg(feature = "rpc")]
57-
batches: tokio::sync::Mutex<BlobBatches>,
57+
batches: Arc<tokio::sync::Mutex<BlobBatches>>,
5858
endpoint: Endpoint,
5959
gc_state: Arc<std::sync::Mutex<GcState>>,
6060
#[cfg(feature = "rpc")]
@@ -131,15 +131,15 @@ impl<S: crate::store::Store> Builder<S> {
131131

132132
/// Build the Blobs protocol handler.
133133
/// You need to provide a local pool handle and an endpoint.
134-
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs<S>> {
134+
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
135135
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
136-
Arc::new(Blobs::new(
136+
Blobs::new(
137137
self.store,
138138
rt.clone(),
139139
self.events.unwrap_or_default(),
140140
downloader,
141141
endpoint.clone(),
142-
))
142+
)
143143
}
144144
}
145145

@@ -391,82 +391,26 @@ impl<S: crate::store::Store> Blobs<S> {
391391
}
392392
}
393393

394-
// trait BlobsInner: Debug + Send + Sync + 'static {
395-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
396-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
397-
// fn client(self: Arc<Self>) -> MemClient;
398-
// fn local_pool_handle(&self) -> &LocalPoolHandle;
399-
// fn downloader(&self) -> &Downloader;
400-
// }
401-
402-
// #[derive(Debug)]
403-
// struct Blobs2 {
404-
// inner: Arc<dyn BlobsInner>,
405-
// }
406-
407-
// impl Blobs2 {
408-
// fn client(&self) -> MemClient {
409-
// self.inner.clone().client()
410-
// }
411-
412-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
413-
// self.inner.local_pool_handle()
414-
// }
415-
416-
// fn downloader(&self) -> &Downloader {
417-
// self.inner.downloader()
418-
// }
419-
// }
420-
421-
// impl<S: crate::store::Store> BlobsInner for Blobs<S> {
422-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
423-
// ProtocolHandler::shutdown(self)
424-
// }
425-
426-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
427-
// ProtocolHandler::accept(self, conn)
428-
// }
429-
430-
// fn client(self: Arc<Self>) -> MemClient {
431-
// Blobs::client(self)
432-
// }
433-
434-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
435-
// self.rt()
436-
// }
437-
438-
// fn downloader(&self) -> &Downloader {
439-
// self.downloader()
440-
// }
441-
// }
442-
443-
// impl ProtocolHandler for Blobs2 {
444-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
445-
// self.inner.clone().accept(conn)
446-
// }
447-
448-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
449-
// self.inner.clone().shutdown()
450-
// }
451-
// }
452-
453394
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
454-
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
395+
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
396+
let this = self.clone();
397+
455398
Box::pin(async move {
456399
crate::provider::handle_connection(
457400
conn.await?,
458-
self.store.clone(),
459-
self.events.clone(),
460-
self.rt.clone(),
401+
this.store.clone(),
402+
this.events.clone(),
403+
this.rt.clone(),
461404
)
462405
.await;
463406
Ok(())
464407
})
465408
}
466409

467-
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
410+
fn shutdown(&self) -> BoxedFuture<()> {
411+
let this = self.clone();
468412
Box::pin(async move {
469-
self.store.shutdown().await;
413+
this.store.shutdown().await;
470414
})
471415
}
472416
}

‎src/rpc.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;
6363

6464
impl<D: crate::store::Store> Blobs<D> {
6565
/// Get a client for the blobs protocol
66-
pub fn client(self: Arc<Self>) -> blobs::MemClient {
66+
pub fn client(&self) -> blobs::MemClient {
6767
let client = self
6868
.rpc_handler
6969
.get_or_init(|| RpcHandler::new(&self))
@@ -74,7 +74,7 @@ impl<D: crate::store::Store> Blobs<D> {
7474

7575
/// Handle an RPC request
7676
pub async fn handle_rpc_request<C>(
77-
self: Arc<Self>,
77+
self,
7878
msg: Request,
7979
chan: RpcChannel<RpcService, C>,
8080
) -> std::result::Result<(), RpcServerError<C>>
@@ -91,7 +91,7 @@ impl<D: crate::store::Store> Blobs<D> {
9191
}
9292

9393
#[derive(Clone)]
94-
struct Handler<S>(Arc<Blobs<S>>);
94+
struct Handler<S>(Blobs<S>);
9595

9696
impl<S> Deref for Handler<S> {
9797
type Target = Blobs<S>;
@@ -903,7 +903,7 @@ pub(crate) struct RpcHandler {
903903
}
904904

905905
impl RpcHandler {
906-
fn new<D: crate::store::Store>(blobs: &Arc<Blobs<D>>) -> Self {
906+
fn new<D: crate::store::Store>(blobs: &Blobs<D>) -> Self {
907907
let blobs = blobs.clone();
908908
let (listener, connector) = quic_rpc::transport::flume::channel(1);
909909
let listener = RpcServer::new(listener);

‎src/rpc/client/blobs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ mod tests {
10031003

10041004
mod node {
10051005
//! An iroh node that just has the blobs transport
1006-
use std::{path::Path, sync::Arc};
1006+
use std::path::Path;
10071007

10081008
use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
10091009
use tokio_util::task::AbortOnDropHandle;
@@ -1068,13 +1068,13 @@ mod tests {
10681068
// Setup blobs
10691069
let downloader =
10701070
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
1071-
let blobs = Arc::new(Blobs::new(
1071+
let blobs = Blobs::new(
10721072
store.clone(),
10731073
local_pool.handle().clone(),
10741074
events,
10751075
downloader,
10761076
endpoint.clone(),
1077-
));
1077+
);
10781078
router = router.accept(crate::ALPN, blobs.clone());
10791079

10801080
// Build the router

‎tests/gc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{
33
io,
44
io::{Cursor, Write},
55
path::PathBuf,
6-
sync::Arc,
76
time::Duration,
87
};
98

@@ -41,7 +40,7 @@ use tokio::io::AsyncReadExt;
4140
#[derive(Debug)]
4241
pub struct Node<S> {
4342
pub router: iroh::protocol::Router,
44-
pub blobs: Arc<Blobs<S>>,
43+
pub blobs: Blobs<S>,
4544
pub store: S,
4645
pub _local_pool: LocalPool,
4746
}

0 commit comments

Comments
 (0)
Please sign in to comment.