Skip to content

Commit 7973ad8

Browse files
committed
chore(rivetkit): debounce ws message ack
1 parent 5ad7f89 commit 7973ad8

File tree

1 file changed

+53
-4
lines changed

1 file changed

+53
-4
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ export class EngineActorDriver implements ActorDriver {
7676
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
7777
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
7878

79+
// WebSocket message acknowledgment debouncing
80+
#wsAckQueue: Map<string, number> = new Map();
81+
#wsAckFlushInterval?: NodeJS.Timeout;
82+
7983
constructor(
8084
registryConfig: RegistryConfig,
8185
runConfig: RunnerConfig,
@@ -284,6 +288,15 @@ export class EngineActorDriver implements ActorDriver {
284288
namespace: runConfig.namespace,
285289
runnerName: runConfig.runnerName,
286290
});
291+
292+
// Start WebSocket ack flush interval
293+
//
294+
// Decreasing this reduces the amount of buffered messages on the
295+
// gateway
296+
//
297+
// Gateway timeout configured to 30s
298+
// https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
299+
this.#wsAckFlushInterval = setInterval(() => this.#flushWsAcks(), 1000);
287300
}
288301

289302
async #loadActorHandler(actorId: string): Promise<ActorHandler> {
@@ -302,6 +315,20 @@ export class EngineActorDriver implements ActorDriver {
302315
return handler.actor;
303316
}
304317

318+
#flushWsAcks(): void {
319+
if (this.#wsAckQueue.size === 0) return;
320+
321+
for (const [requestIdStr, messageIndex] of this.#wsAckQueue.entries()) {
322+
// Convert string back to ArrayBuffer
323+
const requestId = new Uint8Array(
324+
requestIdStr.split(",").map((x) => Number.parseInt(x)),
325+
).buffer;
326+
this.#runner.sendWebsocketMessageAck(requestId, messageIndex);
327+
}
328+
329+
this.#wsAckQueue.clear();
330+
}
331+
305332
getContext(actorId: string): DriverContext {
306333
return {};
307334
}
@@ -554,13 +581,25 @@ export class EngineActorDriver implements ActorDriver {
554581

555582
invariant(event.rivetRequestId, "missing rivetRequestId");
556583
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
557-
this.#runner.sendWebsocketMessageAck(
558-
event.rivetRequestId,
559-
event.rivetMessageIndex,
560-
);
584+
585+
// Track only the highest seen message index per request
586+
// Convert ArrayBuffer to string for Map key
587+
const currentMax = this.#wsAckQueue.get(requestId) ?? -1n;
588+
if (event.rivetMessageIndex > currentMax) {
589+
this.#wsAckQueue.set(requestId, event.rivetMessageIndex);
590+
} else {
591+
logger().warn({
592+
msg: "received lower index than ack queue for message",
593+
requestId: requestId,
594+
queuedMessageIndex: currentMax,
595+
eventMessageIndex: event.rivetMessageIndex,
596+
});
597+
}
561598
});
562599

563600
websocket.addEventListener("close", (event) => {
601+
// Flush any pending acks before closing
602+
this.#flushWsAcks();
564603
wsHandlerPromise.then((x) => x.onClose?.(event, wsContext));
565604
});
566605

@@ -575,6 +614,16 @@ export class EngineActorDriver implements ActorDriver {
575614

576615
async shutdownRunner(immediate: boolean): Promise<void> {
577616
logger().info({ msg: "stopping engine actor driver" });
617+
618+
// Clear the ack flush interval
619+
if (this.#wsAckFlushInterval) {
620+
clearInterval(this.#wsAckFlushInterval);
621+
this.#wsAckFlushInterval = undefined;
622+
}
623+
624+
// Flush any remaining acks
625+
this.#flushWsAcks();
626+
578627
await this.#runner.shutdown(immediate);
579628
}
580629

0 commit comments

Comments
 (0)