diff --git a/irpc-iroh/examples/derive.rs b/irpc-iroh/examples/remote-and-local.rs similarity index 79% rename from irpc-iroh/examples/derive.rs rename to irpc-iroh/examples/remote-and-local.rs index 9ae06fc..0c32c8c 100644 --- a/irpc-iroh/examples/derive.rs +++ b/irpc-iroh/examples/remote-and-local.rs @@ -1,3 +1,7 @@ +//! Demonstrates how to talk to an actor loop both from the same process and from remotes. +//! +//! The [`StorageApi`] struct is only defined once and can be used both locally and as a remote client. + use anyhow::Result; use iroh::{protocol::Router, Endpoint}; @@ -30,7 +34,7 @@ async fn remote() -> Result<()> { let endpoint = Endpoint::bind().await?; let api = StorageApi::spawn(); let router = Router::builder(endpoint.clone()) - .accept(StorageApi::ALPN, api.expose()?) + .accept(StorageApi::ALPN, api.protocol_handler()?) .spawn(); let addr = endpoint.addr(); (router, addr) @@ -55,7 +59,7 @@ mod storage { //! //! The only `pub` item is [`StorageApi`], everything else is private. - use std::collections::BTreeMap; + use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; use iroh::{protocol::ProtocolHandler, Endpoint}; @@ -66,6 +70,7 @@ mod storage { }; // Import the macro use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol}; + use n0_future::task::AbortOnDropHandle; use serde::{Deserialize, Serialize}; use tracing::info; @@ -96,26 +101,14 @@ mod storage { List(List), } + #[derive(Default)] struct StorageActor { - recv: tokio::sync::mpsc::Receiver, state: BTreeMap, } impl StorageActor { - pub fn spawn() -> StorageApi { - let (tx, rx) = tokio::sync::mpsc::channel(1); - let actor = Self { - recv: rx, - state: BTreeMap::new(), - }; - n0_future::task::spawn(actor.run()); - StorageApi { - inner: Client::local(tx), - } - } - - async fn run(mut self) { - while let Some(msg) = self.recv.recv().await { + async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver) { + while let Some(msg) = rx.recv().await { self.handle(msg).await; } } @@ -147,14 +140,21 @@ mod storage { } pub struct StorageApi { - inner: Client, + client: Client, + _actor_task: Option>>, } impl StorageApi { pub const ALPN: &[u8] = b"irpc-iroh/derive-demo/0"; pub fn spawn() -> Self { - StorageActor::spawn() + let (tx, rx) = tokio::sync::mpsc::channel(2); + let actor = StorageActor::default(); + let actor_task = n0_future::task::spawn(actor.run(rx)); + StorageApi { + client: Client::local(tx), + _actor_task: Some(Arc::new(AbortOnDropHandle::new(actor_task))), + } } pub fn connect( @@ -163,29 +163,30 @@ mod storage { ) -> Result { let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); Ok(StorageApi { - inner: Client::boxed(conn), + client: Client::boxed(conn), + _actor_task: None, }) } - pub fn expose(&self) -> Result { + pub fn protocol_handler(&self) -> Result { let local = self - .inner + .client .as_local() .context("can not listen on remote service")?; Ok(IrohProtocol::new(StorageProtocol::remote_handler(local))) } pub async fn get(&self, key: String) -> irpc::Result> { - self.inner.rpc(Get { key }).await + self.client.rpc(Get { key }).await } pub async fn list(&self) -> irpc::Result> { - self.inner.server_streaming(List, 10).await + self.client.server_streaming(List, 10).await } pub async fn set(&self, key: String, value: String) -> irpc::Result<()> { let msg = Set { key, value }; - self.inner.rpc(msg).await + self.client.rpc(msg).await } } } diff --git a/irpc-iroh/examples/simple.rs b/irpc-iroh/examples/server-actor.rs similarity index 97% rename from irpc-iroh/examples/simple.rs rename to irpc-iroh/examples/server-actor.rs index d08cab4..b08af9a 100644 --- a/irpc-iroh/examples/simple.rs +++ b/irpc-iroh/examples/server-actor.rs @@ -1,3 +1,6 @@ +//! Demonstrates the typical pattern where the server runs an actor loop that processes incoming +//! messages sequentially. + #[tokio::main] async fn main() -> anyhow::Result<()> { cli::run().await diff --git a/irpc-iroh/examples/server-shared-state.rs b/irpc-iroh/examples/server-shared-state.rs new file mode 100644 index 0000000..7217420 --- /dev/null +++ b/irpc-iroh/examples/server-shared-state.rs @@ -0,0 +1,219 @@ +//! This example demonstrates using irpc-iroh with a cloneable state struct +//! on the server side instead of with an actor loop. + +use anyhow::Result; +use iroh::{protocol::Router, Endpoint}; + +use self::storage::{StorageClient, StorageServer}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + // Start the server. + let (server_router, server_addr) = { + let endpoint = Endpoint::bind().await?; + let storage = StorageServer::default(); + let router = Router::builder(endpoint) + .accept(storage::ALPN, storage) + .spawn(); + let addr = router.endpoint().addr(); + (router, addr) + }; + + // Connect by passing an endpoint, which allows automatic reconnection. + let client_endpoint = Endpoint::bind().await?; + let api = StorageClient::connect(client_endpoint, server_addr.clone()); + api.set("hello", "world").await?; + api.set("goodbye", "see you soon").await?; + let value = api.get("hello").await?; + println!("hello = {value:?}"); + let mut list = api.list().await?; + while let Some(value) = list.recv().await? { + println!("list: {value:?}"); + } + + // Or create a client from a connection directly. + let client2 = Endpoint::bind().await?; + let conn = client2.connect(server_addr, storage::ALPN).await?; + let api = StorageClient::from_connection(conn); + let value = api.get("goodbye").await?; + println!("goodbye = {value:?}"); + + drop(server_router); + Ok(()) +} + +mod storage { + //! Implementation of our storage service. + + use std::{ + collections::BTreeMap, + sync::{Arc, Mutex, MutexGuard}, + }; + + use anyhow::Result; + use iroh::{ + endpoint::Connection, + protocol::{AcceptError, ProtocolHandler}, + Endpoint, + }; + use irpc::{ + channel::{mpsc, oneshot}, + rpc_requests, Client, WithChannels, + }; + // Import the macro + use irpc_iroh::{read_request, IrohLazyRemoteConnection, IrohRemoteConnection}; + use serde::{Deserialize, Serialize}; + use tracing::info; + + pub const ALPN: &[u8] = b"irpc/example-storage/0"; + + #[derive(Debug, Serialize, Deserialize)] + struct Get { + key: String, + } + + #[derive(Debug, Serialize, Deserialize)] + struct List; + + #[derive(Debug, Serialize, Deserialize)] + struct Set { + key: String, + value: String, + } + + #[derive(Debug, Serialize, Deserialize)] + struct SetMany; + + // Use the macro to generate both the StorageProtocol and StorageMessage enums + // plus implement Channels for each type + #[rpc_requests(message = StorageMessage)] + #[derive(Serialize, Deserialize, Debug)] + enum StorageProtocol { + #[rpc(tx=oneshot::Sender>)] + Get(Get), + #[rpc(tx=oneshot::Sender<()>)] + Set(Set), + #[rpc(tx=oneshot::Sender, rx=mpsc::Receiver<(String, String)>)] + SetMany(SetMany), + #[rpc(tx=mpsc::Sender)] + List(List), + } + + #[derive(Debug, Clone, Default)] + pub struct StorageServer { + state: Arc>>, + } + + impl ProtocolHandler for StorageServer { + async fn accept(&self, conn: Connection) -> Result<(), AcceptError> { + while let Some(msg) = read_request::(&conn).await? { + self.handle_message(msg).await; + } + conn.closed().await; + Ok(()) + } + } + + impl StorageServer { + async fn handle_message(&self, msg: StorageMessage) { + info!("handle message {:?}", msg); + match msg { + StorageMessage::Get(msg) => { + let WithChannels { tx, inner, .. } = msg; + let value = self.state().get(&inner.key).cloned(); + tx.send(value).await.ok(); + } + StorageMessage::Set(msg) => { + let WithChannels { tx, inner, .. } = msg; + self.state().insert(inner.key, inner.value); + tx.send(()).await.ok(); + } + StorageMessage::SetMany(msg) => { + let WithChannels { tx, mut rx, .. } = msg; + let mut i = 0; + while let Ok(Some((key, value))) = rx.recv().await { + self.state().insert(key, value); + i += 1; + } + tx.send(i).await.ok(); + } + StorageMessage::List(msg) => { + let WithChannels { tx, .. } = msg; + let values = { + let state = self.state(); + // We clone the values so that we don't keep the lock open for the lifetime of the request. + // If we wouldn't want to clone here because there can be many entries, + // we have to redesign the storage to support a notion of snapshots, or use an async lock + // but that would mean that no other requests can be processed while the stream here is sent out. + let values: Vec<_> = state + .iter() + .map(|(key, value)| format!("{key}={value}")) + .collect(); + values + }; + for value in values { + if tx.send(value).await.is_err() { + break; + } + } + } + } + } + + fn state(&self) -> MutexGuard<'_, BTreeMap> { + self.state.lock().expect("poisoned") + } + } + + pub struct StorageClient { + inner: Client, + } + + impl StorageClient { + /// Connect via an [`Endpoint`]. + /// + /// This will create a client that automatically reconnects if the connection closes. + pub fn connect(endpoint: Endpoint, addr: impl Into) -> StorageClient { + let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), ALPN.to_vec()); + StorageClient { + inner: Client::boxed(conn), + } + } + + /// Create a client from a [`Connection`]. + /// + /// This creates a client from a single [`Connection`]. If the connection closes, the client will + /// not reconnect and all calls will return errors. + pub fn from_connection(conn: Connection) -> StorageClient { + StorageClient { + inner: Client::boxed(IrohRemoteConnection::new(conn)), + } + } + + pub async fn get(&self, key: impl ToString) -> Result, irpc::Error> { + self.inner + .rpc(Get { + key: key.to_string(), + }) + .await + } + + pub async fn list(&self) -> Result, irpc::Error> { + self.inner.server_streaming(List, 10).await + } + + pub async fn set( + &self, + key: impl ToString, + value: impl ToString, + ) -> Result<(), irpc::Error> { + let msg = Set { + key: key.to_string(), + value: value.to_string(), + }; + self.inner.rpc(msg).await + } + } +}