diff --git a/open-sse/config/constants.ts b/open-sse/config/constants.ts index 462dcd481..6faf2df05 100644 --- a/open-sse/config/constants.ts +++ b/open-sse/config/constants.ts @@ -17,6 +17,11 @@ export const FETCH_TIMEOUT_MS = upstreamTimeouts.fetchTimeoutMs; // idle for this duration. Override with STREAM_IDLE_TIMEOUT_MS env var. export const STREAM_IDLE_TIMEOUT_MS = upstreamTimeouts.streamIdleTimeoutMs; +// Timeout for the first useful SSE event. Keep this much shorter than the +// post-start idle timeout so slow-thinking models can keep streaming after the +// first token, while dead 200 OK streams fail fast enough for combo fallback. +export const STREAM_READINESS_TIMEOUT_MS = upstreamTimeouts.streamReadinessTimeoutMs; + // Timeout for reading the full response body after headers arrive (ms). // Prevents indefinite hangs when the upstream sends headers but stalls on the body. // Defaults to FETCH_TIMEOUT_MS. Override with FETCH_BODY_TIMEOUT_MS env var. diff --git a/open-sse/handlers/chatCore.ts b/open-sse/handlers/chatCore.ts index 04e2159af..e6c4c03b1 100644 --- a/open-sse/handlers/chatCore.ts +++ b/open-sse/handlers/chatCore.ts @@ -10,6 +10,7 @@ import { } from "../utils/stream.ts"; import { ensureStreamReadiness } from "../utils/streamReadiness.ts"; import { createStreamController, pipeWithDisconnect } from "../utils/streamHandler.ts"; +import { createSseHeartbeatTransform } from "../utils/sseHeartbeat.ts"; import { addBufferToUsage, filterUsageForFormat, estimateUsage } from "../utils/usageTracking.ts"; import { refreshWithRetry } from "../services/tokenRefresh.ts"; import { createRequestLogger } from "../utils/requestLogger.ts"; @@ -32,6 +33,7 @@ import { HTTP_STATUS, PROVIDER_MAX_TOKENS, STREAM_IDLE_TIMEOUT_MS, + STREAM_READINESS_TIMEOUT_MS, } from "../config/constants.ts"; import { classifyProviderError, @@ -81,6 +83,11 @@ import { } from "../utils/cacheControlPolicy.ts"; import { getCacheMetrics } from "@/lib/db/settings.ts"; import { getCachedSettings } from "@/lib/db/readCache"; +import { applyCodexGlobalFastServiceTier } from "@/lib/providers/codexFastTier"; +import { + getCodexRequestDefaults, + normalizeCodexServiceTier, +} from "@/lib/providers/requestDefaults"; import { cacheReasoningFromAssistantMessage } from "../services/reasoningCache.ts"; import { sanitizeOpenAITool } from "../services/toolSchemaSanitizer.ts"; @@ -588,14 +595,39 @@ function toFiniteNumberOrNull(value: unknown): number | null { return null; } -function isSemaphoreTimeoutError(error: unknown): error is Error & { code: string } { +function isSemaphoreCapacityError(error: unknown): error is Error & { code: string } { return ( !!error && typeof error === "object" && - (error as { code?: unknown }).code === "SEMAPHORE_TIMEOUT" + ((error as { code?: unknown }).code === "SEMAPHORE_TIMEOUT" || + (error as { code?: unknown }).code === "SEMAPHORE_QUEUE_FULL") ); } +function createStreamingErrorResult(statusCode: number, message: string, code?: string) { + const errorBody = buildErrorBody(statusCode, message); + if (code) { + errorBody.error.code = code; + } + + const body = `data: ${JSON.stringify(errorBody)}\n\ndata: [DONE]\n\n`; + + return { + success: false as const, + status: statusCode, + error: message, + response: new Response(body, { + status: statusCode, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }), + }; +} + function wrapReadableStreamWithFinalize( readable: ReadableStream, finalize: () => void @@ -966,6 +998,7 @@ export async function handleChatCore({ comboStepId = null, comboExecutionKey = null, disableEmergencyFallback = false, + cachedSettings = null, }) { let { provider, model, extendedContext } = modelInfo; const requestedModel = @@ -980,6 +1013,21 @@ export async function handleChatCore({ log?.info?.("STAGE_TRACE", `${traceId} ${label} t=${elapsed}ms${suffix}`); }; let tokensCompressed: number | null = null; + let effectiveServiceTier: "standard" | "priority" = "standard"; + const resolveEffectiveServiceTier = (requestBody?: unknown): "standard" | "priority" => { + if (provider !== "codex") return "standard"; + const requestRecord = + requestBody && typeof requestBody === "object" && !Array.isArray(requestBody) + ? (requestBody as Record) + : {}; + const rawServiceTier = requestRecord.service_tier; + if (typeof rawServiceTier === "string" && rawServiceTier.trim().length > 0) { + return normalizeCodexServiceTier(rawServiceTier) ? "priority" : "standard"; + } + return getCodexRequestDefaults(credentials?.providerSpecificData).serviceTier === "priority" + ? "priority" + : "standard"; + }; const persistFailureUsage = (statusCode: number, errorCode?: string | null) => { saveRequestUsage({ provider: provider || "unknown", @@ -994,6 +1042,7 @@ export async function handleChatCore({ connectionId: connectionId || undefined, apiKeyId: apiKeyInfo?.id || undefined, apiKeyName: apiKeyInfo?.name || undefined, + serviceTier: effectiveServiceTier, }).catch(() => {}); }; @@ -1083,7 +1132,9 @@ export async function handleChatCore({ | undefined) : undefined; const idempotentCost = idempotentUsage - ? await calculateCost(provider, model, idempotentUsage as Record) + ? await calculateCost(provider, model, idempotentUsage as Record, { + serviceTier: effectiveServiceTier, + }) : 0; return { success: true, @@ -1401,7 +1452,9 @@ export async function handleChatCore({ nativeCodexPassthrough && isCompactResponsesEndpoint(endpointPath) ? false : resolveStreamFlag(body?.stream, acceptHeader); - const settings = await getCachedSettings(); + const settings = cachedSettings ?? (await getCachedSettings()); + credentials = applyCodexGlobalFastServiceTier(provider, credentials, settings); + effectiveServiceTier = resolveEffectiveServiceTier(body); setGeminiThoughtSignatureMode(settings.antigravitySignatureCacheMode); const semanticCacheEnabled = settings.semanticCacheEnabled !== false; @@ -1439,7 +1492,9 @@ export async function handleChatCore({ extractUsageFromResponse(cached as Record, provider) || ((cached as Record)?.usage as Record | undefined); const cachedCost = cachedUsage - ? await calculateCost(provider, model, cachedUsage as Record) + ? await calculateCost(provider, model, cachedUsage as Record, { + serviceTier: effectiveServiceTier, + }) : 0; persistAttemptLogs({ status: 200, @@ -1894,7 +1949,8 @@ export async function handleChatCore({ effectiveModel ?? "", { input: tokensSaved, - } + }, + { serviceTier: effectiveServiceTier } ); insertCompressionAnalyticsRow({ timestamp: new Date().toISOString(), @@ -2943,6 +2999,7 @@ export async function handleChatCore({ providerUrl = result.url; providerHeaders = result.headers; finalBody = result.transformedBody; + effectiveServiceTier = resolveEffectiveServiceTier(finalBody); claudePromptCacheLogMeta = buildClaudePromptCacheLogMeta( targetFormat, finalBody, @@ -2966,16 +3023,13 @@ export async function handleChatCore({ ); } catch (error) { trackPendingRequest(model, provider, connectionId, false); - if (isSemaphoreTimeoutError(error)) { + if (isSemaphoreCapacityError(error)) { appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.code}`, }).catch(() => {}); - if (isCombo) { - throw error; - } const failureMessage = error.message || "Semaphore timeout"; persistAttemptLogs({ status: HTTP_STATUS.RATE_LIMITED, @@ -2986,7 +3040,14 @@ export async function handleChatCore({ cacheSource: "upstream", }); persistFailureUsage(HTTP_STATUS.RATE_LIMITED, error.code); - return createErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage); + const result = stream + ? createStreamingErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage, error.code) + : createErrorResult(HTTP_STATUS.RATE_LIMITED, failureMessage); + return { + ...result, + errorType: "account_semaphore_capacity", + errorCode: error.code, + }; } const failureStatus = error.name === "AbortError" @@ -3683,6 +3744,7 @@ export async function handleChatCore({ connectionId: connectionId || undefined, apiKeyId: apiKeyInfo?.id || undefined, apiKeyName: apiKeyInfo?.name || undefined, + serviceTier: effectiveServiceTier, }).catch((err) => { console.error("Failed to save usage stats:", err.message); }); @@ -3815,7 +3877,9 @@ export async function handleChatCore({ (translatedResponse?.usage && typeof translatedResponse.usage === "object" ? translatedResponse.usage : null); - const estimatedCost = responseUsage ? await calculateCost(provider, model, responseUsage) : 0; + const estimatedCost = responseUsage + ? await calculateCost(provider, model, responseUsage, { serviceTier: effectiveServiceTier }) + : 0; if (postCallGuardrails.blocked) { const guardrailMessage = postCallGuardrails.message || "Response blocked by guardrail"; @@ -3908,7 +3972,7 @@ export async function handleChatCore({ // Streaming response const streamReadiness = await ensureStreamReadiness(providerResponse, { - timeoutMs: STREAM_IDLE_TIMEOUT_MS, + timeoutMs: STREAM_READINESS_TIMEOUT_MS, provider, model, log, @@ -3957,8 +4021,9 @@ export async function handleChatCore({ const responseHeaders = { "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", + "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", + "X-Accel-Buffering": "no", [OMNIROUTE_RESPONSE_HEADERS.cache]: "MISS", ...buildOmniRouteResponseMetaHeaders({ provider, @@ -4045,6 +4110,7 @@ export async function handleChatCore({ connectionId: connectionId || undefined, apiKeyId: apiKeyInfo?.id || undefined, apiKeyName: apiKeyInfo?.name || undefined, + serviceTier: effectiveServiceTier, }).catch((err) => { console.error("Failed to save usage stats:", err.message); }); @@ -4063,7 +4129,7 @@ export async function handleChatCore({ }); if (apiKeyInfo?.id && streamUsage) { - calculateCost(provider, model, streamUsage) + calculateCost(provider, model, streamUsage, { serviceTier: effectiveServiceTier }) .then((estimatedCost) => { if (estimatedCost > 0) recordCost(apiKeyInfo.id, estimatedCost); }) @@ -4202,6 +4268,9 @@ export async function handleChatCore({ } else { finalStream = pipeWithDisconnect(providerResponse, transformStream, streamController); } + finalStream = finalStream.pipeThrough( + createSseHeartbeatTransform({ signal: streamController.signal }) + ); return { success: true, diff --git a/open-sse/services/combo.ts b/open-sse/services/combo.ts index 8bd9efa5e..699cac519 100644 --- a/open-sse/services/combo.ts +++ b/open-sse/services/combo.ts @@ -1,7 +1,8 @@ /** * Shared combo (model combo) handling with fallback support * Supports: priority, weighted, round-robin, random, least-used, cost-optimized, - * strict-random, auto, fill-first, p2c, lkgp, context-optimized, and context-relay strategies + * reset-aware, strict-random, auto, fill-first, p2c, lkgp, context-optimized, + * and context-relay strategies */ import { @@ -16,6 +17,7 @@ import { recordComboIntent, recordComboRequest, getComboMetrics } from "./comboM import { resolveComboConfig, getDefaultComboConfig } from "./comboConfig.ts"; import { maybeGenerateHandoff, resolveContextRelayConfig } from "./contextHandoff.ts"; import { fetchCodexQuota } from "./codexQuotaFetcher.ts"; +import { getQuotaFetcher } from "./quotaPreflight.ts"; import * as semaphore from "./rateLimitSemaphore.ts"; import { getCircuitBreaker } from "../../src/shared/utils/circuitBreaker"; import { fisherYatesShuffle, getNextFromDeck } from "../../src/shared/utils/shuffleDeck"; @@ -70,6 +72,12 @@ const MAX_COMBO_DEPTH = 3; const MAX_FALLBACK_WAIT_MS = 5000; const MAX_GLOBAL_ATTEMPTS = 30; +function resolveDelayMs(value: unknown, fallback: number): number { + const numericValue = Number(value); + if (!Number.isFinite(numericValue) || numericValue < 0) return fallback; + return numericValue; +} + function comboModelNotFoundResponse(message: string) { return errorResponse(404, message); } @@ -86,6 +94,18 @@ const DEFAULT_MODEL_P95_MS = { "deepseek-chat": 2000, }; const MIN_HISTORY_SAMPLES = 10; +const RESET_AWARE_SESSION_WINDOW_MS = 5 * 60 * 60 * 1000; +const RESET_AWARE_WEEKLY_WINDOW_MS = 7 * 24 * 60 * 60 * 1000; +const RESET_AWARE_REMAINING_WEIGHT = 0.55; +const RESET_AWARE_RESET_WEIGHT = 0.45; +const RESET_AWARE_CONNECTION_CACHE_TTL_MS = 30_000; +const RESET_AWARE_QUOTA_FETCH_CONCURRENCY = 5; +const RESET_AWARE_DEFAULTS = { + sessionWeight: 0.35, + weeklyWeight: 0.65, + tieBandPercent: 5, + exhaustionGuardPercent: 10, +}; type ResolvedComboTarget = { kind: "model"; @@ -210,6 +230,11 @@ export async function validateResponseQuality( // Resets on server restart (by design — no stale state) const rrCounters = new Map(); +const resetAwareConnectionCache = new Map< + string, + { fetchedAt: number; connections: Array> } +>(); + /** * Normalize a model entry to { model, weight } * Supports both legacy string format and new object format @@ -697,6 +722,363 @@ function orderTargetsByPowerOfTwoChoices(targets: ResolvedComboTarget[], comboNa return [targets[selectedIndex], ...targets.filter((_, index) => index !== selectedIndex)]; } +function clamp01(value: number): number { + if (!Number.isFinite(value)) return 0; + return Math.max(0, Math.min(1, value)); +} + +function finiteNumberOrNull(value: unknown): number | null { + const numericValue = Number(value); + return Number.isFinite(numericValue) ? numericValue : null; +} + +function getPercentConfig(value: unknown, fallback: number): number { + const numericValue = finiteNumberOrNull(value); + if (numericValue === null) return fallback; + return Math.max(0, Math.min(100, numericValue)); +} + +function getWeightConfig(value: unknown, fallback: number): number { + const numericValue = finiteNumberOrNull(value); + if (numericValue === null || numericValue < 0) return fallback; + return numericValue; +} + +function resolveResetAwareConfig(config: Record | null | undefined) { + const sessionWeight = getWeightConfig( + config?.resetAwareSessionWeight, + RESET_AWARE_DEFAULTS.sessionWeight + ); + const weeklyWeight = getWeightConfig( + config?.resetAwareWeeklyWeight, + RESET_AWARE_DEFAULTS.weeklyWeight + ); + const totalWeight = sessionWeight + weeklyWeight; + const normalizedSessionWeight = + totalWeight > 0 ? sessionWeight / totalWeight : RESET_AWARE_DEFAULTS.sessionWeight; + + return { + sessionWeight: normalizedSessionWeight, + weeklyWeight: 1 - normalizedSessionWeight, + tieBand: + getPercentConfig(config?.resetAwareTieBandPercent, RESET_AWARE_DEFAULTS.tieBandPercent) / 100, + exhaustionGuard: + getPercentConfig( + config?.resetAwareExhaustionGuardPercent, + RESET_AWARE_DEFAULTS.exhaustionGuardPercent + ) / 100, + }; +} + +function getResetAwareProvider(target: ResolvedComboTarget): string | null { + const provider = (target.providerId || target.provider || "").toLowerCase(); + return provider || null; +} + +function normalizeResetAt(value: unknown): string | null { + if (typeof value === "string" && value.trim().length > 0) return value.trim(); + if (typeof value === "number" && Number.isFinite(value)) return String(value); + return null; +} + +function parseResetTimeMs(resetAt: string | null | undefined): number { + if (!resetAt) return NaN; + const resetTime = Date.parse(resetAt); + if (Number.isFinite(resetTime)) return resetTime; + + if (!/^\d+(?:\.\d+)?$/.test(resetAt)) return NaN; + const numericResetAt = Number(resetAt); + if (!Number.isFinite(numericResetAt)) return NaN; + return numericResetAt < 10_000_000_000 ? numericResetAt * 1000 : numericResetAt; +} + +function getQuotaWindow( + quota: unknown, + key: "window5h" | "window7d" | "windowWeekly" | "windowMonthly" +): { percentUsed: number | null; resetAt: string | null } | null { + if (!isRecord(quota)) return null; + const window = quota[key]; + if (!isRecord(window)) return null; + const percentUsed = finiteNumberOrNull(window.percentUsed); + const resetAt = normalizeResetAt(window.resetAt); + return { percentUsed, resetAt }; +} + +function getResetUrgency(resetAt: string | null | undefined, windowMs: number): number { + if (!resetAt) return 0.5; + const resetTime = parseResetTimeMs(resetAt); + if (!Number.isFinite(resetTime)) return 0.5; + const msUntilReset = resetTime - Date.now(); + if (msUntilReset <= 0) return 1; + return clamp01(1 - msUntilReset / windowMs); +} + +function scoreQuotaWindow( + remaining: number, + resetAt: string | null | undefined, + windowMs: number +): number { + return ( + RESET_AWARE_REMAINING_WEIGHT * clamp01(remaining) + + RESET_AWARE_RESET_WEIGHT * getResetUrgency(resetAt, windowMs) + ); +} + +function scoreResetAwareQuota(quota: unknown, config: ReturnType) { + if (!quota || !isRecord(quota)) return { score: 0.5 }; + if (quota.limitReached === true) return { score: -Infinity }; + + const overallPercentUsed = clamp01(finiteNumberOrNull(quota.percentUsed) ?? 0.5); + const sessionWindow = getQuotaWindow(quota, "window5h"); + const weeklyWindow = getQuotaWindow(quota, "window7d") || getQuotaWindow(quota, "windowWeekly"); + const sessionRemaining = clamp01(1 - (sessionWindow?.percentUsed ?? overallPercentUsed)); + const weeklyRemaining = clamp01(1 - (weeklyWindow?.percentUsed ?? overallPercentUsed)); + const sessionScore = scoreQuotaWindow( + sessionRemaining, + sessionWindow?.resetAt, + RESET_AWARE_SESSION_WINDOW_MS + ); + const weeklyScore = scoreQuotaWindow( + weeklyRemaining, + weeklyWindow?.resetAt ?? normalizeResetAt(quota.resetAt), + RESET_AWARE_WEEKLY_WINDOW_MS + ); + let score = config.sessionWeight * sessionScore + config.weeklyWeight * weeklyScore; + + if (config.exhaustionGuard > 0 && sessionRemaining < config.exhaustionGuard) { + score *= Math.max(0.05, sessionRemaining / config.exhaustionGuard); + } + + return { score }; +} + +async function getQuotaAwareConnectionsForTarget( + target: ResolvedComboTarget, + connectionCache: Map>>, + connectionLoadPromises: Map>>>, + comboName: string, + log: { warn?: (...args: unknown[]) => void } +) { + const provider = getResetAwareProvider(target); + if (!provider || !getQuotaFetcher(provider)) return []; + if (!connectionCache.has(provider)) { + const cached = resetAwareConnectionCache.get(provider); + if (cached && Date.now() - cached.fetchedAt < RESET_AWARE_CONNECTION_CACHE_TTL_MS) { + connectionCache.set(provider, cached.connections); + return cached.connections; + } + + if (!connectionLoadPromises.has(provider)) { + connectionLoadPromises.set( + provider, + (async () => { + try { + const connections = await getProviderConnections({ provider, isActive: true }); + const activeConnections = Array.isArray(connections) + ? (connections as Array>) + : []; + resetAwareConnectionCache.set(provider, { + connections: activeConnections, + fetchedAt: Date.now(), + }); + return activeConnections; + } catch (error) { + log.warn?.("COMBO", "Reset-aware failed to load quota-aware connections.", { + comboName, + err: error, + operation: "getProviderConnections", + provider, + }); + return []; + } + })() + ); + } + + const connections = await connectionLoadPromises.get(provider)!; + connectionCache.set(provider, connections); + } + return connectionCache.get(provider) || []; +} + +function normalizeConnectionIds(value: unknown): string[] | null { + if (!Array.isArray(value)) return null; + const ids = value.filter( + (connectionId): connectionId is string => + typeof connectionId === "string" && connectionId.trim().length > 0 + ); + return ids.length > 0 ? ids : null; +} + +function filterAllowedConnectionIds( + connectionIds: string[], + apiKeyAllowedConnectionIds: string[] | null | undefined +): string[] { + const allowedIds = normalizeConnectionIds(apiKeyAllowedConnectionIds); + if (!allowedIds) return connectionIds; + const allowedSet = new Set(allowedIds); + return connectionIds.filter((connectionId) => allowedSet.has(connectionId)); +} + +function getTargetConnectionIds( + target: ResolvedComboTarget, + connections: Array> +): string[] { + let connectionIds: string[]; + if (target.connectionId) { + return [target.connectionId]; + } + + if (Array.isArray(target.allowedConnectionIds) && target.allowedConnectionIds.length > 0) { + return target.allowedConnectionIds.filter( + (connectionId): connectionId is string => + typeof connectionId === "string" && connectionId.trim().length > 0 + ); + } + + connectionIds = connections + .map((connection) => (typeof connection.id === "string" ? connection.id : null)) + .filter((connectionId): connectionId is string => !!connectionId); + return connectionIds; +} + +async function mapWithConcurrency( + items: T[], + concurrency: number, + mapper: (item: T, index: number) => Promise +): Promise { + const results = new Array(items.length); + let nextIndex = 0; + const workerCount = Math.max(1, Math.min(concurrency, items.length)); + + await Promise.all( + Array.from({ length: workerCount }, async () => { + while (nextIndex < items.length) { + const currentIndex = nextIndex++; + results[currentIndex] = await mapper(items[currentIndex], currentIndex); + } + }) + ); + + return results; +} + +async function orderTargetsByResetAwareQuota( + targets: ResolvedComboTarget[], + comboName: string, + configSource: Record | null | undefined, + log: { warn?: (...args: unknown[]) => void }, + apiKeyAllowedConnectionIds?: string[] | null +) { + if (targets.length === 0) return targets; + + const config = resolveResetAwareConfig(configSource); + const connectionCache = new Map>>(); + const connectionLoadPromises = new Map>>>(); + const quotaPromises = new Map>(); + const connectionById = new Map>(); + const expandedTargets: ResolvedComboTarget[] = []; + + const targetsWithConnections = await Promise.all( + targets.map(async (target) => ({ + connections: await getQuotaAwareConnectionsForTarget( + target, + connectionCache, + connectionLoadPromises, + comboName, + log + ), + target, + })) + ); + + for (const { target, connections } of targetsWithConnections) { + for (const connection of connections) { + if (typeof connection.id === "string") connectionById.set(connection.id, connection); + } + + const unrestrictedConnectionIds = getTargetConnectionIds(target, connections); + const connectionIds = filterAllowedConnectionIds( + unrestrictedConnectionIds, + apiKeyAllowedConnectionIds + ); + if (connectionIds.length === 0) { + if ( + unrestrictedConnectionIds.length > 0 && + normalizeConnectionIds(apiKeyAllowedConnectionIds) + ) { + continue; + } + expandedTargets.push(target); + continue; + } + + for (const connectionId of connectionIds) { + expandedTargets.push({ + ...target, + connectionId, + executionKey: + target.connectionId === connectionId + ? target.executionKey + : `${target.executionKey}@${connectionId}`, + }); + } + } + + const scoredTargets = await mapWithConcurrency( + expandedTargets, + RESET_AWARE_QUOTA_FETCH_CONCURRENCY, + async (target, index) => { + let quota: unknown = null; + const provider = getResetAwareProvider(target); + const fetcher = provider ? getQuotaFetcher(provider) : null; + if (fetcher && provider && target.connectionId) { + const quotaKey = `${provider}:${target.connectionId}`; + if (!quotaPromises.has(quotaKey)) { + quotaPromises.set( + quotaKey, + fetcher(target.connectionId, connectionById.get(target.connectionId)).catch((error) => { + log.warn?.("COMBO", "Reset-aware quota fetch failed.", { + comboName, + connectionId: target.connectionId, + err: error, + operation: "quotaFetch", + provider, + }); + return null; + }) + ); + } + quota = await quotaPromises.get(quotaKey)!; + } + const { score } = scoreResetAwareQuota(quota, config); + return { target, score, index }; + } + ); + + scoredTargets.sort((a, b) => { + if (b.score !== a.score) return b.score - a.score; + return a.index - b.index; + }); + + const bestScore = scoredTargets[0]?.score ?? 0; + const tiedTargets = scoredTargets.filter((entry) => bestScore - entry.score <= config.tieBand); + let orderedTiedTargets = tiedTargets; + if (tiedTargets.length > 1) { + const key = `reset-aware:${comboName}`; + const counter = rrCounters.get(key) || 0; + rrCounters.set(key, counter + 1); + const startIndex = counter % tiedTargets.length; + orderedTiedTargets = [...tiedTargets.slice(startIndex), ...tiedTargets.slice(0, startIndex)]; + } + + const tiedExecutionKeys = new Set(orderedTiedTargets.map((entry) => entry.target.executionKey)); + return [ + ...orderedTiedTargets, + ...scoredTargets.filter((entry) => !tiedExecutionKeys.has(entry.target.executionKey)), + ].map((entry) => entry.target); +} + function toTextContent(content) { if (typeof content === "string") return content; if (!Array.isArray(content)) return ""; @@ -1074,6 +1456,7 @@ export async function handleComboChat({ allCombos, relayOptions, signal, + apiKeyAllowedConnections = null, }) { const strategy = normalizeRoutingStrategy(combo.strategy || "priority"); const relayConfig = @@ -1282,7 +1665,8 @@ export async function handleComboChat({ ? resolveComboConfig(combo, settings) : { ...getDefaultComboConfig(), ...(combo.config || {}) }; const maxRetries = config.maxRetries ?? 1; - const retryDelayMs = config.retryDelayMs ?? 2000; + const retryDelayMs = resolveDelayMs(config.retryDelayMs, 2000); + const fallbackDelayMs = resolveDelayMs(config.fallbackDelayMs, 0); let orderedTargets = strategy === "weighted" @@ -1482,6 +1866,18 @@ export async function handleComboChat({ } else if (strategy === "cost-optimized") { orderedTargets = await sortTargetsByCost(orderedTargets); log.info("COMBO", `Cost-optimized ordering: cheapest first (${orderedTargets[0]?.modelStr})`); + } else if (strategy === "reset-aware") { + orderedTargets = await orderTargetsByResetAwareQuota( + orderedTargets, + combo.name, + config, + log, + apiKeyAllowedConnections + ); + log.info( + "COMBO", + `Reset-aware ordering: ${orderedTargets[0]?.modelStr}${orderedTargets[0]?.connectionId ? ` (${orderedTargets[0].connectionId})` : ""} first` + ); } else if (strategy === "context-optimized") { orderedTargets = sortTargetsByContextSize(orderedTargets); log.info("COMBO", `Context-optimized ordering: largest first (${orderedTargets[0]?.modelStr})`); @@ -1638,17 +2034,19 @@ export async function handleComboChat({ // Record last known good provider (LKGP) for this combo/model (#919) if (provider) { - try { - const { setLKGP } = await import("../../src/lib/localDb"); - await Promise.all([ - setLKGP(combo.name, target.executionKey, provider), - setLKGP(combo.name, combo.id || combo.name, provider), - ]); - } catch (err) { - log.warn("COMBO", "Failed to record Last Known Good Provider. This is non-fatal.", { - err, - }); - } + void (async () => { + try { + const { setLKGP } = await import("../../src/lib/localDb"); + await Promise.all([ + setLKGP(combo.name, target.executionKey, provider), + setLKGP(combo.name, combo.id || combo.name, provider), + ]); + } catch (err) { + log.warn("COMBO", "Failed to record Last Known Good Provider. This is non-fatal.", { + err, + }); + } + })(); } return quality.clonedResponse ?? result; @@ -1752,8 +2150,8 @@ export async function handleComboChat({ log.warn("COMBO", `Model ${modelStr} failed, trying next`, { status: result.status }); const fallbackWaitMs = - retryDelayMs > 0 && cooldownMs > 0 && cooldownMs <= MAX_FALLBACK_WAIT_MS - ? Math.min(cooldownMs, retryDelayMs) + fallbackDelayMs > 0 && cooldownMs > 0 && cooldownMs <= MAX_FALLBACK_WAIT_MS + ? Math.min(cooldownMs, fallbackDelayMs) : 0; if ([502, 503, 504].includes(result.status) && fallbackWaitMs > 0) { log.info("COMBO", `Waiting ${fallbackWaitMs}ms before fallback to next model`); @@ -1840,7 +2238,8 @@ async function handleRoundRobinCombo({ const concurrency = config.concurrencyPerModel ?? 3; const queueTimeout = config.queueTimeoutMs ?? 30000; const maxRetries = config.maxRetries ?? 1; - const retryDelayMs = config.retryDelayMs ?? 2000; + const retryDelayMs = resolveDelayMs(config.retryDelayMs, 2000); + const fallbackDelayMs = resolveDelayMs(config.fallbackDelayMs, 0); const orderedTargets = resolveComboTargets(combo, allCombos); const filteredTargets = await applyRequestTagRouting(orderedTargets, body, log); @@ -1965,21 +2364,23 @@ async function handleRoundRobinCombo({ }); recordedAttempts++; if (provider) { - try { - const { setLKGP } = await import("../../src/lib/localDb"); - await Promise.all([ - setLKGP(combo.name, target.executionKey, provider), - setLKGP(combo.name, combo.id || combo.name, provider), - ]); - } catch (err) { - log.warn( - "COMBO-RR", - "Failed to record Last Known Good Provider. This is non-fatal.", - { - err, - } - ); - } + void (async () => { + try { + const { setLKGP } = await import("../../src/lib/localDb"); + await Promise.all([ + setLKGP(combo.name, target.executionKey, provider), + setLKGP(combo.name, combo.id || combo.name, provider), + ]); + } catch (err) { + log.warn( + "COMBO-RR", + "Failed to record Last Known Good Provider. This is non-fatal.", + { + err, + } + ); + } + })(); } return result; } @@ -2099,8 +2500,8 @@ async function handleRoundRobinCombo({ log.warn("COMBO-RR", `${modelStr} failed, trying next model`, { status: result.status }); const fallbackWaitMs = - retryDelayMs > 0 && cooldownMs > 0 && cooldownMs <= MAX_FALLBACK_WAIT_MS - ? Math.min(cooldownMs, retryDelayMs) + fallbackDelayMs > 0 && cooldownMs > 0 && cooldownMs <= MAX_FALLBACK_WAIT_MS + ? Math.min(cooldownMs, fallbackDelayMs) : 0; if ([502, 503, 504].includes(result.status) && fallbackWaitMs > 0) { log.info("COMBO-RR", `Waiting ${fallbackWaitMs}ms before fallback to next model`); diff --git a/open-sse/services/comboConfig.ts b/open-sse/services/comboConfig.ts index af464f1aa..d3e04daff 100644 --- a/open-sse/services/comboConfig.ts +++ b/open-sse/services/comboConfig.ts @@ -9,6 +9,7 @@ const DEFAULT_COMBO_CONFIG = { strategy: "priority", maxRetries: 1, retryDelayMs: 2000, + fallbackDelayMs: 0, concurrencyPerModel: 3, // max simultaneous requests per model (round-robin) queueTimeoutMs: 30000, // max wait time in semaphore queue (round-robin) handoffThreshold: 0.85, @@ -17,6 +18,10 @@ const DEFAULT_COMBO_CONFIG = { maxMessagesForSummary: 30, maxComboDepth: 3, trackMetrics: true, + resetAwareSessionWeight: 0.35, + resetAwareWeeklyWeight: 0.65, + resetAwareTieBandPercent: 5, + resetAwareExhaustionGuardPercent: 10, }; const LEGACY_COMBO_RESILIENCE_KEYS = new Set([ diff --git a/open-sse/services/quotaPreflight.ts b/open-sse/services/quotaPreflight.ts index f600fa36a..3e5740cdf 100644 --- a/open-sse/services/quotaPreflight.ts +++ b/open-sse/services/quotaPreflight.ts @@ -35,6 +35,10 @@ export function registerQuotaFetcher(provider: string, fetcher: QuotaFetcher): v quotaFetcherRegistry.set(provider, fetcher); } +export function getQuotaFetcher(provider: string): QuotaFetcher | undefined { + return quotaFetcherRegistry.get(provider) || quotaFetcherRegistry.get(provider.toLowerCase()); +} + export function isQuotaPreflightEnabled(connection: Record): boolean { const psd = connection?.providerSpecificData as Record | undefined; return psd?.quotaPreflightEnabled === true; @@ -49,7 +53,7 @@ export async function preflightQuota( return { proceed: true }; } - const fetcher = quotaFetcherRegistry.get(provider); + const fetcher = getQuotaFetcher(provider); if (!fetcher) { return { proceed: true }; } diff --git a/open-sse/services/usage.ts b/open-sse/services/usage.ts index 2bf2edec5..a10e8ee74 100644 --- a/open-sse/services/usage.ts +++ b/open-sse/services/usage.ts @@ -515,7 +515,46 @@ async function getCrofUsage(apiKey: string) { return { quotas }; } +const GLM_QUOTA_ORDER = ["5 Hours Quota", "Weekly Quota", "Monthly Tools", "Tokens", "Time Limit"]; + +function getGlmQuotaLabel(type: unknown, unit: unknown): string | null { + const normalized = typeof type === "string" ? type.trim().toUpperCase() : ""; + const unitValue = toNumber(unit, -1); + + switch (normalized) { + case "TOKENS_LIMIT": + case "TOKEN_LIMIT": + if (unitValue === 3) return "5 Hours Quota"; + if (unitValue === 6) return "Weekly Quota"; + return "Tokens"; + case "TIME_LIMIT": + case "TIME_USAGE_LIMIT": + if (unitValue === 5) return "Monthly Tools"; + return "Time Limit"; + default: + return null; + } +} + +function orderGlmQuotas(quotas: Record): Record { + const ordered: Record = {}; + + for (const key of GLM_QUOTA_ORDER) { + if (quotas[key]) ordered[key] = quotas[key]; + } + + for (const [key, quota] of Object.entries(quotas)) { + if (!ordered[key]) ordered[key] = quota; + } + + return ordered; +} + async function getGlmUsage(apiKey: string, providerSpecificData?: Record) { + if (!apiKey) { + return { message: "API key not available. Add a coding plan API key to view usage." }; + } + const quotaUrl = getGlmQuotaUrl(providerSpecificData); const res = await fetch(quotaUrl, { @@ -537,13 +576,14 @@ async function getGlmUsage(apiKey: string, providerSpecificData?: Record | undefined; diff --git a/open-sse/utils/streamHandler.ts b/open-sse/utils/streamHandler.ts index 89d695c6f..2bfbcef68 100644 --- a/open-sse/utils/streamHandler.ts +++ b/open-sse/utils/streamHandler.ts @@ -3,6 +3,7 @@ import { trackPendingRequest } from "@/lib/usageDb"; // Stream handler with disconnect detection - shared for all providers const PENDING_REQUEST_CLEARED_MARKER = "__omniroutePendingRequestCleared"; +const DISCONNECT_ABORT_DELAY_MS = 2_000; type StreamDisconnectEvent = { reason: string; @@ -97,7 +98,7 @@ export function createStreamController({ // Delay abort to allow cleanup abortTimeout = setTimeout(() => { abortController.abort(); - }, 500); + }, DISCONNECT_ABORT_DELAY_MS); onDisconnect?.({ reason, duration: Date.now() - startTime }); }, @@ -201,7 +202,9 @@ export function createDisconnectAwareStream(transformStream, streamController) { cancel(reason) { streamController.handleDisconnect(reason || "cancelled"); reader.cancel(); - writer.abort(); + setTimeout(() => { + writer.abort(); + }, DISCONNECT_ABORT_DELAY_MS).unref?.(); }, }); } diff --git a/scripts/generate-docs-index.mjs b/scripts/generate-docs-index.mjs index 482c435f1..69c6babd8 100644 --- a/scripts/generate-docs-index.mjs +++ b/scripts/generate-docs-index.mjs @@ -116,6 +116,52 @@ function extractContentPreview(content) { // ---------- Main ---------- +if (!fs.existsSync(DOCS_DIR)) { + if (fs.existsSync(OUT_FILE)) { + console.warn( + `[generate-docs-index] ${DOCS_DIR} not found; keeping existing generated docs index.` + ); + process.exit(0); + } + + fs.mkdirSync(path.dirname(OUT_FILE), { recursive: true }); + fs.writeFileSync( + OUT_FILE, + `// AUTO-GENERATED by scripts/generate-docs-index.mjs — DO NOT EDIT MANUALLY +// Regenerate with: node scripts/generate-docs-index.mjs + +export interface AutoGenDocItem { + slug: string; + title: string; + fileName: string; +} + +export interface AutoGenNavSection { + title: string; + items: AutoGenDocItem[]; +} + +export interface AutoGenSearchItem { + slug: string; + title: string; + fileName: string; + section: string; + content: string; + headings: string[]; +} + +export const autoNavSections: AutoGenNavSection[] = []; + +export const autoSearchIndex: AutoGenSearchItem[] = []; + +export const autoAllSlugs: string[] = []; +`, + "utf8" + ); + console.warn(`[generate-docs-index] ${DOCS_DIR} not found; generated empty docs index.`); + process.exit(0); +} + const files = fs.readdirSync(DOCS_DIR).filter((f) => f.endsWith(".md") || f.endsWith(".mdx")); const docs = []; diff --git a/src/app/(dashboard)/dashboard/combos/page.tsx b/src/app/(dashboard)/dashboard/combos/page.tsx index 58c8b3ebc..953a3de95 100644 --- a/src/app/(dashboard)/dashboard/combos/page.tsx +++ b/src/app/(dashboard)/dashboard/combos/page.tsx @@ -64,11 +64,14 @@ const STRATEGY_OPTIONS = ROUTING_STRATEGIES.map((strategy) => ({ const STRATEGY_LABEL_FALLBACK = { "context-relay": "Context Relay", + "reset-aware": "Reset-Aware RR", }; const STRATEGY_DESC_FALLBACK = { "context-relay": "Priority-style routing with automatic context handoffs when account rotation happens.", + "reset-aware": + "Quota remaining and reset windows decide the order; similar scores rotate round-robin.", }; const STRATEGY_GUIDANCE_FALLBACK = { @@ -108,6 +111,11 @@ const STRATEGY_GUIDANCE_FALLBACK = { avoid: "Avoid when pricing data is missing or outdated.", example: "Example: Batch or background jobs where lower cost matters most.", }, + "reset-aware": { + when: "Use when multiple accounts with quota telemetry have different reset windows.", + avoid: "Avoid when quota telemetry is unavailable for most accounts.", + example: "Example: Prefer a 60% weekly account resetting tomorrow over 80% that resets later.", + }, "fill-first": { when: "Use when you want to drain one provider's quota fully before moving to the next.", avoid: "Avoid when you need request-level load balancing across providers.", @@ -230,6 +238,15 @@ const STRATEGY_RECOMMENDATIONS_FALLBACK = { "Use for batch/background jobs where cost is the main KPI.", ], }, + "reset-aware": { + title: "Reset-aware account rotation", + description: "Balances remaining provider quota against reset timing.", + tips: [ + "Use explicit account steps or account-tag routing for providers with quota telemetry.", + "Tune session vs weekly weights when short-term exhaustion is more risky.", + "Keep the tie band small so equivalent accounts still rotate fairly.", + ], + }, "fill-first": { title: "Quota drain strategy", description: "Exhausts one provider's quota before moving to the next in chain.", @@ -439,6 +456,7 @@ function getStrategyBadgeClass(strategy) { if (strategy === "random") return "bg-purple-500/15 text-purple-600 dark:text-purple-400"; if (strategy === "least-used") return "bg-cyan-500/15 text-cyan-600 dark:text-cyan-400"; if (strategy === "cost-optimized") return "bg-teal-500/15 text-teal-600 dark:text-teal-400"; + if (strategy === "reset-aware") return "bg-lime-500/15 text-lime-700 dark:text-lime-300"; if (strategy === "fill-first") return "bg-orange-500/15 text-orange-600 dark:text-orange-400"; if (strategy === "p2c") return "bg-indigo-500/15 text-indigo-600 dark:text-indigo-400"; return "bg-blue-500/15 text-blue-600 dark:text-blue-400"; diff --git a/src/app/(dashboard)/dashboard/providers/[id]/page.tsx b/src/app/(dashboard)/dashboard/providers/[id]/page.tsx index cdd04441b..d0ca329f7 100644 --- a/src/app/(dashboard)/dashboard/providers/[id]/page.tsx +++ b/src/app/(dashboard)/dashboard/providers/[id]/page.tsx @@ -53,6 +53,10 @@ import { getClaudeCodeCompatibleRequestDefaults as _getClaudeCodeCompatibleRequestDefaults, getCodexRequestDefaults as _getCodexRequestDefaults, } from "@/lib/providers/requestDefaults"; +import { + getCodexEffectiveFastServiceTier, + isCodexGlobalFastServiceTierEnabled, +} from "@/lib/providers/codexFastTier"; import { isClaudeExtraUsageBlockEnabled } from "@/lib/providers/claudeExtraUsage"; import { parseExtraApiKeys } from "@/shared/utils/parseApiKeys"; import { resolveDashboardProviderInfo } from "../providerPageUtils"; @@ -512,6 +516,7 @@ interface ConnectionRowProps { isOAuth: boolean; isClaude?: boolean; isCodex?: boolean; + codexFastGlobalEnabled?: boolean; isFirst: boolean; isLast: boolean; onMoveUp: () => void; @@ -1016,6 +1021,8 @@ export default function ProviderDetailPage() { ); const [applyingCodexAuthId, setApplyingCodexAuthId] = useState(null); const [exportingCodexAuthId, setExportingCodexAuthId] = useState(null); + const [codexGlobalFastServiceTier, setCodexGlobalFastServiceTier] = useState(false); + const [savingCodexGlobalFastServiceTier, setSavingCodexGlobalFastServiceTier] = useState(false); const isOpenAICompatible = isOpenAICompatibleProvider(providerId); const isCcCompatible = isClaudeCodeCompatibleProvider(providerId); const isAnthropicCompatible = @@ -1240,6 +1247,16 @@ export default function ProviderDetailPage() { .catch(() => {}); }, [fetchConnections, fetchAliases]); + useEffect(() => { + if (providerId !== "codex") return; + fetch("/api/settings", { cache: "no-store" }) + .then((r) => (r.ok ? r.json() : null)) + .then((data) => { + setCodexGlobalFastServiceTier(isCodexGlobalFastServiceTierEnabled(data)); + }) + .catch(() => {}); + }, [providerId]); + const loadConnProxies = useCallback(async (conns: { id?: string }[]) => { if (!conns.length) return; try { @@ -1687,6 +1704,32 @@ export default function ProviderDetailPage() { } }; + const handleToggleCodexGlobalFastServiceTier = async (enabled: boolean) => { + if (savingCodexGlobalFastServiceTier) return; + setSavingCodexGlobalFastServiceTier(true); + try { + const res = await fetch("/api/settings", { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ codexServiceTier: { enabled } }), + }); + + if (!res.ok) { + const data = await res.json().catch(() => ({})); + notify.error(data.error || "Failed to update Codex Fast setting"); + return; + } + + setCodexGlobalFastServiceTier(enabled); + notify.success(enabled ? "Codex Fast enabled globally" : "Codex Fast disabled globally"); + } catch (error) { + console.error("Error toggling Codex Fast setting:", error); + notify.error("Failed to update Codex Fast setting"); + } finally { + setSavingCodexGlobalFastServiceTier(false); + } + }; + const handleRetestConnection = async (connectionId) => { if (!connectionId || retestingId) return; setRetestingId(connectionId); @@ -2812,9 +2855,22 @@ export default function ProviderDetailPage() { {/* Connections */} {!isUpstreamProxyProvider && ( -
-
+
+

{t("connections")}

+ {providerId === "codex" && ( +
+ +
+ )} {/* Provider-level proxy indicator/button */}
- {connections.length > 1 && ( - - )} - {!isCompatible ? ( -
- - {providerId === "qoder" && ( - + )} + {!isCompatible ? ( + <> + - )} -
- ) : ( - connections.length === 0 && ( - - ) - )} + {providerId === "qoder" && ( + + )} + + ) : ( + connections.length === 0 && ( + + ) + )} +
{connections.length === 0 ? ( @@ -2921,6 +2979,7 @@ export default function ProviderDetailPage() { connection={conn} isOAuth={conn.authType === "oauth"} isClaude={providerId === "claude"} + codexFastGlobalEnabled={codexGlobalFastServiceTier} isFirst={index === 0} isLast={index === sorted.length - 1} onMoveUp={() => handleSwapPriority(conn, sorted[index - 1])} @@ -3039,6 +3098,7 @@ export default function ProviderDetailPage() { connection={conn} isOAuth={conn.authType === "oauth"} isClaude={providerId === "claude"} + codexFastGlobalEnabled={codexGlobalFastServiceTier} isFirst={gi === 0 && index === 0} isLast={ gi === groupKeys.length - 1 && index === groupConns.length - 1 @@ -4983,6 +5043,7 @@ function ConnectionRow({ isOAuth, isClaude, isCodex, + codexFastGlobalEnabled, isCcCompatible, cliproxyapiEnabled, isFirst, @@ -5086,6 +5147,12 @@ function ConnectionRow({ const normalizedCodexPolicy = normalizeCodexLimitPolicy(codexPolicy); const codex5hEnabled = normalizedCodexPolicy.use5h; const codexWeeklyEnabled = normalizedCodexPolicy.useWeekly; + const codexFastEnabled = isCodex + ? getCodexEffectiveFastServiceTier( + connection.providerSpecificData, + codexFastGlobalEnabled === true + ) + : false; const claudeBlockExtraUsageEnabled = isClaude ? isClaudeExtraUsageBlockEnabled("claude", connection.providerSpecificData) : false; @@ -5226,6 +5293,19 @@ function ConnectionRow({ {isCodex && ( <> | + {codexFastEnabled && ( + + bolt + Fast + + )}