Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-memes-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents-plugin-silero': patch
---

Fix race condition causing "Writer is not bound to a WritableStream" error in Silero VAD
148 changes: 102 additions & 46 deletions plugins/silero/src/vad.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { VADEvent } from '@livekit/agents';
import {
ExpFilter,
VADEventType,
Expand All @@ -16,6 +17,31 @@ import { OnnxModel, newInferenceSession } from './onnx_model.js';

const SLOW_INFERENCE_THRESHOLD = 200; // late by 200ms

/**
* Helper function to check if an error is related to writer being released during stream closure.
* This can happen during shutdown when close() releases the writer while the VAD task is still running.
*
* Handles Node.js stream errors with code ERR_INVALID_STATE:
* - "Invalid state: Writer is not bound to a WritableStream" (after releaseLock())
* - "Invalid state: WritableStream is closed" (after close())
*/
function isWriterReleaseError(e: unknown): boolean {
if (e instanceof TypeError) {
// Check for ERR_INVALID_STATE error code (most reliable)
if ('code' in e && e.code === 'ERR_INVALID_STATE') {
return true;
}

// Fallback to message checking for compatibility
const message = e.message;
return (
message.includes('Writer is not bound to a WritableStream') ||
message.includes('WritableStream is closed')
);
}
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this hacky? Should we have another mechanism of synchronization?

Copy link
Contributor Author

@toubatbrian toubatbrian Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have similar approach in defered_stream.ts

export function isStreamReaderReleaseError(e: unknown) {
const allowedMessages = [
'Invalid state: Releasing reader',
'Invalid state: The reader is not attached to a stream',
];
if (e instanceof TypeError) {
return allowedMessages.some((message) => e.message.includes(message));
}
return false;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error we're catching (ERR_INVALID_STATE: Writer is not bound) is essentially Node.js telling us the stream is shutting down. It's not really an exceptional condition, it's an expected part of the shutdown sequence that we need to handle gracefully. The safeWriteEvent() method makes this explicit: it catches expected shutdown errors and signals the loop to exit cleanly, while still throwing any truly unexpected errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this happen in llm/stt/tts as well?


export interface VADOptions {
/** Minimum duration of speech to start a new speech chunk */
minSpeechDuration: number;
Expand Down Expand Up @@ -260,26 +286,30 @@ export class VADStream extends baseStream {
pubSilenceDuration += windowDuration;
}

this.outputWriter.write({
type: VADEventType.INFERENCE_DONE,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [
new AudioFrame(
inputFrame.data.subarray(0, toCopyInt),
this.#inputSampleRate,
1,
toCopyInt,
),
],
speaking: pubSpeaking,
rawAccumulatedSilence: silenceThresholdDuration,
rawAccumulatedSpeech: speechThresholdDuration,
});
if (
!this.safeWriteEvent({
type: VADEventType.INFERENCE_DONE,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [
new AudioFrame(
inputFrame.data.subarray(0, toCopyInt),
this.#inputSampleRate,
1,
toCopyInt,
),
],
speaking: pubSpeaking,
rawAccumulatedSilence: silenceThresholdDuration,
rawAccumulatedSpeech: speechThresholdDuration,
})
) {
break; // Exit the inference loop since the stream is closing
}

const resetWriteCursor = () => {
if (!this.#speechBuffer) throw new Error('speechBuffer is empty');
Expand Down Expand Up @@ -314,19 +344,23 @@ export class VADStream extends baseStream {
pubSilenceDuration = 0;
pubSpeechDuration = speechThresholdDuration;

this.outputWriter.write({
type: VADEventType.START_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
});
if (
!this.safeWriteEvent({
type: VADEventType.START_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
})
) {
break; // Exit the inference loop since the stream is closing
}
}
} else {
silenceThresholdDuration += windowDuration;
Expand All @@ -341,19 +375,23 @@ export class VADStream extends baseStream {
pubSpeechDuration = 0;
pubSilenceDuration = silenceThresholdDuration;

this.outputWriter.write({
type: VADEventType.END_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
});
if (
!this.safeWriteEvent({
type: VADEventType.END_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
})
) {
break; // Exit the inference loop since the stream is closing
}

resetWriteCursor();
}
Expand All @@ -379,6 +417,24 @@ export class VADStream extends baseStream {
});
}

/**
* Safely write a VAD event to the output stream, handling writer release errors during shutdown.
* @returns true if the write succeeded, false if the stream is closing
* @throws Error if an unexpected error occurs
*/
private safeWriteEvent(event: VADEvent): boolean {
try {
this.outputWriter.write(event);
return true;
} catch (e) {
// Ignore writer release errors during stream closure (e.g., on participant disconnect)
if (isWriterReleaseError(e)) {
return false; // Signal that the stream is closing
}
throw e;
}
}

/**
* Update the VAD options
*
Expand Down