diff --git a/cloud-v2/docs/runbooks/doppler/porter-integration.md b/cloud-v2/docs/runbooks/doppler/porter-integration.md index 7d466872d0..9fe3ca050f 100644 --- a/cloud-v2/docs/runbooks/doppler/porter-integration.md +++ b/cloud-v2/docs/runbooks/doppler/porter-integration.md @@ -157,7 +157,7 @@ MONGO_URL, REDIS_URL NODE_ENV, LOG_LEVEL, LOG_STDOUT_JSON, REGION -SONIOX_API_KEY +SONIOX_API_KEY, SONIOX_FALLBACK_API_KEYS, SONIOX_MODEL BETTERSTACK_PASSWORD, BETTERSTACK_USERNAME, BETTERSTACK_SOURCE_TOKEN diff --git a/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.test.ts b/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.test.ts new file mode 100644 index 0000000000..6977172c63 --- /dev/null +++ b/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.test.ts @@ -0,0 +1,121 @@ +import { describe, expect, test } from "bun:test"; + +import { + SonioxKeyPool, + classifySonioxCredentialFailure, + parseSonioxFallbackApiKeys, +} from "./soniox-key-pool"; + +describe("SonioxKeyPool", () => { + test("parses comma-separated fallback keys", () => { + expect(parseSonioxFallbackApiKeys(" a, b ,, c ")).toEqual(["a", "b", "c"]); + expect(parseSonioxFallbackApiKeys(undefined)).toEqual([]); + }); + + test("prefers the primary key while available", () => { + const pool = new SonioxKeyPool("primary", ["fallback-a", "fallback-b"]); + + const credential = pool.selectCredential(new Set(), 1_000); + + expect(credential?.role).toBe("primary"); + }); + + test("round-robins fallback keys when primary is cooling down", () => { + const pool = new SonioxKeyPool("primary", ["fallback-a", "fallback-b"]); + const primary = pool.selectCredential(new Set(), 1_000)!; + pool.recordFailure(primary.id, new Error("Soniox error 429: rate limit"), 1_000); + + const first = pool.selectCredential(new Set(), 1_000); + const second = pool.selectCredential(new Set(), 1_000); + const third = pool.selectCredential(new Set(), 1_000); + + expect(first?.role).toBe("fallback"); + expect(second?.role).toBe("fallback"); + expect(third?.role).toBe("fallback"); + expect(first?.id).not.toBe(second?.id); + expect(third?.id).toBe(first?.id); + }); + + test("deduplicates fallback keys that match the primary", () => { + const pool = new SonioxKeyPool("primary", ["primary", "fallback"]); + + expect(pool.size).toBe(2); + }); + + test("makes concurrency failures available again after a short cooldown", () => { + const pool = new SonioxKeyPool("primary", ["fallback"]); + const primary = pool.selectCredential(new Set(), 1_000)!; + pool.recordFailure( + primary.id, + new Error("Soniox error 429: maximum concurrent streams reached"), + 1_000, + ); + + expect(pool.selectCredential(new Set(), 1_000)?.role).toBe("fallback"); + expect(pool.selectCredential(new Set(), 6_001)?.role).toBe("primary"); + }); + + test("disables invalid keys for the process", () => { + const pool = new SonioxKeyPool("primary", ["fallback"]); + const primary = pool.selectCredential(new Set(), 1_000)!; + pool.recordFailure(primary.id, new Error("Soniox error 401: invalid api key"), 1_000); + + const availability = pool + .describeAvailability(10_000) + .find((item) => item.id === primary.id); + + expect(availability?.disabled).toBe(true); + expect(availability?.available).toBe(false); + expect(pool.selectCredential(new Set(), 10_000)?.role).toBe("fallback"); + }); + + test("does not let overlapping success clear active cooldown", () => { + const pool = new SonioxKeyPool("primary", ["fallback"]); + const primary = pool.selectCredential(new Set(), 1_000)!; + pool.recordFailure( + primary.id, + new Error("Soniox error 402: organization_monthly_budget_exhausted"), + 1_000, + ); + + pool.recordSuccess(primary.id, 2_000); + + const availability = pool + .describeAvailability(2_000) + .find((item) => item.id === primary.id); + expect(availability?.failureKind).toBe("quota"); + expect(availability?.available).toBe(false); + expect(pool.selectCredential(new Set(), 2_000)?.role).toBe("fallback"); + }); +}); + +describe("classifySonioxCredentialFailure", () => { + test("classifies quota exhaustion separately from request rate limits", () => { + expect(classifySonioxCredentialFailure(new Error("Monthly quota exceeded")).kind).toBe( + "quota", + ); + expect( + classifySonioxCredentialFailure( + new Error("Soniox error 402: Organization monthly budget exhausted"), + ).kind, + ).toBe("quota"); + expect( + classifySonioxCredentialFailure(new Error("Soniox error 429: rate limit exhausted")).kind, + ).toBe("rate_limit"); + }); + + test("treats concurrent stream errors as temporary capacity errors", () => { + expect(classifySonioxCredentialFailure(new Error("Too many concurrent streams")).kind).toBe( + "concurrency", + ); + }); + + test("does not treat generic exhaustion as quota", () => { + expect(classifySonioxCredentialFailure(new Error("retries exhausted")).kind).toBe( + "transient", + ); + expect(classifySonioxCredentialFailure(new Error("connection suspended")).kind).toBe( + "transient", + ); + }); +}); diff --git a/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.ts b/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.ts new file mode 100644 index 0000000000..0b91094aec --- /dev/null +++ b/cloud-v2/packages/runtime/src/services/audio/providers/soniox-key-pool.ts @@ -0,0 +1,249 @@ +import crypto from "node:crypto"; + +export type SonioxCredentialRole = "primary" | "fallback"; + +export interface SonioxCredential { + id: string; + apiKey: string; + role: SonioxCredentialRole; +} + +type SonioxCredentialFailureKind = + | "auth" + | "concurrency" + | "quota" + | "rate_limit" + | "transient"; + +interface SonioxCredentialState extends SonioxCredential { + cooldownUntil: number; + disabled: boolean; + failureKind?: SonioxCredentialFailureKind; + lastFailureMessage?: string; +} + +export interface SonioxCredentialFailureClassification { + kind: SonioxCredentialFailureKind; + cooldownMs: number; + disabled?: boolean; +} + +const CONCURRENCY_COOLDOWN_MS = 5_000; +const RATE_LIMIT_COOLDOWN_MS = 60_000; +const QUOTA_COOLDOWN_MS = 30 * 60_000; +const TRANSIENT_COOLDOWN_MS = 10_000; + +export function parseSonioxFallbackApiKeys(value: string | undefined): string[] { + if (!value) return []; + return value + .split(",") + .map((key) => key.trim()) + .filter(Boolean); +} + +export function fingerprintSonioxKey(apiKey: string): string { + return crypto.createHash("sha256").update(apiKey).digest("hex").slice(0, 12); +} + +export function classifySonioxCredentialFailure( + error: Error, +): SonioxCredentialFailureClassification { + const message = error.message || ""; + const lower = message.toLowerCase(); + const code = extractSonioxErrorCode(message); + + if ( + code === 401 || + lower.includes("invalid api key") || + lower.includes("invalid_api_key") || + lower.includes("bad api key") || + lower.includes("unauthorized") + ) { + return { kind: "auth", cooldownMs: Number.POSITIVE_INFINITY, disabled: true }; + } + + if ( + lower.includes("concurrent") || + lower.includes("concurrency") || + lower.includes("connection limit") || + lower.includes("stream limit") || + lower.includes("too many streams") || + lower.includes("maximum streams") || + lower.includes("max streams") + ) { + return { kind: "concurrency", cooldownMs: CONCURRENCY_COOLDOWN_MS }; + } + + if ( + code === 429 || + lower.includes("rate limit") || + lower.includes("rate_limit") || + lower.includes("too many requests") + ) { + return { kind: "rate_limit", cooldownMs: RATE_LIMIT_COOLDOWN_MS }; + } + + if ( + code === 402 || + /\bquota\b/.test(lower) || + /\bbudget\b/.test(lower) || + /\bcredit(?:s)?\b/.test(lower) || + /\bbilling\b/.test(lower) || + /\bspend(?:ing)?\b/.test(lower) || + /\bbalance\b/.test(lower) || + lower.includes("usage limit") || + lower.includes("monthly limit") + ) { + return { kind: "quota", cooldownMs: QUOTA_COOLDOWN_MS }; + } + + return { kind: "transient", cooldownMs: TRANSIENT_COOLDOWN_MS }; +} + +export class SonioxKeyPool { + private credentials: SonioxCredentialState[]; + private nextFallbackIndex = 0; + + constructor(primaryApiKey: string, fallbackApiKeys: string[] = []) { + const seen = new Set(); + const credentials: SonioxCredentialState[] = []; + + const addCredential = (apiKey: string, role: SonioxCredentialRole): void => { + const trimmed = apiKey.trim(); + if (!trimmed || seen.has(trimmed)) return; + seen.add(trimmed); + credentials.push({ + id: fingerprintSonioxKey(trimmed), + apiKey: trimmed, + role, + cooldownUntil: 0, + disabled: false, + }); + }; + + addCredential(primaryApiKey, "primary"); + for (const key of fallbackApiKeys) { + addCredential(key, "fallback"); + } + + this.credentials = credentials; + } + + get size(): number { + return this.credentials.length; + } + + get hasFallbacks(): boolean { + return this.credentials.some((credential) => credential.role === "fallback"); + } + + selectCredential( + attempted = new Set(), + now = Date.now(), + ): SonioxCredential | null { + const primary = this.credentials.find( + (credential) => credential.role === "primary", + ); + if (primary && !attempted.has(primary.id) && this.isAvailable(primary, now)) { + return this.toPublicCredential(primary); + } + + const fallbackCredentials = this.credentials.filter( + (credential) => credential.role === "fallback", + ); + if (fallbackCredentials.length === 0) return null; + + for (let offset = 0; offset < fallbackCredentials.length; offset += 1) { + const index = (this.nextFallbackIndex + offset) % fallbackCredentials.length; + const credential = fallbackCredentials[index]; + if (attempted.has(credential.id) || !this.isAvailable(credential, now)) { + continue; + } + + this.nextFallbackIndex = (index + 1) % fallbackCredentials.length; + return this.toPublicCredential(credential); + } + + return null; + } + + recordSuccess(credentialId: string, now = Date.now()): void { + const credential = this.findCredential(credentialId); + if (!credential || credential.disabled) return; + if (credential.cooldownUntil > now) return; + credential.cooldownUntil = 0; + credential.failureKind = undefined; + credential.lastFailureMessage = undefined; + } + + recordFailure( + credentialId: string, + error: Error, + now = Date.now(), + ): SonioxCredentialFailureClassification | null { + const credential = this.findCredential(credentialId); + if (!credential) return null; + + const classification = classifySonioxCredentialFailure(error); + credential.failureKind = classification.kind; + credential.lastFailureMessage = error.message; + + if (classification.disabled) { + credential.disabled = true; + credential.cooldownUntil = Number.POSITIVE_INFINITY; + } else { + credential.cooldownUntil = Math.max( + credential.cooldownUntil, + now + classification.cooldownMs, + ); + } + + return classification; + } + + describeAvailability(now = Date.now()): Array<{ + id: string; + role: SonioxCredentialRole; + available: boolean; + disabled: boolean; + cooldownRemainingMs: number; + failureKind?: SonioxCredentialFailureKind; + }> { + return this.credentials.map((credential) => ({ + id: credential.id, + role: credential.role, + available: this.isAvailable(credential, now), + disabled: credential.disabled, + cooldownRemainingMs: + credential.cooldownUntil === Number.POSITIVE_INFINITY + ? Number.POSITIVE_INFINITY + : Math.max(0, credential.cooldownUntil - now), + failureKind: credential.failureKind, + })); + } + + private findCredential( + credentialId: string, + ): SonioxCredentialState | undefined { + return this.credentials.find((credential) => credential.id === credentialId); + } + + private isAvailable(credential: SonioxCredentialState, now: number): boolean { + return !credential.disabled && credential.cooldownUntil <= now; + } + + private toPublicCredential(credential: SonioxCredentialState): SonioxCredential { + return { + id: credential.id, + apiKey: credential.apiKey, + role: credential.role, + }; + } +} + +function extractSonioxErrorCode(message: string): number | null { + const match = message.match(/Soniox error (\d+):/i); + if (!match) return null; + const parsed = Number.parseInt(match[1], 10); + return Number.isFinite(parsed) ? parsed : null; +} diff --git a/cloud-v2/packages/runtime/src/services/audio/providers/soniox.test.ts b/cloud-v2/packages/runtime/src/services/audio/providers/soniox.test.ts index bb9589b833..886f42ef07 100644 --- a/cloud-v2/packages/runtime/src/services/audio/providers/soniox.test.ts +++ b/cloud-v2/packages/runtime/src/services/audio/providers/soniox.test.ts @@ -21,7 +21,7 @@ process.env.SONIOX_RECONNECT_MAX_MS = "4"; process.env.SONIOX_RECONNECT_MAX_ATTEMPTS = "5"; process.env.SONIOX_ENDPOINT_DEBOUNCE_MS = "1"; -import { createSonioxProvider } from "./soniox"; +import { DEFAULT_SONIOX_MODEL, createSonioxProvider } from "./soniox"; import type { TranscriptEvent } from "./provider"; // Minimal shape of a Soniox realtime token we care about in these tests. @@ -180,6 +180,10 @@ async function makeProvider(): Promise<{ } describe("SonioxProvider utterance lifecycle", () => { + test("defaults to the current Soniox real-time model", () => { + expect(DEFAULT_SONIOX_MODEL).toBe("stt-rt-v5"); + }); + test("does not churn finals/utteranceIds when the rolling window's speaker flips mid-utterance", async () => { const { session, events, provider } = await makeProvider(); diff --git a/cloud-v2/packages/runtime/src/services/audio/providers/soniox.ts b/cloud-v2/packages/runtime/src/services/audio/providers/soniox.ts index 7e7f08ff13..5a0211c3da 100644 --- a/cloud-v2/packages/runtime/src/services/audio/providers/soniox.ts +++ b/cloud-v2/packages/runtime/src/services/audio/providers/soniox.ts @@ -8,7 +8,9 @@ * * Configuration: * - Requires `SONIOX_API_KEY` env var. - * - Model defaults to `"stt-rt-v4"`; override via `SONIOX_MODEL`. + * - Optional `SONIOX_FALLBACK_API_KEYS` is comma-separated and used when the + * primary key hits auth/quota/rate/concurrency errors. + * - Model defaults to `"stt-rt-v5"`; override via `SONIOX_MODEL`. * - Audio format hardcoded to s16le, 16kHz, mono (matches our LC3 output). * * What this does NOT include (deferred from v1's SonioxSdkStream port): @@ -36,8 +38,14 @@ import type { TranscriptionProvider, TranscriptEvent, } from "./provider"; +import { + type SonioxCredential, + SonioxKeyPool, + parseSonioxFallbackApiKeys, +} from "./soniox-key-pool"; -const SONIOX_MODEL = process.env.SONIOX_MODEL ?? "stt-rt-v4"; +export const DEFAULT_SONIOX_MODEL = "stt-rt-v5"; +const SONIOX_MODEL = process.env.SONIOX_MODEL ?? DEFAULT_SONIOX_MODEL; function audioGapConfig(): { checkIntervalMs: number; thresholdMs: number } { return { checkIntervalMs: Number(process.env.SONIOX_GAP_CHECK_INTERVAL_MS ?? 1_000), @@ -98,26 +106,45 @@ export interface CreateSonioxProviderOptions extends ProviderOptions { targetLanguage?: string; } -/** Shared client per worker. Soniox SDK is happy with one client for many streams. */ -let sharedClient: SonioxNodeClient | null = null; -function getClient(): SonioxNodeClient { - if (sharedClient) return sharedClient; +/** Shared clients per worker. Soniox SDK is happy with one client per key. */ +let sharedKeyPool: SonioxKeyPool | null = null; +const sharedClients = new Map(); + +function getKeyPool(): SonioxKeyPool { + if (sharedKeyPool) return sharedKeyPool; const apiKey = process.env.SONIOX_API_KEY; if (!apiKey) { throw new Error( "SONIOX_API_KEY is not set — required to use the soniox provider. Set AUDIO_PROVIDER=mock to use the mock instead.", ); } - sharedClient = new SonioxNodeClient({ api_key: apiKey }); - return sharedClient; + sharedKeyPool = new SonioxKeyPool( + apiKey, + parseSonioxFallbackApiKeys(process.env.SONIOX_FALLBACK_API_KEYS), + ); + if (sharedKeyPool.hasFallbacks) { + console.log(`[soniox] fallback key pool enabled size=${sharedKeyPool.size}`); + } + return sharedKeyPool; +} + +function getClientForCredential(credential: SonioxCredential): SonioxNodeClient { + const existing = sharedClients.get(credential.id); + if (existing) return existing; + + const client = new SonioxNodeClient({ api_key: credential.apiKey }); + sharedClients.set(credential.id, client); + return client; } export async function createSonioxProvider( opts: CreateSonioxProviderOptions, ): Promise { - const client = opts.client ?? getClient(); const gapConfig = audioGapConfig(); const endpointDebounce = endpointDebounceMs(); + const injectedClient = opts.client; + const keyPool = injectedClient ? null : getKeyPool(); + let activeCredential: SonioxCredential | null = null; // Build session config. For language="auto" we enable detection; for a // specific code we pass it as hint with detection still on. (Letting @@ -158,7 +185,7 @@ export async function createSonioxProvider( // reconnect (the old one is dead; a fresh `client.realtime.stt(...)` takes its // place) while keeping THIS provider object — and therefore its identity in // the worker's per-user provider map — stable. - let session: RealtimeSttSession = client.realtime.stt(sessionConfig); + let session!: RealtimeSttSession; // True once `close()` has run. Gates the self-heal path so an expected // teardown (our own `session.finish()` → `disconnected`) does not trigger a @@ -620,6 +647,7 @@ export async function createSonioxProvider( const handleError = (err: Error) => { stopGapDetection(); + recordCredentialFailure(activeCredential, err, "stream-error"); opts.onError?.(err); // A provider-side error usually precedes (or accompanies) the socket dying. // Treat it as a trigger for the self-heal path: if the session is no longer @@ -643,7 +671,7 @@ export async function createSonioxProvider( const handleConnected = () => { console.log( - `[soniox] connected scope=${opts.scope} lang=${opts.language}${opts.targetLanguage ? ` → ${opts.targetLanguage}` : ""}`, + `[soniox] connected scope=${opts.scope} lang=${opts.language}${opts.targetLanguage ? ` → ${opts.targetLanguage}` : ""} ${describeCredential(activeCredential)}`, ); }; @@ -684,6 +712,101 @@ export async function createSonioxProvider( } }; + const describeCredential = (credential: SonioxCredential | null): string => { + if (!credential) return "credential=injected-client"; + return `credential=${credential.role}:${credential.id}`; + }; + + const createSessionCandidate = ( + attempted: Set, + ): { session: RealtimeSttSession; credential: SonioxCredential | null } | null => { + if (injectedClient) { + return { + session: injectedClient.realtime.stt(sessionConfig), + credential: null, + }; + } + + const credential = keyPool!.selectCredential(attempted); + if (!credential) return null; + attempted.add(credential.id); + + return { + session: getClientForCredential(credential).realtime.stt(sessionConfig), + credential, + }; + }; + + const recordCredentialSuccess = ( + credential: SonioxCredential | null, + context: string, + ): void => { + if (!credential) return; + keyPool!.recordSuccess(credential.id); + console.log( + `[soniox] credential success scope=${opts.scope} context=${context} ${describeCredential(credential)}`, + ); + }; + + const recordCredentialFailure = ( + credential: SonioxCredential | null, + err: Error, + context: string, + ): void => { + if (!credential) return; + const classification = keyPool!.recordFailure(credential.id, err); + console.warn( + `[soniox] credential failure scope=${opts.scope} context=${context} ${describeCredential(credential)} kind=${classification?.kind ?? "unknown"} message=${err.message}`, + ); + }; + + const connectFreshSession = async ( + attempted: Set, + context: string, + ): Promise => { + let lastError: Error | null = null; + + while (true) { + const candidate = createSessionCandidate(attempted); + if (!candidate) { + const availability = keyPool + ? keyPool + .describeAvailability() + .map( + (item) => + `${item.role}:${item.id}:${item.available ? "available" : item.disabled ? "disabled" : `cooldown-${item.failureKind ?? "unknown"}`}`, + ) + .join(",") + : "none"; + throw ( + lastError ?? + new Error( + `No Soniox credential is currently available (scope=${opts.scope}, availability=${availability})`, + ) + ); + } + + try { + await candidate.session.connect(); + session = candidate.session; + activeCredential = candidate.credential; + wireSession(candidate.session); + recordCredentialSuccess(candidate.credential, context); + return; + } catch (err) { + const error = err as Error; + lastError = error; + recordCredentialFailure(candidate.credential, error, context); + try { + await candidate.session.close(); + } catch { + /* best-effort */ + } + if (injectedClient) throw error; + } + } + }; + const stopGapDetection = (): void => { if (gapCheckInterval) { clearInterval(gapCheckInterval); @@ -753,15 +876,10 @@ export async function createSonioxProvider( await new Promise((resolve) => setTimeout(resolve, delay)); if (closed) break; - // Drop the dead session's handlers so its late events can't drive us. - unwireSession(session); - - const next = client.realtime.stt(sessionConfig); - wireSession(next); try { - await next.connect(); - // Connected: swap in the fresh session and clear the heal state. - session = next; + // Drop the dead session's handlers so its late events can't drive us. + unwireSession(session); + await connectFreshSession(new Set(), "self-heal"); reconnecting = false; reconnectAttempts = 0; startGapDetection(); @@ -772,12 +890,6 @@ export async function createSonioxProvider( `[soniox] self-heal connect failed scope=${opts.scope} attempt=${reconnectAttempts}:`, err, ); - unwireSession(next); - try { - await next.close(); - } catch { - /* best-effort */ - } // Loop and back off again. } } @@ -787,10 +899,10 @@ export async function createSonioxProvider( void attempt(); }; - // Wire the initial session and connect it. - wireSession(session); + // Connect the initial session, trying a fallback credential immediately if + // the primary is already over quota/rate/concurrency capacity. try { - await session.connect(); + await connectFreshSession(new Set(), "initial-connect"); startGapDetection(); } catch (err) { console.error(`[soniox] connect failed scope=${opts.scope}:`, err);