diff --git a/irpc-iroh/examples/0rtt.rs b/irpc-iroh/examples/0rtt.rs index 53936d3..f0f7609 100644 --- a/irpc-iroh/examples/0rtt.rs +++ b/irpc-iroh/examples/0rtt.rs @@ -117,7 +117,7 @@ async fn ping_one_0rtt( connection_stats: ConnectionStats, ) -> Result<()> { let msg = i.to_be_bytes(); - let data = api.echo_0rtt(msg.to_vec()).await?; + let data = api.echo(msg.to_vec()).await?; let rtt = connection_stats.rtt(&endpoint_id); if wait_for_ticket { tokio::spawn(async move { @@ -252,10 +252,6 @@ mod ping { self.inner.rpc(Echo { data }).await } - pub async fn echo_0rtt(&self, data: Vec) -> irpc::Result> { - self.inner.rpc_0rtt(Echo { data }).await - } - pub fn expose_0rtt(self) -> Result> { let local = self .inner diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 2556f85..58742d6 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -69,8 +69,8 @@ impl irpc::rpc::RemoteConnection for IrohRemoteConnection { }) } - fn zero_rtt_accepted(&self) -> BoxFuture { - Box::pin(async { true }) + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) } } @@ -99,13 +99,13 @@ impl irpc::rpc::RemoteConnection for IrohZrttRemoteConnection { }) } - fn zero_rtt_accepted(&self) -> BoxFuture { + fn zero_rtt_rejected(&self) -> BoxFuture { let conn = self.0.clone(); Box::pin(async move { match conn.handshake_completed().await { - Err(_) => false, - Ok(ZeroRttStatus::Accepted(_)) => true, - Ok(ZeroRttStatus::Rejected(_)) => false, + Err(_) => true, + Ok(ZeroRttStatus::Accepted(_)) => false, + Ok(ZeroRttStatus::Rejected(_)) => true, } }) } @@ -165,8 +165,8 @@ impl RemoteConnection for IrohLazyRemoteConnection { }) } - fn zero_rtt_accepted(&self) -> BoxFuture { - Box::pin(async { true }) + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) } } diff --git a/src/lib.rs b/src/lib.rs index 1d849a2..0c5fd80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1376,67 +1376,6 @@ impl Client { } } - /// Performs a request for which the server returns a oneshot receiver. - pub fn rpc(&self, msg: Req) -> impl Future> + Send + 'static - where - S: From, - S::Message: From>, - Req: Channels, Rx = NoReceiver>, - Res: RpcMessage, - { - let request = self.request(); - async move { - let recv: oneshot::Receiver = match request.await? { - Request::Local(request) => { - let (tx, rx) = oneshot::channel(); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - let res = recv.await?; - Ok(res) - } - } - - /// Performs a request for which the server returns a mpsc receiver. - pub fn server_streaming( - &self, - msg: Req, - local_response_cap: usize, - ) -> impl Future>> + Send + 'static - where - S: From, - S::Message: From>, - Req: Channels, Rx = NoReceiver>, - Res: RpcMessage, - { - let request = self.request(); - async move { - let recv: mpsc::Receiver = match request.await? { - Request::Local(request) => { - let (tx, rx) = mpsc::channel(local_response_cap); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - Ok(recv) - } - } - /// Performs a request for which the client can send updates. pub fn client_streaming( &self, @@ -1523,36 +1462,10 @@ impl Client { /// If you need to send a message with unit result but want to wait until the /// remote has received it, consider using [`rpc`] with a unit `()` return /// type instead. - pub fn notify(&self, msg: Req) -> impl Future> + Send + 'static - where - S: From, - S::Message: From>, - Req: Channels, - { - let request = self.request(); - async move { - match request.await? { - Request::Local(request) => { - request.send((msg,)).await?; - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, _rx) = request.write(msg).await?; - } - }; - Ok(()) - } - } - - /// Performs a request for which the server returns nothing. /// - /// Compared to [`Self::notify`], this variant will re-send the message if 0rtt - /// was not accepted, so it will work for 0rtt connections. - /// - /// For when to use this, see [`Self::notify`]. - pub fn notify_0rtt(&self, msg: Req) -> impl Future> + Send + 'static + /// This method is safe to use with both regular and 0-RTT connections. + /// If 0-RTT data is rejected, the message will be automatically re-sent. + pub fn notify(&self, msg: Req) -> impl Future> + Send + 'static where S: From, S::Message: From>, @@ -1571,7 +1484,7 @@ impl Client { // see https://www.iroh.computer/blog/0rtt-api#connect-side let buf = rpc::prepare_write::(msg)?; let (_tx, _rx) = request.write_raw(&buf).await?; - if !this.0.zero_rtt_accepted().await { + if this.0.zero_rtt_rejected().await { // 0rtt was not accepted, the data is lost, send it again! let Request::Remote(request) = this.request().await? else { unreachable!() @@ -1586,10 +1499,9 @@ impl Client { /// Performs a request for which the server returns a oneshot receiver. /// - /// Compared to [Self::rpc], this variant takes a future that returns true - /// if 0rtt has been accepted. If not, the data is sent again via the same - /// remote channel. For local requests, the future is ignored. - pub fn rpc_0rtt(&self, msg: Req) -> impl Future> + Send + 'static + /// This method is safe to use with both regular and 0-RTT connections. + /// If 0-RTT data is rejected, the message will be automatically re-sent. + pub fn rpc(&self, msg: Req) -> impl Future> + Send + 'static where S: From, S::Message: From>, @@ -1611,15 +1523,15 @@ impl Client { // see https://www.iroh.computer/blog/0rtt-api#connect-side let buf = rpc::prepare_write::(msg)?; let (_tx, rx) = request.write_raw(&buf).await?; - if this.0.zero_rtt_accepted().await { - rx - } else { + if this.0.zero_rtt_rejected().await { // 0rtt was not accepted, the data is lost, send it again! let Request::Remote(request) = this.request().await? else { unreachable!() }; let (_tx, rx) = request.write_raw(&buf).await?; rx + } else { + rx } .into() } @@ -1631,10 +1543,9 @@ impl Client { /// Performs a request for which the server returns a mpsc receiver. /// - /// Compared to [Self::server_streaming], this variant takes a future that returns true - /// if 0rtt has been accepted. If not, the data is sent again via the same - /// remote channel. For local requests, the future is ignored. - pub fn server_streaming_0rtt( + /// This method is safe to use with both regular and 0-RTT connections. + /// If 0-RTT data is rejected, the message will be automatically re-sent. + pub fn server_streaming( &self, msg: Req, local_response_cap: usize, @@ -1660,15 +1571,15 @@ impl Client { // see https://www.iroh.computer/blog/0rtt-api#connect-side let buf = rpc::prepare_write::(msg)?; let (_tx, rx) = request.write_raw(&buf).await?; - if this.0.zero_rtt_accepted().await { - rx - } else { + if this.0.zero_rtt_rejected().await { // 0rtt was not accepted, the data is lost, send it again! let Request::Remote(request) = this.request().await? else { unreachable!() }; let (_tx, rx) = request.write_raw(&buf).await?; rx + } else { + rx } .into() } @@ -1676,6 +1587,45 @@ impl Client { Ok(recv) } } + + /// Deprecated: use [`Self::notify`] instead, it handles 0rtt automatically. + #[deprecated(note = "use `notify` instead, it handles 0rtt automatically")] + pub fn notify_0rtt(&self, msg: Req) -> impl Future> + Send + 'static + where + S: From, + S::Message: From>, + Req: Channels, + { + self.notify(msg) + } + + /// Deprecated: use [`Self::rpc`] instead, it handles 0rtt automatically. + #[deprecated(note = "use `rpc` instead, it handles 0rtt automatically")] + pub fn rpc_0rtt(&self, msg: Req) -> impl Future> + Send + 'static + where + S: From, + S::Message: From>, + Req: Channels, Rx = NoReceiver>, + Res: RpcMessage, + { + self.rpc(msg) + } + + /// Deprecated: use [`Self::server_streaming`] instead, it handles 0rtt automatically. + #[deprecated(note = "use `server_streaming` instead, it handles 0rtt automatically")] + pub fn server_streaming_0rtt( + &self, + msg: Req, + local_response_cap: usize, + ) -> impl Future>> + Send + 'static + where + S: From, + S::Message: From>, + Req: Channels, Rx = NoReceiver>, + Res: RpcMessage, + { + self.server_streaming(msg, local_response_cap) + } } #[derive(Debug)] @@ -1704,11 +1654,11 @@ impl Clone for ClientInner { impl ClientInner { #[allow(dead_code)] - async fn zero_rtt_accepted(&self) -> bool { + async fn zero_rtt_rejected(&self) -> bool { match self { - ClientInner::Local(_sender) => true, + ClientInner::Local(_sender) => false, #[cfg(feature = "rpc")] - ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_accepted().await, + ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_rejected().await, #[cfg(not(feature = "rpc"))] Self::Remote(_) => unreachable!(), } @@ -1942,10 +1892,10 @@ pub mod rpc { &self, ) -> BoxFuture>; - /// Returns whether 0-RTT data was accepted by the server. + /// Returns whether 0-RTT data was rejected by the server. /// - /// For connections that were fully authenticated before allowing to send any data, this should return `true`. - fn zero_rtt_accepted(&self) -> BoxFuture; + /// For connections that were fully authenticated before allowing to send any data, this should return `false`. + fn zero_rtt_rejected(&self) -> BoxFuture; } /// A connection to a remote service. @@ -1978,8 +1928,8 @@ pub mod rpc { }) } - fn zero_rtt_accepted(&self) -> BoxFuture { - Box::pin(async { true }) + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) } } @@ -2023,8 +1973,8 @@ pub mod rpc { }) } - fn zero_rtt_accepted(&self) -> BoxFuture { - Box::pin(async { true }) + fn zero_rtt_rejected(&self) -> BoxFuture { + Box::pin(async { false }) } }