Skip to content

Commit 80327e4

Browse files
committed
use spawn_accept_loop
1 parent 8d8fb44 commit 80327e4

File tree

6 files changed

+14
-53
lines changed

6 files changed

+14
-53
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" }
131131
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
132132
iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
133133
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
134+
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }

src/downloader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,6 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
645645
}
646646

647647
/// Handle receiving a [`Message`].
648-
///
649648
// This is called in the actor loop, and only async because subscribing to an existing transfer
650649
// sends the initial state.
651650
async fn handle_message(&mut self, msg: Message) {

src/protocol.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@
148148
//! # use bao_tree::{ChunkNum, ChunkRanges};
149149
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
150150
//! # let hash: iroh_blobs::Hash = [0; 32].into();
151-
//! let ranges = &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
151+
//! let ranges =
152+
//! &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
152153
//! let spec = RangeSpecSeq::from_ranges([ranges]);
153154
//! let request = GetRequest::new(hash, spec);
154155
//! ```
@@ -236,8 +237,8 @@
236237
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
237238
//! # let hash: iroh_blobs::Hash = [0; 32].into();
238239
//! let spec = RangeSpecSeq::from_ranges_infinite([
239-
//! ChunkRanges::all(), // the collection itself
240-
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
240+
//! ChunkRanges::all(), // the collection itself
241+
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
241242
//! ]);
242243
//! let request = GetRequest::new(hash, spec);
243244
//! ```
@@ -252,9 +253,9 @@
252253
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
253254
//! # let hash: iroh_blobs::Hash = [0; 32].into();
254255
//! let spec = RangeSpecSeq::from_ranges([
255-
//! ChunkRanges::empty(), // we don't need the collection itself
256-
//! ChunkRanges::empty(), // we don't need the first child either
257-
//! ChunkRanges::all(), // we need the second child completely
256+
//! ChunkRanges::empty(), // we don't need the collection itself
257+
//! ChunkRanges::empty(), // we don't need the first child either
258+
//! ChunkRanges::all(), // we need the second child completely
258259
//! ]);
259260
//! let request = GetRequest::new(hash, spec);
260261
//! ```

src/rpc.rs

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ use quic_rpc::{
3737
server::{ChannelTypes, RpcChannel, RpcServerError},
3838
RpcClient, RpcServer,
3939
};
40-
use tokio::task::JoinSet;
4140
use tokio_util::task::AbortOnDropHandle;
42-
use tracing::{error, warn};
4341

4442
use crate::{
4543
export::ExportProgress,
@@ -892,7 +890,7 @@ impl<D: crate::store::Store> Blobs<D> {
892890
#[derive(Debug)]
893891
pub(crate) struct RpcHandler {
894892
/// Client to hand out
895-
client: RpcClient<crate::rpc::proto::RpcService, MemConnector>,
893+
client: RpcClient<RpcService, MemConnector>,
896894
/// Handler task
897895
_handler: AbortOnDropHandle<()>,
898896
}
@@ -903,45 +901,8 @@ impl RpcHandler {
903901
let (listener, connector) = quic_rpc::transport::flume::channel(1);
904902
let listener = RpcServer::new(listener);
905903
let client = RpcClient::new(connector);
906-
let task = tokio::spawn(async move {
907-
let mut tasks = JoinSet::new();
908-
loop {
909-
tokio::select! {
910-
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
911-
if let Err(e) = res {
912-
if e.is_panic() {
913-
error!("Panic handling RPC request: {e}");
914-
}
915-
}
916-
}
917-
req = listener.accept() => {
918-
let req = match req {
919-
Ok(req) => req,
920-
Err(e) => {
921-
warn!("Error accepting RPC request: {e}");
922-
continue;
923-
}
924-
};
925-
let blobs = blobs.clone();
926-
tasks.spawn(async move {
927-
let (req, client) = match req.read_first().await {
928-
Ok((req, client)) => (req, client),
929-
Err(e) => {
930-
warn!("Error reading first message: {e}");
931-
return;
932-
}
933-
};
934-
if let Err(cause) = blobs.handle_rpc_request(req, client).await {
935-
warn!("Error handling RPC request: {:?}", cause);
936-
}
937-
});
938-
}
939-
}
940-
}
941-
});
942-
Self {
943-
client,
944-
_handler: AbortOnDropHandle::new(task),
945-
}
904+
let _handler = listener
905+
.spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan));
906+
Self { client, _handler }
946907
}
947908
}

src/util/fs.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ pub struct PathContent {
179179
}
180180

181181
/// Walks the directory to get the total size and number of files in directory or file
182-
///
183182
// TODO: possible combine with `scan_dir`
184183
pub fn path_content_info(path: impl AsRef<Path>) -> anyhow::Result<PathContent> {
185184
path_content_info0(path)

0 commit comments

Comments
 (0)