diff --git a/open-sse/config/providerModels.js b/open-sse/config/providerModels.js index 68285d36ff..e0dbd7589d 100644 --- a/open-sse/config/providerModels.js +++ b/open-sse/config/providerModels.js @@ -213,6 +213,7 @@ export const PROVIDER_MODELS = { ], mmf: [ // MiMo Free — free channel only serves mimo-auto { id: "mimo-auto", name: "MiMo Auto" }, + { id: "mimo-auto-claude", name: "MiMo Auto (Claude Compatible)", upstreamModelId: "mimo-auto" }, ], cl: [ // Cline diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index 65281cfae2..1cbbef2475 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -15,7 +15,7 @@ import { getExecutor } from "../executors/index.js"; import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js"; import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js"; import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js"; -import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js"; +import { handleStreamingResponse, buildOnStreamComplete, handleClaudeJsonAsStreamingResponse, handleOpenAIJsonAsClaudeStreamingResponse } from "./chatCore/streamingHandler.js"; import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.js"; import { dedupeTools } from "../utils/toolDeduper.js"; import { injectCaveman } from "../rtk/caveman.js"; @@ -60,7 +60,16 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI; const providerRequiresStreaming = provider === "openai" || provider === "codex" || provider === "commandcode"; - let stream = providerRequiresStreaming ? true : (body.stream !== false); + const mmfClaudeSyntheticNeeded = provider === "mmf" && sourceFormat === FORMATS.CLAUDE && body.stream === true && ( + (Array.isArray(body.tools) && body.tools.length >= 20) || + (Array.isArray(body.messages) && body.messages.length >= 12) || + JSON.stringify(body).length >= 200000 + ); + const synthesizeClaudeStream = sourceFormat === FORMATS.CLAUDE && body.stream === true && ( + (provider === "xiaomi-tokenplan" && modelTargetFormat === FORMATS.CLAUDE) || + mmfClaudeSyntheticNeeded + ); + let stream = synthesizeClaudeStream ? false : (providerRequiresStreaming ? true : (body.stream !== false)); // DeepSeek-TUI: interactive TUI panel sends stream:true and needs SSE. // Non-interactive mode (-p flag) sends without stream and can't parse SSE. @@ -268,6 +277,16 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred if (result) { streamController.handleComplete(); return result; } } + // Streaming response synthesized from a non-streaming upstream response. + // This keeps Claude Code on SSE while avoiding flaky provider stream sockets. + const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx }); + if (synthesizeClaudeStream) { + if (targetFormat === FORMATS.OPENAI) { + return handleOpenAIJsonAsClaudeStreamingResponse({ ...sharedCtx, providerResponse, reqLogger, streamController, onStreamComplete, trackDone, appendLog }); + } + return handleClaudeJsonAsStreamingResponse({ ...sharedCtx, providerResponse, reqLogger, streamController, onStreamComplete, trackDone, appendLog }); + } + // True non-streaming response if (!stream) { const result = await handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, toolNameMap, trackDone, appendLog }); @@ -276,7 +295,6 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred } // Streaming response - const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx }); return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete }); } diff --git a/open-sse/handlers/chatCore/streamingHandler.js b/open-sse/handlers/chatCore/streamingHandler.js index f96bb9bd06..e154a06fe1 100644 --- a/open-sse/handlers/chatCore/streamingHandler.js +++ b/open-sse/handlers/chatCore/streamingHandler.js @@ -8,6 +8,171 @@ import { buildAbortedResponsesTerminalBytes } from "../../utils/responsesStreamH import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js"; import { saveRequestDetail } from "@/lib/usageDb.js"; +const encoder = new TextEncoder(); + +function sseEvent(event, data) { + return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; +} + +function normalizeClaudeUsage(usage = {}) { + const inputTokens = usage.input_tokens || usage.prompt_tokens || 0; + const outputTokens = usage.output_tokens || usage.completion_tokens || 0; + const cacheRead = usage.cache_read_input_tokens || 0; + const cacheCreate = usage.cache_creation_input_tokens || 0; + const normalized = { + prompt_tokens: inputTokens, + completion_tokens: outputTokens, + total_tokens: inputTokens + outputTokens, + input_tokens: inputTokens, + output_tokens: outputTokens + }; + if (cacheRead > 0) normalized.cache_read_input_tokens = cacheRead; + if (cacheCreate > 0) normalized.cache_creation_input_tokens = cacheCreate; + return normalized; +} + +function openAIJsonToClaudeJson(responseBody, model) { + const choice = responseBody?.choices?.[0] || {}; + const message = choice.message || {}; + const content = []; + + if (message.content) { + content.push({ type: "text", text: String(message.content) }); + } + + if (Array.isArray(message.tool_calls)) { + for (const toolCall of message.tool_calls) { + const fn = toolCall?.function || {}; + let input = {}; + if (typeof fn.arguments === "string" && fn.arguments.trim()) { + try { input = JSON.parse(fn.arguments); } catch { input = { arguments: fn.arguments }; } + } else if (fn.arguments && typeof fn.arguments === "object") { + input = fn.arguments; + } + content.push({ + type: "tool_use", + id: toolCall.id || `toolu_${Date.now()}_${content.length}`, + name: fn.name || "tool", + input + }); + } + } + + const usage = responseBody?.usage || {}; + const promptTokens = usage.prompt_tokens || 0; + const completionTokens = usage.completion_tokens || 0; + const cacheRead = usage.cache_read_input_tokens || usage.prompt_tokens_details?.cached_tokens || 0; + const cacheCreate = usage.cache_creation_input_tokens || usage.prompt_tokens_details?.cache_creation_tokens || 0; + const claudeUsage = { + input_tokens: Math.max(0, promptTokens - cacheRead - cacheCreate), + output_tokens: completionTokens + }; + if (cacheRead > 0) claudeUsage.cache_read_input_tokens = cacheRead; + if (cacheCreate > 0) claudeUsage.cache_creation_input_tokens = cacheCreate; + + return { + id: responseBody?.id || `msg_${Date.now()}`, + type: "message", + role: "assistant", + model: responseBody?.model || model, + content, + stop_reason: choice.finish_reason === "tool_calls" ? "tool_use" : choice.finish_reason === "length" ? "max_tokens" : "end_turn", + stop_sequence: null, + usage: claudeUsage + }; +} + +function claudeJsonToSSEStream(responseBody, model) { + return new ReadableStream({ + start(controller) { + const id = responseBody?.id || `msg_${Date.now()}`; + const responseModel = responseBody?.model || model; + const usage = responseBody?.usage || {}; + const content = Array.isArray(responseBody?.content) ? responseBody.content : []; + + controller.enqueue(encoder.encode(sseEvent("message_start", { + type: "message_start", + message: { + id, + type: "message", + role: "assistant", + model: responseModel, + content: [], + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: usage.input_tokens || 0, + output_tokens: 0, + ...(usage.cache_read_input_tokens ? { cache_read_input_tokens: usage.cache_read_input_tokens } : {}), + ...(usage.cache_creation_input_tokens ? { cache_creation_input_tokens: usage.cache_creation_input_tokens } : {}) + } + } + }))); + + content.forEach((block, index) => { + if (block?.type === "text") { + controller.enqueue(encoder.encode(sseEvent("content_block_start", { + type: "content_block_start", + index, + content_block: { type: "text", text: "" } + }))); + if (block.text) { + controller.enqueue(encoder.encode(sseEvent("content_block_delta", { + type: "content_block_delta", + index, + delta: { type: "text_delta", text: block.text } + }))); + } + controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index }))); + } else if (block?.type === "thinking") { + controller.enqueue(encoder.encode(sseEvent("content_block_start", { + type: "content_block_start", + index, + content_block: { type: "thinking", thinking: "", signature: block.signature || "" } + }))); + if (block.thinking) { + controller.enqueue(encoder.encode(sseEvent("content_block_delta", { + type: "content_block_delta", + index, + delta: { type: "thinking_delta", thinking: block.thinking } + }))); + } + controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index }))); + } else if (block?.type === "tool_use") { + controller.enqueue(encoder.encode(sseEvent("content_block_start", { + type: "content_block_start", + index, + content_block: { type: "tool_use", id: block.id, name: block.name, input: {} } + }))); + const inputJson = JSON.stringify(block.input || {}); + if (inputJson && inputJson !== "{}") { + controller.enqueue(encoder.encode(sseEvent("content_block_delta", { + type: "content_block_delta", + index, + delta: { type: "input_json_delta", partial_json: inputJson } + }))); + } + controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index }))); + } + }); + + const stopReason = responseBody?.stop_reason || "end_turn"; + controller.enqueue(encoder.encode(sseEvent("message_delta", { + type: "message_delta", + delta: { stop_reason: stopReason, stop_sequence: responseBody?.stop_sequence || null }, + usage: { + output_tokens: usage.output_tokens || 0, + ...(usage.input_tokens ? { input_tokens: usage.input_tokens } : {}), + ...(usage.cache_read_input_tokens ? { cache_read_input_tokens: usage.cache_read_input_tokens } : {}), + ...(usage.cache_creation_input_tokens ? { cache_creation_input_tokens: usage.cache_creation_input_tokens } : {}) + } + }))); + controller.enqueue(encoder.encode(sseEvent("message_stop", { type: "message_stop" }))); + controller.close(); + } + }); +} + const SSE_HEADERS = { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", @@ -105,3 +270,112 @@ export function buildOnStreamComplete({ provider, model, connectionId, apiKey, r return { onStreamComplete, streamDetailId }; } +/** + * Handle a non-streaming Claude JSON provider response while preserving a Claude SSE client contract. + * Used for providers whose upstream streaming path is unstable but whose non-streaming endpoint is reliable. + */ +export async function handleClaudeJsonAsStreamingResponse({ providerResponse, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, streamController, onStreamComplete, trackDone, appendLog }) { + let responseBody; + try { + responseBody = await providerResponse.json(); + } catch (err) { + streamController.handleError(err); + appendLog?.({ status: "FAILED 502" }); + return { + success: false, + response: new Response(JSON.stringify({ error: { message: `Invalid JSON response from ${provider}` } }), { + status: 502, + headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } + }) + }; + } + + reqLogger?.logProviderResponse?.(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody); + if (onRequestSuccess) await onRequestSuccess(); + + const usage = normalizeClaudeUsage(responseBody?.usage || {}); + appendLog?.({ tokens: usage, status: "200 OK" }); + + const textContent = Array.isArray(responseBody?.content) + ? responseBody.content.filter(b => b?.type === "text").map(b => b.text || "").join("") + : ""; + const thinking = Array.isArray(responseBody?.content) + ? responseBody.content.filter(b => b?.type === "thinking").map(b => b.thinking || "").join("") + : null; + onStreamComplete?.({ content: textContent, thinking }, usage, Date.now()); + trackDone?.(); + streamController.handleComplete(); + + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime }, + tokens: usage, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: responseBody || null, + response: { content: textContent, thinking, type: "streaming" }, + status: "success" + }, { endpoint: clientRawRequest?.endpoint || null })).catch(err => { + console.error("[RequestDetail] Failed to save synthetic stream:", err.message); + }); + + return { + success: true, + response: new Response(claudeJsonToSSEStream(responseBody, model), { headers: SSE_HEADERS }) + }; +} + + +/** + * Handle a non-streaming OpenAI JSON provider response while preserving a Claude SSE client contract. + * Used for MMF free Claude mode because its upstream SSE socket can stall after first bytes. + */ +export async function handleOpenAIJsonAsClaudeStreamingResponse({ providerResponse, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, streamController, onStreamComplete, trackDone, appendLog }) { + let responseBody; + try { + responseBody = await providerResponse.json(); + } catch (err) { + streamController.handleError(err); + appendLog?.({ status: "FAILED 502" }); + return { + success: false, + response: new Response(JSON.stringify({ error: { message: `Invalid JSON response from ${provider}` } }), { + status: 502, + headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } + }) + }; + } + + reqLogger?.logProviderResponse?.(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody); + if (onRequestSuccess) await onRequestSuccess(); + + const claudeBody = openAIJsonToClaudeJson(responseBody, model); + const usage = normalizeClaudeUsage(claudeBody.usage || {}); + appendLog?.({ tokens: usage, status: "200 OK" }); + + const textContent = Array.isArray(claudeBody.content) + ? claudeBody.content.filter(b => b?.type === "text").map(b => b.text || "").join("") + : ""; + const thinking = null; + onStreamComplete?.({ content: textContent, thinking }, usage, Date.now()); + trackDone?.(); + streamController.handleComplete(); + + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime }, + tokens: usage, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: responseBody || null, + response: { content: textContent, thinking, type: "streaming" }, + status: "success" + }, { endpoint: clientRawRequest?.endpoint || null })).catch(err => { + console.error("[RequestDetail] Failed to save synthetic OpenAI stream:", err.message); + }); + + return { + success: true, + response: new Response(claudeJsonToSSEStream(claudeBody, model), { headers: SSE_HEADERS }) + }; +} diff --git a/open-sse/translator/response/openai-to-claude.js b/open-sse/translator/response/openai-to-claude.js index 6a46551d65..34779ba7d6 100644 --- a/open-sse/translator/response/openai-to-claude.js +++ b/open-sse/translator/response/openai-to-claude.js @@ -40,6 +40,11 @@ function isValidPdfPagesArg(filePath, pages) { /^\d+(?:-\d+)?$/.test(pages); } +function shouldSuppressReasoningContent(state) { + const provider = String(state?.provider || "").toLowerCase(); + return provider.startsWith("opencode") || provider === "mimo-free" || provider === "mmf"; +} + // Helper: stop thinking block if started function stopThinkingBlock(state, results) { if (!state.thinkingBlockStarted) return; @@ -131,7 +136,7 @@ export function openaiToClaudeResponse(chunk, state) { // Handle reasoning_content (thinking) - GLM, DeepSeek, etc. const reasoningContent = delta?.reasoning_content || delta?.reasoning; - if (reasoningContent) { + if (reasoningContent && !shouldSuppressReasoningContent(state)) { stopTextBlock(state, results); if (!state.thinkingBlockStarted) {