Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions agents/src/llm/chat_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,113 @@ export class ChatContext {
return await toChatCtx(format, this, injectDummyUserMessage);
}

/**
* Compare this ChatContext with another for logical equivalence.
* Unlike strict equality, this method:
* - Ignores timestamps (createdAt fields)
* - Ignores other volatile metadata
* - Focuses on content: compares IDs, types, and payload
*
* This is useful for detecting if the conversation content has changed,
* for example when validating preemptive generation results.
*
* @param other - The ChatContext to compare with
* @returns true if both contexts contain the same sequence of items with matching essential fields
*/
isEquivalent(other: ChatContext): boolean {
// Same object reference
if (this === other) {
return true;
}

// Different lengths
if (this._items.length !== other._items.length) {
return false;
}

// Compare each item pair
for (let i = 0; i < this._items.length; i++) {
const a = this._items[i]!;
const b = other._items[i]!;

// IDs and types must match
if (a.id !== b.id || a.type !== b.type) {
return false;
}
Comment on lines +515 to +534
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, can you also add a unittest for this function? Inside chat_context.test.ts?


// Type-specific field comparison
if (a.type === 'message' && b.type === 'message') {
// Compare role, content, and interrupted status (not timestamp)
if (a.role !== b.role || a.interrupted !== b.interrupted) {
return false;
}

// Compare content arrays
if (a.content.length !== b.content.length) {
return false;
}

for (let j = 0; j < a.content.length; j++) {
const ca = a.content[j]!;
const cb = b.content[j]!;

// Both are strings
if (typeof ca === 'string' && typeof cb === 'string') {
if (ca !== cb) {
return false;
}
}
// Both are objects
else if (typeof ca === 'object' && typeof cb === 'object') {
if (ca.type !== cb.type) {
return false;
}

if (ca.type === 'image_content' && cb.type === 'image_content') {
// Compare essential image fields (not cache)
if (
ca.id !== cb.id ||
ca.image !== cb.image ||
ca.inferenceDetail !== cb.inferenceDetail ||
ca.inferenceWidth !== cb.inferenceWidth ||
ca.inferenceHeight !== cb.inferenceHeight ||
ca.mimeType !== cb.mimeType
) {
return false;
}
} else if (ca.type === 'audio_content' && cb.type === 'audio_content') {
// Compare audio transcript (frames comparison would be too expensive)
if (ca.transcript !== cb.transcript) {
return false;
}
}
}
// Mismatched types
else {
return false;
}
}
} else if (a.type === 'function_call' && b.type === 'function_call') {
// Compare name, callId, and args (not timestamp)
if (a.name !== b.name || a.callId !== b.callId || a.args !== b.args) {
return false;
}
} else if (a.type === 'function_call_output' && b.type === 'function_call_output') {
// Compare name, callId, output, and isError (not timestamp)
if (
a.name !== b.name ||
a.callId !== b.callId ||
a.output !== b.output ||
a.isError !== b.isError
) {
return false;
}
}
}

return true;
}

/**
* Internal helper used by `truncate` & `addMessage` to find the correct
* insertion index for a timestamp so the list remains sorted.
Expand Down
7 changes: 7 additions & 0 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export enum SpeechEventType {
END_OF_SPEECH = 3,
/** Usage event, emitted periodically to indicate usage metrics. */
RECOGNITION_USAGE = 4,
/**
* Preflight transcript, emitted when the STT has a confident interim result
* before the final transcript is ready. This is useful for preemptive generation
* to reduce latency. Contains all pre-committed transcripts including final
* transcripts from previous STT runs.
*/
PREFLIGHT_TRANSCRIPT = 5,
}

/** SpeechData contains metadata about this {@link SpeechEvent}. */
Expand Down
139 changes: 138 additions & 1 deletion agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { type AgentSession, type TurnDetectionMode } from './agent_session.js';
import {
AudioRecognition,
type EndOfTurnInfo,
type PreemptiveGenerationInfo,
type RecognitionHooks,
type _TurnDetector,
} from './audio_recognition.js';
Expand Down Expand Up @@ -71,6 +72,15 @@ import { SpeechHandle } from './speech_handle.js';
// equivalent to Python's contextvars
const speechHandleStorage = new AsyncLocalStorage<SpeechHandle>();

interface PreemptiveGeneration {
speechHandle: SpeechHandle;
info: PreemptiveGenerationInfo;
chatCtx: ChatContext;
tools: ToolContext;
toolChoice: ToolChoice | null;
createdAt: number;
}

export class AgentActivity implements RecognitionHooks {
private static readonly REPLY_TASK_CANCEL_TIMEOUT = 5000;
private started = false;
Expand All @@ -87,6 +97,7 @@ export class AgentActivity implements RecognitionHooks {
private audioStream = new DeferredReadableStream<AudioFrame>();
// default to null as None, which maps to the default provider tool choice value
private toolChoice: ToolChoice | null = null;
private preemptiveGeneration?: PreemptiveGeneration;

agent: Agent;
agentSession: AgentSession;
Expand Down Expand Up @@ -664,6 +675,64 @@ export class AgentActivity implements RecognitionHooks {
);
}

onPreemptiveGeneration(info: PreemptiveGenerationInfo): void {
if (!this.agentSession.options.preemptiveGeneration) {
return;
}

if (this.draining) {
this.logger.debug('skipping preemptive generation, agent is draining');
return;
}

if (this._currentSpeech && !this._currentSpeech.interrupted) {
this.logger.debug('skipping preemptive generation, current speech is not interrupted');
return;
}

if (!(this.llm instanceof LLM)) {
this.logger.debug('skipping preemptive generation, LLM is not a standard LLM instance');
return;
}

// Cancel any existing preemptive generation
this.cancelPreemptiveGeneration();

const chatCtx = this.agent.chatCtx.copy();
const userMessage = ChatMessage.create({
role: 'user',
content: info.newTranscript,
});

this.logger.info(
{ transcript: info.newTranscript, confidence: info.transcriptConfidence },
'starting preemptive generation',
);

const speechHandle = this.generateReply({
userMessage,
chatCtx,
scheduleSpeech: false, // Don't schedule yet!
});

this.preemptiveGeneration = {
speechHandle,
info,
chatCtx,
tools: this.agent.toolCtx,
toolChoice: this.toolChoice,
createdAt: Date.now(),
};
}

private cancelPreemptiveGeneration(): void {
if (this.preemptiveGeneration) {
this.logger.debug('cancelling existing preemptive generation');
this.preemptiveGeneration.speechHandle._cancel();
this.preemptiveGeneration = undefined;
}
}

private createSpeechTask(options: {
task: Task<void>;
ownedSpeechHandle?: SpeechHandle;
Expand Down Expand Up @@ -775,13 +844,15 @@ export class AgentActivity implements RecognitionHooks {
instructions?: string;
toolChoice?: ToolChoice | null;
allowInterruptions?: boolean;
scheduleSpeech?: boolean;
}): SpeechHandle {
const {
userMessage,
chatCtx,
instructions: defaultInstructions,
toolChoice: defaultToolChoice,
allowInterruptions: defaultAllowInterruptions,
scheduleSpeech = true,
} = options;

let instructions = defaultInstructions;
Expand Down Expand Up @@ -871,7 +942,9 @@ export class AgentActivity implements RecognitionHooks {
task.finally(() => this.onPipelineReplyDone());
}

this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL);
if (scheduleSpeech) {
this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL);
}
return handle;
}

Expand Down Expand Up @@ -977,6 +1050,70 @@ export class AgentActivity implements RecognitionHooks {
return;
}

// Check if we can use preemptive generation
const preemptive = this.preemptiveGeneration;
if (preemptive) {
// Add the user message to the chat context for comparison
const validationChatCtx = this.agent.chatCtx.copy();
if (userMessage) {
validationChatCtx.insert(userMessage);
}

// Validate: transcript matches, context equivalent, tools unchanged, toolChoice unchanged
const transcriptMatches = preemptive.info.newTranscript === info.newTranscript;
const contextEquivalent = preemptive.chatCtx.isEquivalent(validationChatCtx);
const toolsUnchanged = preemptive.tools === this.agent.toolCtx;
const toolChoiceUnchanged = preemptive.toolChoice === this.toolChoice;

if (transcriptMatches && contextEquivalent && toolsUnchanged && toolChoiceUnchanged) {
// Use preemptive generation!
const speechHandle = preemptive.speechHandle;
this.preemptiveGeneration = undefined;

const leadTime = Date.now() - preemptive.createdAt;
this.logger.info(
{
transcript: info.newTranscript,
leadTimeMs: leadTime,
confidence: preemptive.info.transcriptConfidence,
},
'using preemptive generation',
);

// Schedule the preemptive speech
this.scheduleSpeech(speechHandle, SpeechHandle.SPEECH_PRIORITY_NORMAL);

// Emit metrics
const eouMetrics: EOUMetrics = {
Comment on lines +1054 to +1087
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type: 'eou_metrics',
timestamp: Date.now(),
endOfUtteranceDelayMs: info.endOfUtteranceDelay,
transcriptionDelayMs: info.transcriptionDelay,
onUserTurnCompletedDelayMs: callbackDuration,
speechId: speechHandle.id,
};

this.agentSession.emit(
AgentSessionEventTypes.MetricsCollected,
createMetricsCollectedEvent({ metrics: eouMetrics }),
);

return;
} else {
// Context changed, discard and regenerate
this.logger.warn(
{
transcriptMatches,
contextEquivalent,
toolsUnchanged,
toolChoiceUnchanged,
},
'preemptive generation invalidated, regenerating',
);
this.cancelPreemptiveGeneration();
}
}

// Ensure the new message is passed to generateReply
// This preserves the original message id, making it easier for users to track responses
const speechHandle = this.generateReply({ userMessage, chatCtx });
Expand Down
9 changes: 9 additions & 0 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ export interface VoiceOptions {
minEndpointingDelay: number;
maxEndpointingDelay: number;
maxToolSteps: number;
/**
* Enable preemptive generation to reduce latency.
* When enabled, the agent starts generating a response as soon as a confident
* interim transcript (preflight) is available, before the final transcript is ready.
* The preemptive generation is validated and potentially discarded if the chat context
* or tool definitions change during the turn.
*/
preemptiveGeneration: boolean;
}

const defaultVoiceOptions: VoiceOptions = {
Expand All @@ -67,6 +75,7 @@ const defaultVoiceOptions: VoiceOptions = {
minEndpointingDelay: 500,
maxEndpointingDelay: 6000,
maxToolSteps: 3,
preemptiveGeneration: false,
} as const;

export type TurnDetectionMode = 'stt' | 'vad' | 'realtime_llm' | 'manual' | _TurnDetector;
Expand Down
Loading