Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/cloudflare-workers/src/actor-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
AnyActorInstance,
ManagerDriver,
} from "rivetkit/driver-helpers";
import { promiseWithResolvers } from "rivetkit/utils";
import { KEYS } from "./actor-handler-do";

interface DurableObjectGlobalState {
Expand Down Expand Up @@ -49,7 +50,8 @@ export interface DriverContext {
// Actor handler to track running instances
class ActorHandler {
actor?: AnyActorInstance;
actorPromise?: PromiseWithResolvers<void> = Promise.withResolvers();
actorPromise?: ReturnType<typeof promiseWithResolvers<void>> =
promiseWithResolvers();
genericConnGlobalState = new GenericConnGlobalState();
}

Expand Down
5 changes: 3 additions & 2 deletions packages/cloudflare-workers/src/actor-handler-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { ActorKey, ActorRouter, Registry, RunConfig } from "rivetkit";
import { createActorRouter, createClientWithDriver } from "rivetkit";
import type { ActorDriver } from "rivetkit/driver-helpers";
import { serializeEmptyPersistData } from "rivetkit/driver-helpers";
import { promiseWithResolvers } from "rivetkit/utils";
import {
CloudflareDurableObjectGlobalState,
createCloudflareActorsActorDriverBuilder,
Expand Down Expand Up @@ -62,7 +63,7 @@ export function createActorDurableObject(
implements ActorHandlerInterface
{
#initialized?: InitializedData;
#initializedPromise?: PromiseWithResolvers<void>;
#initializedPromise?: ReturnType<typeof promiseWithResolvers<void>>;

#actor?: LoadedActor;

Expand All @@ -73,7 +74,7 @@ export function createActorDurableObject(
if (this.#initializedPromise) {
await this.#initializedPromise.promise;
} else {
this.#initializedPromise = Promise.withResolvers();
this.#initializedPromise = promiseWithResolvers();
const res = await this.ctx.storage.get([
KEYS.NAME,
KEYS.KEY,
Expand Down
5 changes: 3 additions & 2 deletions packages/cloudflare-workers/src/manager-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ export class CloudflareActorsManagerDriver implements ManagerDriver {
headers["sec-websocket-protocol"] = "rivetkit";

// Use the path parameter to determine the URL
const url = `http://actor${path}`;
const normalizedPath = path.startsWith("/") ? path : `/${path}`;
const url = `http://actor${normalizedPath}`;

logger().debug({ msg: "rewriting websocket url", from: path, to: url });

Expand All @@ -104,7 +105,7 @@ export class CloudflareActorsManagerDriver implements ManagerDriver {

if (!webSocket) {
throw new InternalError(
"missing websocket connection in response from DO",
`missing websocket connection in response from DO\n\nStatus: ${response.status}\nResponse: ${await response.text()}`,
);
}

Expand Down
13 changes: 12 additions & 1 deletion packages/cloudflare-workers/tests/driver-tests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ async function setupProject(projectPath: string) {
const wranglerConfig = {
name: "rivetkit-test",
compatibility_date: "2025-01-29",
compatibility_flags: ["nodejs_compat"],
compatibility_flags: [
"nodejs_compat",
// Required for passing log env vars
"nodejs_compat_populate_process_env",
],
migrations: [
{
new_classes: ["ActorHandler"],
Expand All @@ -194,6 +198,13 @@ async function setupProject(projectPath: string) {
observability: {
enabled: true,
},
vars: {
LOG_LEVEL: "DEBUG",
LOG_TARGET: "1",
LOG_TIMESTAMP: "1",
_RIVETKIT_ERROR_STACK: "1",
_RIVETKIT_LOG_MESSAGE: "1",
},
};
await fs.writeFile(
path.join(tmpDir, "wrangler.json"),
Expand Down
2 changes: 1 addition & 1 deletion packages/cloudflare-workers/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ export default defineConfig({
test: {
...defaultConfig.test,
// Requires time for installing packages
testTimeout: 60_000,
testTimeout: 15_000,
},
});
3 changes: 2 additions & 1 deletion packages/rivetkit/fixtures/driver-test-suite/sleep.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { actor, type UniversalWebSocket } from "rivetkit";
import { promiseWithResolvers } from "rivetkit/utils";

export const SLEEP_TIMEOUT = 500;

Expand Down Expand Up @@ -44,7 +45,7 @@ export const sleepWithLongRpc = actor({
},
longRunningRpc: async (c) => {
c.log.info("starting long running rpc");
c.vars.longRunningResolve = Promise.withResolvers();
c.vars.longRunningResolve = promiseWithResolvers();
c.broadcast("waiting");
await c.vars.longRunningResolve.promise;
c.log.info("finished long running rpc");
Expand Down
3 changes: 2 additions & 1 deletion packages/rivetkit/src/actor/generic-conn-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type { AnyActorInstance } from "@/actor/instance";
import type { CachedSerializer, Encoding } from "@/actor/protocol/serde";
import { encodeDataToString } from "@/actor/protocol/serde";
import type * as protocol from "@/schemas/client-protocol/mod";
import { promiseWithResolvers } from "@/utils";
import { loggerWithoutContext } from "./log";

// This state is different than `PersistedConn` state since the connection-specific state is persisted & must be serializable. This is also part of the connection driver, not part of the core actor.
Expand Down Expand Up @@ -133,7 +134,7 @@ export function createGenericWebSocketDriver(
}

// Create promise to wait for socket to close gracefully
const { promise, resolve } = Promise.withResolvers<void>();
const { promise, resolve } = promiseWithResolvers<void>();
raw.addEventListener("close", () => resolve());

// Close socket
Expand Down
5 changes: 3 additions & 2 deletions packages/rivetkit/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
import {
bufferToArrayBuffer,
getEnvUniversal,
promiseWithResolvers,
SinglePromiseQueue,
} from "@/utils";
import type { ActionContext } from "./action";
Expand Down Expand Up @@ -543,7 +544,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
}

/** Promise used to wait for a save to complete. This is required since you cannot await `#saveStateThrottled`. */
#onPersistSavedPromise?: PromiseWithResolvers<void>;
#onPersistSavedPromise?: ReturnType<typeof promiseWithResolvers<void>>;

/** Throttled save state method. Used to write to KV at a reasonable cadence. */
#savePersistThrottled() {
Expand Down Expand Up @@ -1562,7 +1563,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
} else {
// Create callback
if (!this.#onPersistSavedPromise) {
this.#onPersistSavedPromise = Promise.withResolvers();
this.#onPersistSavedPromise = promiseWithResolvers();
}

// Save state throttled
Expand Down
8 changes: 4 additions & 4 deletions packages/rivetkit/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
deserializeWithEncoding,
serializeWithEncoding,
} from "@/serde";
import { bufferToArrayBuffer } from "@/utils";
import { bufferToArrayBuffer, promiseWithResolvers } from "@/utils";
import type { ActorDriver } from "./driver";
import type {
GenericHttpDriverState,
Expand Down Expand Up @@ -128,7 +128,7 @@ export async function handleWebSocketConnect(
promise: handlersPromise,
resolve: handlersResolve,
reject: handlersReject,
} = Promise.withResolvers<{
} = promiseWithResolvers<{
conn: AnyConn;
actor: AnyActorInstance;
connId: string;
Expand Down Expand Up @@ -162,7 +162,7 @@ export async function handleWebSocketConnect(

return {
onOpen: (_evt: any, ws: WSContext) => {
actor.rLog.debug("websocket open");
actor.rLog.debug("actor websocket open");

// Run async operations in background
(async () => {
Expand Down Expand Up @@ -380,7 +380,7 @@ export async function handleSseConnect(
);

// Wait for close
const abortResolver = Promise.withResolvers();
const abortResolver = promiseWithResolvers();

// HACK: This is required so the abort handler below works
//
Expand Down
17 changes: 11 additions & 6 deletions packages/rivetkit/src/client/actor-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ import {
encodingIsBinary,
serializeWithEncoding,
} from "@/serde";
import { bufferToArrayBuffer, getEnvUniversal, httpUserAgent } from "@/utils";
import {
bufferToArrayBuffer,
getEnvUniversal,
httpUserAgent,
promiseWithResolvers,
} from "@/utils";
import type { ActorDefinitionActions } from "./actor-common";
import { queryActor } from "./actor-query";
import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client";
Expand Down Expand Up @@ -119,7 +124,7 @@ export class ActorConnRaw {
#keepNodeAliveInterval: NodeJS.Timeout;

/** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */
#onOpenPromise?: PromiseWithResolvers<undefined>;
#onOpenPromise?: ReturnType<typeof promiseWithResolvers<undefined>>;

#client: ClientRaw;
#driver: ManagerDriver;
Expand Down Expand Up @@ -177,7 +182,7 @@ export class ActorConnRaw {
this.#actionIdCounter += 1;

const { promise, resolve, reject } =
Promise.withResolvers<protocol.ActionResponse>();
promiseWithResolvers<protocol.ActionResponse>();
this.#actionsInFlight.set(actionId, { name: opts.name, resolve, reject });

this.#sendMessage({
Expand Down Expand Up @@ -253,7 +258,7 @@ enc
// Create promise for open
if (this.#onOpenPromise)
throw new Error("#onOpenPromise already defined");
this.#onOpenPromise = Promise.withResolvers();
this.#onOpenPromise = promiseWithResolvers();

// Connect transport
if (this.#client[TRANSPORT_SYMBOL] === "websocket") {
Expand Down Expand Up @@ -285,7 +290,7 @@ enc
);
this.#transport = { websocket: ws };
ws.addEventListener("open", () => {
logger().debug({ msg: "websocket open" });
logger().debug({ msg: "client websocket open" });
});
ws.addEventListener("message", async (ev) => {
this.#handleOnMessage(ev.data);
Expand Down Expand Up @@ -818,7 +823,7 @@ enc
) {
logger().debug({ msg: "ws already closed or closing" });
} else {
const { promise, resolve } = Promise.withResolvers();
const { promise, resolve } = promiseWithResolvers();
ws.addEventListener("close", () => {
logger().debug({ msg: "ws closed" });
resolve(undefined);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ export function createTestInlineClientDriver(
// Normalize path to match other drivers
const normalizedPath = path.startsWith("/") ? path.slice(1) : path;

logger().debug({
msg: "creating websocket connection via test inline driver",
});

// Create WebSocket connection to the test endpoint
// Use a placeholder path and pass the actual path as a query param to avoid mixing user query params with internal ones
const wsUrl = new URL(
Expand All @@ -174,6 +170,12 @@ export function createTestInlineClientDriver(
if (params !== undefined)
wsUrl.searchParams.set("params", JSON.stringify(params));
wsUrl.searchParams.set("encodingKind", encoding);
wsUrl.searchParams.set("transport", transport);

logger().debug({
msg: "creating websocket connection via test inline driver",
url: wsUrl.toString(),
});

// Convert http/https to ws/wss
const wsProtocol = wsUrl.protocol === "https:" ? "wss:" : "ws:";
Expand Down
15 changes: 9 additions & 6 deletions packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,15 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) {

// HACK: Race condition between subscribing & sending events in SSE
// Verify events were received
await vi.waitFor(async () => {
await connection.setCount(5);
await connection.setCount(8);
expect(receivedEvents).toContain(5);
expect(receivedEvents).toContain(8);
});
await vi.waitFor(
async () => {
await connection.setCount(5);
await connection.setCount(8);
expect(receivedEvents).toContain(5);
expect(receivedEvents).toContain(8);
},
{ timeout: 10_000 },
);

// Clean up
await connection.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ export function runActorSleepTests(driverTestConfig: DriverTestConfig) {

// Close WebSocket
ws.close();
await new Promise((resolve) => setTimeout(resolve, 100));

// Wait for sleep timeout after WebSocket closed
await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100);
Expand Down
5 changes: 3 additions & 2 deletions packages/rivetkit/src/drivers/engine/actor-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import {
} from "@/driver-helpers/mod";
import type { RegistryConfig } from "@/registry/config";
import type { RunConfig } from "@/registry/run-config";
import { promiseWithResolvers } from "@/utils";
import type { Config } from "./config";
import { KEYS } from "./kv";
import { logger } from "./log";

interface ActorHandler {
actor?: AnyActorInstance;
actorStartPromise?: PromiseWithResolvers<void>;
actorStartPromise?: ReturnType<typeof promiseWithResolvers<void>>;
genericConnGlobalState: GenericConnGlobalState;
persistedData?: Uint8Array;
}
Expand Down Expand Up @@ -222,7 +223,7 @@ export class EngineActorDriver implements ActorDriver {
if (!handler) {
handler = {
genericConnGlobalState: new GenericConnGlobalState(),
actorStartPromise: Promise.withResolvers(),
actorStartPromise: promiseWithResolvers(),
persistedData: serializeEmptyPersistData(input),
};
this.#actors.set(actorId, handler);
Expand Down
5 changes: 3 additions & 2 deletions packages/rivetkit/src/drivers/file-system/global-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
import {
bufferToArrayBuffer,
type LongTimeoutHandle,
promiseWithResolvers,
SinglePromiseQueue,
setLongTimeout,
stringifyError,
Expand All @@ -49,7 +50,7 @@ interface ActorEntry {

actor?: AnyActorInstance;
/** Promise for starting the actor. */
startPromise?: PromiseWithResolvers<void>;
startPromise?: ReturnType<typeof promiseWithResolvers<void>>;

genericConnGlobalState: GenericConnGlobalState;

Expand Down Expand Up @@ -469,7 +470,7 @@ export class FileSystemGlobalState {
}

// Create start promise
entry.startPromise = Promise.withResolvers();
entry.startPromise = promiseWithResolvers();

try {
// Create actor
Expand Down
7 changes: 4 additions & 3 deletions packages/rivetkit/src/inspector/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
AnyDatabaseProvider,
InferDatabaseClient,
} from "@/actor/database";
import { promiseWithResolvers } from "@/utils";
import {
ColumnsSchema,
type Connection,
Expand Down Expand Up @@ -99,7 +100,7 @@ export function createActorInspectorRouter() {
});
});

const { promise } = Promise.withResolvers<void>();
const { promise } = promiseWithResolvers<void>();

return promise;
},
Expand Down Expand Up @@ -128,7 +129,7 @@ export function createActorInspectorRouter() {
});
});

const { promise } = Promise.withResolvers<void>();
const { promise } = promiseWithResolvers<void>();

return promise;
},
Expand Down Expand Up @@ -159,7 +160,7 @@ export function createActorInspectorRouter() {
});
});

const { promise } = Promise.withResolvers<void>();
const { promise } = promiseWithResolvers<void>();

return promise;
},
Expand Down
Loading
Loading