Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 09562ce

Browse files
committedDec 6, 2024·
Remove the lazy part.
The lazy handler kept a reference to Blobs alive. This caused both the task and the blobs to never be dropped. To solve this you can just split the inner part in 2 parts, one that has the handle and one that has the logic. But that is not nice. I think it is best for the mem rpc handler to exist completely separately, especially given that rpc is a non-default feature.
1 parent e851bcc commit 09562ce

File tree

4 files changed

+24
-23
lines changed

4 files changed

+24
-23
lines changed
 

‎src/net_protocol.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ struct BlobsInner<S> {
6262
#[derive(Debug, Clone)]
6363
pub struct Blobs<S> {
6464
inner: Arc<BlobsInner<S>>,
65-
#[cfg(feature = "rpc")]
66-
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
6765
}
6866

6967
/// Keeps track of all the currently active batch operations of the blobs api.
@@ -193,8 +191,6 @@ impl<S: crate::store::Store> Blobs<S> {
193191
batches: Default::default(),
194192
gc_state: Default::default(),
195193
}),
196-
#[cfg(feature = "rpc")]
197-
rpc_handler: Default::default(),
198194
}
199195
}
200196

‎src/rpc.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
33
use std::{
44
io,
5+
ops::Deref,
56
sync::{Arc, Mutex},
67
};
78

89
use anyhow::anyhow;
910
use client::{
10-
blobs::{self, BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption},
11+
blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, MemClient, WrapOption},
1112
tags::TagInfo,
1213
MemConnector,
1314
};
@@ -62,13 +63,8 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;
6263

6364
impl<D: crate::store::Store> Blobs<D> {
6465
/// Get a client for the blobs protocol
65-
pub fn client(&self) -> blobs::MemClient {
66-
let client = self
67-
.rpc_handler
68-
.get_or_init(|| RpcHandler::new(self))
69-
.client
70-
.clone();
71-
blobs::Client::new(client)
66+
pub fn client(&self) -> RpcHandler {
67+
RpcHandler::new(self)
7268
}
7369

7470
/// Handle an RPC request
@@ -874,20 +870,34 @@ impl<D: crate::store::Store> Blobs<D> {
874870
}
875871
}
876872

873+
/// A rpc handler for the blobs rpc protocol
874+
///
875+
/// This struct contains both a task that handles rpc requests and a client
876+
/// that can be used to send rpc requests. Dropping it will stop the handler task,
877+
/// so you need to put it somewhere where it will be kept alive.
877878
#[derive(Debug)]
878-
pub(crate) struct RpcHandler {
879+
pub struct RpcHandler {
879880
/// Client to hand out
880-
client: RpcClient<RpcService, MemConnector>,
881+
client: MemClient,
881882
/// Handler task
882883
_handler: AbortOnDropHandle<()>,
883884
}
884885

886+
impl Deref for RpcHandler {
887+
type Target = MemClient;
888+
889+
fn deref(&self) -> &Self::Target {
890+
&self.client
891+
}
892+
}
893+
885894
impl RpcHandler {
886895
fn new<D: crate::store::Store>(blobs: &Blobs<D>) -> Self {
887896
let blobs = blobs.clone();
888897
let (listener, connector) = quic_rpc::transport::flume::channel(1);
889898
let listener = RpcServer::new(listener);
890899
let client = RpcClient::new(connector);
900+
let client = MemClient::new(client);
891901
let _handler = listener
892902
.spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan));
893903
Self { client, _handler }

‎tests/blobs.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,7 @@ async fn blobs_gc_protected() -> TestResult<()> {
3232
let pool = LocalPool::default();
3333
let endpoint = Endpoint::builder().bind().await?;
3434
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
35-
let client: iroh_blobs::rpc::client::blobs::Client<
36-
quic_rpc::transport::flume::FlumeConnector<
37-
iroh_blobs::rpc::proto::Response,
38-
iroh_blobs::rpc::proto::Request,
39-
>,
40-
> = blobs.clone().client();
35+
let client = blobs.clone().client();
4136
let h1 = client.add_bytes(b"test".to_vec()).await?;
4237
let protected = Arc::new(Mutex::new(Vec::new()));
4338
blobs.add_protected(Box::new({

‎tests/gc.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
2020
use iroh_blobs::{
2121
hashseq::HashSeq,
2222
net_protocol::Blobs,
23-
rpc::client::{blobs, tags},
23+
rpc::{client::tags, RpcHandler},
2424
store::{
2525
bao_tree, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, GcConfig, MapEntryMut,
2626
MapMut, ReportLevel, Store,
@@ -66,8 +66,8 @@ impl<S: Store> Node<S> {
6666
}
6767

6868
/// Returns an in-memory blobs client
69-
pub fn blobs(&self) -> blobs::MemClient {
70-
self.blobs.clone().client()
69+
pub fn blobs(&self) -> RpcHandler {
70+
self.blobs.client()
7171
}
7272

7373
/// Returns an in-memory tags client

0 commit comments

Comments
 (0)
Please sign in to comment.