diff --git a/Cargo.lock b/Cargo.lock index 3b1d5eb..4abba1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -451,7 +451,7 @@ checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32" [[package]] name = "cli-sub-agent" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -610,7 +610,7 @@ dependencies = [ [[package]] name = "csa-acp" -version = "0.1.103" +version = "0.1.104" dependencies = [ "agent-client-protocol", "anyhow", @@ -629,7 +629,7 @@ dependencies = [ [[package]] name = "csa-config" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -645,7 +645,7 @@ dependencies = [ [[package]] name = "csa-core" -version = "0.1.103" +version = "0.1.104" dependencies = [ "agent-teams", "chrono", @@ -659,7 +659,7 @@ dependencies = [ [[package]] name = "csa-executor" -version = "0.1.103" +version = "0.1.104" dependencies = [ "agent-teams", "anyhow", @@ -683,7 +683,7 @@ dependencies = [ [[package]] name = "csa-hooks" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -698,7 +698,7 @@ dependencies = [ [[package]] name = "csa-lock" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -710,7 +710,7 @@ dependencies = [ [[package]] name = "csa-mcp-hub" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "axum", @@ -732,7 +732,7 @@ dependencies = [ [[package]] name = "csa-memory" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "async-trait", @@ -750,7 +750,7 @@ dependencies = [ [[package]] name = "csa-process" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "csa-core", @@ -767,7 +767,7 @@ dependencies = [ [[package]] name = "csa-resource" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "csa-core", @@ -782,7 +782,7 @@ dependencies = [ [[package]] name = "csa-scheduler" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -800,7 +800,7 @@ dependencies = [ [[package]] name = "csa-session" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -821,7 +821,7 @@ dependencies = [ [[package]] name = "csa-todo" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "chrono", @@ -4089,7 +4089,7 @@ dependencies = [ [[package]] name = "weave" -version = "0.1.103" +version = "0.1.104" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index 40849d1..fb0ebe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.1.103" +version = "0.1.104" edition = "2024" rust-version = "1.85" license = "Apache-2.0" diff --git a/crates/csa-acp/src/client.rs b/crates/csa-acp/src/client.rs index c4d65de..a2105e1 100644 --- a/crates/csa-acp/src/client.rs +++ b/crates/csa-acp/src/client.rs @@ -41,11 +41,6 @@ impl AcpClient { } } - pub(crate) fn push_event(&self, event: SessionEvent) { - *self.last_activity.borrow_mut() = Instant::now(); - self.events.borrow_mut().push(event); - } - fn chunk_to_text(chunk: &ContentChunk) -> String { match &chunk.content { ContentBlock::Text(text) => text.text.clone(), @@ -53,41 +48,61 @@ impl AcpClient { } } - fn update_to_event(update: SessionUpdate) -> SessionEvent { + /// Convert an ACP `SessionUpdate` into an optional `SessionEvent`. + /// + /// Returns `None` for protocol-level overhead events that carry no + /// meaningful content (e.g. `AvailableCommandsUpdate`, mode/config + /// updates). These are logged at trace level but excluded from + /// collected output to prevent multi-KB JSON blobs from polluting + /// stdout and summary extraction. + fn update_to_event(update: SessionUpdate) -> Option { match update { SessionUpdate::AgentMessageChunk(chunk) => { - SessionEvent::AgentMessage(Self::chunk_to_text(&chunk)) + Some(SessionEvent::AgentMessage(Self::chunk_to_text(&chunk))) } SessionUpdate::AgentThoughtChunk(chunk) => { - SessionEvent::AgentThought(Self::chunk_to_text(&chunk)) + Some(SessionEvent::AgentThought(Self::chunk_to_text(&chunk))) } - SessionUpdate::ToolCall(tool_call) => SessionEvent::ToolCallStarted { + SessionUpdate::ToolCall(tool_call) => Some(SessionEvent::ToolCallStarted { id: tool_call.tool_call_id.0.to_string(), title: tool_call.title, kind: format!("{:?}", tool_call.kind), - }, + }), SessionUpdate::ToolCallUpdate(tool_call_update) => { let id = tool_call_update.tool_call_id.0.to_string(); if let Some(status) = tool_call_update.fields.status { - SessionEvent::ToolCallCompleted { + Some(SessionEvent::ToolCallCompleted { id, status: format!("{:?}", status), - } + }) } else { - SessionEvent::Other( + Some(SessionEvent::Other( serde_json::to_string(&tool_call_update) .unwrap_or_else(|_| "tool_call_update".into()), - ) + )) } } SessionUpdate::Plan(plan) => { let serialized = serde_json::to_string(&plan) .unwrap_or_else(|_| "".into()); - SessionEvent::PlanUpdate(serialized) + Some(SessionEvent::PlanUpdate(serialized)) + } + // Protocol overhead: these carry large JSON payloads (slash + // command lists, config toggles, mode switches) that are not + // meaningful agent output. Suppress from events to keep + // stdout clean and summary extraction accurate. + SessionUpdate::AvailableCommandsUpdate(_) + | SessionUpdate::ConfigOptionUpdate(_) + | SessionUpdate::CurrentModeUpdate(_) + | SessionUpdate::UserMessageChunk(_) => { + tracing::trace!("suppressed protocol-level SessionUpdate (not content)"); + None } - other => SessionEvent::Other( + // Catch-all for future ACP protocol variants (enum is + // non-exhaustive). Emit as Other for visibility. + other => Some(SessionEvent::Other( serde_json::to_string(&other).unwrap_or_else(|_| "".into()), - ), + )), } } } @@ -115,7 +130,13 @@ impl Client for AcpClient { &self, args: SessionNotification, ) -> agent_client_protocol::Result<()> { - self.push_event(Self::update_to_event(args.update)); + // Always refresh activity timestamp so idle-timeout considers + // protocol-level traffic as proof of liveness, even when the + // event itself is suppressed from collected output. + *self.last_activity.borrow_mut() = Instant::now(); + if let Some(event) = Self::update_to_event(args.update) { + self.events.borrow_mut().push(event); + } Ok(()) } } @@ -138,7 +159,8 @@ mod tests { #[test] fn test_update_to_event_agent_message_chunk() { let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("hello"))); - let event = AcpClient::update_to_event(SessionUpdate::AgentMessageChunk(chunk)); + let event = AcpClient::update_to_event(SessionUpdate::AgentMessageChunk(chunk)) + .expect("AgentMessageChunk should produce an event"); match event { SessionEvent::AgentMessage(text) => assert_eq!(text, "hello"), @@ -149,7 +171,8 @@ mod tests { #[test] fn test_update_to_event_agent_thought_chunk() { let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("thinking"))); - let event = AcpClient::update_to_event(SessionUpdate::AgentThoughtChunk(chunk)); + let event = AcpClient::update_to_event(SessionUpdate::AgentThoughtChunk(chunk)) + .expect("AgentThoughtChunk should produce an event"); match event { SessionEvent::AgentThought(text) => assert_eq!(text, "thinking"), @@ -160,7 +183,8 @@ mod tests { #[test] fn test_update_to_event_tool_call_started() { let tool_call = ToolCall::new("call-1", "Run tests").kind(ToolKind::Execute); - let event = AcpClient::update_to_event(SessionUpdate::ToolCall(tool_call)); + let event = AcpClient::update_to_event(SessionUpdate::ToolCall(tool_call)) + .expect("ToolCall should produce an event"); match event { SessionEvent::ToolCallStarted { id, title, kind } => { @@ -176,7 +200,8 @@ mod tests { fn test_update_to_event_tool_call_completed() { let fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed); let update = ToolCallUpdate::new("call-2", fields); - let event = AcpClient::update_to_event(SessionUpdate::ToolCallUpdate(update)); + let event = AcpClient::update_to_event(SessionUpdate::ToolCallUpdate(update)) + .expect("ToolCallUpdate with status should produce an event"); match event { SessionEvent::ToolCallCompleted { id, status } => { @@ -188,23 +213,71 @@ mod tests { } #[test] - fn test_push_event_appends_to_shared_buffer() { + fn test_update_to_event_suppresses_protocol_overhead() { + use agent_client_protocol::{AvailableCommand, AvailableCommandsUpdate}; + + let commands_update = + AvailableCommandsUpdate::new(vec![AvailableCommand::new("/help", "Get help")]); + let result = + AcpClient::update_to_event(SessionUpdate::AvailableCommandsUpdate(commands_update)); + assert!( + result.is_none(), + "AvailableCommandsUpdate should be suppressed" + ); + } + + #[tokio::test] + async fn test_session_notification_appends_content_event() { + use agent_client_protocol::{Client, SessionNotification}; + let events = Rc::new(RefCell::new(Vec::new())); let last_activity = Rc::new(RefCell::new(Instant::now() - Duration::from_secs(2))); let before = *last_activity.borrow(); let client = AcpClient::new(Rc::clone(&events), Rc::clone(&last_activity)); - client.push_event(SessionEvent::Other("payload".to_string())); + let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("hello"))); + let notification = + SessionNotification::new("test-session", SessionUpdate::AgentMessageChunk(chunk)); + client.session_notification(notification).await.unwrap(); let stored = events.borrow(); assert_eq!(stored.len(), 1); match &stored[0] { - SessionEvent::Other(payload) => assert_eq!(payload, "payload"), + SessionEvent::AgentMessage(text) => assert_eq!(text, "hello"), other => panic!("unexpected stored event: {other:?}"), } assert!( *last_activity.borrow() > before, - "push_event must refresh last_activity for all session events" + "session_notification must refresh last_activity" + ); + } + + #[tokio::test] + async fn test_session_notification_suppresses_protocol_event_but_refreshes_activity() { + use agent_client_protocol::{ + AvailableCommand, AvailableCommandsUpdate, Client, SessionNotification, + }; + + let events = Rc::new(RefCell::new(Vec::new())); + let last_activity = Rc::new(RefCell::new(Instant::now() - Duration::from_secs(2))); + let before = *last_activity.borrow(); + let client = AcpClient::new(Rc::clone(&events), Rc::clone(&last_activity)); + + let commands_update = + AvailableCommandsUpdate::new(vec![AvailableCommand::new("/help", "Get help")]); + let notification = SessionNotification::new( + "test-session", + SessionUpdate::AvailableCommandsUpdate(commands_update), + ); + client.session_notification(notification).await.unwrap(); + + assert!( + events.borrow().is_empty(), + "protocol overhead should not produce events" + ); + assert!( + *last_activity.borrow() > before, + "session_notification must refresh last_activity even for suppressed events" ); } } diff --git a/crates/csa-acp/src/connection.rs b/crates/csa-acp/src/connection.rs index 9f9d563..e3e2331 100644 --- a/crates/csa-acp/src/connection.rs +++ b/crates/csa-acp/src/connection.rs @@ -548,6 +548,14 @@ fn spool_chunk(spool: &mut Option, bytes: &[u8]) { } } +/// Collect agent output for the caller (stdout / summary extraction). +/// +/// Only `AgentMessage` and `AgentThought` events contribute to the return +/// value. Diagnostic events (plan updates, tool-call lifecycle, other) are +/// intentionally excluded — they are already written to the `output.log` +/// spool by [`stream_new_agent_messages`] for audit purposes, but including +/// them in the caller-facing output pollutes stdout and corrupts summary +/// extraction (which uses the last non-empty line). fn collect_agent_output(events: &[SessionEvent]) -> String { let mut output = String::new(); for event in events { @@ -555,18 +563,11 @@ fn collect_agent_output(events: &[SessionEvent]) -> String { SessionEvent::AgentMessage(chunk) | SessionEvent::AgentThought(chunk) => { output.push_str(chunk); } - SessionEvent::PlanUpdate(plan) => { - output.push_str(&format!("[plan] {plan}\n")); - } - SessionEvent::ToolCallStarted { title, kind, .. } => { - output.push_str(&format!("[tool:started] {title} ({kind})\n")); - } - SessionEvent::ToolCallCompleted { status, .. } => { - output.push_str(&format!("[tool:completed] {status}\n")); - } - SessionEvent::Other(payload) => { - output.push_str(&format!("[other] {payload}\n")); - } + // Diagnostic events: spool-only, not forwarded to caller. + SessionEvent::PlanUpdate(_) + | SessionEvent::ToolCallStarted { .. } + | SessionEvent::ToolCallCompleted { .. } + | SessionEvent::Other(_) => {} } } output diff --git a/crates/csa-acp/src/connection_tests.rs b/crates/csa-acp/src/connection_tests.rs index ed4634e..aa6bb6c 100644 --- a/crates/csa-acp/src/connection_tests.rs +++ b/crates/csa-acp/src/connection_tests.rs @@ -180,6 +180,83 @@ fn stream_new_agent_messages_skips_non_message_events() { assert_eq!(index, 2); } +#[test] +fn collect_agent_output_excludes_diagnostic_events() { + let events = vec![ + SessionEvent::AgentMessage("Hello".to_string()), + SessionEvent::PlanUpdate("step 1".to_string()), + SessionEvent::ToolCallStarted { + id: "t1".to_string(), + title: "Read".to_string(), + kind: "tool_use".to_string(), + }, + SessionEvent::AgentThought("hmm".to_string()), + SessionEvent::ToolCallCompleted { + id: "t1".to_string(), + status: "completed".to_string(), + }, + SessionEvent::Other("misc".to_string()), + SessionEvent::AgentMessage(" world".to_string()), + ]; + let output = collect_agent_output(&events); + assert_eq!( + output, "Hellohmm world", + "collect_agent_output must only include AgentMessage and AgentThought" + ); +} + +#[test] +fn stream_new_agent_messages_writes_all_event_types_to_spool() { + let events = Rc::new(RefCell::new(vec![ + SessionEvent::AgentMessage("msg".to_string()), + SessionEvent::PlanUpdate("plan step".to_string()), + SessionEvent::ToolCallStarted { + id: "t1".to_string(), + title: "Edit".to_string(), + kind: "tool_use".to_string(), + }, + SessionEvent::ToolCallCompleted { + id: "t1".to_string(), + status: "done".to_string(), + }, + SessionEvent::Other("extra".to_string()), + SessionEvent::AgentThought("thought".to_string()), + ])); + + let temp = tempfile::tempdir().expect("tempdir"); + let spool_path = temp.path().join("output.log"); + let mut spool = open_output_spool_file(Some(&spool_path)); + let mut index = 0; + + stream_new_agent_messages(&events, &mut index, false, &mut spool); + let spool_content = std::fs::read_to_string(&spool_path).expect("read spool"); + assert!( + spool_content.contains("msg"), + "spool must include AgentMessage" + ); + assert!( + spool_content.contains("[plan] plan step"), + "spool must include PlanUpdate" + ); + assert!( + spool_content.contains("[tool:started] Edit"), + "spool must include ToolCallStarted" + ); + assert!( + spool_content.contains("[tool:completed] done"), + "spool must include ToolCallCompleted" + ); + assert!( + spool_content.contains("[other] extra"), + "spool must include Other" + ); + assert!( + spool_content.contains("thought"), + "spool must include AgentThought" + ); + assert_eq!(index, 6); +} + #[test] fn heartbeat_interval_defaults_to_enabled() { let _env_lock = HEARTBEAT_ENV_LOCK diff --git a/weave.lock b/weave.lock index 8bdbe1b..97e7320 100644 --- a/weave.lock +++ b/weave.lock @@ -1,7 +1,9 @@ +package = [] + [versions] -csa = "0.1.102" -weave = "0.1.102" +csa = "0.1.104" last_migrated_at = "2026-03-08T12:08:01.820964091Z" +weave = "0.1.104" [migrations] applied = [