Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions irpc-iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn ping_one_0rtt(
connection_stats: ConnectionStats,
) -> Result<()> {
let msg = i.to_be_bytes();
let data = api.echo_0rtt(msg.to_vec()).await?;
let data = api.echo(msg.to_vec()).await?;
let rtt = connection_stats.rtt(&endpoint_id);
if wait_for_ticket {
tokio::spawn(async move {
Expand Down Expand Up @@ -252,10 +252,6 @@ mod ping {
self.inner.rpc(Echo { data }).await
}

pub async fn echo_0rtt(&self, data: Vec<u8>) -> irpc::Result<Vec<u8>> {
self.inner.rpc_0rtt(Echo { data }).await
}

pub fn expose_0rtt(self) -> Result<Iroh0RttProtocol<EchoProtocol>> {
let local = self
.inner
Expand Down
16 changes: 8 additions & 8 deletions irpc-iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl irpc::rpc::RemoteConnection for IrohRemoteConnection {
})
}

fn zero_rtt_accepted(&self) -> BoxFuture<bool> {
Box::pin(async { true })
fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
Box::pin(async { false })
}
}

Expand Down Expand Up @@ -99,13 +99,13 @@ impl irpc::rpc::RemoteConnection for IrohZrttRemoteConnection {
})
}

fn zero_rtt_accepted(&self) -> BoxFuture<bool> {
fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
let conn = self.0.clone();
Box::pin(async move {
match conn.handshake_completed().await {
Err(_) => false,
Ok(ZeroRttStatus::Accepted(_)) => true,
Ok(ZeroRttStatus::Rejected(_)) => false,
Err(_) => true,
Ok(ZeroRttStatus::Accepted(_)) => false,
Ok(ZeroRttStatus::Rejected(_)) => true,
}
})
}
Expand Down Expand Up @@ -165,8 +165,8 @@ impl RemoteConnection for IrohLazyRemoteConnection {
})
}

fn zero_rtt_accepted(&self) -> BoxFuture<bool> {
Box::pin(async { true })
fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
Box::pin(async { false })
}
}

Expand Down
180 changes: 65 additions & 115 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,67 +1376,6 @@ impl<S: Service> Client<S> {
}
}

/// Performs a request for which the server returns a oneshot receiver.
pub fn rpc<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
Res: RpcMessage,
{
let request = self.request();
async move {
let recv: oneshot::Receiver<Res> = match request.await? {
Request::Local(request) => {
let (tx, rx) = oneshot::channel();
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
let res = recv.await?;
Ok(res)
}
}

/// Performs a request for which the server returns a mpsc receiver.
pub fn server_streaming<Req, Res>(
&self,
msg: Req,
local_response_cap: usize,
) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
Res: RpcMessage,
{
let request = self.request();
async move {
let recv: mpsc::Receiver<Res> = match request.await? {
Request::Local(request) => {
let (tx, rx) = mpsc::channel(local_response_cap);
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
Ok(recv)
}
}

/// Performs a request for which the client can send updates.
pub fn client_streaming<Req, Update, Res>(
&self,
Expand Down Expand Up @@ -1523,36 +1462,10 @@ impl<S: Service> Client<S> {
/// If you need to send a message with unit result but want to wait until the
/// remote has received it, consider using [`rpc`] with a unit `()` return
/// type instead.
pub fn notify<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
{
let request = self.request();
async move {
match request.await? {
Request::Local(request) => {
request.send((msg,)).await?;
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, _rx) = request.write(msg).await?;
}
};
Ok(())
}
}

/// Performs a request for which the server returns nothing.
///
/// Compared to [`Self::notify`], this variant will re-send the message if 0rtt
/// was not accepted, so it will work for 0rtt connections.
///
/// For when to use this, see [`Self::notify`].
pub fn notify_0rtt<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
/// This method is safe to use with both regular and 0-RTT connections.
/// If 0-RTT data is rejected, the message will be automatically re-sent.
pub fn notify<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Expand All @@ -1571,7 +1484,7 @@ impl<S: Service> Client<S> {
// see https://www.iroh.computer/blog/0rtt-api#connect-side
let buf = rpc::prepare_write::<S>(msg)?;
let (_tx, _rx) = request.write_raw(&buf).await?;
if !this.0.zero_rtt_accepted().await {
if this.0.zero_rtt_rejected().await {
// 0rtt was not accepted, the data is lost, send it again!
let Request::Remote(request) = this.request().await? else {
unreachable!()
Expand All @@ -1586,10 +1499,9 @@ impl<S: Service> Client<S> {

/// Performs a request for which the server returns a oneshot receiver.
///
/// Compared to [Self::rpc], this variant takes a future that returns true
/// if 0rtt has been accepted. If not, the data is sent again via the same
/// remote channel. For local requests, the future is ignored.
pub fn rpc_0rtt<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
/// This method is safe to use with both regular and 0-RTT connections.
/// If 0-RTT data is rejected, the message will be automatically re-sent.
pub fn rpc<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Expand All @@ -1611,15 +1523,15 @@ impl<S: Service> Client<S> {
// see https://www.iroh.computer/blog/0rtt-api#connect-side
let buf = rpc::prepare_write::<S>(msg)?;
let (_tx, rx) = request.write_raw(&buf).await?;
if this.0.zero_rtt_accepted().await {
rx
} else {
if this.0.zero_rtt_rejected().await {
// 0rtt was not accepted, the data is lost, send it again!
let Request::Remote(request) = this.request().await? else {
unreachable!()
};
let (_tx, rx) = request.write_raw(&buf).await?;
rx
} else {
rx
}
.into()
}
Expand All @@ -1631,10 +1543,9 @@ impl<S: Service> Client<S> {

/// Performs a request for which the server returns a mpsc receiver.
///
/// Compared to [Self::server_streaming], this variant takes a future that returns true
/// if 0rtt has been accepted. If not, the data is sent again via the same
/// remote channel. For local requests, the future is ignored.
pub fn server_streaming_0rtt<Req, Res>(
/// This method is safe to use with both regular and 0-RTT connections.
/// If 0-RTT data is rejected, the message will be automatically re-sent.
pub fn server_streaming<Req, Res>(
&self,
msg: Req,
local_response_cap: usize,
Expand All @@ -1660,22 +1571,61 @@ impl<S: Service> Client<S> {
// see https://www.iroh.computer/blog/0rtt-api#connect-side
let buf = rpc::prepare_write::<S>(msg)?;
let (_tx, rx) = request.write_raw(&buf).await?;
if this.0.zero_rtt_accepted().await {
rx
} else {
if this.0.zero_rtt_rejected().await {
// 0rtt was not accepted, the data is lost, send it again!
let Request::Remote(request) = this.request().await? else {
unreachable!()
};
let (_tx, rx) = request.write_raw(&buf).await?;
rx
} else {
rx
}
.into()
}
};
Ok(recv)
}
}

/// Deprecated: use [`Self::notify`] instead, it handles 0rtt automatically.
#[deprecated(note = "use `notify` instead, it handles 0rtt automatically")]
pub fn notify_0rtt<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
{
self.notify(msg)
}

/// Deprecated: use [`Self::rpc`] instead, it handles 0rtt automatically.
#[deprecated(note = "use `rpc` instead, it handles 0rtt automatically")]
pub fn rpc_0rtt<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
Res: RpcMessage,
{
self.rpc(msg)
}

/// Deprecated: use [`Self::server_streaming`] instead, it handles 0rtt automatically.
#[deprecated(note = "use `server_streaming` instead, it handles 0rtt automatically")]
pub fn server_streaming_0rtt<Req, Res>(
&self,
msg: Req,
local_response_cap: usize,
) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static
where
S: From<Req>,
S::Message: From<WithChannels<Req, S>>,
Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
Res: RpcMessage,
{
self.server_streaming(msg, local_response_cap)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1704,11 +1654,11 @@ impl<M> Clone for ClientInner<M> {

impl<M> ClientInner<M> {
#[allow(dead_code)]
async fn zero_rtt_accepted(&self) -> bool {
async fn zero_rtt_rejected(&self) -> bool {
match self {
ClientInner::Local(_sender) => true,
ClientInner::Local(_sender) => false,
#[cfg(feature = "rpc")]
ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_accepted().await,
ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_rejected().await,
#[cfg(not(feature = "rpc"))]
Self::Remote(_) => unreachable!(),
}
Expand Down Expand Up @@ -1942,10 +1892,10 @@ pub mod rpc {
&self,
) -> BoxFuture<std::result::Result<(quinn::SendStream, quinn::RecvStream), RequestError>>;

/// Returns whether 0-RTT data was accepted by the server.
/// Returns whether 0-RTT data was rejected by the server.
///
/// For connections that were fully authenticated before allowing to send any data, this should return `true`.
fn zero_rtt_accepted(&self) -> BoxFuture<bool>;
/// For connections that were fully authenticated before allowing to send any data, this should return `false`.
fn zero_rtt_rejected(&self) -> BoxFuture<bool>;
}

/// A connection to a remote service.
Expand Down Expand Up @@ -1978,8 +1928,8 @@ pub mod rpc {
})
}

fn zero_rtt_accepted(&self) -> BoxFuture<bool> {
Box::pin(async { true })
fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
Box::pin(async { false })
}
}

Expand Down Expand Up @@ -2023,8 +1973,8 @@ pub mod rpc {
})
}

fn zero_rtt_accepted(&self) -> BoxFuture<bool> {
Box::pin(async { true })
fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
Box::pin(async { false })
}
}

Expand Down
Loading