Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/yummy-parents-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@livekit/agents-plugin-google': patch
'@livekit/agents-plugin-openai': patch
'@livekit/agents': patch
---

Usage with separate TTS
2 changes: 2 additions & 0 deletions agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface MessageGeneration {
messageId: string;
textStream: ReadableStream<string>;
audioStream: ReadableStream<AudioFrame>;
modalities?: ['text'] | ['text', 'audio'];
}

export interface GenerationCreatedEvent {
Expand All @@ -40,6 +41,7 @@ export interface RealtimeCapabilities {
turnDetection: boolean;
userTranscription: boolean;
autoToolReplyGeneration: boolean;
audioOutput: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down
79 changes: 62 additions & 17 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1520,10 +1520,29 @@ export class AgentActivity implements RecognitionHooks {
break;
}
const trNodeResult = await this.agent.transcriptionNode(msg.textStream, modelSettings);

// Determine if we need to tee the text stream for both text output and TTS
const needsTextOutput = !!textOutput && !!trNodeResult;
const needsTTSSynthesis =
audioOutput &&
this.llm instanceof RealtimeModel &&
!this.llm.capabilities.audioOutput &&
this.tts;
const needsBothTextAndTTS = needsTextOutput && needsTTSSynthesis;

// Tee the stream if we need it for both purposes
let textStreamForOutput = trNodeResult;
let textStreamForTTS = trNodeResult;
if (needsBothTextAndTTS && trNodeResult) {
const [stream1, stream2] = trNodeResult.tee();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be super careful whenever doing an tee operation. It will lock the source stream untill both tee-ed streams have finished or been cancelled. In case of an interruption, we would have to release reader lock for both sub-streams to release resources and cannot simply cancel the trNodeResult

textStreamForOutput = stream1;
textStreamForTTS = stream2;
}

Comment on lines +1523 to +1541
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let textOut: _TextOut | null = null;
if (trNodeResult) {
if (textStreamForOutput) {
const [textForwardTask, _textOut] = performTextForwarding(
trNodeResult,
textStreamForOutput,
abortController,
textOutput,
);
Expand All @@ -1532,23 +1551,49 @@ export class AgentActivity implements RecognitionHooks {
}
let audioOut: _AudioOut | null = null;
if (audioOutput) {
const realtimeAudio = await this.agent.realtimeAudioOutputNode(
msg.audioStream,
modelSettings,
);
if (realtimeAudio) {
const [forwardTask, _audioOut] = performAudioForwarding(
realtimeAudio,
audioOutput,
abortController,
// Check if realtime model has audio output capability
if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) {
// Use realtime audio output
const realtimeAudio = await this.agent.realtimeAudioOutputNode(
msg.audioStream,
modelSettings,
);
forwardTasks.push(forwardTask);
audioOut = _audioOut;
audioOut.firstFrameFut.await.finally(onFirstFrame);
if (realtimeAudio) {
const [forwardTask, _audioOut] = performAudioForwarding(
realtimeAudio,
audioOutput,
abortController,
);
forwardTasks.push(forwardTask);
audioOut = _audioOut;
audioOut.firstFrameFut.await.finally(onFirstFrame);
} else {
this.logger.warn(
'audio output is enabled but neither tts nor realtime audio is available',
);
}
} else {
this.logger.warn(
'audio output is enabled but neither tts nor realtime audio is available',
);
// Text-only mode - synthesize audio using TTS
if (this.tts && textStreamForTTS) {
const [ttsTask, ttsStream] = performTTSInference(
(...args) => this.agent.ttsNode(...args),
textStreamForTTS,
modelSettings,
abortController,
);
forwardTasks.push(ttsTask);

const [forwardTask, _audioOut] = performAudioForwarding(
ttsStream,
audioOutput,
abortController,
);
forwardTasks.push(forwardTask);
audioOut = _audioOut;
audioOut.firstFrameFut.await.finally(onFirstFrame);
} else if (!this.tts) {
this.logger.warn('realtime model in text-only mode but no TTS is configured');
}
}
} else if (textOut) {
textOut.firstTextFut.await.finally(onFirstFrame);
Expand Down
1 change: 1 addition & 0 deletions plugins/google/src/beta/realtime/realtime_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ export class RealtimeModel extends llm.RealtimeModel {
turnDetection: serverTurnDetection,
userTranscription: inputAudioTranscription !== null,
autoToolReplyGeneration: true,
audioOutput: (options.modalities || [Modality.AUDIO]).includes(Modality.AUDIO),
});

// Environment variable fallbacks
Expand Down
8 changes: 5 additions & 3 deletions plugins/openai/src/realtime/api_proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export interface SessionResource {
id: string;
object: 'realtime.session';
model: string;
modalities: ['text', 'audio'] | ['text']; // default: ["text", "audio"]
output_modalities: ['text'] | ['audio'];
instructions: string;
voice: Voice; // default: "alloy"
input_audio_format: AudioFormat; // default: "pcm16"
Expand Down Expand Up @@ -267,7 +267,7 @@ export interface SessionUpdateEvent extends BaseClientEvent {
type: 'session.update';
session: Partial<{
model: Model;
modalities: ['text', 'audio'] | ['text'];
modalities: ['text'] | ['text', 'audio'];
instructions: string;
voice: Voice;
input_audio_format: AudioFormat;
Expand Down Expand Up @@ -350,7 +350,7 @@ export interface ConversationItemDeleteEvent extends BaseClientEvent {
export interface ResponseCreateEvent extends BaseClientEvent {
type: 'response.create';
response?: Partial<{
modalities: ['text', 'audio'] | ['text'];
output_modalities: ['text'] | ['audio'];
instructions: string;
voice: Voice;
output_audio_format: AudioFormat;
Expand Down Expand Up @@ -511,6 +511,7 @@ export interface ResponseContentPartDoneEvent extends BaseServerEvent {
export interface ResponseTextDeltaEvent extends BaseServerEvent {
type: 'response.text.delta';
response_id: string;
item_id: string;
output_index: number;
content_index: number;
delta: string;
Expand All @@ -519,6 +520,7 @@ export interface ResponseTextDeltaEvent extends BaseServerEvent {
export interface ResponseTextDoneEvent extends BaseServerEvent {
type: 'response.text.done';
response_id: string;
item_id: string;
output_index: number;
content_index: number;
text: string;
Expand Down
108 changes: 80 additions & 28 deletions plugins/openai/src/realtime/realtime_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ interface RealtimeOptions {
model: api_proto.Model;
voice: api_proto.Voice;
temperature: number;
modalities: ['text'] | ['text', 'audio'];
toolChoice?: llm.ToolChoice;
inputAudioTranscription?: api_proto.InputAudioTranscription | null;
// TODO(shubhra): add inputAudioNoiseReduction
Expand All @@ -61,6 +62,7 @@ interface MessageGeneration {
textChannel: stream.StreamChannel<string>;
audioChannel: stream.StreamChannel<AudioFrame>;
audioTranscript: string;
modalities?: ['text'] | ['text', 'audio'];
}

interface ResponseGeneration {
Expand Down Expand Up @@ -121,6 +123,7 @@ const DEFAULT_REALTIME_MODEL_OPTIONS = {
model: 'gpt-realtime',
voice: 'marin',
temperature: DEFAULT_TEMPERATURE,
modalities: ['text', 'audio'] as ['text', 'audio'],
inputAudioTranscription: DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
turnDetection: DEFAULT_TURN_DETECTION,
toolChoice: DEFAULT_TOOL_CHOICE,
Expand All @@ -142,6 +145,7 @@ export class RealtimeModel extends llm.RealtimeModel {
model?: string;
voice?: string;
temperature?: number;
modalities?: ['text'] | ['text', 'audio'];
toolChoice?: llm.ToolChoice;
baseURL?: string;
inputAudioTranscription?: api_proto.InputAudioTranscription | null;
Expand All @@ -162,6 +166,7 @@ export class RealtimeModel extends llm.RealtimeModel {
turnDetection: options.turnDetection !== null,
userTranscription: options.inputAudioTranscription !== null,
autoToolReplyGeneration: false,
audioOutput: options.modalities ? (options.modalities as string[]).includes('audio') : true,
});

const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment);
Expand Down Expand Up @@ -197,6 +202,7 @@ export class RealtimeModel extends llm.RealtimeModel {
apiKey,
isAzure,
model: options.model || DEFAULT_REALTIME_MODEL_OPTIONS.model,
modalities: options.modalities || DEFAULT_REALTIME_MODEL_OPTIONS.modalities,
};
}

Expand Down Expand Up @@ -229,6 +235,7 @@ export class RealtimeModel extends llm.RealtimeModel {
entraToken,
baseURL,
voice = 'alloy',
modalities,
inputAudioTranscription = AZURE_DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
turnDetection = AZURE_DEFAULT_TURN_DETECTION,
temperature = 0.8,
Expand All @@ -241,6 +248,7 @@ export class RealtimeModel extends llm.RealtimeModel {
entraToken?: string;
baseURL?: string;
voice?: string;
modalities?: ['text'] | ['text', 'audio'];
inputAudioTranscription?: api_proto.InputAudioTranscription;
// TODO(shubhra): add inputAudioNoiseReduction
turnDetection?: api_proto.TurnDetectionType;
Expand Down Expand Up @@ -273,6 +281,7 @@ export class RealtimeModel extends llm.RealtimeModel {

return new RealtimeModel({
voice,
modalities,
inputAudioTranscription,
turnDetection,
temperature,
Expand Down Expand Up @@ -398,7 +407,7 @@ export class RealtimeSession extends llm.RealtimeSession {
voice: this.oaiRealtimeModel._options.voice,
input_audio_format: 'pcm16',
output_audio_format: 'pcm16',
modalities: ['text', 'audio'],
modalities: this.oaiRealtimeModel._options.modalities, // Supported combinations are: ['text'] and ['audio', 'text'].",
turn_detection: this.oaiRealtimeModel._options.turnDetection,
input_audio_transcription: this.oaiRealtimeModel._options.inputAudioTranscription,
// TODO(shubhra): add inputAudioNoiseReduction
Expand Down Expand Up @@ -909,6 +918,12 @@ export class RealtimeSession extends llm.RealtimeSession {
case 'response.content_part.done':
this.handleResponseContentPartDone(event);
break;
case 'response.text.delta':
this.handleResponseTextDelta(event);
break;
case 'response.text.done':
this.handleResponseTextDone(event);
break;
case 'response.audio_transcript.delta':
this.handleResponseAudioTranscriptDelta(event);
break;
Expand Down Expand Up @@ -1129,35 +1144,39 @@ export class RealtimeSession extends llm.RealtimeSession {
const itemType = event.part.type;
const responseId = event.response_id;

if (itemType === 'audio') {
this.resolveGeneration(responseId);
if (this.textModeRecoveryRetries > 0) {
this.#logger.info(
{ retries: this.textModeRecoveryRetries },
'recovered from text-only response',
);
this.textModeRecoveryRetries = 0;
}
this.resolveGeneration(responseId);
if (this.textModeRecoveryRetries > 0) {
this.#logger.info(
{ retries: this.textModeRecoveryRetries },
'recovered from text-only response',
);
this.textModeRecoveryRetries = 0;
}

const itemGeneration: MessageGeneration = {
messageId: itemId,
textChannel: stream.createStreamChannel<string>(),
audioChannel: stream.createStreamChannel<AudioFrame>(),
audioTranscript: '',
};

this.currentGeneration.messageChannel.write({
messageId: itemId,
textStream: itemGeneration.textChannel.stream(),
audioStream: itemGeneration.audioChannel.stream(),
});
const itemGeneration: MessageGeneration = {
messageId: itemId,
textChannel: stream.createStreamChannel<string>(),
audioChannel: stream.createStreamChannel<AudioFrame>(),
audioTranscript: '',
};

this.currentGeneration.messages.set(itemId, itemGeneration);
this.currentGeneration._firstTokenTimestamp = Date.now();
return;
} else {
this.interrupt();
if (this.textModeRecoveryRetries === 0) {
if (!this.oaiRealtimeModel.capabilities.audioOutput) {
itemGeneration.audioChannel.close();
itemGeneration.modalities = ['text'];
}

this.currentGeneration.messageChannel.write({
messageId: itemId,
textStream: itemGeneration.textChannel.stream(),
audioStream: itemGeneration.audioChannel.stream(),
modalities: itemGeneration.modalities || ['text', 'audio'],
});

this.currentGeneration.messages.set(itemId, itemGeneration);

if (itemType === 'text') {
// Only warn if we expected audio but received text
if (this.textModeRecoveryRetries === 0 && this.oaiRealtimeModel.capabilities.audioOutput) {
this.#logger.warn({ responseId }, 'received text-only response from OpenAI Realtime API');
}
}
Expand All @@ -1175,6 +1194,32 @@ export class RealtimeSession extends llm.RealtimeSession {
// TODO(shubhra): handle text mode recovery
}

private handleResponseTextDelta(event: api_proto.ResponseTextDeltaEvent): void {
if (!this.currentGeneration) {
throw new Error('currentGeneration is not set');
}

const itemGeneration = this.currentGeneration.messages.get(event.item_id);
if (!itemGeneration) {
throw new Error('itemGeneration is not set');
}

// Set first token timestamp if in text-only mode
if (itemGeneration.modalities?.[0] === 'text' && !this.currentGeneration._firstTokenTimestamp) {
this.currentGeneration._firstTokenTimestamp = Date.now();
}

itemGeneration.textChannel.write(event.delta);
itemGeneration.audioTranscript += event.delta;
}

private handleResponseTextDone(_event: api_proto.ResponseTextDoneEvent): void {
if (!this.currentGeneration) {
throw new Error('currentGeneration is not set');
}
// No additional processing needed - just assert generation exists
}

private handleResponseAudioTranscriptDelta(
event: api_proto.ResponseAudioTranscriptDeltaEvent,
): void {
Expand Down Expand Up @@ -1206,6 +1251,13 @@ export class RealtimeSession extends llm.RealtimeSession {
throw new Error('itemGeneration is not set');
}

if (!this.currentGeneration._firstTokenTimestamp) {
this.currentGeneration._firstTokenTimestamp = Date.now();
}
if (!itemGeneration.modalities) {
itemGeneration.modalities = ['text', 'audio'];
}

const binaryString = atob(event.delta);
const len = binaryString.length;
const bytes = new Uint8Array(len);
Expand Down