Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions open-sse/config/providerModels.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ export const PROVIDER_MODELS = {
],
mmf: [ // MiMo Free — free channel only serves mimo-auto
{ id: "mimo-auto", name: "MiMo Auto" },
{ id: "mimo-auto-claude", name: "MiMo Auto (Claude Compatible)", upstreamModelId: "mimo-auto" },
],

cl: [ // Cline
Expand Down
24 changes: 21 additions & 3 deletions open-sse/handlers/chatCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { getExecutor } from "../executors/index.js";
import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js";
import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js";
import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js";
import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js";
import { handleStreamingResponse, buildOnStreamComplete, handleClaudeJsonAsStreamingResponse, handleOpenAIJsonAsClaudeStreamingResponse } from "./chatCore/streamingHandler.js";
import { detectClientTool, isNativePassthrough } from "../utils/clientDetector.js";
import { dedupeTools } from "../utils/toolDeduper.js";
import { injectCaveman } from "../rtk/caveman.js";
Expand Down Expand Up @@ -60,7 +60,16 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred

const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI;
const providerRequiresStreaming = provider === "openai" || provider === "codex" || provider === "commandcode";
let stream = providerRequiresStreaming ? true : (body.stream !== false);
const mmfClaudeSyntheticNeeded = provider === "mmf" && sourceFormat === FORMATS.CLAUDE && body.stream === true && (
(Array.isArray(body.tools) && body.tools.length >= 20) ||
(Array.isArray(body.messages) && body.messages.length >= 12) ||
JSON.stringify(body).length >= 200000
);
const synthesizeClaudeStream = sourceFormat === FORMATS.CLAUDE && body.stream === true && (
(provider === "xiaomi-tokenplan" && modelTargetFormat === FORMATS.CLAUDE) ||
mmfClaudeSyntheticNeeded
);
let stream = synthesizeClaudeStream ? false : (providerRequiresStreaming ? true : (body.stream !== false));

// DeepSeek-TUI: interactive TUI panel sends stream:true and needs SSE.
// Non-interactive mode (-p flag) sends without stream and can't parse SSE.
Expand Down Expand Up @@ -268,6 +277,16 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
if (result) { streamController.handleComplete(); return result; }
}

// Streaming response synthesized from a non-streaming upstream response.
// This keeps Claude Code on SSE while avoiding flaky provider stream sockets.
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
if (synthesizeClaudeStream) {
if (targetFormat === FORMATS.OPENAI) {
return handleOpenAIJsonAsClaudeStreamingResponse({ ...sharedCtx, providerResponse, reqLogger, streamController, onStreamComplete, trackDone, appendLog });
}
return handleClaudeJsonAsStreamingResponse({ ...sharedCtx, providerResponse, reqLogger, streamController, onStreamComplete, trackDone, appendLog });
}

// True non-streaming response
if (!stream) {
const result = await handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, toolNameMap, trackDone, appendLog });
Expand All @@ -276,7 +295,6 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
}

// Streaming response
const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete });
}

Expand Down
274 changes: 274 additions & 0 deletions open-sse/handlers/chatCore/streamingHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,171 @@ import { buildAbortedResponsesTerminalBytes } from "../../utils/responsesStreamH
import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
import { saveRequestDetail } from "@/lib/usageDb.js";

const encoder = new TextEncoder();

function sseEvent(event, data) {
return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
}

function normalizeClaudeUsage(usage = {}) {
const inputTokens = usage.input_tokens || usage.prompt_tokens || 0;
const outputTokens = usage.output_tokens || usage.completion_tokens || 0;
const cacheRead = usage.cache_read_input_tokens || 0;
const cacheCreate = usage.cache_creation_input_tokens || 0;
const normalized = {
prompt_tokens: inputTokens,
completion_tokens: outputTokens,
total_tokens: inputTokens + outputTokens,
input_tokens: inputTokens,
output_tokens: outputTokens
};
if (cacheRead > 0) normalized.cache_read_input_tokens = cacheRead;
if (cacheCreate > 0) normalized.cache_creation_input_tokens = cacheCreate;
return normalized;
}

function openAIJsonToClaudeJson(responseBody, model) {
const choice = responseBody?.choices?.[0] || {};
const message = choice.message || {};
const content = [];

if (message.content) {
content.push({ type: "text", text: String(message.content) });
}

if (Array.isArray(message.tool_calls)) {
for (const toolCall of message.tool_calls) {
const fn = toolCall?.function || {};
let input = {};
if (typeof fn.arguments === "string" && fn.arguments.trim()) {
try { input = JSON.parse(fn.arguments); } catch { input = { arguments: fn.arguments }; }
} else if (fn.arguments && typeof fn.arguments === "object") {
input = fn.arguments;
}
content.push({
type: "tool_use",
id: toolCall.id || `toolu_${Date.now()}_${content.length}`,
name: fn.name || "tool",
input
});
}
}

const usage = responseBody?.usage || {};
const promptTokens = usage.prompt_tokens || 0;
const completionTokens = usage.completion_tokens || 0;
const cacheRead = usage.cache_read_input_tokens || usage.prompt_tokens_details?.cached_tokens || 0;
const cacheCreate = usage.cache_creation_input_tokens || usage.prompt_tokens_details?.cache_creation_tokens || 0;
const claudeUsage = {
input_tokens: Math.max(0, promptTokens - cacheRead - cacheCreate),
output_tokens: completionTokens
};
if (cacheRead > 0) claudeUsage.cache_read_input_tokens = cacheRead;
if (cacheCreate > 0) claudeUsage.cache_creation_input_tokens = cacheCreate;

return {
id: responseBody?.id || `msg_${Date.now()}`,
type: "message",
role: "assistant",
model: responseBody?.model || model,
content,
stop_reason: choice.finish_reason === "tool_calls" ? "tool_use" : choice.finish_reason === "length" ? "max_tokens" : "end_turn",
stop_sequence: null,
usage: claudeUsage
};
}

function claudeJsonToSSEStream(responseBody, model) {
return new ReadableStream({
start(controller) {
const id = responseBody?.id || `msg_${Date.now()}`;
const responseModel = responseBody?.model || model;
const usage = responseBody?.usage || {};
const content = Array.isArray(responseBody?.content) ? responseBody.content : [];

controller.enqueue(encoder.encode(sseEvent("message_start", {
type: "message_start",
message: {
id,
type: "message",
role: "assistant",
model: responseModel,
content: [],
stop_reason: null,
stop_sequence: null,
usage: {
input_tokens: usage.input_tokens || 0,
output_tokens: 0,
...(usage.cache_read_input_tokens ? { cache_read_input_tokens: usage.cache_read_input_tokens } : {}),
...(usage.cache_creation_input_tokens ? { cache_creation_input_tokens: usage.cache_creation_input_tokens } : {})
}
}
})));

content.forEach((block, index) => {
if (block?.type === "text") {
controller.enqueue(encoder.encode(sseEvent("content_block_start", {
type: "content_block_start",
index,
content_block: { type: "text", text: "" }
})));
if (block.text) {
controller.enqueue(encoder.encode(sseEvent("content_block_delta", {
type: "content_block_delta",
index,
delta: { type: "text_delta", text: block.text }
})));
}
controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index })));
} else if (block?.type === "thinking") {
controller.enqueue(encoder.encode(sseEvent("content_block_start", {
type: "content_block_start",
index,
content_block: { type: "thinking", thinking: "", signature: block.signature || "" }
})));
if (block.thinking) {
controller.enqueue(encoder.encode(sseEvent("content_block_delta", {
type: "content_block_delta",
index,
delta: { type: "thinking_delta", thinking: block.thinking }
})));
}
controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index })));
} else if (block?.type === "tool_use") {
controller.enqueue(encoder.encode(sseEvent("content_block_start", {
type: "content_block_start",
index,
content_block: { type: "tool_use", id: block.id, name: block.name, input: {} }
})));
const inputJson = JSON.stringify(block.input || {});
if (inputJson && inputJson !== "{}") {
controller.enqueue(encoder.encode(sseEvent("content_block_delta", {
type: "content_block_delta",
index,
delta: { type: "input_json_delta", partial_json: inputJson }
})));
}
controller.enqueue(encoder.encode(sseEvent("content_block_stop", { type: "content_block_stop", index })));
}
});

const stopReason = responseBody?.stop_reason || "end_turn";
controller.enqueue(encoder.encode(sseEvent("message_delta", {
type: "message_delta",
delta: { stop_reason: stopReason, stop_sequence: responseBody?.stop_sequence || null },
usage: {
output_tokens: usage.output_tokens || 0,
...(usage.input_tokens ? { input_tokens: usage.input_tokens } : {}),
...(usage.cache_read_input_tokens ? { cache_read_input_tokens: usage.cache_read_input_tokens } : {}),
...(usage.cache_creation_input_tokens ? { cache_creation_input_tokens: usage.cache_creation_input_tokens } : {})
}
})));
controller.enqueue(encoder.encode(sseEvent("message_stop", { type: "message_stop" })));
controller.close();
}
});
}

const SSE_HEADERS = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Expand Down Expand Up @@ -105,3 +270,112 @@ export function buildOnStreamComplete({ provider, model, connectionId, apiKey, r

return { onStreamComplete, streamDetailId };
}
/**
* Handle a non-streaming Claude JSON provider response while preserving a Claude SSE client contract.
* Used for providers whose upstream streaming path is unstable but whose non-streaming endpoint is reliable.
*/
export async function handleClaudeJsonAsStreamingResponse({ providerResponse, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, streamController, onStreamComplete, trackDone, appendLog }) {
let responseBody;
try {
responseBody = await providerResponse.json();
} catch (err) {
streamController.handleError(err);
appendLog?.({ status: "FAILED 502" });
return {
success: false,
response: new Response(JSON.stringify({ error: { message: `Invalid JSON response from ${provider}` } }), {
status: 502,
headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
})
};
}

reqLogger?.logProviderResponse?.(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody);
if (onRequestSuccess) await onRequestSuccess();

const usage = normalizeClaudeUsage(responseBody?.usage || {});
appendLog?.({ tokens: usage, status: "200 OK" });

const textContent = Array.isArray(responseBody?.content)
? responseBody.content.filter(b => b?.type === "text").map(b => b.text || "").join("")
: "";
const thinking = Array.isArray(responseBody?.content)
? responseBody.content.filter(b => b?.type === "thinking").map(b => b.thinking || "").join("")
: null;
onStreamComplete?.({ content: textContent, thinking }, usage, Date.now());
trackDone?.();
streamController.handleComplete();

saveRequestDetail(buildRequestDetail({
provider, model, connectionId,
latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime },
tokens: usage,
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
providerResponse: responseBody || null,
response: { content: textContent, thinking, type: "streaming" },
status: "success"
}, { endpoint: clientRawRequest?.endpoint || null })).catch(err => {
console.error("[RequestDetail] Failed to save synthetic stream:", err.message);
});

return {
success: true,
response: new Response(claudeJsonToSSEStream(responseBody, model), { headers: SSE_HEADERS })
};
}


/**
* Handle a non-streaming OpenAI JSON provider response while preserving a Claude SSE client contract.
* Used for MMF free Claude mode because its upstream SSE socket can stall after first bytes.
*/
export async function handleOpenAIJsonAsClaudeStreamingResponse({ providerResponse, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, streamController, onStreamComplete, trackDone, appendLog }) {
let responseBody;
try {
responseBody = await providerResponse.json();
} catch (err) {
streamController.handleError(err);
appendLog?.({ status: "FAILED 502" });
return {
success: false,
response: new Response(JSON.stringify({ error: { message: `Invalid JSON response from ${provider}` } }), {
status: 502,
headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
})
};
}

reqLogger?.logProviderResponse?.(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody);
if (onRequestSuccess) await onRequestSuccess();

const claudeBody = openAIJsonToClaudeJson(responseBody, model);
const usage = normalizeClaudeUsage(claudeBody.usage || {});
appendLog?.({ tokens: usage, status: "200 OK" });

const textContent = Array.isArray(claudeBody.content)
? claudeBody.content.filter(b => b?.type === "text").map(b => b.text || "").join("")
: "";
const thinking = null;
onStreamComplete?.({ content: textContent, thinking }, usage, Date.now());
trackDone?.();
streamController.handleComplete();

saveRequestDetail(buildRequestDetail({
provider, model, connectionId,
latency: { ttft: Date.now() - requestStartTime, total: Date.now() - requestStartTime },
tokens: usage,
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
providerResponse: responseBody || null,
response: { content: textContent, thinking, type: "streaming" },
status: "success"
}, { endpoint: clientRawRequest?.endpoint || null })).catch(err => {
console.error("[RequestDetail] Failed to save synthetic OpenAI stream:", err.message);
});

return {
success: true,
response: new Response(claudeJsonToSSEStream(claudeBody, model), { headers: SSE_HEADERS })
};
}
7 changes: 6 additions & 1 deletion open-sse/translator/response/openai-to-claude.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ function isValidPdfPagesArg(filePath, pages) {
/^\d+(?:-\d+)?$/.test(pages);
}

function shouldSuppressReasoningContent(state) {
const provider = String(state?.provider || "").toLowerCase();
return provider.startsWith("opencode") || provider === "mimo-free" || provider === "mmf";
}

// Helper: stop thinking block if started
function stopThinkingBlock(state, results) {
if (!state.thinkingBlockStarted) return;
Expand Down Expand Up @@ -131,7 +136,7 @@ export function openaiToClaudeResponse(chunk, state) {

// Handle reasoning_content (thinking) - GLM, DeepSeek, etc.
const reasoningContent = delta?.reasoning_content || delta?.reasoning;
if (reasoningContent) {
if (reasoningContent && !shouldSuppressReasoningContent(state)) {
stopTextBlock(state, results);

if (!state.thinkingBlockStarted) {
Expand Down