Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 141 additions & 8 deletions src-tauri/src/proxy/providers/streaming_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ fn tool_item_key_from_event(data: &Value) -> Option<String> {
None
}

fn buffered_tool_arguments(chunks: &[String]) -> String {
let joined = chunks.concat();
if joined.is_empty() || matches!(serde_json::from_str::<Value>(&joined), Ok(Value::Object(_))) {
return joined;
}

chunks
.iter()
.rev()
.find(|chunk| matches!(serde_json::from_str::<Value>(chunk), Ok(Value::Object(_))))
.cloned()
.unwrap_or(joined)
}

#[inline]
fn resolve_content_index(
data: &Value,
Expand Down Expand Up @@ -102,7 +116,7 @@ pub fn create_anthropic_sse_stream_from_responses(
let mut current_text_index: Option<u32> = None;
let mut tool_index_by_item_id: HashMap<String, u32> = HashMap::new();
let mut tool_name_by_index: HashMap<u32, String> = HashMap::new();
let mut tool_args_by_index: HashMap<u32, String> = HashMap::new();
let mut tool_args_by_index: HashMap<u32, Vec<String>> = HashMap::new();
let mut last_tool_index: Option<u32> = None;

tokio::pin!(stream);
Expand Down Expand Up @@ -326,7 +340,7 @@ pub fn create_anthropic_sse_stream_from_responses(
continue;
}

tool_args_by_index.insert(index, String::new());
tool_args_by_index.insert(index, Vec::new());

let event = json!({
"type": "content_block_start",
Expand Down Expand Up @@ -384,7 +398,10 @@ pub fn create_anthropic_sse_stream_from_responses(
open_indices.insert(index);
}

tool_args_by_index.entry(index).or_default().push_str(delta);
tool_args_by_index
.entry(index)
.or_default()
.push(delta.to_string());
if tool_name_by_index.get(&index).map(String::as_str) == Some("Read") {
continue;
}
Expand Down Expand Up @@ -425,7 +442,7 @@ pub fn create_anthropic_sse_stream_from_responses(
.unwrap_or_else(|| {
tool_args_by_index
.get(&index)
.cloned()
.map(|chunks| buffered_tool_arguments(chunks))
.unwrap_or_default()
});
let sanitized = sanitize_anthropic_tool_use_input_json("Read", &raw);
Expand All @@ -442,11 +459,10 @@ pub fn create_anthropic_sse_stream_from_responses(
yield Ok(Bytes::from(sse));
}
} else {
let accumulated = tool_args_by_index
let has_accumulated = tool_args_by_index
.get(&index)
.map(String::as_str)
.unwrap_or("");
if accumulated.is_empty() {
.is_some_and(|chunks| chunks.iter().any(|chunk| !chunk.is_empty()));
if !has_accumulated {
if let Some(arguments) = data
.get("arguments")
.and_then(|value| value.as_str())
Expand Down Expand Up @@ -732,6 +748,27 @@ data: {\"response\":{\"status\":\"completed\"}}\r\n\
assert!(merged.contains("\"stop_reason\":\"tool_use\""));
}

#[tokio::test]
async fn non_read_tool_empty_delta_still_uses_done_arguments() {
let input = concat!(
"event: response.created\n",
"data: {\"response\":{\"id\":\"resp_tool\",\"model\":\"gpt-5.4\"}}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_1\",\"type\":\"function_call\",\"call_id\":\"call_1\",\"name\":\"get_weather\"}}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_1\",\"delta\":\"\"}\n\n",
"event: response.function_call_arguments.done\n",
"data: {\"item_id\":\"fc_1\",\"arguments\":\"{\\\"city\\\":\\\"Tokyo\\\"}\"}\n\n",
"event: response.completed\n",
"data: {\"response\":{\"status\":\"completed\"}}\n\n"
);

let (merged, _) = collect_stream(vec![Bytes::from(input)]).await;

assert!(merged.contains("\\\"city\\\":\\\"Tokyo\\\""));
assert!(merged.contains("\"stop_reason\":\"tool_use\""));
}

#[tokio::test]
async fn read_tool_arguments_are_buffered_and_sanitized() {
let input = concat!(
Expand All @@ -754,6 +791,102 @@ data: {\"response\":{\"status\":\"completed\"}}\r\n\
assert!(!merged.contains("\\\"pages\\\":\\\"\\\""));
}

#[tokio::test]
async fn read_tool_duplicate_start_preserves_buffered_arguments() {
let input = concat!(
"event: response.created\n",
"data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0,\\\"pages\\\":\\\"\\\"}\"}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n",
"event: response.function_call_arguments.done\n",
"data: {\"item_id\":\"fc_read\"}\n\n",
"event: response.completed\n",
"data: {\"response\":{\"status\":\"completed\"}}\n\n"
);

let (merged, _) = collect_stream(vec![Bytes::from(input)]).await;

assert_eq!(merged.matches("event: content_block_start").count(), 1);
assert_eq!(merged.matches("event: content_block_stop").count(), 1);
assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0}"));
assert!(!merged.contains("\\\"pages\\\":\\\"\\\""));
}

#[tokio::test]
async fn read_tool_split_argument_deltas_are_still_joined() {
let input = concat!(
"event: response.created\n",
"data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\"}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"\\\"offset\\\":0,\\\"pages\\\":\\\"\\\"}\"}\n\n",
"event: response.function_call_arguments.done\n",
"data: {\"item_id\":\"fc_read\"}\n\n",
"event: response.completed\n",
"data: {\"response\":{\"status\":\"completed\"}}\n\n"
);

let (merged, _) = collect_stream(vec![Bytes::from(input)]).await;

assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0}"));
assert!(!merged.contains("\\\"pages\\\":\\\"\\\""));
}

#[tokio::test]
async fn read_tool_snapshot_argument_deltas_use_latest_complete_json() {
let input = concat!(
"event: response.created\n",
"data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}\"}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}\"}\n\n",
"event: response.function_call_arguments.done\n",
"data: {\"item_id\":\"fc_read\"}\n\n",
"event: response.completed\n",
"data: {\"response\":{\"status\":\"completed\"}}\n\n"
);

let (merged, _) = collect_stream(vec![Bytes::from(input)]).await;

assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320}"));
assert!(!merged.contains("\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}{"));
assert!(!merged.contains("\\\"pages\\\":\\\"\\\""));
}

#[tokio::test]
async fn read_tool_incomplete_argument_deltas_are_not_treated_as_snapshots() {
let input = concat!(
"event: response.created\n",
"data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n",
"event: response.output_item.added\n",
"data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":\"}\n\n",
"event: response.function_call_arguments.delta\n",
"data: {\"item_id\":\"fc_read\",\"delta\":\"2000\"}\n\n",
"event: response.function_call_arguments.done\n",
"data: {\"item_id\":\"fc_read\"}\n\n",
"event: response.completed\n",
"data: {\"response\":{\"status\":\"completed\"}}\n\n"
);

let (merged, _) = collect_stream(vec![Bytes::from(input)]).await;

assert!(merged.contains(
"\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000"
));
}

#[tokio::test]
async fn tool_start_closes_open_text_block_first() {
let input = concat!(
Expand Down
Loading