Skip to content

Commit 2baa4b7

Browse files
committed
chore(rivetkit): debounce ws message ack
1 parent 8e33967 commit 2baa4b7

File tree

1 file changed

+62
-4
lines changed

1 file changed

+62
-4
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ export class EngineActorDriver implements ActorDriver {
7676
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
7777
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
7878

79+
// WebSocket message acknowledgment debouncing
80+
#wsAckQueue: Map<
81+
string,
82+
{ requestIdBuf: ArrayBuffer; messageIndex: number }
83+
> = new Map();
84+
#wsAckFlushInterval?: NodeJS.Timeout;
85+
7986
constructor(
8087
registryConfig: RegistryConfig,
8188
runConfig: RunnerConfig,
@@ -284,6 +291,15 @@ export class EngineActorDriver implements ActorDriver {
284291
namespace: runConfig.namespace,
285292
runnerName: runConfig.runnerName,
286293
});
294+
295+
// Start WebSocket ack flush interval
296+
//
297+
// Decreasing this reduces the amount of buffered messages on the
298+
// gateway
299+
//
300+
// Gateway timeout configured to 30s
301+
// https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
302+
this.#wsAckFlushInterval = setInterval(() => this.#flushWsAcks(), 1000);
287303
}
288304

289305
async #loadActorHandler(actorId: string): Promise<ActorHandler> {
@@ -302,6 +318,19 @@ export class EngineActorDriver implements ActorDriver {
302318
return handler.actor;
303319
}
304320

321+
#flushWsAcks(): void {
322+
if (this.#wsAckQueue.size === 0) return;
323+
324+
for (const {
325+
requestIdBuf: requestId,
326+
messageIndex: index,
327+
} of this.#wsAckQueue.values()) {
328+
this.#runner.sendWebsocketMessageAck(requestId, index);
329+
}
330+
331+
this.#wsAckQueue.clear();
332+
}
333+
305334
getContext(actorId: string): DriverContext {
306335
return {};
307336
}
@@ -554,13 +583,32 @@ export class EngineActorDriver implements ActorDriver {
554583

555584
invariant(event.rivetRequestId, "missing rivetRequestId");
556585
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
557-
this.#runner.sendWebsocketMessageAck(
558-
event.rivetRequestId,
559-
event.rivetMessageIndex,
560-
);
586+
587+
// Track only the highest seen message index per request
588+
// Convert ArrayBuffer to string for Map key
589+
const currentEntry = this.#wsAckQueue.get(requestId);
590+
if (currentEntry) {
591+
if (event.rivetMessageIndex > currentEntry.messageIndex) {
592+
currentEntry.messageIndex = event.rivetMessageIndex;
593+
} else {
594+
logger().warn({
595+
msg: "received lower index than ack queue for message",
596+
requestId,
597+
queuedMessageIndex: currentEntry,
598+
eventMessageIndex: event.rivetMessageIndex,
599+
});
600+
}
601+
} else {
602+
this.#wsAckQueue.set(requestId, {
603+
requestIdBuf,
604+
messageIndex: event.rivetMessageIndex,
605+
});
606+
}
561607
});
562608

563609
websocket.addEventListener("close", (event) => {
610+
// Flush any pending acks before closing
611+
this.#flushWsAcks();
564612
wsHandlerPromise.then((x) => x.onClose?.(event, wsContext));
565613
});
566614

@@ -575,6 +623,16 @@ export class EngineActorDriver implements ActorDriver {
575623

576624
async shutdownRunner(immediate: boolean): Promise<void> {
577625
logger().info({ msg: "stopping engine actor driver" });
626+
627+
// Clear the ack flush interval
628+
if (this.#wsAckFlushInterval) {
629+
clearInterval(this.#wsAckFlushInterval);
630+
this.#wsAckFlushInterval = undefined;
631+
}
632+
633+
// Flush any remaining acks
634+
this.#flushWsAcks();
635+
578636
await this.#runner.shutdown(immediate);
579637
}
580638

0 commit comments

Comments
 (0)