diff --git a/Cargo.lock b/Cargo.lock index 63fa032a1..ca1acf205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,9 +3133,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.13.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0ea1bd0b3124538bb71ed8cedbe92608fd1cf227e4f5ff53fb28746737b794" +checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" dependencies = [ "anyhow", "derive_more", @@ -3154,9 +3154,9 @@ dependencies = [ [[package]] name = "quic-rpc-derive" -version = "0.13.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b91a3f7a42657cbfbd0c2499c1f037738eff45bb7f59c6ce3d3d9e890d141c" +checksum = "cbef4c942978f74ef296ae40d43d4375c9d730b65a582688a358108cfd5c0cf7" dependencies = [ "proc-macro2", "quic-rpc", diff --git a/Cargo.toml b/Cargo.toml index 8bf261cee..0d2753f49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,8 @@ parking_lot = { version = "0.12.1", optional = true } pin-project = "1.1.5" portable-atomic = { version = "1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.13.0", optional = true } -quic-rpc-derive = { version = "0.13.0", optional = true } +quic-rpc = { version = "0.15.0", optional = true } +quic-rpc-derive = { version = "0.15.0", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" range-collections = "0.4.0" @@ -82,9 +82,6 @@ fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] metrics = ["iroh-metrics/metrics"] redb = ["dep:redb"] rpc = ["dep:quic-rpc", "dep:quic-rpc-derive", "dep:nested_enum_utils", "dep:strum", "dep:futures-util", "dep:ref-cast", "dep:portable-atomic", "dep:walkdir", "downloader"] -ref-cast = ["dep:ref-cast"] -portable-atomic = ["dep:portable-atomic"] -walkdir = ["dep:walkdir"] [package.metadata.docs.rs] all-features = true diff --git a/deny.toml b/deny.toml index 267ca7a65..f5669dbf3 100644 --- a/deny.toml +++ b/deny.toml @@ -21,6 +21,7 @@ allow = [ "Unicode-DFS-2016", "Zlib", "MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/ + "Unicode-3.0" ] [[licenses.clarify]] diff --git a/src/rpc.rs b/src/rpc.rs index b764f948d..7264f8ac5 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -30,9 +30,9 @@ use proto::{ CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode, }, - RpcError, RpcResult, + Request, RpcError, RpcResult, RpcService, }; -use quic_rpc::server::{RpcChannel, RpcServerError}; +use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError}; use crate::{ export::ExportProgress, @@ -57,16 +57,15 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; impl<D: crate::store::Store> Blobs<D> { /// Handle an RPC request - pub async fn handle_rpc_request<S, C>( + pub async fn handle_rpc_request<C>( self: Arc<Self>, - msg: crate::rpc::proto::Request, - chan: RpcChannel<crate::rpc::proto::RpcService, C, S>, + msg: Request, + chan: RpcChannel<RpcService, C>, ) -> std::result::Result<(), RpcServerError<C>> where - S: quic_rpc::Service, - C: quic_rpc::ServiceEndpoint<S>, + C: ChannelTypes<RpcService>, { - use crate::rpc::proto::Request::*; + use Request::*; match msg { Blobs(msg) => self.handle_blobs_request(msg, chan).await, Tags(msg) => self.handle_tags_request(msg, chan).await, @@ -74,14 +73,13 @@ impl<D: crate::store::Store> Blobs<D> { } /// Handle a tags request - pub async fn handle_tags_request<S, C>( + pub async fn handle_tags_request<C>( self: Arc<Self>, msg: proto::tags::Request, - chan: RpcChannel<proto::RpcService, C, S>, + chan: RpcChannel<proto::RpcService, C>, ) -> std::result::Result<(), RpcServerError<C>> where - S: quic_rpc::Service, - C: quic_rpc::ServiceEndpoint<S>, + C: ChannelTypes<proto::RpcService>, { use proto::tags::Request::*; match msg { @@ -93,14 +91,13 @@ impl<D: crate::store::Store> Blobs<D> { } /// Handle a blobs request - pub async fn handle_blobs_request<Sv, C>( + pub async fn handle_blobs_request<C>( self: Arc<Self>, msg: proto::blobs::Request, - chan: RpcChannel<proto::RpcService, C, Sv>, + chan: RpcChannel<proto::RpcService, C>, ) -> std::result::Result<(), RpcServerError<C>> where - Sv: quic_rpc::Service, - C: quic_rpc::ServiceEndpoint<Sv>, + C: ChannelTypes<proto::RpcService>, { use proto::blobs::Request::*; match msg { diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 21cd82d2e..1f7ce7646 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -73,8 +73,8 @@ use genawaiter::sync::{Co, Gen}; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; use quic_rpc::{ - client::{BoxStreamSync, BoxedServiceConnection}, - RpcClient, + client::{BoxStreamSync, BoxedConnector}, + Connector, RpcClient, }; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -87,6 +87,7 @@ use crate::{ format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, net_protocol::BlobDownloadRequest, + rpc::proto::RpcService, store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::SetTagOption, BlobFormat, Hash, Tag, @@ -105,20 +106,16 @@ use crate::rpc::proto::blobs::{ /// Iroh blobs client. #[derive(Debug, Clone)] -pub struct Client< - C = BoxedServiceConnection<crate::rpc::proto::RpcService>, - S = crate::rpc::proto::RpcService, -> { - pub(super) rpc: RpcClient<crate::rpc::proto::RpcService, C, S>, +pub struct Client<C = BoxedConnector<RpcService>> { + pub(super) rpc: RpcClient<RpcService, C>, } -impl<C, S> Client<C, S> +impl<C> Client<C> where - S: quic_rpc::Service, - C: quic_rpc::ServiceConnection<S>, + C: Connector<RpcService>, { /// Create a new client - pub fn new(rpc: RpcClient<crate::rpc::proto::RpcService, C, S>) -> Self { + pub fn new(rpc: RpcClient<RpcService, C>) -> Self { Self { rpc } } @@ -147,7 +144,7 @@ where /// A batch is a context in which temp tags are created and data is added to the node. Temp tags /// are automatically deleted when the batch is dropped, leading to the data being garbage collected /// unless a permanent tag is created for it. - pub async fn batch(&self) -> Result<Batch<C, S>> { + pub async fn batch(&self) -> Result<Batch<C>> { let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; let rpc = self.rpc.clone(); @@ -457,15 +454,14 @@ where Ok(()) } - fn tags_client(&self) -> tags::Client<C, S> { + fn tags_client(&self) -> tags::Client<C> { tags::Client::new(self.rpc.clone()) } } -impl<C, S> SimpleStore for Client<C, S> +impl<C> SimpleStore for Client<C> where - S: quic_rpc::Service, - C: quic_rpc::ServiceConnection<S>, + C: Connector<RpcService>, { async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> { self.read_to_bytes(hash).await @@ -882,26 +878,24 @@ impl Reader { } /// todo make private again - pub async fn from_rpc_read<C, S>( - rpc: &RpcClient<crate::rpc::proto::RpcService, C, S>, + pub async fn from_rpc_read<C>( + rpc: &RpcClient<RpcService, C>, hash: Hash, ) -> anyhow::Result<Self> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await } - async fn from_rpc_read_at<C, S>( - rpc: &RpcClient<crate::rpc::proto::RpcService, C, S>, + async fn from_rpc_read_at<C>( + rpc: &RpcClient<RpcService, C>, hash: Hash, offset: u64, len: ReadAtLen, ) -> anyhow::Result<Self> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { let stream = rpc .server_streaming(ReadAtRequest { hash, offset, len }) @@ -999,20 +993,17 @@ mod tests { use std::{path::Path, sync::Arc}; use iroh_net::{NodeAddr, NodeId}; - use quic_rpc::client::BoxedServiceConnection; + use quic_rpc::transport::{Connector, Listener}; use tokio_util::task::AbortOnDropHandle; + use super::RpcService; use crate::{ provider::{CustomEventSender, EventSender}, rpc::client::{blobs, tags}, util::local_pool::LocalPool, }; - type RpcClient = quic_rpc::RpcClient< - crate::rpc::proto::RpcService, - BoxedServiceConnection<crate::rpc::proto::RpcService>, - crate::rpc::proto::RpcService, - >; + type RpcClient = quic_rpc::RpcClient<RpcService>; /// An iroh node that just has the blobs transport #[derive(Debug)] @@ -1129,10 +1120,9 @@ mod tests { let router = router.spawn().await?; // Setup RPC - let (internal_rpc, controller) = - quic_rpc::transport::flume::service_connection::<crate::rpc::proto::RpcService>(32); - let controller = quic_rpc::transport::boxed::Connection::new(controller); - let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc); + let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); + let controller = controller.boxed(); + let internal_rpc = internal_rpc.boxed(); let internal_rpc = quic_rpc::RpcServer::new(internal_rpc); let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move { diff --git a/src/rpc/client/blobs/batch.rs b/src/rpc/client/blobs/batch.rs index 6b08e9268..b82f17837 100644 --- a/src/rpc/client/blobs/batch.rs +++ b/src/rpc/client/blobs/batch.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use futures_buffered::BufferedStreamExt; use futures_lite::StreamExt; use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; -use quic_rpc::{client::UpdateSink, RpcClient}; +use quic_rpc::{client::UpdateSink, Connector, RpcClient}; use tokio::io::AsyncRead; use tokio_util::io::ReaderStream; use tracing::{debug, warn}; @@ -25,6 +25,7 @@ use crate::{ BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, }, tags::{self, SyncMode}, + RpcService, }, store::ImportMode, util::{SetTagOption, TagDrop}, @@ -33,19 +34,17 @@ use crate::{ /// A scope in which blobs can be added. #[derive(derive_more::Debug)] -struct BatchInner<C, S> +struct BatchInner<C> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { /// The id of the scope. batch: BatchId, /// The rpc client. - rpc: RpcClient<crate::rpc::proto::RpcService, C, S>, + rpc: RpcClient<RpcService, C>, /// The stream to send drop #[debug(skip)] - updates: - Mutex<Buffer<UpdateSink<S, C, BatchUpdate, crate::rpc::proto::RpcService>, BatchUpdate>>, + updates: Mutex<Buffer<UpdateSink<C, BatchUpdate>, BatchUpdate>>, } /// A batch for write operations. @@ -55,15 +54,13 @@ where /// It is not a transaction, so things in a batch are not atomic. Also, there is /// no isolation between batches. #[derive(derive_more::Debug)] -pub struct Batch<C, S>(Arc<BatchInner<C, S>>) +pub struct Batch<C>(Arc<BatchInner<C>>) where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service; + C: Connector<RpcService>; -impl<C, S> TagDrop for BatchInner<C, S> +impl<C> TagDrop for BatchInner<C> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { fn on_drop(&self, content: &HashAndFormat) { let mut updates = self.updates.lock().unwrap(); @@ -131,15 +128,14 @@ impl Default for AddReaderOpts { } } -impl<C, S> Batch<C, S> +impl<C> Batch<C> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { pub(super) fn new( batch: BatchId, - rpc: RpcClient<crate::rpc::proto::RpcService, C, S>, - updates: UpdateSink<S, C, BatchUpdate, crate::rpc::proto::RpcService>, + rpc: RpcClient<RpcService, C>, + updates: UpdateSink<C, BatchUpdate>, buffer_size: usize, ) -> Self { let updates = updates.buffer(buffer_size); diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 51825fd1c..2b7cbc04d 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -12,31 +12,30 @@ //! [`Client::delete`] can be used to delete a tag. use anyhow::Result; use futures_lite::{Stream, StreamExt}; -use quic_rpc::{client::BoxedServiceConnection, RpcClient}; +use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; use serde::{Deserialize, Serialize}; use crate::{ - rpc::proto::tags::{DeleteRequest, ListRequest}, + rpc::proto::{ + tags::{DeleteRequest, ListRequest}, + RpcService, + }, BlobFormat, Hash, Tag, }; /// Iroh tags client. #[derive(Debug, Clone)] #[repr(transparent)] -pub struct Client< - C = BoxedServiceConnection<crate::rpc::proto::RpcService>, - S = crate::rpc::proto::RpcService, -> { - pub(super) rpc: RpcClient<crate::rpc::proto::RpcService, C, S>, +pub struct Client<C = BoxedConnector<RpcService>> { + pub(super) rpc: RpcClient<RpcService, C>, } -impl<C, S> Client<C, S> +impl<C> Client<C> where - C: quic_rpc::ServiceConnection<S>, - S: quic_rpc::Service, + C: Connector<RpcService>, { /// Creates a new client - pub fn new(rpc: RpcClient<crate::rpc::proto::RpcService, C, S>) -> Self { + pub fn new(rpc: RpcClient<RpcService, C>) -> Self { Self { rpc } }