@@ -73,8 +73,8 @@ use genawaiter::sync::{Co, Gen};
73
73
use iroh_net:: NodeAddr ;
74
74
use portable_atomic:: { AtomicU64 , Ordering } ;
75
75
use quic_rpc:: {
76
- client:: { BoxStreamSync , BoxedServiceConnection } ,
77
- RpcClient ,
76
+ client:: { BoxStreamSync , BoxedConnector } ,
77
+ Connector , RpcClient ,
78
78
} ;
79
79
use serde:: { Deserialize , Serialize } ;
80
80
use tokio:: io:: { AsyncRead , AsyncReadExt , ReadBuf } ;
@@ -87,6 +87,7 @@ use crate::{
87
87
format:: collection:: { Collection , SimpleStore } ,
88
88
get:: db:: DownloadProgress as BytesDownloadProgress ,
89
89
net_protocol:: BlobDownloadRequest ,
90
+ rpc:: proto:: RpcService ,
90
91
store:: { BaoBlobSize , ConsistencyCheckProgress , ExportFormat , ExportMode , ValidateProgress } ,
91
92
util:: SetTagOption ,
92
93
BlobFormat , Hash , Tag ,
@@ -105,20 +106,16 @@ use crate::rpc::proto::blobs::{
105
106
106
107
/// Iroh blobs client.
107
108
#[ derive( Debug , Clone ) ]
108
- pub struct Client <
109
- C = BoxedServiceConnection < crate :: rpc:: proto:: RpcService > ,
110
- S = crate :: rpc:: proto:: RpcService ,
111
- > {
112
- pub ( super ) rpc : RpcClient < crate :: rpc:: proto:: RpcService , C , S > ,
109
+ pub struct Client < C = BoxedConnector < RpcService > > {
110
+ pub ( super ) rpc : RpcClient < RpcService , C > ,
113
111
}
114
112
115
- impl < C , S > Client < C , S >
113
+ impl < C > Client < C >
116
114
where
117
- S : quic_rpc:: Service ,
118
- C : quic_rpc:: ServiceConnection < S > ,
115
+ C : Connector < RpcService > ,
119
116
{
120
117
/// Create a new client
121
- pub fn new ( rpc : RpcClient < crate :: rpc :: proto :: RpcService , C , S > ) -> Self {
118
+ pub fn new ( rpc : RpcClient < RpcService , C > ) -> Self {
122
119
Self { rpc }
123
120
}
124
121
@@ -147,7 +144,7 @@ where
147
144
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
148
145
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
149
146
/// unless a permanent tag is created for it.
150
- pub async fn batch ( & self ) -> Result < Batch < C , S > > {
147
+ pub async fn batch ( & self ) -> Result < Batch < C > > {
151
148
let ( updates, mut stream) = self . rpc . bidi ( BatchCreateRequest ) . await ?;
152
149
let BatchCreateResponse :: Id ( batch) = stream. next ( ) . await . context ( "expected scope id" ) ??;
153
150
let rpc = self . rpc . clone ( ) ;
@@ -457,15 +454,14 @@ where
457
454
Ok ( ( ) )
458
455
}
459
456
460
- fn tags_client ( & self ) -> tags:: Client < C , S > {
457
+ fn tags_client ( & self ) -> tags:: Client < C > {
461
458
tags:: Client :: new ( self . rpc . clone ( ) )
462
459
}
463
460
}
464
461
465
- impl < C , S > SimpleStore for Client < C , S >
462
+ impl < C > SimpleStore for Client < C >
466
463
where
467
- S : quic_rpc:: Service ,
468
- C : quic_rpc:: ServiceConnection < S > ,
464
+ C : Connector < RpcService > ,
469
465
{
470
466
async fn load ( & self , hash : Hash ) -> anyhow:: Result < Bytes > {
471
467
self . read_to_bytes ( hash) . await
@@ -882,26 +878,24 @@ impl Reader {
882
878
}
883
879
884
880
/// todo make private again
885
- pub async fn from_rpc_read < C , S > (
886
- rpc : & RpcClient < crate :: rpc :: proto :: RpcService , C , S > ,
881
+ pub async fn from_rpc_read < C > (
882
+ rpc : & RpcClient < RpcService , C > ,
887
883
hash : Hash ,
888
884
) -> anyhow:: Result < Self >
889
885
where
890
- C : quic_rpc:: ServiceConnection < S > ,
891
- S : quic_rpc:: Service ,
886
+ C : Connector < RpcService > ,
892
887
{
893
888
Self :: from_rpc_read_at ( rpc, hash, 0 , ReadAtLen :: All ) . await
894
889
}
895
890
896
- async fn from_rpc_read_at < C , S > (
897
- rpc : & RpcClient < crate :: rpc :: proto :: RpcService , C , S > ,
891
+ async fn from_rpc_read_at < C > (
892
+ rpc : & RpcClient < RpcService , C > ,
898
893
hash : Hash ,
899
894
offset : u64 ,
900
895
len : ReadAtLen ,
901
896
) -> anyhow:: Result < Self >
902
897
where
903
- C : quic_rpc:: ServiceConnection < S > ,
904
- S : quic_rpc:: Service ,
898
+ C : Connector < RpcService > ,
905
899
{
906
900
let stream = rpc
907
901
. server_streaming ( ReadAtRequest { hash, offset, len } )
@@ -999,20 +993,17 @@ mod tests {
999
993
use std:: { path:: Path , sync:: Arc } ;
1000
994
1001
995
use iroh_net:: { NodeAddr , NodeId } ;
1002
- use quic_rpc:: client :: BoxedServiceConnection ;
996
+ use quic_rpc:: transport :: { Connector , Listener } ;
1003
997
use tokio_util:: task:: AbortOnDropHandle ;
1004
998
999
+ use super :: RpcService ;
1005
1000
use crate :: {
1006
1001
provider:: { CustomEventSender , EventSender } ,
1007
1002
rpc:: client:: { blobs, tags} ,
1008
1003
util:: local_pool:: LocalPool ,
1009
1004
} ;
1010
1005
1011
- type RpcClient = quic_rpc:: RpcClient <
1012
- crate :: rpc:: proto:: RpcService ,
1013
- BoxedServiceConnection < crate :: rpc:: proto:: RpcService > ,
1014
- crate :: rpc:: proto:: RpcService ,
1015
- > ;
1006
+ type RpcClient = quic_rpc:: RpcClient < RpcService > ;
1016
1007
1017
1008
/// An iroh node that just has the blobs transport
1018
1009
#[ derive( Debug ) ]
@@ -1129,10 +1120,9 @@ mod tests {
1129
1120
let router = router. spawn ( ) . await ?;
1130
1121
1131
1122
// Setup RPC
1132
- let ( internal_rpc, controller) =
1133
- quic_rpc:: transport:: flume:: service_connection :: < crate :: rpc:: proto:: RpcService > ( 32 ) ;
1134
- let controller = quic_rpc:: transport:: boxed:: Connection :: new ( controller) ;
1135
- let internal_rpc = quic_rpc:: transport:: boxed:: ServerEndpoint :: new ( internal_rpc) ;
1123
+ let ( internal_rpc, controller) = quic_rpc:: transport:: flume:: channel ( 32 ) ;
1124
+ let controller = controller. boxed ( ) ;
1125
+ let internal_rpc = internal_rpc. boxed ( ) ;
1136
1126
let internal_rpc = quic_rpc:: RpcServer :: new ( internal_rpc) ;
1137
1127
1138
1128
let rpc_server_task: tokio:: task:: JoinHandle < ( ) > = tokio:: task:: spawn ( async move {
0 commit comments