From 2baa4b71f67ae3a855359905bf6a20f90a381a81 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 4 Nov 2025 19:55:20 +0000 Subject: [PATCH] chore(rivetkit): debounce ws message ack --- .../src/drivers/engine/actor-driver.ts | 66 +++++++++++++++++-- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 6cea7cf22d..5cfa627612 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -76,6 +76,13 @@ export class EngineActorDriver implements ActorDriver { #runnerStarted: PromiseWithResolvers = promiseWithResolvers(); #runnerStopped: PromiseWithResolvers = promiseWithResolvers(); + // WebSocket message acknowledgment debouncing + #wsAckQueue: Map< + string, + { requestIdBuf: ArrayBuffer; messageIndex: number } + > = new Map(); + #wsAckFlushInterval?: NodeJS.Timeout; + constructor( registryConfig: RegistryConfig, runConfig: RunnerConfig, @@ -284,6 +291,15 @@ export class EngineActorDriver implements ActorDriver { namespace: runConfig.namespace, runnerName: runConfig.runnerName, }); + + // Start WebSocket ack flush interval + // + // Decreasing this reduces the amount of buffered messages on the + // gateway + // + // Gateway timeout configured to 30s + // https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17 + this.#wsAckFlushInterval = setInterval(() => this.#flushWsAcks(), 1000); } async #loadActorHandler(actorId: string): Promise { @@ -302,6 +318,19 @@ export class EngineActorDriver implements ActorDriver { return handler.actor; } + #flushWsAcks(): void { + if (this.#wsAckQueue.size === 0) return; + + for (const { + requestIdBuf: requestId, + messageIndex: index, + } of this.#wsAckQueue.values()) { + this.#runner.sendWebsocketMessageAck(requestId, index); + } + + this.#wsAckQueue.clear(); + } + getContext(actorId: string): DriverContext { return {}; } @@ -554,13 +583,32 @@ export class EngineActorDriver implements ActorDriver { invariant(event.rivetRequestId, "missing rivetRequestId"); invariant(event.rivetMessageIndex, "missing rivetMessageIndex"); - this.#runner.sendWebsocketMessageAck( - event.rivetRequestId, - event.rivetMessageIndex, - ); + + // Track only the highest seen message index per request + // Convert ArrayBuffer to string for Map key + const currentEntry = this.#wsAckQueue.get(requestId); + if (currentEntry) { + if (event.rivetMessageIndex > currentEntry.messageIndex) { + currentEntry.messageIndex = event.rivetMessageIndex; + } else { + logger().warn({ + msg: "received lower index than ack queue for message", + requestId, + queuedMessageIndex: currentEntry, + eventMessageIndex: event.rivetMessageIndex, + }); + } + } else { + this.#wsAckQueue.set(requestId, { + requestIdBuf, + messageIndex: event.rivetMessageIndex, + }); + } }); websocket.addEventListener("close", (event) => { + // Flush any pending acks before closing + this.#flushWsAcks(); wsHandlerPromise.then((x) => x.onClose?.(event, wsContext)); }); @@ -575,6 +623,16 @@ export class EngineActorDriver implements ActorDriver { async shutdownRunner(immediate: boolean): Promise { logger().info({ msg: "stopping engine actor driver" }); + + // Clear the ack flush interval + if (this.#wsAckFlushInterval) { + clearInterval(this.#wsAckFlushInterval); + this.#wsAckFlushInterval = undefined; + } + + // Flush any remaining acks + this.#flushWsAcks(); + await this.#runner.shutdown(immediate); }