Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Demonstrates how to talk to an actor loop both from the same process and from remotes.
//!
//! The [`StorageApi`] struct is only defined once and can be used both locally and as a remote client.

use anyhow::Result;
use iroh::{protocol::Router, Endpoint};

Expand Down Expand Up @@ -30,7 +34,7 @@ async fn remote() -> Result<()> {
let endpoint = Endpoint::bind().await?;
let api = StorageApi::spawn();
let router = Router::builder(endpoint.clone())
.accept(StorageApi::ALPN, api.expose()?)
.accept(StorageApi::ALPN, api.protocol_handler()?)
.spawn();
let addr = endpoint.addr();
(router, addr)
Expand All @@ -55,7 +59,7 @@ mod storage {
//!
//! The only `pub` item is [`StorageApi`], everything else is private.

use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use iroh::{protocol::ProtocolHandler, Endpoint};
Expand All @@ -66,6 +70,7 @@ mod storage {
};
// Import the macro
use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol};
use n0_future::task::AbortOnDropHandle;
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -96,26 +101,14 @@ mod storage {
List(List),
}

#[derive(Default)]
struct StorageActor {
recv: tokio::sync::mpsc::Receiver<StorageMessage>,
state: BTreeMap<String, String>,
}

impl StorageActor {
pub fn spawn() -> StorageApi {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let actor = Self {
recv: rx,
state: BTreeMap::new(),
};
n0_future::task::spawn(actor.run());
StorageApi {
inner: Client::local(tx),
}
}

async fn run(mut self) {
while let Some(msg) = self.recv.recv().await {
async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver<StorageMessage>) {
while let Some(msg) = rx.recv().await {
self.handle(msg).await;
}
}
Expand Down Expand Up @@ -147,14 +140,21 @@ mod storage {
}

pub struct StorageApi {
inner: Client<StorageProtocol>,
client: Client<StorageProtocol>,
_actor_task: Option<Arc<AbortOnDropHandle<()>>>,
}

impl StorageApi {
pub const ALPN: &[u8] = b"irpc-iroh/derive-demo/0";

pub fn spawn() -> Self {
StorageActor::spawn()
let (tx, rx) = tokio::sync::mpsc::channel(2);
let actor = StorageActor::default();
let actor_task = n0_future::task::spawn(actor.run(rx));
StorageApi {
client: Client::local(tx),
_actor_task: Some(Arc::new(AbortOnDropHandle::new(actor_task))),
}
}

pub fn connect(
Expand All @@ -163,29 +163,30 @@ mod storage {
) -> Result<StorageApi> {
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
Ok(StorageApi {
inner: Client::boxed(conn),
client: Client::boxed(conn),
_actor_task: None,
})
}

pub fn expose(&self) -> Result<impl ProtocolHandler> {
pub fn protocol_handler(&self) -> Result<impl ProtocolHandler> {
let local = self
.inner
.client
.as_local()
.context("can not listen on remote service")?;
Ok(IrohProtocol::new(StorageProtocol::remote_handler(local)))
}

pub async fn get(&self, key: String) -> irpc::Result<Option<String>> {
self.inner.rpc(Get { key }).await
self.client.rpc(Get { key }).await
}

pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
self.inner.server_streaming(List, 10).await
self.client.server_streaming(List, 10).await
}

pub async fn set(&self, key: String, value: String) -> irpc::Result<()> {
let msg = Set { key, value };
self.inner.rpc(msg).await
self.client.rpc(msg).await
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Demonstrates the typical pattern where the server runs an actor loop that processes incoming
//! messages sequentially.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
cli::run().await
Expand Down
219 changes: 219 additions & 0 deletions irpc-iroh/examples/server-shared-state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
//! This example demonstrates using irpc-iroh with a cloneable state struct
//! on the server side instead of with an actor loop.

use anyhow::Result;
use iroh::{protocol::Router, Endpoint};

use self::storage::{StorageClient, StorageServer};

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

// Start the server.
let (server_router, server_addr) = {
let endpoint = Endpoint::bind().await?;
let storage = StorageServer::default();
let router = Router::builder(endpoint)
.accept(storage::ALPN, storage)
.spawn();
let addr = router.endpoint().addr();
(router, addr)
};

// Connect by passing an endpoint, which allows automatic reconnection.
let client_endpoint = Endpoint::bind().await?;
let api = StorageClient::connect(client_endpoint, server_addr.clone());
api.set("hello", "world").await?;
api.set("goodbye", "see you soon").await?;
let value = api.get("hello").await?;
println!("hello = {value:?}");
let mut list = api.list().await?;
while let Some(value) = list.recv().await? {
println!("list: {value:?}");
}

// Or create a client from a connection directly.
let client2 = Endpoint::bind().await?;
let conn = client2.connect(server_addr, storage::ALPN).await?;
let api = StorageClient::from_connection(conn);
let value = api.get("goodbye").await?;
println!("goodbye = {value:?}");

drop(server_router);
Ok(())
}

mod storage {
//! Implementation of our storage service.

use std::{
collections::BTreeMap,
sync::{Arc, Mutex, MutexGuard},
};

use anyhow::Result;
use iroh::{
endpoint::Connection,
protocol::{AcceptError, ProtocolHandler},
Endpoint,
};
use irpc::{
channel::{mpsc, oneshot},
rpc_requests, Client, WithChannels,
};
// Import the macro
use irpc_iroh::{read_request, IrohLazyRemoteConnection, IrohRemoteConnection};
use serde::{Deserialize, Serialize};
use tracing::info;

pub const ALPN: &[u8] = b"irpc/example-storage/0";

#[derive(Debug, Serialize, Deserialize)]
struct Get {
key: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct List;

#[derive(Debug, Serialize, Deserialize)]
struct Set {
key: String,
value: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct SetMany;

// Use the macro to generate both the StorageProtocol and StorageMessage enums
// plus implement Channels for each type
#[rpc_requests(message = StorageMessage)]
#[derive(Serialize, Deserialize, Debug)]
enum StorageProtocol {
#[rpc(tx=oneshot::Sender<Option<String>>)]
Get(Get),
#[rpc(tx=oneshot::Sender<()>)]
Set(Set),
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
SetMany(SetMany),
#[rpc(tx=mpsc::Sender<String>)]
List(List),
}

#[derive(Debug, Clone, Default)]
pub struct StorageServer {
state: Arc<Mutex<BTreeMap<String, String>>>,
}

impl ProtocolHandler for StorageServer {
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
while let Some(msg) = read_request::<StorageProtocol>(&conn).await? {
self.handle_message(msg).await;
}
conn.closed().await;
Ok(())
}
}

impl StorageServer {
async fn handle_message(&self, msg: StorageMessage) {
info!("handle message {:?}", msg);
match msg {
StorageMessage::Get(msg) => {
let WithChannels { tx, inner, .. } = msg;
let value = self.state().get(&inner.key).cloned();
tx.send(value).await.ok();
}
StorageMessage::Set(msg) => {
let WithChannels { tx, inner, .. } = msg;
self.state().insert(inner.key, inner.value);
tx.send(()).await.ok();
}
StorageMessage::SetMany(msg) => {
let WithChannels { tx, mut rx, .. } = msg;
let mut i = 0;
while let Ok(Some((key, value))) = rx.recv().await {
self.state().insert(key, value);
i += 1;
}
tx.send(i).await.ok();
}
StorageMessage::List(msg) => {
let WithChannels { tx, .. } = msg;
let values = {
let state = self.state();
// We clone the values so that we don't keep the lock open for the lifetime of the request.
// If we wouldn't want to clone here because there can be many entries,
// we have to redesign the storage to support a notion of snapshots, or use an async lock
// but that would mean that no other requests can be processed while the stream here is sent out.
let values: Vec<_> = state
.iter()
.map(|(key, value)| format!("{key}={value}"))
.collect();
values
};
for value in values {
if tx.send(value).await.is_err() {
break;
}
}
}
}
}

fn state(&self) -> MutexGuard<'_, BTreeMap<String, String>> {
self.state.lock().expect("poisoned")
}
}

pub struct StorageClient {
inner: Client<StorageProtocol>,
}

impl StorageClient {
/// Connect via an [`Endpoint`].
///
/// This will create a client that automatically reconnects if the connection closes.
pub fn connect(endpoint: Endpoint, addr: impl Into<iroh::EndpointAddr>) -> StorageClient {
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), ALPN.to_vec());
StorageClient {
inner: Client::boxed(conn),
}
}

/// Create a client from a [`Connection`].
///
/// This creates a client from a single [`Connection`]. If the connection closes, the client will
/// not reconnect and all calls will return errors.
pub fn from_connection(conn: Connection) -> StorageClient {
StorageClient {
inner: Client::boxed(IrohRemoteConnection::new(conn)),
}
}

pub async fn get(&self, key: impl ToString) -> Result<Option<String>, irpc::Error> {
self.inner
.rpc(Get {
key: key.to_string(),
})
.await
}

pub async fn list(&self) -> Result<mpsc::Receiver<String>, irpc::Error> {
self.inner.server_streaming(List, 10).await
}

pub async fn set(
&self,
key: impl ToString,
value: impl ToString,
) -> Result<(), irpc::Error> {
let msg = Set {
key: key.to_string(),
value: value.to_string(),
};
self.inner.rpc(msg).await
}
}
}
Loading