Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions open-sse/config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ export const FETCH_TIMEOUT_MS = upstreamTimeouts.fetchTimeoutMs;
// idle for this duration. Override with STREAM_IDLE_TIMEOUT_MS env var.
export const STREAM_IDLE_TIMEOUT_MS = upstreamTimeouts.streamIdleTimeoutMs;

// Timeout for the first useful SSE event. Keep this much shorter than the
// post-start idle timeout so slow-thinking models can keep streaming after the
// first token, while dead 200 OK streams fail fast enough for combo fallback.
export const STREAM_READINESS_TIMEOUT_MS = upstreamTimeouts.streamReadinessTimeoutMs;

// Timeout for reading the full response body after headers arrive (ms).
// Prevents indefinite hangs when the upstream sends headers but stalls on the body.
// Defaults to FETCH_TIMEOUT_MS. Override with FETCH_BODY_TIMEOUT_MS env var.
Expand Down
99 changes: 84 additions & 15 deletions open-sse/handlers/chatCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from "../utils/stream.ts";
import { ensureStreamReadiness } from "../utils/streamReadiness.ts";
import { createStreamController, pipeWithDisconnect } from "../utils/streamHandler.ts";
import { createSseHeartbeatTransform } from "../utils/sseHeartbeat.ts";
import { addBufferToUsage, filterUsageForFormat, estimateUsage } from "../utils/usageTracking.ts";
import { refreshWithRetry } from "../services/tokenRefresh.ts";
import { createRequestLogger } from "../utils/requestLogger.ts";
Expand All @@ -32,6 +33,7 @@ import {
HTTP_STATUS,
PROVIDER_MAX_TOKENS,
STREAM_IDLE_TIMEOUT_MS,
STREAM_READINESS_TIMEOUT_MS,
} from "../config/constants.ts";
import {
classifyProviderError,
Expand Down Expand Up @@ -81,6 +83,11 @@ import {
} from "../utils/cacheControlPolicy.ts";
import { getCacheMetrics } from "@/lib/db/settings.ts";
import { getCachedSettings } from "@/lib/db/readCache";
import { applyCodexGlobalFastServiceTier } from "@/lib/providers/codexFastTier";
import {
getCodexRequestDefaults,
normalizeCodexServiceTier,
} from "@/lib/providers/requestDefaults";
import { cacheReasoningFromAssistantMessage } from "../services/reasoningCache.ts";
import { sanitizeOpenAITool } from "../services/toolSchemaSanitizer.ts";

Expand Down Expand Up @@ -588,14 +595,39 @@ function toFiniteNumberOrNull(value: unknown): number | null {
return null;
}

function isSemaphoreTimeoutError(error: unknown): error is Error & { code: string } {
function isSemaphoreCapacityError(error: unknown): error is Error & { code: string } {
return (
!!error &&
typeof error === "object" &&
(error as { code?: unknown }).code === "SEMAPHORE_TIMEOUT"
((error as { code?: unknown }).code === "SEMAPHORE_TIMEOUT" ||
(error as { code?: unknown }).code === "SEMAPHORE_QUEUE_FULL")
);
}

function createStreamingErrorResult(statusCode: number, message: string, code?: string) {
const errorBody = buildErrorBody(statusCode, message);
if (code) {
errorBody.error.code = code;
}

const body = `data: ${JSON.stringify(errorBody)}\n\ndata: [DONE]\n\n`;

return {
success: false as const,
status: statusCode,
error: message,
response: new Response(body, {
status: statusCode,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
}),
};
}

function wrapReadableStreamWithFinalize<T>(
readable: ReadableStream<T>,
finalize: () => void
Expand Down Expand Up @@ -966,6 +998,7 @@ export async function handleChatCore({
comboStepId = null,
comboExecutionKey = null,
disableEmergencyFallback = false,
cachedSettings = null,
}) {
let { provider, model, extendedContext } = modelInfo;
const requestedModel =
Expand All @@ -980,6 +1013,21 @@ export async function handleChatCore({
log?.info?.("STAGE_TRACE", `${traceId} ${label} t=${elapsed}ms${suffix}`);
};
let tokensCompressed: number | null = null;
let effectiveServiceTier: "standard" | "priority" = "standard";
const resolveEffectiveServiceTier = (requestBody?: unknown): "standard" | "priority" => {
if (provider !== "codex") return "standard";
const requestRecord =
requestBody && typeof requestBody === "object" && !Array.isArray(requestBody)
? (requestBody as Record<string, unknown>)
: {};
const rawServiceTier = requestRecord.service_tier;
if (typeof rawServiceTier === "string" && rawServiceTier.trim().length > 0) {
return normalizeCodexServiceTier(rawServiceTier) ? "priority" : "standard";
}
return getCodexRequestDefaults(credentials?.providerSpecificData).serviceTier === "priority"
? "priority"
: "standard";
};
const persistFailureUsage = (statusCode: number, errorCode?: string | null) => {
saveRequestUsage({
provider: provider || "unknown",
Expand All @@ -994,6 +1042,7 @@ export async function handleChatCore({
connectionId: connectionId || undefined,
apiKeyId: apiKeyInfo?.id || undefined,
apiKeyName: apiKeyInfo?.name || undefined,
serviceTier: effectiveServiceTier,
}).catch(() => {});
};

Expand Down Expand Up @@ -1083,7 +1132,9 @@ export async function handleChatCore({
| undefined)
: undefined;
const idempotentCost = idempotentUsage
? await calculateCost(provider, model, idempotentUsage as Record<string, number>)
? await calculateCost(provider, model, idempotentUsage as Record<string, number>, {
serviceTier: effectiveServiceTier,
})
: 0;
return {
success: true,
Expand Down Expand Up @@ -1401,7 +1452,9 @@ export async function handleChatCore({
nativeCodexPassthrough && isCompactResponsesEndpoint(endpointPath)
? false
: resolveStreamFlag(body?.stream, acceptHeader);
const settings = await getCachedSettings();
const settings = cachedSettings ?? (await getCachedSettings());
credentials = applyCodexGlobalFastServiceTier(provider, credentials, settings);
effectiveServiceTier = resolveEffectiveServiceTier(body);
setGeminiThoughtSignatureMode(settings.antigravitySignatureCacheMode);
const semanticCacheEnabled = settings.semanticCacheEnabled !== false;

Expand Down Expand Up @@ -1439,7 +1492,9 @@ export async function handleChatCore({
extractUsageFromResponse(cached as Record<string, unknown>, provider) ||
((cached as Record<string, unknown>)?.usage as Record<string, unknown> | undefined);
const cachedCost = cachedUsage
? await calculateCost(provider, model, cachedUsage as Record<string, number>)
? await calculateCost(provider, model, cachedUsage as Record<string, number>, {
serviceTier: effectiveServiceTier,
})
: 0;
persistAttemptLogs({
status: 200,
Expand Down Expand Up @@ -1894,7 +1949,8 @@ export async function handleChatCore({
effectiveModel ?? "",
{
input: tokensSaved,
}
},
{ serviceTier: effectiveServiceTier }
);
insertCompressionAnalyticsRow({
timestamp: new Date().toISOString(),
Expand Down Expand Up @@ -2943,6 +2999,7 @@ export async function handleChatCore({
providerUrl = result.url;
providerHeaders = result.headers;
finalBody = result.transformedBody;
effectiveServiceTier = resolveEffectiveServiceTier(finalBody);
claudePromptCacheLogMeta = buildClaudePromptCacheLogMeta(
targetFormat,
finalBody,
Expand All @@ -2966,16 +3023,13 @@ export async function handleChatCore({
);
} catch (error) {
trackPendingRequest(model, provider, connectionId, false);
if (isSemaphoreTimeoutError(error)) {
if (isSemaphoreCapacityError(error)) {
appendRequestLog({
model,
provider,
connectionId,
status: `FAILED ${error.code}`,
}).catch(() => {});
if (isCombo) {
throw error;
}
const failureMessage = error.message || "Semaphore timeout";
persistAttemptLogs({
status: HTTP_STATUS.RATE_LIMITED,
Expand All @@ -2986,7 +3040,14 @@ export async function handleChatCore({
cacheSource: "upstream",
});
persistFailureUsage(HTTP_STATUS.RATE_LIMITED, error.code);
return createErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage);
const result = stream
? createStreamingErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage, error.code)
: createErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage);
return {
...result,
errorType: "account_semaphore_capacity",
errorCode: error.code,
};
}
const failureStatus =
error.name === "AbortError"
Expand Down Expand Up @@ -3683,6 +3744,7 @@ export async function handleChatCore({
connectionId: connectionId || undefined,
apiKeyId: apiKeyInfo?.id || undefined,
apiKeyName: apiKeyInfo?.name || undefined,
serviceTier: effectiveServiceTier,
}).catch((err) => {
console.error("Failed to save usage stats:", err.message);
});
Expand Down Expand Up @@ -3815,7 +3877,9 @@ export async function handleChatCore({
(translatedResponse?.usage && typeof translatedResponse.usage === "object"
? translatedResponse.usage
: null);
const estimatedCost = responseUsage ? await calculateCost(provider, model, responseUsage) : 0;
const estimatedCost = responseUsage
? await calculateCost(provider, model, responseUsage, { serviceTier: effectiveServiceTier })
: 0;

if (postCallGuardrails.blocked) {
const guardrailMessage = postCallGuardrails.message || "Response blocked by guardrail";
Expand Down Expand Up @@ -3908,7 +3972,7 @@ export async function handleChatCore({

// Streaming response
const streamReadiness = await ensureStreamReadiness(providerResponse, {
timeoutMs: STREAM_IDLE_TIMEOUT_MS,
timeoutMs: STREAM_READINESS_TIMEOUT_MS,
provider,
model,
log,
Expand Down Expand Up @@ -3957,8 +4021,9 @@ export async function handleChatCore({

const responseHeaders = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
[OMNIROUTE_RESPONSE_HEADERS.cache]: "MISS",
...buildOmniRouteResponseMetaHeaders({
provider,
Expand Down Expand Up @@ -4045,6 +4110,7 @@ export async function handleChatCore({
connectionId: connectionId || undefined,
apiKeyId: apiKeyInfo?.id || undefined,
apiKeyName: apiKeyInfo?.name || undefined,
serviceTier: effectiveServiceTier,
}).catch((err) => {
console.error("Failed to save usage stats:", err.message);
});
Expand All @@ -4063,7 +4129,7 @@ export async function handleChatCore({
});

if (apiKeyInfo?.id && streamUsage) {
calculateCost(provider, model, streamUsage)
calculateCost(provider, model, streamUsage, { serviceTier: effectiveServiceTier })
.then((estimatedCost) => {
if (estimatedCost > 0) recordCost(apiKeyInfo.id, estimatedCost);
})
Expand Down Expand Up @@ -4202,6 +4268,9 @@ export async function handleChatCore({
} else {
finalStream = pipeWithDisconnect(providerResponse, transformStream, streamController);
}
finalStream = finalStream.pipeThrough(
createSseHeartbeatTransform({ signal: streamController.signal })
);

return {
success: true,
Expand Down
Loading