Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.1.103"
version = "0.1.104"
edition = "2024"
rust-version = "1.85"
license = "Apache-2.0"
Expand Down
125 changes: 99 additions & 26 deletions crates/csa-acp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,53 +41,68 @@ impl AcpClient {
}
}

pub(crate) fn push_event(&self, event: SessionEvent) {
*self.last_activity.borrow_mut() = Instant::now();
self.events.borrow_mut().push(event);
}

fn chunk_to_text(chunk: &ContentChunk) -> String {
match &chunk.content {
ContentBlock::Text(text) => text.text.clone(),
other => serde_json::to_string(other).unwrap_or_else(|_| "<non-text-content>".into()),
}
}

fn update_to_event(update: SessionUpdate) -> SessionEvent {
/// Convert an ACP `SessionUpdate` into an optional `SessionEvent`.
///
/// Returns `None` for protocol-level overhead events that carry no
/// meaningful content (e.g. `AvailableCommandsUpdate`, mode/config
/// updates). These are logged at trace level but excluded from
/// collected output to prevent multi-KB JSON blobs from polluting
/// stdout and summary extraction.
fn update_to_event(update: SessionUpdate) -> Option<SessionEvent> {
match update {
SessionUpdate::AgentMessageChunk(chunk) => {
SessionEvent::AgentMessage(Self::chunk_to_text(&chunk))
Some(SessionEvent::AgentMessage(Self::chunk_to_text(&chunk)))
}
SessionUpdate::AgentThoughtChunk(chunk) => {
SessionEvent::AgentThought(Self::chunk_to_text(&chunk))
Some(SessionEvent::AgentThought(Self::chunk_to_text(&chunk)))
}
SessionUpdate::ToolCall(tool_call) => SessionEvent::ToolCallStarted {
SessionUpdate::ToolCall(tool_call) => Some(SessionEvent::ToolCallStarted {
id: tool_call.tool_call_id.0.to_string(),
title: tool_call.title,
kind: format!("{:?}", tool_call.kind),
},
}),
SessionUpdate::ToolCallUpdate(tool_call_update) => {
let id = tool_call_update.tool_call_id.0.to_string();
if let Some(status) = tool_call_update.fields.status {
SessionEvent::ToolCallCompleted {
Some(SessionEvent::ToolCallCompleted {
id,
status: format!("{:?}", status),
}
})
} else {
SessionEvent::Other(
Some(SessionEvent::Other(
serde_json::to_string(&tool_call_update)
.unwrap_or_else(|_| "tool_call_update".into()),
)
))
}
}
SessionUpdate::Plan(plan) => {
let serialized = serde_json::to_string(&plan)
.unwrap_or_else(|_| "<plan-serialize-failed>".into());
SessionEvent::PlanUpdate(serialized)
Some(SessionEvent::PlanUpdate(serialized))
}
// Protocol overhead: these carry large JSON payloads (slash
// command lists, config toggles, mode switches) that are not
// meaningful agent output. Suppress from events to keep
// stdout clean and summary extraction accurate.
SessionUpdate::AvailableCommandsUpdate(_)
| SessionUpdate::ConfigOptionUpdate(_)
| SessionUpdate::CurrentModeUpdate(_)
| SessionUpdate::UserMessageChunk(_) => {
tracing::trace!("suppressed protocol-level SessionUpdate (not content)");
None
}
other => SessionEvent::Other(
// Catch-all for future ACP protocol variants (enum is
// non-exhaustive). Emit as Other for visibility.
other => Some(SessionEvent::Other(
serde_json::to_string(&other).unwrap_or_else(|_| "<unknown-update>".into()),
),
)),
}
}
}
Expand Down Expand Up @@ -115,7 +130,13 @@ impl Client for AcpClient {
&self,
args: SessionNotification,
) -> agent_client_protocol::Result<()> {
self.push_event(Self::update_to_event(args.update));
// Always refresh activity timestamp so idle-timeout considers
// protocol-level traffic as proof of liveness, even when the
// event itself is suppressed from collected output.
*self.last_activity.borrow_mut() = Instant::now();
if let Some(event) = Self::update_to_event(args.update) {
self.events.borrow_mut().push(event);
}
Ok(())
}
}
Expand All @@ -138,7 +159,8 @@ mod tests {
#[test]
fn test_update_to_event_agent_message_chunk() {
let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("hello")));
let event = AcpClient::update_to_event(SessionUpdate::AgentMessageChunk(chunk));
let event = AcpClient::update_to_event(SessionUpdate::AgentMessageChunk(chunk))
.expect("AgentMessageChunk should produce an event");

match event {
SessionEvent::AgentMessage(text) => assert_eq!(text, "hello"),
Expand All @@ -149,7 +171,8 @@ mod tests {
#[test]
fn test_update_to_event_agent_thought_chunk() {
let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("thinking")));
let event = AcpClient::update_to_event(SessionUpdate::AgentThoughtChunk(chunk));
let event = AcpClient::update_to_event(SessionUpdate::AgentThoughtChunk(chunk))
.expect("AgentThoughtChunk should produce an event");

match event {
SessionEvent::AgentThought(text) => assert_eq!(text, "thinking"),
Expand All @@ -160,7 +183,8 @@ mod tests {
#[test]
fn test_update_to_event_tool_call_started() {
let tool_call = ToolCall::new("call-1", "Run tests").kind(ToolKind::Execute);
let event = AcpClient::update_to_event(SessionUpdate::ToolCall(tool_call));
let event = AcpClient::update_to_event(SessionUpdate::ToolCall(tool_call))
.expect("ToolCall should produce an event");

match event {
SessionEvent::ToolCallStarted { id, title, kind } => {
Expand All @@ -176,7 +200,8 @@ mod tests {
fn test_update_to_event_tool_call_completed() {
let fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed);
let update = ToolCallUpdate::new("call-2", fields);
let event = AcpClient::update_to_event(SessionUpdate::ToolCallUpdate(update));
let event = AcpClient::update_to_event(SessionUpdate::ToolCallUpdate(update))
.expect("ToolCallUpdate with status should produce an event");

match event {
SessionEvent::ToolCallCompleted { id, status } => {
Expand All @@ -188,23 +213,71 @@ mod tests {
}

#[test]
fn test_push_event_appends_to_shared_buffer() {
fn test_update_to_event_suppresses_protocol_overhead() {
use agent_client_protocol::{AvailableCommand, AvailableCommandsUpdate};

let commands_update =
AvailableCommandsUpdate::new(vec![AvailableCommand::new("/help", "Get help")]);
let result =
AcpClient::update_to_event(SessionUpdate::AvailableCommandsUpdate(commands_update));
assert!(
result.is_none(),
"AvailableCommandsUpdate should be suppressed"
);
}

#[tokio::test]
async fn test_session_notification_appends_content_event() {
use agent_client_protocol::{Client, SessionNotification};

let events = Rc::new(RefCell::new(Vec::new()));
let last_activity = Rc::new(RefCell::new(Instant::now() - Duration::from_secs(2)));
let before = *last_activity.borrow();
let client = AcpClient::new(Rc::clone(&events), Rc::clone(&last_activity));

client.push_event(SessionEvent::Other("payload".to_string()));
let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new("hello")));
let notification =
SessionNotification::new("test-session", SessionUpdate::AgentMessageChunk(chunk));
client.session_notification(notification).await.unwrap();

let stored = events.borrow();
assert_eq!(stored.len(), 1);
match &stored[0] {
SessionEvent::Other(payload) => assert_eq!(payload, "payload"),
SessionEvent::AgentMessage(text) => assert_eq!(text, "hello"),
other => panic!("unexpected stored event: {other:?}"),
}
assert!(
*last_activity.borrow() > before,
"push_event must refresh last_activity for all session events"
"session_notification must refresh last_activity"
);
}

#[tokio::test]
async fn test_session_notification_suppresses_protocol_event_but_refreshes_activity() {
use agent_client_protocol::{
AvailableCommand, AvailableCommandsUpdate, Client, SessionNotification,
};

let events = Rc::new(RefCell::new(Vec::new()));
let last_activity = Rc::new(RefCell::new(Instant::now() - Duration::from_secs(2)));
let before = *last_activity.borrow();
let client = AcpClient::new(Rc::clone(&events), Rc::clone(&last_activity));

let commands_update =
AvailableCommandsUpdate::new(vec![AvailableCommand::new("/help", "Get help")]);
let notification = SessionNotification::new(
"test-session",
SessionUpdate::AvailableCommandsUpdate(commands_update),
);
client.session_notification(notification).await.unwrap();

assert!(
events.borrow().is_empty(),
"protocol overhead should not produce events"
);
assert!(
*last_activity.borrow() > before,
"session_notification must refresh last_activity even for suppressed events"
);
}
}
25 changes: 13 additions & 12 deletions crates/csa-acp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,25 +548,26 @@ fn spool_chunk(spool: &mut Option<std::fs::File>, bytes: &[u8]) {
}
}

/// Collect agent output for the caller (stdout / summary extraction).
///
/// Only `AgentMessage` and `AgentThought` events contribute to the return
/// value. Diagnostic events (plan updates, tool-call lifecycle, other) are
/// intentionally excluded — they are already written to the `output.log`
/// spool by [`stream_new_agent_messages`] for audit purposes, but including
/// them in the caller-facing output pollutes stdout and corrupts summary
/// extraction (which uses the last non-empty line).
fn collect_agent_output(events: &[SessionEvent]) -> String {
let mut output = String::new();
for event in events {
match event {
SessionEvent::AgentMessage(chunk) | SessionEvent::AgentThought(chunk) => {
output.push_str(chunk);
}
SessionEvent::PlanUpdate(plan) => {
output.push_str(&format!("[plan] {plan}\n"));
}
SessionEvent::ToolCallStarted { title, kind, .. } => {
output.push_str(&format!("[tool:started] {title} ({kind})\n"));
}
SessionEvent::ToolCallCompleted { status, .. } => {
output.push_str(&format!("[tool:completed] {status}\n"));
}
SessionEvent::Other(payload) => {
output.push_str(&format!("[other] {payload}\n"));
}
// Diagnostic events: spool-only, not forwarded to caller.
SessionEvent::PlanUpdate(_)
| SessionEvent::ToolCallStarted { .. }
| SessionEvent::ToolCallCompleted { .. }
| SessionEvent::Other(_) => {}
}
}
output
Expand Down
Loading
Loading