Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions src/sacp/src/cookbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,17 +327,19 @@ pub mod per_session_mcp_server {
//! - You want to customize the MCP server based on session parameters
//! - Tools need to send notifications back to a specific session
//!
//! # Example
//! # Simple example: proxy everything
//!
//! Use [`spawn_session_proxy`] when you just want to inject an MCP server
//! and proxy all messages without any additional processing:
//!
//! ```
//! use sacp::mcp_server::McpServer;
//! use sacp::schema::NewSessionRequest;
//! use sacp::{Agent, Client, Component, JrResponder, ProxyToConductor};
//! use sacp::{Agent, Client, Component, ProxyToConductor};
//!
//! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> {
//! ProxyToConductor::builder()
//! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| {
//! // Create an MCP server for this session
//! let cwd = request.cwd.clone();
//! let mcp_server = McpServer::builder("session-tools")
//! .tool_fn("get_cwd", "Returns session working directory",
Expand All @@ -346,17 +348,54 @@ pub mod per_session_mcp_server {
//! }, sacp::tool_fn!())
//! .build();
//!
//! // Build the session with the MCP server attached and proxy it
//! cx.build_session_from(request)
//! .with_mcp_server(mcp_server)?
//! .proxy_session(request_cx, JrResponder::run)
//! .spawn_session_proxy(request_cx)
//! .await
//! }, sacp::on_receive_request!())
//! .serve(transport)
//! .await
//! }
//! ```
//!
//! # Advanced example: intercept before proxying
//!
//! Use [`spawn_session`] + [`proxy_remaining_messages`] when you need to
//! do something with the session before handing off to proxy mode:
//!
//! ```
//! use sacp::mcp_server::McpServer;
//! use sacp::schema::NewSessionRequest;
//! use sacp::{Agent, Client, Component, ProxyToConductor};
//!
//! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> {
//! ProxyToConductor::builder()
//! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| {
//! let cwd = request.cwd.clone();
//! let mcp_server = McpServer::builder("session-tools")
//! .tool_fn("get_cwd", "Returns session working directory",
//! async move |_params: (), _cx| {
//! Ok(cwd.display().to_string())
//! }, sacp::tool_fn!())
//! .build();
//!
//! let active_session = cx.build_session_from(request)
//! .with_mcp_server(mcp_server)?
//! .spawn_session()
//! .await?;
//!
//! // Do something with the session before proxying...
//! tracing::info!(session_id = %active_session.session_id(), "Session created");
//!
//! // Respond to the client and proxy remaining messages
//! request_cx.respond(active_session.response())?;
//! active_session.proxy_remaining_messages()
//! }, sacp::on_receive_request!())
//! .serve(transport)
//! .await
//! }
//! ```
//!
//! # How it works
//!
//! When you call [`SessionBuilder::with_mcp_server`]:
Expand All @@ -366,5 +405,8 @@ pub mod per_session_mcp_server {
//! 3. The MCP server's URL is added to the `NewSessionRequest`
//! 4. The handler lives as long as the session (dropped when `run_session` completes)
//!
//! [`spawn_session_proxy`]: crate::SessionBuilder::spawn_session_proxy
//! [`spawn_session`]: crate::SessionBuilder::spawn_session
//! [`proxy_remaining_messages`]: crate::ActiveSession::proxy_remaining_messages
//! [`SessionBuilder::with_mcp_server`]: crate::SessionBuilder::with_mcp_server
}
177 changes: 113 additions & 64 deletions src/sacp/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::path::Path;
use std::{marker::PhantomData, path::Path};

use agent_client_protocol_schema::{
ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
};
use futures::channel::mpsc;
use tokio::sync::oneshot;

use crate::{
Agent, Client, Handled, HasEndpoint, JrConnectionCx, JrMessageHandler, JrRequestCx, JrRole,
Expand Down Expand Up @@ -51,16 +52,16 @@ where
/// and let you access them.
///
/// Normally you would not use this method directly but would
/// instead use [`Self::build_session`] and then [`SessionBuilder::send_request`].
/// instead use [`Self::build_session`] and then [`SessionBuilder::spawn_session`].
///
/// The vector `dynamic_handler_registrations` contains any dynamic
/// handle registrations associated with this session (e.g., from MCP servers).
/// You can simply pass `Default::default()` if not applicable.
pub fn attach_session(
pub fn attach_session<'responder>(
&self,
response: NewSessionResponse,
mut dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Role>>,
) -> Result<ActiveSession<Role>, crate::Error> {
) -> Result<ActiveSession<'responder, Role>, crate::Error> {
let NewSessionResponse {
session_id,
modes,
Expand All @@ -81,6 +82,7 @@ where
update_tx,
connection: self.clone(),
dynamic_handler_registrations,
_responder: PhantomData,
})
}
}
Expand Down Expand Up @@ -136,10 +138,17 @@ where
})
}

/// Send the request to create the session.
/// Run this session synchronously. The current task will be blocked
/// and `op` will be executed with the active session information.
/// This is useful when you have MCP servers that are borrowed from your local
/// stack frame.
///
/// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
/// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
/// responders would terminate when `op` returns).
pub async fn run_session<R>(
self,
op: impl AsyncFnOnce(ActiveSession<Role>) -> Result<R, crate::Error>,
op: impl for<'responder> AsyncFnOnce(ActiveSession<'responder, Role>) -> Result<R, crate::Error>,
) -> Result<R, crate::Error> {
let response = self
.connection
Expand All @@ -163,81 +172,74 @@ where
/// drift but at the cost of requiring MCP servers that are `Send` and
/// don't access data from the surrounding scope.
///
/// # Parameters
///
/// * `run_responder`: this is typically just `Responder::run`;
/// the need for this parameter is a workaround for Rust limitations.
pub async fn send_request<F>(
self,
run_responder: impl FnOnce(Responder, JrConnectionCx<Role>) -> F,
) -> Result<ActiveSession<Role>, crate::Error>
/// Returns an `ActiveSession<'static, _>` because responders are spawned
/// into background tasks that live for the connection lifetime.
pub async fn spawn_session(self) -> Result<ActiveSession<'static, Role>, crate::Error>
where
F: Future<Output = Result<(), crate::Error>> + Send + 'static,
Responder: 'static,
{
let response = self
.connection
.send_request_to(Agent, self.request)
.block_task()
.await?;
let (active_session_tx, active_session_rx) = oneshot::channel();

let cx = self.connection.clone();
self.connection.spawn(run_responder(self.responder, cx))?;
let connection = self.connection.clone();
connection.spawn(async move {
let response = self
.connection
.send_request_to(Agent, self.request)
.block_task()
.await?;

self.connection
.attach_session(response, self.dynamic_handler_registrations)
let cx = self.connection.clone();
self.connection.spawn(self.responder.run(cx))?;

let active_session = self
.connection
.attach_session(response, self.dynamic_handler_registrations)?;

active_session_tx
.send(active_session)
.map_err(|_| crate::Error::internal_error())?;

Ok(())
})?;

active_session_rx
.await
.map_err(|_| crate::Error::internal_error())
}

/// Forward the session request to the agent and proxy all messages.
/// Spawn a session and proxy all messages between client and agent.
///
/// This is a convenience method that combines [`spawn_session`](Self::spawn_session),
/// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
/// Use this when you want to inject MCP servers into a session but don't need
/// to actively interact with it. The session messages will be proxied between
/// client and agent automatically.
/// to actively interact with it.
///
/// # Parameters
///
/// * `request_cx`: The request context from the intercepted `session.new` request,
/// used to send the response back to the client.
/// * `run_responder`: this is typically just `Responder::run`;
/// the need for this parameter is a workaround for Rust limitations.
pub async fn proxy_session<F>(
/// For more control (e.g., to send some messages before proxying), use
/// [`spawn_session`](Self::spawn_session) instead and call
/// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
pub async fn spawn_session_proxy(
self,
request_cx: JrRequestCx<NewSessionResponse>,
run_responder: impl FnOnce(Responder, JrConnectionCx<Role>) -> F,
) -> Result<(), crate::Error>
where
Role: HasEndpoint<Client>,
F: Future<Output = Result<(), crate::Error>> + Send + 'static,
Responder: 'static,
{
let response = self
.connection
.send_request_to(Agent, self.request)
.block_task()
.await?;

// Add dynamic handler to proxy session messages
let session_id = response.session_id.clone();
self.connection
.add_dynamic_handler(ProxySessionMessages::new(session_id))?
.run_indefinitely();

// Keep MCP server handlers alive
for registration in self.dynamic_handler_registrations {
registration.run_indefinitely();
}

// Spawn the responder
let cx = self.connection.clone();
self.connection.spawn(run_responder(self.responder, cx))?;

// Send response back to client
request_cx.respond(response)?;

Ok(())
let active_session = self.spawn_session().await?;
request_cx.respond(active_session.response())?;
active_session.proxy_remaining_messages()
}
}

/// Active session struct that lets you send prompts and receive updates.
pub struct ActiveSession<Role>
///
/// The `'responder` lifetime represents the span during which responders
/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::spawn_session`],
/// this is `'static` because responders are spawned into background tasks.
/// When created via [`SessionBuilder::run_session`], this is tied to the
/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
/// (since the responders would die when the closure returns).
pub struct ActiveSession<'responder, Role>
where
Role: HasEndpoint<Agent>,
{
Expand All @@ -251,8 +253,10 @@ where
/// Collect registrations from dynamic handlers for MCP servers etc.
/// These will be dropped once the active-session struct is dropped
/// which will cause them to be deregistered.
#[expect(dead_code)]
dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Role>>,

/// Phantom lifetime representing the responder lifetime.
_responder: PhantomData<&'responder ()>,
}

/// Incoming message from the agent
Expand All @@ -267,7 +271,7 @@ pub enum SessionMessage {
StopReason(StopReason),
}

impl<R> ActiveSession<R>
impl<'responder, R> ActiveSession<'responder, R>
where
R: HasEndpoint<Agent>,
{
Expand All @@ -286,6 +290,18 @@ where
&self.meta
}

/// Build a `NewSessionResponse` from the session information.
///
/// Useful when you need to forward the session response to a client
/// after doing some processing.
pub fn response(&self) -> NewSessionResponse {
NewSessionResponse {
session_id: self.session_id.clone(),
modes: self.modes.clone(),
meta: self.meta.clone(),
}
}

/// Access the underlying connection context used to communicate with the agent.
pub fn connection_cx(&self) -> JrConnectionCx<R> {
self.connection.clone()
Expand Down Expand Up @@ -356,6 +372,39 @@ where
}
}

impl<R> ActiveSession<'static, R>
where
R: HasEndpoint<Agent>,
{
/// Proxy all remaining messages for this session between client and agent.
///
/// Use this when you want to inject MCP servers into a session but don't need
/// to actively interact with it after setup. The session messages will be proxied
/// between client and agent automatically.
///
/// This consumes the `ActiveSession` since you're giving up active control.
///
/// This method is only available on `ActiveSession<'static, _>` (from
/// [`SessionBuilder::spawn_session`]) because it requires responders to
/// outlive the method call.
pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
where
R: HasEndpoint<Client>,
{
// Add dynamic handler to proxy session messages
self.connection
.add_dynamic_handler(ProxySessionMessages::new(self.session_id))?
.run_indefinitely();

// Keep MCP server handlers alive
for registration in self.dynamic_handler_registrations {
registration.run_indefinitely();
}

Ok(())
}
}

struct ActiveSessionHandler<Role>
where
Role: HasEndpoint<Agent>,
Expand Down
7 changes: 2 additions & 5 deletions src/yopo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
//!
//! Provides a convenient API for running one-shot prompts against SACP components.

use sacp::ClientToAgent;
use sacp::schema::{
AudioContent, ContentBlock, EmbeddedResourceResource, ImageContent, InitializeRequest,
RequestPermissionOutcome, RequestPermissionRequest, RequestPermissionResponse,
SessionNotification, TextContent, VERSION as PROTOCOL_VERSION,
};
use sacp::util::MatchMessage;
use sacp::{ClientToAgent, JrResponder};
use sacp::{Component, Handled, MessageCx, UntypedMessage};
use std::path::PathBuf;

Expand Down Expand Up @@ -113,10 +113,7 @@ pub async fn prompt_with_callback(
.block_task()
.await?;

let mut session = cx
.build_session(PathBuf::from("."))
.send_request(JrResponder::run)
.await?;
let mut session = cx.build_session(PathBuf::from(".")).spawn_session().await?;

session.send_prompt(prompt_text)?;

Expand Down