@@ -76,6 +76,13 @@ export class EngineActorDriver implements ActorDriver {
7676 #runnerStarted: PromiseWithResolvers < undefined > = promiseWithResolvers ( ) ;
7777 #runnerStopped: PromiseWithResolvers < undefined > = promiseWithResolvers ( ) ;
7878
79+ // WebSocket message acknowledgment debouncing
80+ #wsAckQueue: Map <
81+ string ,
82+ { requestIdBuf : ArrayBuffer ; messageIndex : number }
83+ > = new Map ( ) ;
84+ #wsAckFlushInterval?: NodeJS . Timeout ;
85+
7986 constructor (
8087 registryConfig : RegistryConfig ,
8188 runConfig : RunnerConfig ,
@@ -284,6 +291,15 @@ export class EngineActorDriver implements ActorDriver {
284291 namespace : runConfig . namespace ,
285292 runnerName : runConfig . runnerName ,
286293 } ) ;
294+
295+ // Start WebSocket ack flush interval
296+ //
297+ // Decreasing this reduces the amount of buffered messages on the
298+ // gateway
299+ //
300+ // Gateway timeout configured to 30s
301+ // https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
302+ this . #wsAckFlushInterval = setInterval ( ( ) => this . #flushWsAcks( ) , 1000 ) ;
287303 }
288304
289305 async #loadActorHandler( actorId : string ) : Promise < ActorHandler > {
@@ -302,6 +318,19 @@ export class EngineActorDriver implements ActorDriver {
302318 return handler . actor ;
303319 }
304320
321+ #flushWsAcks( ) : void {
322+ if ( this . #wsAckQueue. size === 0 ) return ;
323+
324+ for ( const {
325+ requestIdBuf : requestId ,
326+ messageIndex : index ,
327+ } of this . #wsAckQueue. values ( ) ) {
328+ this . #runner. sendWebsocketMessageAck ( requestId , index ) ;
329+ }
330+
331+ this . #wsAckQueue. clear ( ) ;
332+ }
333+
305334 getContext ( actorId : string ) : DriverContext {
306335 return { } ;
307336 }
@@ -554,13 +583,32 @@ export class EngineActorDriver implements ActorDriver {
554583
555584 invariant ( event . rivetRequestId , "missing rivetRequestId" ) ;
556585 invariant ( event . rivetMessageIndex , "missing rivetMessageIndex" ) ;
557- this . #runner. sendWebsocketMessageAck (
558- event . rivetRequestId ,
559- event . rivetMessageIndex ,
560- ) ;
586+
587+ // Track only the highest seen message index per request
588+ // Convert ArrayBuffer to string for Map key
589+ const currentEntry = this . #wsAckQueue. get ( requestId ) ;
590+ if ( currentEntry ) {
591+ if ( event . rivetMessageIndex > currentEntry . messageIndex ) {
592+ currentEntry . messageIndex = event . rivetMessageIndex ;
593+ } else {
594+ logger ( ) . warn ( {
595+ msg : "received lower index than ack queue for message" ,
596+ requestId,
597+ queuedMessageIndex : currentEntry ,
598+ eventMessageIndex : event . rivetMessageIndex ,
599+ } ) ;
600+ }
601+ } else {
602+ this . #wsAckQueue. set ( requestId , {
603+ requestIdBuf,
604+ messageIndex : event . rivetMessageIndex ,
605+ } ) ;
606+ }
561607 } ) ;
562608
563609 websocket . addEventListener ( "close" , ( event ) => {
610+ // Flush any pending acks before closing
611+ this . #flushWsAcks( ) ;
564612 wsHandlerPromise . then ( ( x ) => x . onClose ?.( event , wsContext ) ) ;
565613 } ) ;
566614
@@ -575,6 +623,16 @@ export class EngineActorDriver implements ActorDriver {
575623
576624 async shutdownRunner ( immediate : boolean ) : Promise < void > {
577625 logger ( ) . info ( { msg : "stopping engine actor driver" } ) ;
626+
627+ // Clear the ack flush interval
628+ if ( this . #wsAckFlushInterval) {
629+ clearInterval ( this . #wsAckFlushInterval) ;
630+ this . #wsAckFlushInterval = undefined ;
631+ }
632+
633+ // Flush any remaining acks
634+ this . #flushWsAcks( ) ;
635+
578636 await this . #runner. shutdown ( immediate ) ;
579637 }
580638
0 commit comments