Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/elizacp/tests/mcp_tool_invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn recv<T: sacp::JrResponsePayload + Send>(
response: sacp::JrResponse<T>,
) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.await_when_result_received(async move |result| {
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
Expand Down
10 changes: 5 additions & 5 deletions src/sacp-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ impl ConductorResponder {
meta: None,
},
)
.await_when_result_received({
.on_receiving_result({
let mut conductor_tx = conductor_tx.clone();
async move |result| {
match result {
Expand Down Expand Up @@ -979,7 +979,7 @@ impl ConductorResponder {
.as_ref()
.expect("we have an agent component")
.send_request(initialize_req)
.await_when_result_received(async move |response| {
.on_receiving_result(async move |response| {
tracing::debug!(?response, "got initialize response from agent");
request_cx
.respond_with_result_via(conductor_tx, response)
Expand All @@ -999,7 +999,7 @@ impl ConductorResponder {
// Forward initialize request to our successor
connection_cx
.send_request_to(Agent, initialize_req)
.await_when_result_received(async move |result| {
.on_receiving_result(async move |result| {
tracing::trace!(
?result,
"received response to initialize_proxy from empty conductor"
Expand All @@ -1016,7 +1016,7 @@ impl ConductorResponder {
let proxy_req = InitializeProxyRequest::from(initialize_req);
self.proxies[target_component_index]
.send_request(proxy_req)
.await_when_result_received(async move |result| {
.on_receiving_result(async move |result| {
tracing::debug!(?result, "got initialize_proxy response from proxy");
// Convert InitializeProxyResponse back to InitializeResponse
request_cx
Expand Down Expand Up @@ -1550,7 +1550,7 @@ impl<T: JrResponsePayload> JrResponseExt<T> for JrResponse<T> {
request_cx: JrRequestCx<T>,
) -> Result<(), sacp::Error> {
let conductor_tx = conductor_tx.clone();
self.await_when_result_received(async move |result| {
self.on_receiving_result(async move |result| {
request_cx
.respond_with_result_via(conductor_tx, result)
.await
Expand Down
4 changes: 2 additions & 2 deletions src/sacp-conductor/tests/initialization_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn recv<T: sacp::JrResponsePayload + Send>(
response: sacp::JrResponse<T>,
) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.await_when_result_received(async move |result| {
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Component for InitComponent {

// Forward to successor and respond
cx.send_request_to(sacp::Agent, request)
.await_when_result_received(async move |response| {
.on_receiving_result(async move |response| {
let response: InitializeResponse = response?;
request_cx.respond(response)
})
Expand Down
2 changes: 1 addition & 1 deletion src/sacp-conductor/tests/mcp-integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn recv<T: sacp::JrResponsePayload + Send>(
response: sacp::JrResponse<T>,
) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.await_when_result_received(async move |result| {
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
Expand Down
4 changes: 2 additions & 2 deletions src/sacp-conductor/tests/mcp_server_handler_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn recv<T: sacp::JrResponsePayload + Send>(
response: sacp::JrResponse<T>,
) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.await_when_result_received(async move |result| {
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Component for ProxyWithMcpAndHandler {

// Forward to agent and relay response
cx.send_request_to(Agent, request)
.await_when_result_received(async move |result| {
.on_receiving_result(async move |result| {
let response: NewSessionResponse = result?;
request_cx.respond(response)
})
Expand Down
101 changes: 101 additions & 0 deletions src/sacp-conductor/tests/test_tool_fn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! Integration test for `tool_fn` - stateless concurrent tools
//!
//! This test verifies that `tool_fn` works correctly for stateless tools
//! that don't need mutable state.

use sacp::Component;
use sacp::ProxyToConductor;
use sacp::mcp_server::McpServer;
use sacp_conductor::Conductor;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

/// Input for the greet tool
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct GreetInput {
name: String,
}

/// Create a proxy that provides an MCP server with a stateless greet tool
fn create_greet_proxy() -> Result<sacp::DynComponent, sacp::Error> {
// Create MCP server with a stateless greet tool using tool_fn
let mcp_server = McpServer::builder("greet_server".to_string())
.instructions("Test MCP server with stateless greet tool")
.tool_fn(
"greet",
"Greet someone by name",
async |input: GreetInput, _context| Ok(format!("Hello, {}!", input.name)),
sacp::tool_fn!(),
)
.build();

// Create proxy component
Ok(sacp::DynComponent::new(ProxyWithGreetServer { mcp_server }))
}

struct ProxyWithGreetServer<R: sacp::JrResponder<ProxyToConductor>> {
mcp_server: McpServer<ProxyToConductor, R>,
}

impl<R: sacp::JrResponder<ProxyToConductor> + 'static + Send> Component
for ProxyWithGreetServer<R>
{
async fn serve(self, client: impl Component) -> Result<(), sacp::Error> {
ProxyToConductor::builder()
.name("greet-proxy")
.with_mcp_server(self.mcp_server)
.serve(client)
.await
}
}

/// Elizacp agent component wrapper for testing
struct ElizacpAgentComponent;

impl Component for ElizacpAgentComponent {
async fn serve(self, client: impl Component) -> Result<(), sacp::Error> {
// Create duplex channels for bidirectional communication
let (elizacp_write, client_read) = duplex(8192);
let (client_write, elizacp_read) = duplex(8192);

let elizacp_transport =
sacp::ByteStreams::new(elizacp_write.compat_write(), elizacp_read.compat());

let client_transport =
sacp::ByteStreams::new(client_write.compat_write(), client_read.compat());

// Spawn elizacp in a background task
tokio::spawn(async move {
if let Err(e) = elizacp::ElizaAgent::new().serve(elizacp_transport).await {
tracing::error!("Elizacp error: {}", e);
}
});

// Serve the client with the transport connected to elizacp
client_transport.serve(client).await
}
}

#[tokio::test]
async fn test_tool_fn_greet() -> Result<(), sacp::Error> {
let result = yopo::prompt(
Conductor::new(
"test-conductor".to_string(),
vec![
create_greet_proxy()?,
sacp::DynComponent::new(ElizacpAgentComponent),
],
Default::default(),
),
r#"Use tool greet_server::greet with {"name": "World"}"#,
)
.await?;

expect_test::expect![[r#"
"OK: CallToolResult { content: [Annotated { raw: Text(RawTextContent { text: \"\\\"Hello, World!\\\"\", meta: None }), annotations: None }], structured_content: Some(String(\"Hello, World!\")), is_error: Some(false), meta: None }"
"#]].assert_debug_eq(&result);

Ok(())
}
2 changes: 1 addition & 1 deletion src/sacp-tokio/tests/debug_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn recv<T: sacp::JrResponsePayload + Send>(
response: sacp::JrResponse<T>,
) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.await_when_result_received(async move |result| {
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
Expand Down
Loading