diff --git a/Cargo.lock b/Cargo.lock index ca1acf205..8d2af62fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,9 +3133,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" +checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", "derive_more", @@ -3149,6 +3149,7 @@ dependencies = [ "serde", "slab", "tokio", + "tokio-util", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 3324f1f5e..c27f50da3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ parking_lot = { version = "0.12.1", optional = true } pin-project = "1.1.5" portable-atomic = { version = "1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.15.0", optional = true } +quic-rpc = { version = "0.15.1", optional = true } quic-rpc-derive = { version = "0.15.0", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" @@ -131,3 +131,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } + diff --git a/deny.toml b/deny.toml index f5669dbf3..050bf2b46 100644 --- a/deny.toml +++ b/deny.toml @@ -34,6 +34,7 @@ license-files = [ [advisories] ignore = [ "RUSTSEC-2024-0370", # unmaintained, no upgrade available + "RUSTSEC-2024-0384", # unmaintained, no upgrade available ] [sources] diff --git a/src/downloader.rs b/src/downloader.rs index 90f3cd0b5..b9cd2df19 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -645,7 +645,6 @@ impl, D: Dialer> Service { } /// Handle receiving a [`Message`]. - /// // This is called in the actor loop, and only async because subscribing to an existing transfer // sends the initial state. async fn handle_message(&mut self, msg: Message) { diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 68b88f82d..9caff14e1 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -3,7 +3,10 @@ // TODO: reduce API surface and add documentation #![allow(missing_docs)] -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::BTreeMap, + sync::{Arc, OnceLock}, +}; use anyhow::{anyhow, Result}; use futures_lite::future::Boxed as BoxedFuture; @@ -36,6 +39,8 @@ pub struct Blobs { downloader: Downloader, batches: tokio::sync::Mutex, endpoint: Endpoint, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, } /// Name used for logging when new node addresses are added from gossip. @@ -107,6 +112,8 @@ impl Blobs { downloader, endpoint, batches: Default::default(), + #[cfg(feature = "rpc")] + rpc_handler: Arc::new(OnceLock::new()), } } diff --git a/src/protocol.rs b/src/protocol.rs index 9f24b7217..da0995f5f 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -148,7 +148,8 @@ //! # use bao_tree::{ChunkNum, ChunkRanges}; //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); -//! let ranges = &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110)); +//! let ranges = +//! &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110)); //! let spec = RangeSpecSeq::from_ranges([ranges]); //! let request = GetRequest::new(hash, spec); //! ``` @@ -236,8 +237,8 @@ //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); //! let spec = RangeSpecSeq::from_ranges_infinite([ -//! ChunkRanges::all(), // the collection itself -//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child +//! ChunkRanges::all(), // the collection itself +//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child //! ]); //! let request = GetRequest::new(hash, spec); //! ``` @@ -252,9 +253,9 @@ //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); //! let spec = RangeSpecSeq::from_ranges([ -//! ChunkRanges::empty(), // we don't need the collection itself -//! ChunkRanges::empty(), // we don't need the first child either -//! ChunkRanges::all(), // we need the second child completely +//! ChunkRanges::empty(), // we don't need the collection itself +//! ChunkRanges::empty(), // we don't need the first child either +//! ChunkRanges::all(), // we need the second child completely //! ]); //! let request = GetRequest::new(hash, spec); //! ``` diff --git a/src/rpc.rs b/src/rpc.rs index 457691d12..04f9d00bc 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,8 +7,9 @@ use std::{ use anyhow::anyhow; use client::{ - blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, + blobs::{self, BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, tags::TagInfo, + MemConnector, }; use futures_buffered::BufferedStreamExt; use futures_lite::StreamExt; @@ -32,7 +33,11 @@ use proto::{ }, Request, RpcError, RpcResult, RpcService, }; -use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError}; +use quic_rpc::{ + server::{ChannelTypes, RpcChannel, RpcServerError}, + RpcClient, RpcServer, +}; +use tokio_util::task::AbortOnDropHandle; use crate::{ export::ExportProgress, @@ -56,6 +61,16 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; impl Blobs { + /// Get a client for the blobs protocol + pub fn client(self: Arc) -> blobs::MemClient { + let client = self + .rpc_handler + .get_or_init(|| RpcHandler::new(&self)) + .client + .clone(); + blobs::Client::new(client) + } + /// Handle an RPC request pub async fn handle_rpc_request( self: Arc, @@ -871,3 +886,23 @@ impl Blobs { Ok(CreateCollectionResponse { hash, tag }) } } + +#[derive(Debug)] +pub(crate) struct RpcHandler { + /// Client to hand out + client: RpcClient, + /// Handler task + _handler: AbortOnDropHandle<()>, +} + +impl RpcHandler { + fn new(blobs: &Arc>) -> Self { + let blobs = blobs.clone(); + let (listener, connector) = quic_rpc::transport::flume::channel(1); + let listener = RpcServer::new(listener); + let client = RpcClient::new(connector); + let _handler = listener + .spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)); + Self { client, _handler } + } +} diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 4b11fdc19..a2450f496 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -1,10 +1,15 @@ //! Iroh blobs and tags client use anyhow::Result; use futures_util::{Stream, StreamExt}; +use quic_rpc::transport::flume::FlumeConnector; pub mod blobs; pub mod tags; +/// Type alias for a memory-backed client. +pub(crate) type MemConnector = + FlumeConnector; + fn flatten( s: impl Stream, E2>>, ) -> impl Stream> diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 36e37b61c..64c4d7056 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -111,6 +111,9 @@ pub struct Client> { pub(super) rpc: RpcClient, } +/// Type alias for a memory-backed client. +pub type MemClient = Client; + impl Client where C: Connector, @@ -120,6 +123,11 @@ where Self { rpc } } + /// Get a tags client. + pub fn tags(&self) -> tags::Client { + tags::Client::new(self.rpc.clone()) + } + /// Check if a blob is completely stored on the node. /// /// Note that this will return false for blobs that are partially stored on diff --git a/src/util/fs.rs b/src/util/fs.rs index 068ebadc9..6095bc768 100644 --- a/src/util/fs.rs +++ b/src/util/fs.rs @@ -179,7 +179,6 @@ pub struct PathContent { } /// Walks the directory to get the total size and number of files in directory or file -/// // TODO: possible combine with `scan_dir` pub fn path_content_info(path: impl AsRef) -> anyhow::Result { path_content_info0(path)