diff --git a/src-tauri/src/proxy/providers/streaming_responses.rs b/src-tauri/src/proxy/providers/streaming_responses.rs index 5c7de974..bae4755d 100644 --- a/src-tauri/src/proxy/providers/streaming_responses.rs +++ b/src-tauri/src/proxy/providers/streaming_responses.rs @@ -58,6 +58,20 @@ fn tool_item_key_from_event(data: &Value) -> Option { None } +fn buffered_tool_arguments(chunks: &[String]) -> String { + let joined = chunks.concat(); + if joined.is_empty() || matches!(serde_json::from_str::(&joined), Ok(Value::Object(_))) { + return joined; + } + + chunks + .iter() + .rev() + .find(|chunk| matches!(serde_json::from_str::(chunk), Ok(Value::Object(_)))) + .cloned() + .unwrap_or(joined) +} + #[inline] fn resolve_content_index( data: &Value, @@ -102,7 +116,7 @@ pub fn create_anthropic_sse_stream_from_responses( let mut current_text_index: Option = None; let mut tool_index_by_item_id: HashMap = HashMap::new(); let mut tool_name_by_index: HashMap = HashMap::new(); - let mut tool_args_by_index: HashMap = HashMap::new(); + let mut tool_args_by_index: HashMap> = HashMap::new(); let mut last_tool_index: Option = None; tokio::pin!(stream); @@ -326,7 +340,7 @@ pub fn create_anthropic_sse_stream_from_responses( continue; } - tool_args_by_index.insert(index, String::new()); + tool_args_by_index.insert(index, Vec::new()); let event = json!({ "type": "content_block_start", @@ -384,7 +398,10 @@ pub fn create_anthropic_sse_stream_from_responses( open_indices.insert(index); } - tool_args_by_index.entry(index).or_default().push_str(delta); + tool_args_by_index + .entry(index) + .or_default() + .push(delta.to_string()); if tool_name_by_index.get(&index).map(String::as_str) == Some("Read") { continue; } @@ -425,7 +442,7 @@ pub fn create_anthropic_sse_stream_from_responses( .unwrap_or_else(|| { tool_args_by_index .get(&index) - .cloned() + .map(|chunks| buffered_tool_arguments(chunks)) .unwrap_or_default() }); let sanitized = sanitize_anthropic_tool_use_input_json("Read", &raw); @@ -442,11 +459,10 @@ pub fn create_anthropic_sse_stream_from_responses( yield Ok(Bytes::from(sse)); } } else { - let accumulated = tool_args_by_index + let has_accumulated = tool_args_by_index .get(&index) - .map(String::as_str) - .unwrap_or(""); - if accumulated.is_empty() { + .is_some_and(|chunks| chunks.iter().any(|chunk| !chunk.is_empty())); + if !has_accumulated { if let Some(arguments) = data .get("arguments") .and_then(|value| value.as_str()) @@ -732,6 +748,27 @@ data: {\"response\":{\"status\":\"completed\"}}\r\n\ assert!(merged.contains("\"stop_reason\":\"tool_use\"")); } + #[tokio::test] + async fn non_read_tool_empty_delta_still_uses_done_arguments() { + let input = concat!( + "event: response.created\n", + "data: {\"response\":{\"id\":\"resp_tool\",\"model\":\"gpt-5.4\"}}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_1\",\"type\":\"function_call\",\"call_id\":\"call_1\",\"name\":\"get_weather\"}}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_1\",\"delta\":\"\"}\n\n", + "event: response.function_call_arguments.done\n", + "data: {\"item_id\":\"fc_1\",\"arguments\":\"{\\\"city\\\":\\\"Tokyo\\\"}\"}\n\n", + "event: response.completed\n", + "data: {\"response\":{\"status\":\"completed\"}}\n\n" + ); + + let (merged, _) = collect_stream(vec![Bytes::from(input)]).await; + + assert!(merged.contains("\\\"city\\\":\\\"Tokyo\\\"")); + assert!(merged.contains("\"stop_reason\":\"tool_use\"")); + } + #[tokio::test] async fn read_tool_arguments_are_buffered_and_sanitized() { let input = concat!( @@ -754,6 +791,102 @@ data: {\"response\":{\"status\":\"completed\"}}\r\n\ assert!(!merged.contains("\\\"pages\\\":\\\"\\\"")); } + #[tokio::test] + async fn read_tool_duplicate_start_preserves_buffered_arguments() { + let input = concat!( + "event: response.created\n", + "data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0,\\\"pages\\\":\\\"\\\"}\"}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n", + "event: response.function_call_arguments.done\n", + "data: {\"item_id\":\"fc_read\"}\n\n", + "event: response.completed\n", + "data: {\"response\":{\"status\":\"completed\"}}\n\n" + ); + + let (merged, _) = collect_stream(vec![Bytes::from(input)]).await; + + assert_eq!(merged.matches("event: content_block_start").count(), 1); + assert_eq!(merged.matches("event: content_block_stop").count(), 1); + assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0}")); + assert!(!merged.contains("\\\"pages\\\":\\\"\\\"")); + } + + #[tokio::test] + async fn read_tool_split_argument_deltas_are_still_joined() { + let input = concat!( + "event: response.created\n", + "data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\"}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"\\\"offset\\\":0,\\\"pages\\\":\\\"\\\"}\"}\n\n", + "event: response.function_call_arguments.done\n", + "data: {\"item_id\":\"fc_read\"}\n\n", + "event: response.completed\n", + "data: {\"response\":{\"status\":\"completed\"}}\n\n" + ); + + let (merged, _) = collect_stream(vec![Bytes::from(input)]).await; + + assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":0}")); + assert!(!merged.contains("\\\"pages\\\":\\\"\\\"")); + } + + #[tokio::test] + async fn read_tool_snapshot_argument_deltas_use_latest_complete_json() { + let input = concat!( + "event: response.created\n", + "data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}\"}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}\"}\n\n", + "event: response.function_call_arguments.done\n", + "data: {\"item_id\":\"fc_read\"}\n\n", + "event: response.completed\n", + "data: {\"response\":{\"status\":\"completed\"}}\n\n" + ); + + let (merged, _) = collect_stream(vec![Bytes::from(input)]).await; + + assert!(merged.contains("\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000,\\\"offset\\\":320}")); + assert!(!merged.contains("\\\"offset\\\":320,\\\"pages\\\":\\\"\\\"}{")); + assert!(!merged.contains("\\\"pages\\\":\\\"\\\"")); + } + + #[tokio::test] + async fn read_tool_incomplete_argument_deltas_are_not_treated_as_snapshots() { + let input = concat!( + "event: response.created\n", + "data: {\"response\":{\"id\":\"resp_read\",\"model\":\"gpt-5.4\"}}\n\n", + "event: response.output_item.added\n", + "data: {\"item\":{\"id\":\"fc_read\",\"type\":\"function_call\",\"call_id\":\"call_read\",\"name\":\"Read\"}}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":\"}\n\n", + "event: response.function_call_arguments.delta\n", + "data: {\"item_id\":\"fc_read\",\"delta\":\"2000\"}\n\n", + "event: response.function_call_arguments.done\n", + "data: {\"item_id\":\"fc_read\"}\n\n", + "event: response.completed\n", + "data: {\"response\":{\"status\":\"completed\"}}\n\n" + ); + + let (merged, _) = collect_stream(vec![Bytes::from(input)]).await; + + assert!(merged.contains( + "\"partial_json\":\"{\\\"file_path\\\":\\\"/tmp/demo.py\\\",\\\"limit\\\":2000" + )); + } + #[tokio::test] async fn tool_start_closes_open_text_block_first() { let input = concat!(