diff --git a/src/cli/ccproxy.ts b/src/cli/ccproxy.ts index 20a63a9e..9397a030 100644 --- a/src/cli/ccproxy.ts +++ b/src/cli/ccproxy.ts @@ -5,7 +5,7 @@ import { randomUUID } from 'node:crypto'; import fs from 'node:fs/promises'; import fastify, { FastifyRequest, FastifyReply } from 'fastify'; -import { Claude3_5_Haiku_Vertex, Claude4_1_Opus_Vertex, Claude4_5_Sonnet_Vertex } from '#llm/services/anthropic-vertex'; +import { Claude4_1_Opus_Vertex, Claude4_5_Haiku_Vertex, Claude4_5_Sonnet_Vertex } from '#llm/services/anthropic-vertex'; import type { AssistantContentExt, LlmMessage, TextPartExt } from '#shared/llm/llm.model'; const PROXY_PORT = Number(process.env.PROXY_PORT ?? 8080); @@ -15,7 +15,7 @@ const LOG_FILE = process.env.LLM_PROXY_LOG ?? 'llm-proxy.log'; * Anthropic ↔ internal model name mapping */ function pickLLM(modelName: string) { - if (modelName.includes('haiku')) return Claude3_5_Haiku_Vertex(); + if (modelName.includes('haiku')) return Claude4_5_Haiku_Vertex(); if (modelName.includes('sonnet')) return Claude4_5_Sonnet_Vertex(); if (modelName.includes('opus')) return Claude4_1_Opus_Vertex(); return undefined; diff --git a/src/llm/multi-agent/fastEasy.ts b/src/llm/multi-agent/fastEasy.ts index a8a7a80a..d41040ee 100644 --- a/src/llm/multi-agent/fastEasy.ts +++ b/src/llm/multi-agent/fastEasy.ts @@ -21,7 +21,7 @@ export class FastEasyLLM extends BaseLLM { constructor() { super({ - displayName: 'Fast Easy (Cerebras GPT OSS 120B - Groq Llama Scout - Gemini 2.5 Flash)', + displayName: 'Fast Easy (Cerebras GPT OSS 120B - Groq Llama Scout - Gemini 2.5 Flash lite - GPT5 nano)', service: 'multi', modelId: 'fast-easy', maxInputTokens: 0, diff --git a/src/llm/multi-agent/fastMedium.ts b/src/llm/multi-agent/fastMedium.ts index c9276d4a..471259cd 100644 --- a/src/llm/multi-agent/fastMedium.ts +++ b/src/llm/multi-agent/fastMedium.ts @@ -1,4 +1,7 @@ -import { cerebrasQwen3_235b_Thinking } from '#llm/services/cerebras'; +import { anthropicClaude4_5_Haiku } from '#llm/services/anthropic'; +import { Claude4_5_Haiku_Vertex } from '#llm/services/anthropic-vertex'; +import { cerebrasQwen3_235b_Thinking, cerebrasQwen3_Coder } from '#llm/services/cerebras'; +import { groqKimiK2, groqQwen3_32b } from '#llm/services/groq'; import { openaiGPT5mini } from '#llm/services/openai'; import { vertexGemini_2_5_Flash } from '#llm/services/vertexai'; import { countTokens } from '#llm/tokens'; @@ -12,13 +15,16 @@ import { BaseLLM } from '../base-llm'; */ export class FastMediumLLM extends BaseLLM { private readonly providers: LLM[]; - private readonly cerebras: LLM; - private readonly openai: LLM; - private readonly gemini: LLM; + private readonly cerebras = cerebrasQwen3_Coder(); + private readonly groq = groqQwen3_32b(); + private readonly openai = openaiGPT5mini(); + private readonly gemini = vertexGemini_2_5_Flash({ thinking: 'high' }); + private readonly haiku = anthropicClaude4_5_Haiku(); + private readonly vertexHaiku = Claude4_5_Haiku_Vertex(); constructor() { super({ - displayName: 'Fast Medium (Qwen3 235b (Cerebras) - GPT-5 Mini - Gemini 2.5 Flash)', + displayName: 'Fast Medium (Cerebras/Groq Qwen3, Gemini 2.5 Flash, GPT-5 Mini', service: 'multi', modelId: 'fast-medium', maxInputTokens: 0, @@ -28,11 +34,7 @@ export class FastMediumLLM extends BaseLLM { totalCost: 0, }), }); - this.providers = [cerebrasQwen3_235b_Thinking(), openaiGPT5mini(), vertexGemini_2_5_Flash({ thinking: 'high' })]; - this.cerebras = this.providers[0]!; - this.openai = this.providers[1]!; - this.gemini = this.providers[2]!; - + this.providers = [this.vertexHaiku, this.haiku, this.cerebras, this.groq, this.gemini, this.openai]; this.maxInputTokens = Math.max(...this.providers.map((p) => p.getMaxInputTokens())); } diff --git a/src/llm/multi-agent/openaiFlex.test.ts b/src/llm/multi-agent/openaiFlex.test.ts new file mode 100644 index 00000000..96599ef0 --- /dev/null +++ b/src/llm/multi-agent/openaiFlex.test.ts @@ -0,0 +1,244 @@ +import type { TextStreamPart } from 'ai'; +import { expect } from 'chai'; +import type { GenerateTextOptions, GenerationStats, LLM, LlmMessage } from '#shared/llm/llm.model'; +import { type FlexMetricsSnapshot, OPENAI_FLEX_SERVICE, OpenAIFlex } from './openaiFlex'; + +type GenerateHandler = (messages: LlmMessage[], opts?: GenerateTextOptions) => Promise; +type StreamHandler = (messages: LlmMessage[], onChunk: (chunk: TextStreamPart) => void, opts?: GenerateTextOptions) => Promise; + +const DEFAULT_STATS: GenerationStats = { + llmId: 'test:model', + cost: 0, + inputTokens: 0, + outputTokens: 0, + totalTime: 0, + timeToFirstToken: 0, + requestTime: 0, + finishReason: 'stop', +}; + +class TestLLM implements LLM { + constructor( + private readonly name: string, + private readonly model: string, + private readonly generateHandler: GenerateHandler, + private readonly streamHandler: StreamHandler, + private readonly configured = true, + ) {} + + async generateText( + userOrSystemOrMessages: string | LlmMessage[] | ReadonlyArray, + userOrOpts?: string | GenerateTextOptions, + opts?: GenerateTextOptions, + ): Promise { + const messages = this.toMessages(userOrSystemOrMessages, userOrOpts, opts); + return this.generateHandler(messages, this.toOptions(userOrSystemOrMessages, userOrOpts, opts)); + } + + async generateTextWithJson(): Promise { + throw new Error('Not implemented in TestLLM'); + } + + async generateJson(): Promise { + throw new Error('Not implemented in TestLLM'); + } + + async generateTextWithResult(): Promise { + throw new Error('Not implemented in TestLLM'); + } + + async generateMessage(): Promise { + throw new Error('Not implemented in TestLLM'); + } + + streamText( + messages: LlmMessage[] | ReadonlyArray, + onChunk: (chunk: TextStreamPart) => void, + opts?: GenerateTextOptions, + ): Promise { + return this.streamHandler(messages as LlmMessage[], onChunk, opts); + } + + getService(): string { + return OPENAI_FLEX_SERVICE; + } + + getModel(): string { + return this.model; + } + + getDisplayName(): string { + return this.name; + } + + getId(): string { + return `${this.getService()}:${this.model}`; + } + + getMaxInputTokens(): number { + return 100_000; + } + + getMaxOutputTokens(): number { + return 100_000; + } + + countTokens(): Promise { + return Promise.resolve(0); + } + + isConfigured(): boolean { + return this.configured; + } + + getOldModels(): string[] { + return []; + } + + private toMessages( + userOrSystemOrMessages: string | LlmMessage[] | ReadonlyArray, + userOrOpts?: string | GenerateTextOptions, + opts?: GenerateTextOptions, + ): LlmMessage[] { + if (Array.isArray(userOrSystemOrMessages)) return [...userOrSystemOrMessages]; + if (typeof userOrOpts === 'string') { + return [ + { role: 'system', content: userOrSystemOrMessages as string }, + { role: 'user', content: userOrOpts }, + ]; + } + return [{ role: 'user', content: userOrSystemOrMessages as string }]; + } + + private toOptions( + userOrSystemOrMessages: string | LlmMessage[] | ReadonlyArray, + userOrOpts?: string | GenerateTextOptions, + opts?: GenerateTextOptions, + ): GenerateTextOptions | undefined { + if (Array.isArray(userOrSystemOrMessages)) return userOrOpts as GenerateTextOptions | undefined; + if (typeof userOrOpts === 'string') return opts; + return userOrOpts as GenerateTextOptions | undefined; + } +} + +describe('OpenAIFlex', () => { + const messages: LlmMessage[] = [{ role: 'user', content: 'hello' }]; + + it('uses flex response when first chunk arrives before timeout', async () => { + let streamed = ''; + const flexLLM = new TestLLM( + 'flex', + 'flex-model', + async () => 'unused', + async (_msgs, onChunk) => { + onChunk({ type: 'text-delta', id: '1', text: 'flex-response' }); + streamed += 'flex-response'; + return DEFAULT_STATS; + }, + ); + const standardLLM = new TestLLM( + 'standard', + 'std-model', + async () => 'standard-response', + async (_msgs, _onChunk) => DEFAULT_STATS, + ); + + const flex = new OpenAIFlex('Flex Under Test', 'flex-test', standardLLM, flexLLM, 200); + const response = await flex.generateTextFromMessages(messages); + const metrics = flex.getMetrics(); + + expect(response).to.equal('flex-response'); + expect(streamed).to.equal('flex-response'); + expect(metrics.flexAttempts).to.equal(1); + expect(metrics.flexFallbacks).to.equal(0); + expect(metrics.flexResponses).to.equal(1); + expect(metrics.lastFlexResponseMs).to.be.a('number'); + }); + + it('falls back to standard when flex times out before first chunk', async () => { + const flexLLM = new TestLLM( + 'flex', + 'flex-model', + async () => 'unused', + async (_msgs, _onChunk, opts) => + await new Promise((_resolve, reject) => { + opts?.abortSignal?.addEventListener('abort', () => reject(new Error('aborted'))); + }), + ); + const standardLLM = new TestLLM( + 'standard', + 'std-model', + async () => 'standard-response', + async (_msgs, _onChunk) => DEFAULT_STATS, + ); + + const flex = new OpenAIFlex('Flex Under Test', 'flex-test', standardLLM, flexLLM, 50); + const response = await flex.generateTextFromMessages(messages); + const metrics = flex.getMetrics(); + + expect(response).to.equal('standard-response'); + expect(metrics.flexAttempts).to.equal(1); + expect(metrics.flexFallbacks).to.equal(1); + expect(metrics.flexResponses).to.equal(0); + }); + + it('falls back if flex fails after first chunk', async () => { + const flexLLM = new TestLLM( + 'flex', + 'flex-model', + async () => 'unused', + async (_msgs, onChunk) => + await new Promise((_resolve, reject) => { + onChunk({ type: 'text-delta', id: '1', text: 'partial' }); + setTimeout(() => reject(new Error('boom')), 0); + }), + ); + const standardLLM = new TestLLM( + 'standard', + 'std-model', + async () => 'standard-response', + async (_msgs, _onChunk) => DEFAULT_STATS, + ); + + const flex = new OpenAIFlex('Flex Under Test', 'flex-test', standardLLM, flexLLM, 200); + const response = await flex.generateTextFromMessages(messages); + const metrics: FlexMetricsSnapshot = flex.getMetrics(); + + expect(response).to.equal('standard-response'); + expect(metrics.flexFallbacks).to.equal(1); + expect(metrics.flexResponses).to.equal(1); + }); + + it('streams from standard when flex times out', async () => { + const flexLLM = new TestLLM( + 'flex', + 'flex-model', + async () => 'unused', + async (_msgs, _onChunk, opts) => + await new Promise((_resolve, reject) => { + opts?.abortSignal?.addEventListener('abort', () => reject(new Error('aborted'))); + }), + ); + + const standardLLM = new TestLLM( + 'standard', + 'std-model', + async () => 'standard-response', + async (_msgs, onChunk) => { + onChunk({ type: 'text-delta', id: '1', text: 'S' }); + return DEFAULT_STATS; + }, + ); + + let streamed = ''; + const flex = new OpenAIFlex('Flex Under Test', 'flex-test', standardLLM, flexLLM, 30); + const stats = await flex.streamText(messages, (chunk) => { + if (chunk.type === 'text-delta') streamed += chunk.text; + }); + + expect(streamed).to.equal('S'); + expect(stats.llmId).to.equal('test:model'); + const metrics = flex.getMetrics(); + expect(metrics.flexFallbacks).to.equal(1); + }); +}); diff --git a/src/llm/multi-agent/openaiFlex.ts b/src/llm/multi-agent/openaiFlex.ts index e08c28bc..e703075c 100644 --- a/src/llm/multi-agent/openaiFlex.ts +++ b/src/llm/multi-agent/openaiFlex.ts @@ -1,9 +1,8 @@ import type { TextStreamPart } from 'ai'; import { BaseLLM } from '#llm/base-llm'; -import { openaiGPT5, openaiGPT5mini, openaiGPT5nano } from '#llm/services/openai'; +import { openaiGPT5, openaiGPT5mini } from '#llm/services/openai'; import { logger } from '#o11y/logger'; -import type { GenerateTextOptions, GenerationStats, LLM, LlmCostFunction, LlmMessage } from '#shared/llm/llm.model'; -import { sleep } from '#utils/async-utils'; +import type { GenerateTextOptions, GenerationStats, LLM, LlmMessage } from '#shared/llm/llm.model'; export const OPENAI_FLEX_SERVICE = 'openai_flex'; @@ -25,12 +24,8 @@ export function openAIFlexGPT5Mini(): LLM { // return new OpenAIFlex('GPT5 Nano Flex', 'gpt-5-nano', openaiGPT5nano(), openaiGPT5nano('flex')); // } -/** - * Flex processing - -Beta - -======================= +/* +Flex processing (OpenAI documentation) Optimize costs with flex processing. @@ -81,28 +76,97 @@ Consider implementing these strategies for handling resource unavailable errors: * **Retry requests with exponential backoff**: Implementing exponential backoff is suitable for workloads that can tolerate delays and aims to minimize costs, as your request can eventually complete when more capacity is available. For implementation details, see [this cookbook](https://cookbook.openai.com/examples/how_to_handle_rate_limits?utm_source=chatgpt.com#retrying-with-exponential-backoff). * **Retry requests with standard processing**: When receiving a resource unavailable error, implement a retry strategy with standard processing if occasional higher costs are worth ensuring successful completion for your use case. To do so, set `service_tier` to `auto` in the retried request, or remove the `service_tier` parameter to use the default mode for the project. - */ + +# Stopping streams + +import { openai } from '@ai-sdk/openai'; +import { streamText } from 'ai'; + +export async function POST(req: Request) { + const { prompt } = await req.json(); + + const result = streamText({ + model: openai('gpt-4.1'), + prompt, + // forward the abort signal: + abortSignal: req.signal, + onAbort: ({ steps }) => { + // Handle cleanup when stream is aborted + console.log('Stream aborted after', steps.length, 'steps'); + // Persist partial results to database + }, + }); + + return result.toTextStreamResponse(); +} + +(End OpenAI documentation) +*/ + +/* +# OpenAIFlex class requirements + +This class uses a combination GPT5 with the standard and flex service tiers, to reduce costs. +Call(s) are made with the flex tier, up to a timeout configured in the class. +If there has not been a response with the flex tier configuration, then a call will be made with the standard tier to ensure progress continues. + +The generateTextFromMessages implementation will call the flex tier first in streaming mode. If the flex tier call has started to recieve a response, +then we will cancel any timeouts and continue with the flex tier call. + +If the flex tier call does not start to recieve a response within the timeout, then we will cancel the flex tier call and make a call with the standard tier. + +We want to be able to collect statistics on how long it takes for the flex tier to start recieving a response, and how often we need to fallback to the standard tier. +*/ + +interface StreamAttemptConfig { + collectText?: boolean; + onChunk?: (chunk: TextStreamPart) => void; +} + +interface StreamAttemptResult { + stats: GenerationStats; + text?: string; +} + +interface StreamAttempt { + completion: Promise; + firstChunk: Promise; + hasFirstChunk(): boolean; + abort(): void; + suppressCompletionErrors(): void; +} + +export interface FlexMetricsSnapshot { + flexAttempts: number; + flexFallbacks: number; + flexResponses: number; + lastFlexResponseMs?: number; + averageFlexResponseMs?: number; +} export class OpenAIFlex extends BaseLLM { private readonly flexTimeoutMs: number; + private readonly metrics = { + flexAttempts: 0, + flexFallbacks: 0, + flexResponses: 0, + flexResponseTotalMs: 0, + lastFlexResponseMs: undefined as number | undefined, + }; constructor( displayName: string, model: string, - private standardLLM: LLM, - private flexLLM: LLM, - timeoutMs?: number, // optional override for total flex retry window + private readonly standardLLM: LLM, + private readonly flexLLM: LLM, + timeoutMs?: number, ) { super({ displayName, service: OPENAI_FLEX_SERVICE, modelId: model, maxInputTokens: standardLLM.getMaxInputTokens(), - calculateCosts: () => ({ - inputCost: 0, - outputCost: 0, - totalCost: 0, - }), + calculateCosts: () => ({ inputCost: 0, outputCost: 0, totalCost: 0 }), }); this.flexTimeoutMs = timeoutMs ?? DEFAULT_FLEX_TIMEOUT_MS; } @@ -112,191 +176,223 @@ export class OpenAIFlex extends BaseLLM { } override isConfigured(): boolean { - return openaiGPT5().isConfigured(); + return this.standardLLM.isConfigured() && this.flexLLM.isConfigured(); + } + + getMetrics(): FlexMetricsSnapshot { + const { flexAttempts, flexFallbacks, flexResponses, flexResponseTotalMs, lastFlexResponseMs } = this.metrics; + return { + flexAttempts, + flexFallbacks, + flexResponses, + lastFlexResponseMs, + averageFlexResponseMs: flexResponses > 0 ? Math.round(flexResponseTotalMs / flexResponses) : undefined, + }; } override async generateTextFromMessages(messages: LlmMessage[], opts?: GenerateTextOptions): Promise { - const start = Date.now(); - let attempt = 0; - let backoff = 1000; // start 1s - const maxBackoff = 16_000; // cap 16s - - while (Date.now() - start < this.flexTimeoutMs) { - attempt++; - try { - // Ensure provider-specific timeout for this attempt does not exceed remaining time - const elapsed = Date.now() - start; - const remaining = Math.max(0, this.flexTimeoutMs - elapsed); - - // Clone options and ensure providerOptions.openai.timeout is set to remaining (if consumer hasn't already set a smaller timeout) - const attemptOpts: GenerateTextOptions = { ...(opts ?? {}) }; - attemptOpts.providerOptions = { ...(opts?.providerOptions ?? {}) }; - attemptOpts.providerOptions.openai = { ...(attemptOpts.providerOptions.openai ?? {}) }; - - // If no explicit timeout was specified, set it to remaining ms for this attempt. - // If consumer set a timeout smaller than remaining, respect it. - if (typeof attemptOpts.providerOptions.openai.timeout !== 'number') { - attemptOpts.providerOptions.openai.timeout = remaining; - } else { - attemptOpts.providerOptions.openai.timeout = Math.min(attemptOpts.providerOptions.openai.timeout, remaining); - } - - return await this.flexLLM.generateText(messages, attemptOpts); - } catch (err: any) { - // If 429/resource-unavailable => retry until flexTimeoutMs - if (is429Error(err)) { - const elapsed = Date.now() - start; - if (elapsed >= this.flexTimeoutMs) { - // timed out trying flex - logger.info(`Flex provider ${this.flexLLM.getDisplayName()} timed out after ${elapsed}ms; falling back to standard provider.`); - break; - } - // wait with backoff but not longer than remaining time - const wait = Math.min(backoff, Math.max(0, this.flexTimeoutMs - elapsed)); - logger.warn(`Flex provider ${this.flexLLM.getDisplayName()} returned 429 (attempt ${attempt}). Retrying in ${wait}ms...`); - await sleep(wait); - backoff = Math.min(backoff * 2, maxBackoff); - continue; - } - - // Non-429 error: immediately fallback to standard provider, but compute remaining time and pass it as timeout - logger.error(`Flex provider ${this.flexLLM.getDisplayName()} failed with error: ${err?.message ?? err}. Falling back to standard provider.`); - const elapsed = Date.now() - start; - const remaining = Math.max(0, this.flexTimeoutMs - elapsed); - - const standardOpts: GenerateTextOptions = { ...(opts ?? {}) }; - standardOpts.providerOptions = { ...(opts?.providerOptions ?? {}) }; - standardOpts.providerOptions.openai = { ...(standardOpts.providerOptions.openai ?? {}) }; - // give standard provider the remaining allowable time (if any). If remaining==0, don't override a consumer timeout. - if (remaining > 0 && typeof standardOpts.providerOptions.openai.timeout !== 'number') { - standardOpts.providerOptions.openai.timeout = remaining; - } - return await this.standardLLM.generateText(messages, standardOpts); - } + const startTime = Date.now(); + this.recordFlexAttempt(); + + const flexAttempt = this.createStreamAttempt(this.flexLLM, messages, opts, { collectText: true }); + let fallbackReason: 'timeout' | 'error-before-response' | null = null; + + const timeoutHandle = this.createTimeout(() => { + if (flexAttempt.hasFirstChunk()) return; + fallbackReason = 'timeout'; + flexAttempt.abort(); + }, this.flexTimeoutMs); + + try { + const firstChunkTimestamp = await flexAttempt.firstChunk; + clearTimeout(timeoutHandle); + this.recordFlexResponse(firstChunkTimestamp - startTime); + } catch (error) { + clearTimeout(timeoutHandle); + if (!fallbackReason) fallbackReason = 'error-before-response'; } - // If loop exits (due to timeout), fall back to standard provider and supply remaining time (likely 0) - const elapsed = Date.now() - start; - const remaining = Math.max(0, this.flexTimeoutMs - elapsed); - logger.info(`Flex provider attempts exhausted after ${elapsed}ms; falling back to standard provider.`); + if (fallbackReason) { + this.recordFallback(fallbackReason); + flexAttempt.abort(); + flexAttempt.suppressCompletionErrors(); + return await this.standardLLM.generateText(messages, opts); + } - const finalOpts: GenerateTextOptions = { ...(opts ?? {}) }; - finalOpts.providerOptions = { ...(opts?.providerOptions ?? {}) }; - finalOpts.providerOptions.openai = { ...(finalOpts.providerOptions.openai ?? {}) }; - if (remaining > 0 && typeof finalOpts.providerOptions.openai.timeout !== 'number') { - finalOpts.providerOptions.openai.timeout = remaining; + try { + const result = await flexAttempt.completion; + return result.text ?? ''; + } catch (error) { + this.recordFallback('error-after-response'); + return await this.standardLLM.generateText(messages, opts); } - return await this.standardLLM.generateText(messages, finalOpts); } - /** - * Stream from flex provider, but if no chunk arrives within flexTimeoutMs, - * start standard provider and stream from whichever delivers first. - * The "losing" stream is ignored (not canceled). - */ override async streamText( messages: LlmMessage[] | ReadonlyArray, onChunk: (chunk: TextStreamPart) => void, opts?: GenerateTextOptions, ): Promise { - const start = Date.now(); - let selected: 'flex' | 'standard' | null = null; + const startTime = Date.now(); + this.recordFlexAttempt(); - // Kick off flex stream immediately - const flexPromise = this.flexLLM.streamText( - messages as LlmMessage[], - (chunk) => { - if (selected === null) selected = 'flex'; + let selected: 'flex' | 'standard' | null = null; + const flexAttempt = this.createStreamAttempt(this.flexLLM, messages as LlmMessage[], opts, { + onChunk: (chunk) => { + if (selected === null && this.isMeaningfulChunk(chunk)) selected = 'flex'; if (selected === 'flex') onChunk(chunk); }, - opts, - ); + }); - let standardStarted = false; - let standardPromise: Promise | undefined; + let fallbackReason: 'timeout' | 'error-before-response' | null = null; + const timeoutHandle = this.createTimeout(() => { + if (flexAttempt.hasFirstChunk()) return; + fallbackReason = 'timeout'; + flexAttempt.abort(); + }, this.flexTimeoutMs); - const startStandard = () => { - if (standardStarted) return; - standardStarted = true; - standardPromise = this.standardLLM.streamText( - messages as LlmMessage[], - (chunk) => { - if (selected === null) selected = 'standard'; - if (selected === 'standard') onChunk(chunk); - }, - opts, - ); - }; + let firstChunkArrived = false; + try { + const firstChunkTimestamp = await flexAttempt.firstChunk; + clearTimeout(timeoutHandle); + firstChunkArrived = true; + this.recordFlexResponse(firstChunkTimestamp - startTime); + } catch (error) { + clearTimeout(timeoutHandle); + if (!fallbackReason) fallbackReason = 'error-before-response'; + } - // Hedge: give flex up to flexTimeoutMs to deliver first chunk - const elapsed = Date.now() - start; - const remaining = Math.max(0, this.flexTimeoutMs - elapsed); - const hedgeTimer = setTimeout(() => { - if (selected === null) startStandard(); - }, remaining); + if (!firstChunkArrived) { + this.recordFallback(fallbackReason ?? 'error-before-response'); + flexAttempt.abort(); + flexAttempt.suppressCompletionErrors(); + selected = 'standard'; + return await this.standardLLM.streamText(messages as LlmMessage[], onChunk, opts); + } try { - // Wait for whichever selected stream completes - const stats = await new Promise((resolve, reject) => { - flexPromise - .then((s) => { - if (selected === 'flex' || (selected === null && !standardStarted)) { - selected = 'flex'; - resolve(s); - } - }) - .catch((err) => { - // If flex fails before selection, start standard immediately - if (selected === null && !standardStarted) { - startStandard(); - } - // Only reject if standard is not running/selected - if (selected !== 'standard') { - // Do not reject yet; wait for standard if available - // no-op here - } - }); - - const waitStandard = async () => { - if (!standardStarted) return; - try { - const s = await standardPromise!; - if (selected === 'standard') { - resolve(s); - } else if (selected === null) { - selected = 'standard'; - resolve(s); - } - } catch (e) { - if (selected !== 'flex') { - reject(e); - } - } - }; + const { stats } = await flexAttempt.completion; + return stats; + } catch (error) { + this.recordFallback('error-after-response'); + selected = 'standard'; + return await this.standardLLM.streamText(messages as LlmMessage[], onChunk, opts); + } + } - // Poll for when standard starts - const checkInterval = setInterval(() => { - if (standardStarted) { - clearInterval(checkInterval); - waitStandard(); + private createStreamAttempt( + llm: LLM, + messages: LlmMessage[] | ReadonlyArray, + opts: GenerateTextOptions | undefined, + config: StreamAttemptConfig, + ): StreamAttempt { + const abortController = new AbortController(); + const attemptOpts = this.cloneOptionsWithAbort(opts, abortController.signal); + let firstChunkSeen = false; + let firstChunkSettled = false; + let aborted = false; + let textBuffer = ''; + + let resolveFirstChunk: (value: number) => void; + let rejectFirstChunk: (reason?: unknown) => void; + const firstChunk = new Promise((resolve, reject) => { + resolveFirstChunk = resolve; + rejectFirstChunk = reject; + }); + + const settleFirstChunk = (value: number) => { + if (firstChunkSettled) return; + firstChunkSettled = true; + resolveFirstChunk(value); + }; + + const failFirstChunk = (reason: unknown) => { + if (firstChunkSettled) return; + firstChunkSettled = true; + rejectFirstChunk(reason); + }; + + const completion = llm + .streamText( + messages as LlmMessage[], + (chunk) => { + if (!firstChunkSeen && this.isMeaningfulChunk(chunk)) { + firstChunkSeen = true; + settleFirstChunk(Date.now()); } - }, 10); + if (config.collectText && chunk.type === 'text-delta') textBuffer += chunk.text; + config.onChunk?.(chunk); + }, + attemptOpts, + ) + .then((stats) => { + if (!firstChunkSeen) failFirstChunk(new Error('flex-no-response')); + return { + stats, + text: config.collectText ? textBuffer : undefined, + }; + }) + .catch((error) => { + failFirstChunk(error); + throw error; }); - return stats; - } finally { - clearTimeout(hedgeTimer); + return { + completion, + firstChunk, + hasFirstChunk: () => firstChunkSeen, + abort: () => { + if (aborted) return; + aborted = true; + abortController.abort(); + failFirstChunk(new Error('flex-aborted')); + }, + suppressCompletionErrors: () => { + void completion.catch(() => undefined); + }, + }; + } + + private cloneOptionsWithAbort(opts: GenerateTextOptions | undefined, abortSignal: AbortSignal): GenerateTextOptions { + const cloned: GenerateTextOptions = { ...(opts ?? {}) }; + if (opts?.providerOptions) cloned.providerOptions = { ...opts.providerOptions }; + cloned.abortSignal = abortSignal; + return cloned; + } + + private createTimeout(onTimeout: () => void, timeoutMs: number): NodeJS.Timeout { + return setTimeout(onTimeout, timeoutMs); + } + + private isMeaningfulChunk(chunk: TextStreamPart): boolean { + switch (chunk.type) { + case 'text-delta': + case 'reasoning-delta': + case 'tool-call': + case 'tool-result': + case 'tool-error': + case 'source': + case 'file': + case 'tool-input-delta': + return true; + default: + return false; } } -} -function is429Error(err: any): boolean { - if (!err) return false; - if (err.status === 429 || err.statusCode === 429) return true; - if (err.response && (err.response.status === 429 || err.response.statusCode === 429)) return true; - // Some providers use specific codes/messages for resource unavailable - if (err.code === 'RESOURCE_UNAVAILABLE' || err.code === '429') return true; - const msg = (err.message || '').toString().toLowerCase(); - if (/resource.*unavailable|rate.*limit|too many requests|429/.test(msg)) return true; - return false; + private recordFlexAttempt(): void { + this.metrics.flexAttempts += 1; + } + + private recordFlexResponse(responseMs: number): void { + if (Number.isNaN(responseMs)) return; + this.metrics.flexResponses += 1; + this.metrics.flexResponseTotalMs += responseMs; + this.metrics.lastFlexResponseMs = responseMs; + } + + private recordFallback(reason: string): void { + this.metrics.flexFallbacks += 1; + logger.info({ reason, model: this.getId() }, 'OpenAIFlex fallback to standard tier triggered.'); + } } diff --git a/src/llm/services/anthropic-vertex.ts b/src/llm/services/anthropic-vertex.ts index b2883e42..6e4c529d 100644 --- a/src/llm/services/anthropic-vertex.ts +++ b/src/llm/services/anthropic-vertex.ts @@ -12,7 +12,7 @@ export const ANTHROPIC_VERTEX_SERVICE = 'anthropic-vertex'; export function anthropicVertexLLMRegistry(): Record LLM> { return { - [`${ANTHROPIC_VERTEX_SERVICE}:claude-3-5-haiku`]: Claude3_5_Haiku_Vertex, + [`${ANTHROPIC_VERTEX_SERVICE}:claude-haiku-4-5@20251001`]: Claude4_5_Haiku_Vertex, [`${ANTHROPIC_VERTEX_SERVICE}:claude-sonnet-4-5@20250929`]: Claude4_5_Sonnet_Vertex, [`${ANTHROPIC_VERTEX_SERVICE}:claude-opus-4-1@20250805`]: Claude4_1_Opus_Vertex, }; @@ -33,8 +33,10 @@ export function Claude4_5_Sonnet_Vertex(): LLM { } // https://cloud.google.com/vertex-ai/generative-ai/docs/partner-models/claude/haiku-3-5 -export function Claude3_5_Haiku_Vertex(): LLM { - return new AnthropicVertexLLM('Claude 3.5 Haiku (Vertex)', 'claude-3-5-haiku@20241022', 200_000, 8_192, anthropicCostFunction(1, 5)); +export function Claude4_5_Haiku_Vertex(): LLM { + return new AnthropicVertexLLM('Claude 4.5 Haiku (Vertex)', 'claude-haiku-4-5@20251001', 200_000, 64_000, anthropicCostFunction(1, 5), [ + 'claude-3-5-haiku@20241022', + ]); } export function anthropicCostFunction(inputMil: number, outputMil: number): LlmCostFunction { @@ -60,7 +62,7 @@ export function anthropicCostFunction(inputMil: number, outputMil: number): LlmC export function ClaudeVertexLLMs(): AgentLLMs { return { - easy: Claude3_5_Haiku_Vertex(), + easy: Claude4_5_Haiku_Vertex(), medium: Claude4_5_Sonnet_Vertex(), hard: Claude4_5_Sonnet_Vertex(), xhard: Claude4_1_Opus_Vertex(), diff --git a/src/llm/services/anthropic.ts b/src/llm/services/anthropic.ts index a851b1aa..84305717 100644 --- a/src/llm/services/anthropic.ts +++ b/src/llm/services/anthropic.ts @@ -10,7 +10,7 @@ export const ANTHROPIC_SERVICE = 'anthropic'; export function anthropicLLMRegistry(): Record LLM> { return { - [`${ANTHROPIC_SERVICE}:claude-3-5-haiku`]: Claude3_5_Haiku, + [`${ANTHROPIC_SERVICE}:claude-haiku-4-5-20251001`]: anthropicClaude4_5_Haiku, [`${ANTHROPIC_SERVICE}:claude-sonnet-4-5-20250929`]: anthropicClaude4_5_Sonnet, [`${ANTHROPIC_SERVICE}:claude-opus-4-1-20250805`]: anthropicClaude4_1_Opus, }; @@ -24,8 +24,8 @@ export function anthropicClaude4_5_Sonnet(): LLM { return new Anthropic('Claude 4.5 Sonnet (Anthropic)', 'claude-sonnet-4-5-20250929', 200_000, 64_000, anthropicCostFunction(3, 15), ['claude-sonnet-4']); } -export function Claude3_5_Haiku(): LLM { - return new Anthropic('Claude 3.5 Haiku', 'claude-3-5-haiku-20241022', 200_000, 8_192, anthropicCostFunction(1, 5)); +export function anthropicClaude4_5_Haiku(): LLM { + return new Anthropic('Claude 4.5 Haiku', 'claude-haiku-4-5-20251001', 200_000, 64_000, anthropicCostFunction(1, 5), ['claude-3-5-haiku-20241022']); } function anthropicCostFunction(inputMil: number, outputMil: number): LlmCostFunction { @@ -48,7 +48,7 @@ export function ClaudeLLMs(): AgentLLMs { const sonnet4 = anthropicClaude4_5_Sonnet(); const opus = anthropicClaude4_1_Opus(); return { - easy: Claude3_5_Haiku(), + easy: anthropicClaude4_5_Haiku(), medium: sonnet4, hard: sonnet4, xhard: new MultiLLM([sonnet4], 3), diff --git a/src/llm/services/defaultLlms.ts b/src/llm/services/defaultLlms.ts index ef9cf8a1..49cd3f6f 100644 --- a/src/llm/services/defaultLlms.ts +++ b/src/llm/services/defaultLlms.ts @@ -2,7 +2,7 @@ import { FastEasyLLM } from '#llm/multi-agent/fastEasy'; import { FastMediumLLM } from '#llm/multi-agent/fastMedium'; import { MAD_Anthropic, MAD_Balanced, MAD_Fast, MAD_Grok, MAD_OpenAI, MAD_Vertex } from '#llm/multi-agent/reasoning-debate'; // import { MAD_Balanced, MAD_Vertex, MAD_Anthropic, MAD_OpenAI, MAD_Grok, MAD_Fast } from '#llm/multi-agent/reasoning-debate'; -import { Claude3_5_Haiku, anthropicClaude4_5_Sonnet } from '#llm/services/anthropic'; +import { anthropicClaude4_5_Haiku, anthropicClaude4_5_Sonnet } from '#llm/services/anthropic'; import { vertexGemini_2_5_Flash, vertexGemini_2_5_Pro } from '#llm/services/vertexai'; import { logger } from '#o11y/logger'; import type { AgentLLMs } from '#shared/agent/agent.model'; @@ -32,11 +32,18 @@ export function defaultLLMs(): AgentLLMs { // return _defaultLLMs; // } - const easyLLMs = [new FastEasyLLM(), vertexGemini_2_5_Flash(), Gemini_2_5_Flash(), groqLlama4_Scout(), openaiGPT5nano(), Claude3_5_Haiku()]; + const easyLLMs = [new FastEasyLLM(), vertexGemini_2_5_Flash(), Gemini_2_5_Flash(), groqLlama4_Scout(), openaiGPT5nano(), anthropicClaude4_5_Haiku()]; const easy: LLM | undefined = easyLLMs.find((llm) => llm.isConfigured()); if (!easy) throw new Error('No default easy LLM configured'); - const mediumLLMs = [new FastMediumLLM(), vertexGemini_2_5_Flash(), Gemini_2_5_Flash(), cerebrasQwen3_235b_Thinking(), openaiGPT5mini(), Claude3_5_Haiku()]; + const mediumLLMs = [ + new FastMediumLLM(), + vertexGemini_2_5_Flash(), + Gemini_2_5_Flash(), + cerebrasQwen3_235b_Thinking(), + openaiGPT5mini(), + anthropicClaude4_5_Haiku(), + ]; const medium: LLM | undefined = mediumLLMs.find((llm) => llm.isConfigured()); if (!medium) throw new Error('No default medium LLM configured');