diff --git a/src/sacp/src/cookbook.rs b/src/sacp/src/cookbook.rs index b38f19d..9d9f882 100644 --- a/src/sacp/src/cookbook.rs +++ b/src/sacp/src/cookbook.rs @@ -327,17 +327,19 @@ pub mod per_session_mcp_server { //! - You want to customize the MCP server based on session parameters //! - Tools need to send notifications back to a specific session //! - //! # Example + //! # Simple example: proxy everything + //! + //! Use [`spawn_session_proxy`] when you just want to inject an MCP server + //! and proxy all messages without any additional processing: //! //! ``` //! use sacp::mcp_server::McpServer; //! use sacp::schema::NewSessionRequest; - //! use sacp::{Agent, Client, Component, JrResponder, ProxyToConductor}; + //! use sacp::{Agent, Client, Component, ProxyToConductor}; //! //! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> { //! ProxyToConductor::builder() //! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| { - //! // Create an MCP server for this session //! let cwd = request.cwd.clone(); //! let mcp_server = McpServer::builder("session-tools") //! .tool_fn("get_cwd", "Returns session working directory", @@ -346,10 +348,9 @@ pub mod per_session_mcp_server { //! }, sacp::tool_fn!()) //! .build(); //! - //! // Build the session with the MCP server attached and proxy it //! cx.build_session_from(request) //! .with_mcp_server(mcp_server)? - //! .proxy_session(request_cx, JrResponder::run) + //! .spawn_session_proxy(request_cx) //! .await //! }, sacp::on_receive_request!()) //! .serve(transport) @@ -357,6 +358,44 @@ pub mod per_session_mcp_server { //! } //! ``` //! + //! # Advanced example: intercept before proxying + //! + //! Use [`spawn_session`] + [`proxy_remaining_messages`] when you need to + //! do something with the session before handing off to proxy mode: + //! + //! ``` + //! use sacp::mcp_server::McpServer; + //! use sacp::schema::NewSessionRequest; + //! use sacp::{Agent, Client, Component, ProxyToConductor}; + //! + //! async fn run_proxy(transport: impl Component) -> Result<(), sacp::Error> { + //! ProxyToConductor::builder() + //! .on_receive_request_from(Client, async |request: NewSessionRequest, request_cx, cx| { + //! let cwd = request.cwd.clone(); + //! let mcp_server = McpServer::builder("session-tools") + //! .tool_fn("get_cwd", "Returns session working directory", + //! async move |_params: (), _cx| { + //! Ok(cwd.display().to_string()) + //! }, sacp::tool_fn!()) + //! .build(); + //! + //! let active_session = cx.build_session_from(request) + //! .with_mcp_server(mcp_server)? + //! .spawn_session() + //! .await?; + //! + //! // Do something with the session before proxying... + //! tracing::info!(session_id = %active_session.session_id(), "Session created"); + //! + //! // Respond to the client and proxy remaining messages + //! request_cx.respond(active_session.response())?; + //! active_session.proxy_remaining_messages() + //! }, sacp::on_receive_request!()) + //! .serve(transport) + //! .await + //! } + //! ``` + //! //! # How it works //! //! When you call [`SessionBuilder::with_mcp_server`]: @@ -366,5 +405,8 @@ pub mod per_session_mcp_server { //! 3. The MCP server's URL is added to the `NewSessionRequest` //! 4. The handler lives as long as the session (dropped when `run_session` completes) //! + //! [`spawn_session_proxy`]: crate::SessionBuilder::spawn_session_proxy + //! [`spawn_session`]: crate::SessionBuilder::spawn_session + //! [`proxy_remaining_messages`]: crate::ActiveSession::proxy_remaining_messages //! [`SessionBuilder::with_mcp_server`]: crate::SessionBuilder::with_mcp_server } diff --git a/src/sacp/src/session.rs b/src/sacp/src/session.rs index faed433..8edad8f 100644 --- a/src/sacp/src/session.rs +++ b/src/sacp/src/session.rs @@ -1,10 +1,11 @@ -use std::path::Path; +use std::{marker::PhantomData, path::Path}; use agent_client_protocol_schema::{ ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason, }; use futures::channel::mpsc; +use tokio::sync::oneshot; use crate::{ Agent, Client, Handled, HasEndpoint, JrConnectionCx, JrMessageHandler, JrRequestCx, JrRole, @@ -51,16 +52,16 @@ where /// and let you access them. /// /// Normally you would not use this method directly but would - /// instead use [`Self::build_session`] and then [`SessionBuilder::send_request`]. + /// instead use [`Self::build_session`] and then [`SessionBuilder::spawn_session`]. /// /// The vector `dynamic_handler_registrations` contains any dynamic /// handle registrations associated with this session (e.g., from MCP servers). /// You can simply pass `Default::default()` if not applicable. - pub fn attach_session( + pub fn attach_session<'responder>( &self, response: NewSessionResponse, mut dynamic_handler_registrations: Vec>, - ) -> Result, crate::Error> { + ) -> Result, crate::Error> { let NewSessionResponse { session_id, modes, @@ -81,6 +82,7 @@ where update_tx, connection: self.clone(), dynamic_handler_registrations, + _responder: PhantomData, }) } } @@ -136,10 +138,17 @@ where }) } - /// Send the request to create the session. + /// Run this session synchronously. The current task will be blocked + /// and `op` will be executed with the active session information. + /// This is useful when you have MCP servers that are borrowed from your local + /// stack frame. + /// + /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which + /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the + /// responders would terminate when `op` returns). pub async fn run_session( self, - op: impl AsyncFnOnce(ActiveSession) -> Result, + op: impl for<'responder> AsyncFnOnce(ActiveSession<'responder, Role>) -> Result, ) -> Result { let response = self .connection @@ -163,81 +172,74 @@ where /// drift but at the cost of requiring MCP servers that are `Send` and /// don't access data from the surrounding scope. /// - /// # Parameters - /// - /// * `run_responder`: this is typically just `Responder::run`; - /// the need for this parameter is a workaround for Rust limitations. - pub async fn send_request( - self, - run_responder: impl FnOnce(Responder, JrConnectionCx) -> F, - ) -> Result, crate::Error> + /// Returns an `ActiveSession<'static, _>` because responders are spawned + /// into background tasks that live for the connection lifetime. + pub async fn spawn_session(self) -> Result, crate::Error> where - F: Future> + Send + 'static, + Responder: 'static, { - let response = self - .connection - .send_request_to(Agent, self.request) - .block_task() - .await?; + let (active_session_tx, active_session_rx) = oneshot::channel(); - let cx = self.connection.clone(); - self.connection.spawn(run_responder(self.responder, cx))?; + let connection = self.connection.clone(); + connection.spawn(async move { + let response = self + .connection + .send_request_to(Agent, self.request) + .block_task() + .await?; - self.connection - .attach_session(response, self.dynamic_handler_registrations) + let cx = self.connection.clone(); + self.connection.spawn(self.responder.run(cx))?; + + let active_session = self + .connection + .attach_session(response, self.dynamic_handler_registrations)?; + + active_session_tx + .send(active_session) + .map_err(|_| crate::Error::internal_error())?; + + Ok(()) + })?; + + active_session_rx + .await + .map_err(|_| crate::Error::internal_error()) } - /// Forward the session request to the agent and proxy all messages. + /// Spawn a session and proxy all messages between client and agent. /// + /// This is a convenience method that combines [`spawn_session`](Self::spawn_session), + /// responding to the client, and [`ActiveSession::proxy_remaining_messages`]. /// Use this when you want to inject MCP servers into a session but don't need - /// to actively interact with it. The session messages will be proxied between - /// client and agent automatically. + /// to actively interact with it. /// - /// # Parameters - /// - /// * `request_cx`: The request context from the intercepted `session.new` request, - /// used to send the response back to the client. - /// * `run_responder`: this is typically just `Responder::run`; - /// the need for this parameter is a workaround for Rust limitations. - pub async fn proxy_session( + /// For more control (e.g., to send some messages before proxying), use + /// [`spawn_session`](Self::spawn_session) instead and call + /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually. + pub async fn spawn_session_proxy( self, request_cx: JrRequestCx, - run_responder: impl FnOnce(Responder, JrConnectionCx) -> F, ) -> Result<(), crate::Error> where Role: HasEndpoint, - F: Future> + Send + 'static, + Responder: 'static, { - let response = self - .connection - .send_request_to(Agent, self.request) - .block_task() - .await?; - - // Add dynamic handler to proxy session messages - let session_id = response.session_id.clone(); - self.connection - .add_dynamic_handler(ProxySessionMessages::new(session_id))? - .run_indefinitely(); - - // Keep MCP server handlers alive - for registration in self.dynamic_handler_registrations { - registration.run_indefinitely(); - } - - // Spawn the responder - let cx = self.connection.clone(); - self.connection.spawn(run_responder(self.responder, cx))?; - - // Send response back to client - request_cx.respond(response)?; - - Ok(()) + let active_session = self.spawn_session().await?; + request_cx.respond(active_session.response())?; + active_session.proxy_remaining_messages() } } /// Active session struct that lets you send prompts and receive updates. -pub struct ActiveSession +/// +/// The `'responder` lifetime represents the span during which responders +/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::spawn_session`], +/// this is `'static` because responders are spawned into background tasks. +/// When created via [`SessionBuilder::run_session`], this is tied to the +/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called +/// (since the responders would die when the closure returns). +pub struct ActiveSession<'responder, Role> where Role: HasEndpoint, { @@ -251,8 +253,10 @@ where /// Collect registrations from dynamic handlers for MCP servers etc. /// These will be dropped once the active-session struct is dropped /// which will cause them to be deregistered. - #[expect(dead_code)] dynamic_handler_registrations: Vec>, + + /// Phantom lifetime representing the responder lifetime. + _responder: PhantomData<&'responder ()>, } /// Incoming message from the agent @@ -267,7 +271,7 @@ pub enum SessionMessage { StopReason(StopReason), } -impl ActiveSession +impl<'responder, R> ActiveSession<'responder, R> where R: HasEndpoint, { @@ -286,6 +290,18 @@ where &self.meta } + /// Build a `NewSessionResponse` from the session information. + /// + /// Useful when you need to forward the session response to a client + /// after doing some processing. + pub fn response(&self) -> NewSessionResponse { + NewSessionResponse { + session_id: self.session_id.clone(), + modes: self.modes.clone(), + meta: self.meta.clone(), + } + } + /// Access the underlying connection context used to communicate with the agent. pub fn connection_cx(&self) -> JrConnectionCx { self.connection.clone() @@ -356,6 +372,39 @@ where } } +impl ActiveSession<'static, R> +where + R: HasEndpoint, +{ + /// Proxy all remaining messages for this session between client and agent. + /// + /// Use this when you want to inject MCP servers into a session but don't need + /// to actively interact with it after setup. The session messages will be proxied + /// between client and agent automatically. + /// + /// This consumes the `ActiveSession` since you're giving up active control. + /// + /// This method is only available on `ActiveSession<'static, _>` (from + /// [`SessionBuilder::spawn_session`]) because it requires responders to + /// outlive the method call. + pub fn proxy_remaining_messages(self) -> Result<(), crate::Error> + where + R: HasEndpoint, + { + // Add dynamic handler to proxy session messages + self.connection + .add_dynamic_handler(ProxySessionMessages::new(self.session_id))? + .run_indefinitely(); + + // Keep MCP server handlers alive + for registration in self.dynamic_handler_registrations { + registration.run_indefinitely(); + } + + Ok(()) + } +} + struct ActiveSessionHandler where Role: HasEndpoint, diff --git a/src/yopo/src/lib.rs b/src/yopo/src/lib.rs index 0412ef5..fb6d574 100644 --- a/src/yopo/src/lib.rs +++ b/src/yopo/src/lib.rs @@ -2,13 +2,13 @@ //! //! Provides a convenient API for running one-shot prompts against SACP components. +use sacp::ClientToAgent; use sacp::schema::{ AudioContent, ContentBlock, EmbeddedResourceResource, ImageContent, InitializeRequest, RequestPermissionOutcome, RequestPermissionRequest, RequestPermissionResponse, SessionNotification, TextContent, VERSION as PROTOCOL_VERSION, }; use sacp::util::MatchMessage; -use sacp::{ClientToAgent, JrResponder}; use sacp::{Component, Handled, MessageCx, UntypedMessage}; use std::path::PathBuf; @@ -113,10 +113,7 @@ pub async fn prompt_with_callback( .block_task() .await?; - let mut session = cx - .build_session(PathBuf::from(".")) - .send_request(JrResponder::run) - .await?; + let mut session = cx.build_session(PathBuf::from(".")).spawn_session().await?; session.send_prompt(prompt_text)?;