From c686a97027812553ce4a48e5a424377efe86f035 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 18 Sep 2025 15:34:30 -0700 Subject: [PATCH] fix(core): fix inline client driver for cloudflare-workers --- .../cloudflare-workers/src/actor-driver.ts | 4 +- .../src/actor-handler-do.ts | 5 +- .../cloudflare-workers/src/manager-driver.ts | 5 +- .../tests/driver-tests.test.ts | 13 ++- packages/cloudflare-workers/vitest.config.ts | 2 +- .../fixtures/driver-test-suite/sleep.ts | 3 +- .../rivetkit/src/actor/generic-conn-driver.ts | 3 +- packages/rivetkit/src/actor/instance.ts | 5 +- .../rivetkit/src/actor/router-endpoints.ts | 8 +- packages/rivetkit/src/client/actor-conn.ts | 17 ++- .../test-inline-client-driver.ts | 10 +- .../src/driver-test-suite/tests/actor-conn.ts | 15 ++- .../driver-test-suite/tests/actor-sleep.ts | 1 - .../src/drivers/engine/actor-driver.ts | 5 +- .../src/drivers/file-system/global-state.ts | 5 +- packages/rivetkit/src/inspector/actor.ts | 7 +- packages/rivetkit/src/manager/router.ts | 107 ++++++++++-------- packages/rivetkit/src/utils.ts | 23 +++- packages/rivetkit/tsconfig.json | 3 +- 19 files changed, 154 insertions(+), 87 deletions(-) diff --git a/packages/cloudflare-workers/src/actor-driver.ts b/packages/cloudflare-workers/src/actor-driver.ts index dd8da7680..52a69a1b9 100644 --- a/packages/cloudflare-workers/src/actor-driver.ts +++ b/packages/cloudflare-workers/src/actor-driver.ts @@ -15,6 +15,7 @@ import type { AnyActorInstance, ManagerDriver, } from "rivetkit/driver-helpers"; +import { promiseWithResolvers } from "rivetkit/utils"; import { KEYS } from "./actor-handler-do"; interface DurableObjectGlobalState { @@ -49,7 +50,8 @@ export interface DriverContext { // Actor handler to track running instances class ActorHandler { actor?: AnyActorInstance; - actorPromise?: PromiseWithResolvers = Promise.withResolvers(); + actorPromise?: ReturnType> = + promiseWithResolvers(); genericConnGlobalState = new GenericConnGlobalState(); } diff --git a/packages/cloudflare-workers/src/actor-handler-do.ts b/packages/cloudflare-workers/src/actor-handler-do.ts index cb60c1421..de869ef32 100644 --- a/packages/cloudflare-workers/src/actor-handler-do.ts +++ b/packages/cloudflare-workers/src/actor-handler-do.ts @@ -5,6 +5,7 @@ import type { ActorKey, ActorRouter, Registry, RunConfig } from "rivetkit"; import { createActorRouter, createClientWithDriver } from "rivetkit"; import type { ActorDriver } from "rivetkit/driver-helpers"; import { serializeEmptyPersistData } from "rivetkit/driver-helpers"; +import { promiseWithResolvers } from "rivetkit/utils"; import { CloudflareDurableObjectGlobalState, createCloudflareActorsActorDriverBuilder, @@ -62,7 +63,7 @@ export function createActorDurableObject( implements ActorHandlerInterface { #initialized?: InitializedData; - #initializedPromise?: PromiseWithResolvers; + #initializedPromise?: ReturnType>; #actor?: LoadedActor; @@ -73,7 +74,7 @@ export function createActorDurableObject( if (this.#initializedPromise) { await this.#initializedPromise.promise; } else { - this.#initializedPromise = Promise.withResolvers(); + this.#initializedPromise = promiseWithResolvers(); const res = await this.ctx.storage.get([ KEYS.NAME, KEYS.KEY, diff --git a/packages/cloudflare-workers/src/manager-driver.ts b/packages/cloudflare-workers/src/manager-driver.ts index caa7ffdee..95d9175a3 100644 --- a/packages/cloudflare-workers/src/manager-driver.ts +++ b/packages/cloudflare-workers/src/manager-driver.ts @@ -93,7 +93,8 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { headers["sec-websocket-protocol"] = "rivetkit"; // Use the path parameter to determine the URL - const url = `http://actor${path}`; + const normalizedPath = path.startsWith("/") ? path : `/${path}`; + const url = `http://actor${normalizedPath}`; logger().debug({ msg: "rewriting websocket url", from: path, to: url }); @@ -104,7 +105,7 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { if (!webSocket) { throw new InternalError( - "missing websocket connection in response from DO", + `missing websocket connection in response from DO\n\nStatus: ${response.status}\nResponse: ${await response.text()}`, ); } diff --git a/packages/cloudflare-workers/tests/driver-tests.test.ts b/packages/cloudflare-workers/tests/driver-tests.test.ts index e176e8043..46431c1e0 100644 --- a/packages/cloudflare-workers/tests/driver-tests.test.ts +++ b/packages/cloudflare-workers/tests/driver-tests.test.ts @@ -170,7 +170,11 @@ async function setupProject(projectPath: string) { const wranglerConfig = { name: "rivetkit-test", compatibility_date: "2025-01-29", - compatibility_flags: ["nodejs_compat"], + compatibility_flags: [ + "nodejs_compat", + // Required for passing log env vars + "nodejs_compat_populate_process_env", + ], migrations: [ { new_classes: ["ActorHandler"], @@ -194,6 +198,13 @@ async function setupProject(projectPath: string) { observability: { enabled: true, }, + vars: { + LOG_LEVEL: "DEBUG", + LOG_TARGET: "1", + LOG_TIMESTAMP: "1", + _RIVETKIT_ERROR_STACK: "1", + _RIVETKIT_LOG_MESSAGE: "1", + }, }; await fs.writeFile( path.join(tmpDir, "wrangler.json"), diff --git a/packages/cloudflare-workers/vitest.config.ts b/packages/cloudflare-workers/vitest.config.ts index 9a4971dc0..f30276784 100644 --- a/packages/cloudflare-workers/vitest.config.ts +++ b/packages/cloudflare-workers/vitest.config.ts @@ -6,6 +6,6 @@ export default defineConfig({ test: { ...defaultConfig.test, // Requires time for installing packages - testTimeout: 60_000, + testTimeout: 15_000, }, }); diff --git a/packages/rivetkit/fixtures/driver-test-suite/sleep.ts b/packages/rivetkit/fixtures/driver-test-suite/sleep.ts index 29f85422b..fec86f6a3 100644 --- a/packages/rivetkit/fixtures/driver-test-suite/sleep.ts +++ b/packages/rivetkit/fixtures/driver-test-suite/sleep.ts @@ -1,4 +1,5 @@ import { actor, type UniversalWebSocket } from "rivetkit"; +import { promiseWithResolvers } from "rivetkit/utils"; export const SLEEP_TIMEOUT = 500; @@ -44,7 +45,7 @@ export const sleepWithLongRpc = actor({ }, longRunningRpc: async (c) => { c.log.info("starting long running rpc"); - c.vars.longRunningResolve = Promise.withResolvers(); + c.vars.longRunningResolve = promiseWithResolvers(); c.broadcast("waiting"); await c.vars.longRunningResolve.promise; c.log.info("finished long running rpc"); diff --git a/packages/rivetkit/src/actor/generic-conn-driver.ts b/packages/rivetkit/src/actor/generic-conn-driver.ts index 9653f91df..257edcc92 100644 --- a/packages/rivetkit/src/actor/generic-conn-driver.ts +++ b/packages/rivetkit/src/actor/generic-conn-driver.ts @@ -16,6 +16,7 @@ import type { AnyActorInstance } from "@/actor/instance"; import type { CachedSerializer, Encoding } from "@/actor/protocol/serde"; import { encodeDataToString } from "@/actor/protocol/serde"; import type * as protocol from "@/schemas/client-protocol/mod"; +import { promiseWithResolvers } from "@/utils"; import { loggerWithoutContext } from "./log"; // This state is different than `PersistedConn` state since the connection-specific state is persisted & must be serializable. This is also part of the connection driver, not part of the core actor. @@ -133,7 +134,7 @@ export function createGenericWebSocketDriver( } // Create promise to wait for socket to close gracefully - const { promise, resolve } = Promise.withResolvers(); + const { promise, resolve } = promiseWithResolvers(); raw.addEventListener("close", () => resolve()); // Close socket diff --git a/packages/rivetkit/src/actor/instance.ts b/packages/rivetkit/src/actor/instance.ts index ae90e88e9..06b4d7a4e 100644 --- a/packages/rivetkit/src/actor/instance.ts +++ b/packages/rivetkit/src/actor/instance.ts @@ -15,6 +15,7 @@ import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { bufferToArrayBuffer, getEnvUniversal, + promiseWithResolvers, SinglePromiseQueue, } from "@/utils"; import type { ActionContext } from "./action"; @@ -543,7 +544,7 @@ export class ActorInstance { } /** Promise used to wait for a save to complete. This is required since you cannot await `#saveStateThrottled`. */ - #onPersistSavedPromise?: PromiseWithResolvers; + #onPersistSavedPromise?: ReturnType>; /** Throttled save state method. Used to write to KV at a reasonable cadence. */ #savePersistThrottled() { @@ -1562,7 +1563,7 @@ export class ActorInstance { } else { // Create callback if (!this.#onPersistSavedPromise) { - this.#onPersistSavedPromise = Promise.withResolvers(); + this.#onPersistSavedPromise = promiseWithResolvers(); } // Save state throttled diff --git a/packages/rivetkit/src/actor/router-endpoints.ts b/packages/rivetkit/src/actor/router-endpoints.ts index 2563202b3..050abbc24 100644 --- a/packages/rivetkit/src/actor/router-endpoints.ts +++ b/packages/rivetkit/src/actor/router-endpoints.ts @@ -36,7 +36,7 @@ import { deserializeWithEncoding, serializeWithEncoding, } from "@/serde"; -import { bufferToArrayBuffer } from "@/utils"; +import { bufferToArrayBuffer, promiseWithResolvers } from "@/utils"; import type { ActorDriver } from "./driver"; import type { GenericHttpDriverState, @@ -128,7 +128,7 @@ export async function handleWebSocketConnect( promise: handlersPromise, resolve: handlersResolve, reject: handlersReject, - } = Promise.withResolvers<{ + } = promiseWithResolvers<{ conn: AnyConn; actor: AnyActorInstance; connId: string; @@ -162,7 +162,7 @@ export async function handleWebSocketConnect( return { onOpen: (_evt: any, ws: WSContext) => { - actor.rLog.debug("websocket open"); + actor.rLog.debug("actor websocket open"); // Run async operations in background (async () => { @@ -380,7 +380,7 @@ export async function handleSseConnect( ); // Wait for close - const abortResolver = Promise.withResolvers(); + const abortResolver = promiseWithResolvers(); // HACK: This is required so the abort handler below works // diff --git a/packages/rivetkit/src/client/actor-conn.ts b/packages/rivetkit/src/client/actor-conn.ts index 16e783cc2..9a3c21327 100644 --- a/packages/rivetkit/src/client/actor-conn.ts +++ b/packages/rivetkit/src/client/actor-conn.ts @@ -32,7 +32,12 @@ import { encodingIsBinary, serializeWithEncoding, } from "@/serde"; -import { bufferToArrayBuffer, getEnvUniversal, httpUserAgent } from "@/utils"; +import { + bufferToArrayBuffer, + getEnvUniversal, + httpUserAgent, + promiseWithResolvers, +} from "@/utils"; import type { ActorDefinitionActions } from "./actor-common"; import { queryActor } from "./actor-query"; import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client"; @@ -119,7 +124,7 @@ export class ActorConnRaw { #keepNodeAliveInterval: NodeJS.Timeout; /** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */ - #onOpenPromise?: PromiseWithResolvers; + #onOpenPromise?: ReturnType>; #client: ClientRaw; #driver: ManagerDriver; @@ -177,7 +182,7 @@ export class ActorConnRaw { this.#actionIdCounter += 1; const { promise, resolve, reject } = - Promise.withResolvers(); + promiseWithResolvers(); this.#actionsInFlight.set(actionId, { name: opts.name, resolve, reject }); this.#sendMessage({ @@ -253,7 +258,7 @@ enc // Create promise for open if (this.#onOpenPromise) throw new Error("#onOpenPromise already defined"); - this.#onOpenPromise = Promise.withResolvers(); + this.#onOpenPromise = promiseWithResolvers(); // Connect transport if (this.#client[TRANSPORT_SYMBOL] === "websocket") { @@ -285,7 +290,7 @@ enc ); this.#transport = { websocket: ws }; ws.addEventListener("open", () => { - logger().debug({ msg: "websocket open" }); + logger().debug({ msg: "client websocket open" }); }); ws.addEventListener("message", async (ev) => { this.#handleOnMessage(ev.data); @@ -818,7 +823,7 @@ enc ) { logger().debug({ msg: "ws already closed or closing" }); } else { - const { promise, resolve } = Promise.withResolvers(); + const { promise, resolve } = promiseWithResolvers(); ws.addEventListener("close", () => { logger().debug({ msg: "ws closed" }); resolve(undefined); diff --git a/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts b/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts index fb2d260bf..247bf0515 100644 --- a/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts +++ b/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts @@ -160,10 +160,6 @@ export function createTestInlineClientDriver( // Normalize path to match other drivers const normalizedPath = path.startsWith("/") ? path.slice(1) : path; - logger().debug({ - msg: "creating websocket connection via test inline driver", - }); - // Create WebSocket connection to the test endpoint // Use a placeholder path and pass the actual path as a query param to avoid mixing user query params with internal ones const wsUrl = new URL( @@ -174,6 +170,12 @@ export function createTestInlineClientDriver( if (params !== undefined) wsUrl.searchParams.set("params", JSON.stringify(params)); wsUrl.searchParams.set("encodingKind", encoding); + wsUrl.searchParams.set("transport", transport); + + logger().debug({ + msg: "creating websocket connection via test inline driver", + url: wsUrl.toString(), + }); // Convert http/https to ws/wss const wsProtocol = wsUrl.protocol === "https:" ? "wss:" : "ws:"; diff --git a/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts b/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts index 6d768010d..bf6630ac1 100644 --- a/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts +++ b/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts @@ -126,12 +126,15 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { // HACK: Race condition between subscribing & sending events in SSE // Verify events were received - await vi.waitFor(async () => { - await connection.setCount(5); - await connection.setCount(8); - expect(receivedEvents).toContain(5); - expect(receivedEvents).toContain(8); - }); + await vi.waitFor( + async () => { + await connection.setCount(5); + await connection.setCount(8); + expect(receivedEvents).toContain(5); + expect(receivedEvents).toContain(8); + }, + { timeout: 10_000 }, + ); // Clean up await connection.dispose(); diff --git a/packages/rivetkit/src/driver-test-suite/tests/actor-sleep.ts b/packages/rivetkit/src/driver-test-suite/tests/actor-sleep.ts index c2279cdf3..28354455c 100644 --- a/packages/rivetkit/src/driver-test-suite/tests/actor-sleep.ts +++ b/packages/rivetkit/src/driver-test-suite/tests/actor-sleep.ts @@ -318,7 +318,6 @@ export function runActorSleepTests(driverTestConfig: DriverTestConfig) { // Close WebSocket ws.close(); - await new Promise((resolve) => setTimeout(resolve, 100)); // Wait for sleep timeout after WebSocket closed await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); diff --git a/packages/rivetkit/src/drivers/engine/actor-driver.ts b/packages/rivetkit/src/drivers/engine/actor-driver.ts index 74620dd5e..b1672523e 100644 --- a/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -37,13 +37,14 @@ import { } from "@/driver-helpers/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; +import { promiseWithResolvers } from "@/utils"; import type { Config } from "./config"; import { KEYS } from "./kv"; import { logger } from "./log"; interface ActorHandler { actor?: AnyActorInstance; - actorStartPromise?: PromiseWithResolvers; + actorStartPromise?: ReturnType>; genericConnGlobalState: GenericConnGlobalState; persistedData?: Uint8Array; } @@ -222,7 +223,7 @@ export class EngineActorDriver implements ActorDriver { if (!handler) { handler = { genericConnGlobalState: new GenericConnGlobalState(), - actorStartPromise: Promise.withResolvers(), + actorStartPromise: promiseWithResolvers(), persistedData: serializeEmptyPersistData(input), }; this.#actors.set(actorId, handler); diff --git a/packages/rivetkit/src/drivers/file-system/global-state.ts b/packages/rivetkit/src/drivers/file-system/global-state.ts index e8d0516f3..30918c342 100644 --- a/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -27,6 +27,7 @@ import { import { bufferToArrayBuffer, type LongTimeoutHandle, + promiseWithResolvers, SinglePromiseQueue, setLongTimeout, stringifyError, @@ -49,7 +50,7 @@ interface ActorEntry { actor?: AnyActorInstance; /** Promise for starting the actor. */ - startPromise?: PromiseWithResolvers; + startPromise?: ReturnType>; genericConnGlobalState: GenericConnGlobalState; @@ -469,7 +470,7 @@ export class FileSystemGlobalState { } // Create start promise - entry.startPromise = Promise.withResolvers(); + entry.startPromise = promiseWithResolvers(); try { // Create actor diff --git a/packages/rivetkit/src/inspector/actor.ts b/packages/rivetkit/src/inspector/actor.ts index e202f96fa..402c472f8 100644 --- a/packages/rivetkit/src/inspector/actor.ts +++ b/packages/rivetkit/src/inspector/actor.ts @@ -8,6 +8,7 @@ import type { AnyDatabaseProvider, InferDatabaseClient, } from "@/actor/database"; +import { promiseWithResolvers } from "@/utils"; import { ColumnsSchema, type Connection, @@ -99,7 +100,7 @@ export function createActorInspectorRouter() { }); }); - const { promise } = Promise.withResolvers(); + const { promise } = promiseWithResolvers(); return promise; }, @@ -128,7 +129,7 @@ export function createActorInspectorRouter() { }); }); - const { promise } = Promise.withResolvers(); + const { promise } = promiseWithResolvers(); return promise; }, @@ -159,7 +160,7 @@ export function createActorInspectorRouter() { }); }); - const { promise } = Promise.withResolvers(); + const { promise } = promiseWithResolvers(); return promise; }, diff --git a/packages/rivetkit/src/manager/router.ts b/packages/rivetkit/src/manager/router.ts index e91c42133..91c494be6 100644 --- a/packages/rivetkit/src/manager/router.ts +++ b/packages/rivetkit/src/manager/router.ts @@ -13,7 +13,7 @@ import { Unsupported, WebSocketsNotEnabled, } from "@/actor/errors"; -import type { Encoding } from "@/client/mod"; +import type { Encoding, Transport } from "@/client/mod"; import { handleRouteError, handleRouteNotFound, @@ -44,7 +44,7 @@ import { RivetIdSchema } from "@/manager-api/routes/common"; import type { UniversalWebSocket, UpgradeWebSocketArgs } from "@/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; -import { stringifyError } from "@/utils"; +import { promiseWithResolvers, stringifyError } from "@/utils"; import type { ManagerDriver } from "./driver"; import { logger } from "./log"; @@ -440,11 +440,13 @@ export function createManagerRouter( actorId, params: paramsRaw, encodingKind, + transport, } = c.req.query() as { path: string; actorId: string; params?: string; encodingKind: Encoding; + transport: Transport; }; const params = paramsRaw !== undefined ? JSON.parse(paramsRaw) : undefined; @@ -454,6 +456,7 @@ export function createManagerRouter( actorId, params, encodingKind, + transport, path: path, }); @@ -465,7 +468,7 @@ export function createManagerRouter( params, ); - return await createTestWebSocketProxy(clientWsPromise, "standard"); + return await createTestWebSocketProxy(clientWsPromise); })(c, noopNext()); }); @@ -560,10 +563,14 @@ export function createManagerRouter( */ async function createTestWebSocketProxy( clientWsPromise: Promise, - connectionType: string, ): Promise { // Store a reference to the resolved WebSocket let clientWs: UniversalWebSocket | null = null; + const { + promise: serverWsPromise, + resolve: serverWsResolve, + reject: serverWsReject, + } = promiseWithResolvers(); try { // Resolve the client WebSocket promise logger().debug({ msg: "awaiting client websocket promise" }); @@ -577,7 +584,7 @@ async function createTestWebSocketProxy( // Wait for ws to open await new Promise((resolve, reject) => { const onOpen = () => { - logger().debug({ msg: "test websocket connection opened" }); + logger().debug({ msg: "test websocket connection to actor opened" }); resolve(); }; const onError = (error: any) => { @@ -585,44 +592,18 @@ async function createTestWebSocketProxy( reject( new Error(`Failed to open WebSocket: ${error.message || error}`), ); + serverWsReject(); }; + ws.addEventListener("open", onOpen); - ws.addEventListener("error", onError); - }); - } catch (error) { - logger().error({ - msg: `failed to establish client ${connectionType} websocket connection`, - error, - }); - return { - onOpen: (_evt, serverWs) => { - serverWs.close(1011, "Failed to establish connection"); - }, - onMessage: () => {}, - onError: () => {}, - onClose: () => {}, - }; - } - // Create WebSocket proxy handlers to relay messages between client and server - return { - onOpen: (_evt: any, serverWs: WSContext) => { - logger().debug({ - msg: `test ${connectionType} websocket connection opened`, - }); + ws.addEventListener("error", onError); - // Check WebSocket type - logger().debug({ - msg: "clientWs info", - constructor: clientWs.constructor.name, - hasAddEventListener: typeof clientWs.addEventListener === "function", - readyState: clientWs.readyState, - }); + ws.addEventListener("message", async (clientEvt: MessageEvent) => { + const serverWs = await serverWsPromise; - // Add message handler to forward messages from client to server - clientWs.addEventListener("message", (clientEvt: MessageEvent) => { logger().debug({ - msg: `test ${connectionType} websocket connection message from client`, + msg: `test websocket connection message from client`, dataType: typeof clientEvt.data, isBlob: clientEvt.data instanceof Blob, isArrayBuffer: clientEvt.data instanceof ArrayBuffer, @@ -666,10 +647,11 @@ async function createTestWebSocketProxy( } }); - // Add close handler to close server when client closes - clientWs.addEventListener("close", (clientEvt: any) => { + ws.addEventListener("close", async (clientEvt: any) => { + const serverWs = await serverWsPromise; + logger().debug({ - msg: `test ${connectionType} websocket connection closed`, + msg: `test websocket connection closed`, }); if (serverWs.readyState !== 3) { @@ -678,10 +660,11 @@ async function createTestWebSocketProxy( } }); - // Add error handler - clientWs.addEventListener("error", () => { + ws.addEventListener("error", async () => { + const serverWs = await serverWsPromise; + logger().debug({ - msg: `test ${connectionType} websocket connection error`, + msg: `test websocket connection error`, }); if (serverWs.readyState !== 3) { @@ -689,6 +672,38 @@ async function createTestWebSocketProxy( serverWs.close(1011, "Error in client websocket"); } }); + }); + } catch (error) { + logger().error({ + msg: `failed to establish client websocket connection`, + error, + }); + return { + onOpen: (_evt, serverWs) => { + serverWs.close(1011, "Failed to establish connection"); + }, + onMessage: () => {}, + onError: () => {}, + onClose: () => {}, + }; + } + + // Create WebSocket proxy handlers to relay messages between client and server + return { + onOpen: (_evt: any, serverWs: WSContext) => { + logger().debug({ + msg: `test websocket connection from client opened`, + }); + + // Check WebSocket type + logger().debug({ + msg: "clientWs info", + constructor: clientWs.constructor.name, + hasAddEventListener: typeof clientWs.addEventListener === "function", + readyState: clientWs.readyState, + }); + + serverWsResolve(serverWs); }, onMessage: (evt: { data: any }) => { logger().debug({ @@ -741,7 +756,7 @@ async function createTestWebSocketProxy( serverWs: WSContext, ) => { logger().debug({ - msg: `server ${connectionType} websocket closed`, + msg: `server websocket closed`, wasClean: event.wasClean, code: event.code, reason: event.reason, @@ -763,7 +778,7 @@ async function createTestWebSocketProxy( }, onError: (error: unknown) => { logger().error({ - msg: `error in server ${connectionType} websocket`, + msg: `error in server websocket`, error, }); @@ -775,6 +790,8 @@ async function createTestWebSocketProxy( ) { clientWs.close(1011, "Error in server websocket"); } + + serverWsReject(); }, }; } diff --git a/packages/rivetkit/src/utils.ts b/packages/rivetkit/src/utils.ts index 21df6f6f5..fe390597c 100644 --- a/packages/rivetkit/src/utils.ts +++ b/packages/rivetkit/src/utils.ts @@ -77,6 +77,25 @@ const TIMEOUT_MAX = 2147483647; // 2^31-1 export type LongTimeoutHandle = { abort: () => void }; +/** + * Polyfill for Promise.withResolvers(). + * + * This is specifically for Cloudflare Workers. Their implementation of Promise.withResolvers does not work correctly. + */ +export function promiseWithResolvers(): { + promise: Promise; + resolve: (value: T | PromiseLike) => void; + reject: (reason?: any) => void; +} { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + export function setLongTimeout( listener: () => void, after: number, @@ -115,7 +134,7 @@ export class SinglePromiseQueue { runningDrainLoop?: Promise; /** Pending resolver fro the currently queued entry. */ - #pending?: PromiseWithResolvers; + #pending?: ReturnType>; /** Queue the next operation and return a promise that resolves when it flushes. */ enqueue(op: () => Promise): Promise { @@ -124,7 +143,7 @@ export class SinglePromiseQueue { // Ensure a shared resolver exists for all callers in this cycle if (!this.#pending) { - this.#pending = Promise.withResolvers(); + this.#pending = promiseWithResolvers(); } const waitForThisCycle = this.#pending.promise; diff --git a/packages/rivetkit/tsconfig.json b/packages/rivetkit/tsconfig.json index fc714fd67..7042125b9 100644 --- a/packages/rivetkit/tsconfig.json +++ b/packages/rivetkit/tsconfig.json @@ -6,7 +6,8 @@ "paths": { "@/*": ["./src/*"], // Used for test fixtures - "rivetkit": ["./src/mod.ts"] + "rivetkit": ["./src/mod.ts"], + "rivetkit/utils": ["./src/utils.ts"] } }, "include": [