Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export async function initializeLocalSpeechServices(params: {

const workerClient = localConfig
? new LocalSpeechWorkerClient({
logger,
config: {
modelsDir: localConfig.modelsDir,
voiceSttModel: localModels.voiceLocalSttModel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { bufferToWorkerBytes, workerBytesToBuffer } from "./worker-bytes.js";
class FakeLocalSpeechWorker extends EventEmitter {
public connected = true;
public killed = false;
public pid = 12345;
public readonly stderr = new EventEmitter() as NodeJS.ReadableStream;
public readonly sent: LocalSpeechWorkerRequest[] = [];
public disconnects = 0;
public kills = 0;
Expand Down Expand Up @@ -88,9 +90,9 @@ class PausedIpcWorker {
}

on(event: "message", listener: (message: LocalSpeechWorkerToParentMessage) => void): this;
on(event: "exit", listener: (code: number | null, signal: NodeJS.Signals | null) => void): this;
on(event: "close", listener: (code: number | null, signal: NodeJS.Signals | null) => void): this;
on(
event: "message" | "exit",
event: "message" | "close",
listener:
| ((message: LocalSpeechWorkerToParentMessage) => void)
| ((code: number | null, signal: NodeJS.Signals | null) => void),
Expand All @@ -103,6 +105,7 @@ class PausedIpcWorker {
function createClient(options?: { idleTtlMs?: number }) {
const workers: FakeLocalSpeechWorker[] = [];
const client = new LocalSpeechWorkerClient({
logger: pino({ level: "silent" }),
config: {
modelsDir: "/tmp/models",
voiceSttModel: "parakeet-tdt-0.6b-v2-int8",
Expand All @@ -120,6 +123,19 @@ function createClient(options?: { idleTtlMs?: number }) {
return { client, workers };
}

function createCapturingLogger(): { logger: pino.Logger; records: Array<Record<string, unknown>> } {
const records: Array<Record<string, unknown>> = [];
const logger = pino(
{ level: "trace" },
{
write(line: string) {
records.push(JSON.parse(line) as Record<string, unknown>);
},
},
);
return { logger, records };
}

async function waitForMicrotasks(): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, 0));
}
Expand Down Expand Up @@ -217,6 +233,7 @@ describe("LocalSpeechWorkerClient", () => {
it("does not surface real IPC backpressure when replaying native-sized dictation frames", async () => {
const workers: PausedIpcWorker[] = [];
const client = new LocalSpeechWorkerClient({
logger: pino({ level: "silent" }),
config: {
modelsDir: "/tmp/models",
voiceSttModel: "parakeet-tdt-0.6b-v2-int8",
Expand Down Expand Up @@ -257,6 +274,80 @@ describe("LocalSpeechWorkerClient", () => {
}
});

it("logs worker exit details and includes actionable context in the surfaced error", async () => {
const { logger, records } = createCapturingLogger();
const workers: FakeLocalSpeechWorker[] = [];
const client = new LocalSpeechWorkerClient({
logger,
config: {
modelsDir: "/tmp/models",
voiceSttModel: "parakeet-tdt-0.6b-v2-int8",
dictationSttModel: "parakeet-tdt-0.6b-v2-int8",
voiceTtsModel: "kokoro-en-v0_19",
},
forkWorker: () => {
const worker = new FakeLocalSpeechWorker();
workers.push(worker);
return worker;
},
});
const provider = new WorkerBackedSpeechToTextProvider(client, "dictationStt");
const session = provider.createSession({ logger: pino({ level: "silent" }) });

const connect = session.connect();
workers[0].emit("exit", null, "SIGABRT");
workers[0].stderr.emit("data", "dyld: Library not loaded: libsherpa-onnx-c-api.dylib");
workers[0].emit("close", null, "SIGABRT");

await expect(connect).rejects.toThrow(
"Local speech worker exited (signal SIGABRT) while handling session.create (dictationStt). Last stderr: dyld: Library not loaded: libsherpa-onnx-c-api.dylib",
);
const exitRecord = records.find((record) => record.msg === "Local speech worker exited");
expect(exitRecord).toMatchObject({
workerPid: 12345,
signal: "SIGABRT",
stderrTail: "dyld: Library not loaded: libsherpa-onnx-c-api.dylib",
pendingRequests: [expect.objectContaining({ type: "session.create", kind: "dictationStt" })],
});
});

it("does not log intentional shutdowns as worker crashes", async () => {
const { logger, records } = createCapturingLogger();
const workers: FakeLocalSpeechWorker[] = [];
const client = new LocalSpeechWorkerClient({
logger,
config: {
modelsDir: "/tmp/models",
voiceSttModel: "parakeet-tdt-0.6b-v2-int8",
dictationSttModel: "parakeet-tdt-0.6b-v2-int8",
voiceTtsModel: "kokoro-en-v0_19",
},
forkWorker: () => {
const worker = new FakeLocalSpeechWorker();
workers.push(worker);
return worker;
},
});

const synthesize = client.synthesizeSpeech("hello");
workers[0].respond(workers[0].sent[0], {
audio: bufferToWorkerBytes(Buffer.from([1])),
format: "pcm;rate=24000",
});
await synthesize;

client.shutdown();
workers[0].emit("close", null, "SIGTERM");

expect(records.find((record) => record.msg === "Local speech worker exited")).toBeUndefined();
expect(
records.find((record) => record.msg === "Local speech worker closed after shutdown"),
).toMatchObject({
workerPid: 12345,
signal: "SIGTERM",
});
});

it("forwards VAD session events through the shared worker", async () => {
const { client, workers } = createClient();
const provider = new WorkerBackedTurnDetectionProvider(client);
Expand Down
Loading
Loading