11import type * as protocol from "@rivetkit/engine-runner-protocol" ;
22import type { MessageId , RequestId } from "@rivetkit/engine-runner-protocol" ;
3- import { v4 as uuidv4 , stringify as uuidstringify } from "uuid" ;
3+ import { stringify as uuidstringify , v4 as uuidv4 } from "uuid" ;
44import { logger } from "./log" ;
55import type { ActorInstance , Runner } from "./mod" ;
66import { unreachable } from "./utils" ;
@@ -77,12 +77,20 @@ export class Tunnel {
7777 // Build message
7878 const messageId = generateUuidBuffer ( ) ;
7979
80- const requestIdStr = bufferToString ( requestId ) ;
81- this . #pendingTunnelMessages. set ( bufferToString ( messageId ) , {
80+ const requestIdStr = idToStr ( requestId ) ;
81+ const messageIdStr = idToStr ( messageId ) ;
82+ this . #pendingTunnelMessages. set ( messageIdStr , {
8283 sentAt : Date . now ( ) ,
8384 requestIdStr,
8485 } ) ;
8586
87+ logger ( ) ?. debug ( {
88+ msg : "send tunnel msg" ,
89+ requestId : requestIdStr ,
90+ messageId : messageIdStr ,
91+ message : messageKind ,
92+ } ) ;
93+
8694 // Send message
8795 const message : protocol . ToServer = {
8896 tag : "ToServerTunnelMessage" ,
@@ -111,8 +119,8 @@ export class Tunnel {
111119
112120 logger ( ) ?. debug ( {
113121 msg : "ack tunnel msg" ,
114- requestId : uuidstringify ( new Uint8Array ( requestId ) ) ,
115- messageId : uuidstringify ( new Uint8Array ( messageId ) ) ,
122+ requestId : idToStr ( requestId ) ,
123+ messageId : idToStr ( messageId ) ,
116124 } ) ;
117125
118126 this . #runner. __sendToServer ( message ) ;
@@ -163,7 +171,10 @@ export class Tunnel {
163171 const webSocket = this . #actorWebSockets. get ( requestIdStr ) ;
164172 if ( webSocket ) {
165173 // Close the WebSocket connection
166- webSocket . __closeWithRetry ( 1000 , "Message acknowledgment timeout" ) ;
174+ webSocket . __closeWithRetry (
175+ 1000 ,
176+ "Message acknowledgment timeout" ,
177+ ) ;
167178
168179 // Clean up from actorWebSockets map
169180 this . #actorWebSockets. delete ( requestIdStr ) ;
@@ -207,7 +218,11 @@ export class Tunnel {
207218 actor . webSockets . clear ( ) ;
208219 }
209220
210- async #fetch( actorId : string , requestId : protocol . RequestId , request : Request ) : Promise < Response > {
221+ async #fetch(
222+ actorId : string ,
223+ requestId : protocol . RequestId ,
224+ request : Request ,
225+ ) : Promise < Response > {
211226 // Validate actor exists
212227 if ( ! this . #runner. hasActor ( actorId ) ) {
213228 logger ( ) ?. warn ( {
@@ -219,7 +234,10 @@ export class Tunnel {
219234 //
220235 // See should_retry_request_inner
221236 // https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/guard-core/src/proxy_service.rs#L2458
222- return new Response ( "Actor not found" , { status : 503 , headers : { "x-rivet-error" : "runner.actor_not_found" } } ) ;
237+ return new Response ( "Actor not found" , {
238+ status : 503 ,
239+ headers : { "x-rivet-error" : "runner.actor_not_found" } ,
240+ } ) ;
223241 }
224242
225243 const fetchHandler = this . #runner. config . fetch (
@@ -237,19 +255,28 @@ export class Tunnel {
237255 }
238256
239257 async handleTunnelMessage ( message : protocol . ToClientTunnelMessage ) {
258+ const requestIdStr = idToStr ( message . requestId ) ;
259+ const messageIdStr = idToStr ( message . messageId ) ;
240260 logger ( ) ?. debug ( {
241- msg : "tunnel msg" ,
242- requestId : uuidstringify ( new Uint8Array ( message . requestId ) ) ,
243- messageId : uuidstringify ( new Uint8Array ( message . messageId ) ) ,
261+ msg : "receive tunnel msg" ,
262+ requestId : requestIdStr ,
263+ messageId : messageIdStr ,
244264 message : message . messageKind ,
245265 } ) ;
246266
247267 if ( message . messageKind . tag === "TunnelAck" ) {
248268 // Mark pending message as acknowledged and remove it
249- const msgIdStr = bufferToString ( message . messageId ) ;
250- const pending = this . #pendingTunnelMessages. get ( msgIdStr ) ;
269+ const pending = this . #pendingTunnelMessages. get ( messageIdStr ) ;
251270 if ( pending ) {
252- this . #pendingTunnelMessages. delete ( msgIdStr ) ;
271+ const didDelete =
272+ this . #pendingTunnelMessages. delete ( messageIdStr ) ;
273+ if ( ! didDelete ) {
274+ logger ( ) ?. warn ( {
275+ msg : "received tunnel ack for nonexistent message" ,
276+ requestId : requestIdStr ,
277+ messageId : messageIdStr ,
278+ } ) ;
279+ }
253280 }
254281 } else {
255282 switch ( message . messageKind . tag ) {
@@ -282,14 +309,15 @@ export class Tunnel {
282309 message . messageKind . val ,
283310 ) ;
284311 break ;
285- case "ToClientWebSocketMessage" :
312+ case "ToClientWebSocketMessage" : {
286313 this . #sendAck( message . requestId , message . messageId ) ;
287314
288- let _unhandled = await this . #handleWebSocketMessage(
315+ const _unhandled = await this . #handleWebSocketMessage(
289316 message . requestId ,
290317 message . messageKind . val ,
291318 ) ;
292319 break ;
320+ }
293321 case "ToClientWebSocketClose" :
294322 this . #sendAck( message . requestId , message . messageId ) ;
295323
@@ -309,7 +337,7 @@ export class Tunnel {
309337 req : protocol . ToClientRequestStart ,
310338 ) {
311339 // Track this request for the actor
312- const requestIdStr = bufferToString ( requestId ) ;
340+ const requestIdStr = idToStr ( requestId ) ;
313341 const actor = this . #runner. getActor ( req . actorId ) ;
314342 if ( actor ) {
315343 actor . requests . add ( requestIdStr ) ;
@@ -342,8 +370,8 @@ export class Tunnel {
342370 existing . actorId = req . actorId ;
343371 } else {
344372 this . #actorPendingRequests. set ( requestIdStr , {
345- resolve : ( ) => { } ,
346- reject : ( ) => { } ,
373+ resolve : ( ) => { } ,
374+ reject : ( ) => { } ,
347375 streamController : controller ,
348376 actorId : req . actorId ,
349377 } ) ;
@@ -366,7 +394,11 @@ export class Tunnel {
366394 await this . #sendResponse( requestId , response ) ;
367395 } else {
368396 // Non-streaming request
369- const response = await this . #fetch( req . actorId , requestId , request ) ;
397+ const response = await this . #fetch(
398+ req . actorId ,
399+ requestId ,
400+ request ,
401+ ) ;
370402 await this . #sendResponse( requestId , response ) ;
371403 }
372404 } catch ( error ) {
@@ -385,7 +417,7 @@ export class Tunnel {
385417 requestId : ArrayBuffer ,
386418 chunk : protocol . ToClientRequestChunk ,
387419 ) {
388- const requestIdStr = bufferToString ( requestId ) ;
420+ const requestIdStr = idToStr ( requestId ) ;
389421 const pending = this . #actorPendingRequests. get ( requestIdStr ) ;
390422 if ( pending ?. streamController ) {
391423 pending . streamController . enqueue ( new Uint8Array ( chunk . body ) ) ;
@@ -397,7 +429,7 @@ export class Tunnel {
397429 }
398430
399431 async #handleRequestAbort( requestId : ArrayBuffer ) {
400- const requestIdStr = bufferToString ( requestId ) ;
432+ const requestIdStr = idToStr ( requestId ) ;
401433 const pending = this . #actorPendingRequests. get ( requestIdStr ) ;
402434 if ( pending ?. streamController ) {
403435 pending . streamController . error ( new Error ( "Request aborted" ) ) ;
@@ -461,7 +493,7 @@ export class Tunnel {
461493 requestId : protocol . RequestId ,
462494 open : protocol . ToClientWebSocketOpen ,
463495 ) {
464- const webSocketId = bufferToString ( requestId ) ;
496+ const webSocketId = idToStr ( requestId ) ;
465497 // Validate actor exists
466498 const actor = this . #runner. getActor ( open . actorId ) ;
467499 if ( ! actor ) {
@@ -518,7 +550,7 @@ export class Tunnel {
518550 const dataBuffer =
519551 typeof data === "string"
520552 ? ( new TextEncoder ( ) . encode ( data )
521- . buffer as ArrayBuffer )
553+ . buffer as ArrayBuffer )
522554 : data ;
523555
524556 this . #sendMessage( requestId , {
@@ -575,7 +607,12 @@ export class Tunnel {
575607 } ) ;
576608
577609 // Send open confirmation
578- let hibernationConfig = this . #runner. config . getActorHibernationConfig ( actor . actorId , requestId , request ) ;
610+ const hibernationConfig =
611+ this . #runner. config . getActorHibernationConfig (
612+ actor . actorId ,
613+ requestId ,
614+ request ,
615+ ) ;
579616 this . #sendMessage( requestId , {
580617 tag : "ToServerWebSocketOpen" ,
581618 val : {
@@ -587,8 +624,6 @@ export class Tunnel {
587624 // Notify adapter that connection is open
588625 adapter . _handleOpen ( requestId ) ;
589626
590-
591-
592627 // Call websocket handler
593628 await websocketHandler (
594629 this . #runner,
@@ -623,14 +658,19 @@ export class Tunnel {
623658 requestId : ArrayBuffer ,
624659 msg : protocol . ToClientWebSocketMessage ,
625660 ) : Promise < boolean > {
626- const webSocketId = bufferToString ( requestId ) ;
661+ const webSocketId = idToStr ( requestId ) ;
627662 const adapter = this . #actorWebSockets. get ( webSocketId ) ;
628663 if ( adapter ) {
629664 const data = msg . binary
630665 ? new Uint8Array ( msg . data )
631666 : new TextDecoder ( ) . decode ( new Uint8Array ( msg . data ) ) ;
632667
633- return adapter . _handleMessage ( requestId , data , msg . index , msg . binary ) ;
668+ return adapter . _handleMessage (
669+ requestId ,
670+ data ,
671+ msg . index ,
672+ msg . binary ,
673+ ) ;
634674 } else {
635675 return true ;
636676 }
@@ -639,11 +679,12 @@ export class Tunnel {
639679 __ackWebsocketMessage ( requestId : ArrayBuffer , index : number ) {
640680 logger ( ) ?. debug ( {
641681 msg : "ack ws msg" ,
642- requestId : uuidstringify ( new Uint8Array ( requestId ) ) ,
682+ requestId : idToStr ( requestId ) ,
643683 index,
644684 } ) ;
645685
646- if ( index < 0 || index > 65535 ) throw new Error ( "invalid websocket ack index" ) ;
686+ if ( index < 0 || index > 65535 )
687+ throw new Error ( "invalid websocket ack index" ) ;
647688
648689 // Send the ack message
649690 this . #sendMessage( requestId , {
@@ -658,27 +699,26 @@ export class Tunnel {
658699 requestId : ArrayBuffer ,
659700 close : protocol . ToClientWebSocketClose ,
660701 ) {
661- const webSocketId = bufferToString ( requestId ) ;
662- const adapter = this . #actorWebSockets. get ( webSocketId ) ;
702+ const requestIdStr = idToStr ( requestId ) ;
703+ const adapter = this . #actorWebSockets. get ( requestIdStr ) ;
663704 if ( adapter ) {
664705 adapter . _handleClose (
665706 requestId ,
666707 close . code || undefined ,
667708 close . reason || undefined ,
668709 ) ;
669- this . #actorWebSockets. delete ( webSocketId ) ;
710+ this . #actorWebSockets. delete ( requestIdStr ) ;
670711 }
671712 }
672713}
673714
674- /** Converts a buffer to a string. Used for storing strings in a lookup map. */
675- function bufferToString ( buffer : ArrayBuffer ) : string {
676- return Buffer . from ( buffer ) . toString ( "base64" ) ;
677- }
678-
679715/** Generates a UUID as bytes. */
680716function generateUuidBuffer ( ) : ArrayBuffer {
681717 const buffer = new Uint8Array ( 16 ) ;
682718 uuidv4 ( undefined , buffer ) ;
683719 return buffer . buffer ;
684720}
721+
722+ function idToStr ( id : ArrayBuffer ) : string {
723+ return uuidstringify ( new Uint8Array ( id ) ) ;
724+ }
0 commit comments