diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 2b1a8bc63..17dcd6f0b 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -23,6 +23,4 @@ - Client & inline client - ManagerDriver - ActorDriver -- GenericConnGlobalState & other generic drivers: tracks actual connections separately from the actual conn state - - TODO: Can we remove the "generic" prefix? diff --git a/packages/cloudflare-workers/src/actor-driver.ts b/packages/cloudflare-workers/src/actor-driver.ts index 52a69a1b9..e69b9f798 100644 --- a/packages/cloudflare-workers/src/actor-driver.ts +++ b/packages/cloudflare-workers/src/actor-driver.ts @@ -4,11 +4,7 @@ import type { RegistryConfig, RunConfig, } from "rivetkit"; -import { - createGenericConnDrivers, - GenericConnGlobalState, - lookupInRegistry, -} from "rivetkit"; +import { lookupInRegistry } from "rivetkit"; import type { Client } from "rivetkit/client"; import type { ActorDriver, @@ -52,7 +48,6 @@ class ActorHandler { actor?: AnyActorInstance; actorPromise?: ReturnType> = promiseWithResolvers(); - genericConnGlobalState = new GenericConnGlobalState(); } export class CloudflareActorsActorDriver implements ActorDriver { @@ -116,11 +111,7 @@ export class CloudflareActorsActorDriver implements ActorDriver { handler.actor = definition.instantiate(); // Start actor - const connDrivers = createGenericConnDrivers( - handler.genericConnGlobalState, - ); await handler.actor.start( - connDrivers, this, this.#inlineClient, actorId, @@ -136,14 +127,6 @@ export class CloudflareActorsActorDriver implements ActorDriver { return handler.actor; } - getGenericConnGlobalState(actorId: string): GenericConnGlobalState { - const handler = this.#actors.get(actorId); - if (!handler) { - throw new Error(`Actor ${actorId} not loaded`); - } - return handler.genericConnGlobalState; - } - getContext(actorId: string): DriverContext { const state = this.#globalState.getDOState(actorId); return { state: state.ctx }; diff --git a/packages/rivetkit/fixtures/driver-test-suite/conn-liveness.ts b/packages/rivetkit/fixtures/driver-test-suite/conn-liveness.ts deleted file mode 100644 index 6dc77ba02..000000000 --- a/packages/rivetkit/fixtures/driver-test-suite/conn-liveness.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { actor, CONNECTION_DRIVER_WEBSOCKET } from "rivetkit"; - -export const connLivenessActor = actor({ - state: { - counter: 0, - acceptingConnections: true, - }, - options: { - connectionLivenessInterval: 5_000, - connectionLivenessTimeout: 2_500, - }, - onConnect: (c, conn) => { - if (!c.state.acceptingConnections) { - conn.disconnect(); - throw new Error("Actor is not accepting connections"); - } - }, - actions: { - getWsConnectionsLiveness: (c) => { - return Array.from(c.conns.values()) - .filter((conn) => conn.driver === CONNECTION_DRIVER_WEBSOCKET) - .map((conn) => ({ - id: conn.id, - status: conn.status, - lastSeen: conn.lastSeen, - })); - }, - getConnectionId: (c) => { - return c.conn.id; - }, - kill: (c, connId: string) => { - c.state.acceptingConnections = false; - // Disconnect the connection with the given ID - // This simulates a network failure or a manual disconnection - // The connection will be cleaned up by the actor manager after the timeout - const conn = c.conns.get(connId); - if (conn) { - conn.disconnect(); - } - }, - getCounter: (c) => { - return c.state.counter; - }, - increment: (c, amount: number) => { - c.state.counter += amount; - return c.state.counter; - }, - }, -}); diff --git a/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 9237c5ce6..844dc2e73 100644 --- a/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -13,7 +13,6 @@ import { syncActionActor, } from "./action-types"; import { onStateChangeActor } from "./actor-onstatechange"; -import { connLivenessActor } from "./conn-liveness"; import { counterWithParams } from "./conn-params"; import { connStateActor } from "./conn-state"; // Import actors from individual files @@ -82,8 +81,6 @@ export const registry = setup({ counterWithParams, // From conn-state.ts connStateActor, - // From actor-conn.ts - connLivenessActor, // From metadata.ts metadataActor, // From vars.ts diff --git a/packages/rivetkit/schemas/actor-persist/v1.bare b/packages/rivetkit/schemas/actor-persist/v1.bare index 5725bc135..320c5c1a3 100644 --- a/packages/rivetkit/schemas/actor-persist/v1.bare +++ b/packages/rivetkit/schemas/actor-persist/v1.bare @@ -11,10 +11,6 @@ type PersistedConnection struct { id: str # Connection token token: str - # Connection driver type - driver: str - # Connection driver state - driverState: data # Connection parameters parameters: data # Connection state diff --git a/packages/rivetkit/src/actor/action.ts b/packages/rivetkit/src/actor/action.ts index a3b50ecb7..8c2c21c30 100644 --- a/packages/rivetkit/src/actor/action.ts +++ b/packages/rivetkit/src/actor/action.ts @@ -2,7 +2,7 @@ import type { ActorKey } from "@/actor/mod"; import type { Client } from "@/client/client"; import type { Logger } from "@/common/log"; import type { Registry } from "@/registry/mod"; -import type { Conn, ConnId } from "./connection"; +import type { Conn, ConnId } from "./conn"; import type { ActorContext } from "./context"; import type { AnyDatabaseProvider, InferDatabaseClient } from "./database"; import type { SaveStateOptions } from "./instance"; diff --git a/packages/rivetkit/src/actor/config.ts b/packages/rivetkit/src/actor/config.ts index ccdbdca4b..0fd5d8cd9 100644 --- a/packages/rivetkit/src/actor/config.ts +++ b/packages/rivetkit/src/actor/config.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import type { UniversalWebSocket } from "@/common/websocket-interface"; import type { ActionContext } from "./action"; -import type { Conn } from "./connection"; +import type { Conn } from "./conn"; import type { ActorContext } from "./context"; import type { AnyDatabaseProvider } from "./database"; diff --git a/packages/rivetkit/src/actor/conn-drivers.ts b/packages/rivetkit/src/actor/conn-drivers.ts new file mode 100644 index 000000000..c3a31a1e5 --- /dev/null +++ b/packages/rivetkit/src/actor/conn-drivers.ts @@ -0,0 +1,205 @@ +import type { SSEStreamingApi } from "hono/streaming"; +import type { WSContext } from "hono/ws"; +import type { WebSocket } from "ws"; +import type { AnyConn } from "@/actor/conn"; +import type { AnyActorInstance } from "@/actor/instance"; +import type { CachedSerializer, Encoding } from "@/actor/protocol/serde"; +import { encodeDataToString } from "@/actor/protocol/serde"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { assertUnreachable, type promiseWithResolvers } from "@/utils"; + +export enum ConnDriverKind { + WEBSOCKET = 0, + SSE = 1, + HTTP = 2, +} + +export enum ConnReadyState { + UNKNOWN = -1, + CONNECTING = 0, + OPEN = 1, + CLOSING = 2, + CLOSED = 3, +} + +export interface ConnDriverWebSocketState { + encoding: Encoding; + websocket: WSContext; + closePromise: ReturnType>; +} + +export interface ConnDriverSseState { + encoding: Encoding; + stream: SSEStreamingApi; +} + +export type ConnDriverHttpState = Record; + +export type ConnDriverState = + | { [ConnDriverKind.WEBSOCKET]: ConnDriverWebSocketState } + | { [ConnDriverKind.SSE]: ConnDriverSseState } + | { [ConnDriverKind.HTTP]: ConnDriverHttpState }; + +export interface ConnDriver { + sendMessage?( + actor: AnyActorInstance, + conn: AnyConn, + state: State, + message: CachedSerializer, + ): void; + + /** + * This returns a promise since we commonly disconnect at the end of a program, and not waiting will cause the socket to not close cleanly. + */ + disconnect( + actor: AnyActorInstance, + conn: AnyConn, + state: State, + reason?: string, + ): Promise; + + /** + * Returns the ready state of the connection. + * This is used to determine if the connection is ready to send messages, or if the connection is stale. + */ + getConnectionReadyState( + actor: AnyActorInstance, + conn: AnyConn, + state: State, + ): ConnReadyState | undefined; +} + +// MARK: WebSocket +const WEBSOCKET_DRIVER: ConnDriver = { + sendMessage: ( + actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverWebSocketState, + message: CachedSerializer, + ) => { + const serialized = message.serialize(state.encoding); + + actor.rLog.debug({ + msg: "sending websocket message", + encoding: state.encoding, + dataType: typeof serialized, + isUint8Array: serialized instanceof Uint8Array, + isArrayBuffer: serialized instanceof ArrayBuffer, + dataLength: (serialized as any).byteLength || (serialized as any).length, + }); + + // Convert Uint8Array to ArrayBuffer for proper transmission + if (serialized instanceof Uint8Array) { + const buffer = serialized.buffer.slice( + serialized.byteOffset, + serialized.byteOffset + serialized.byteLength, + ); + // Handle SharedArrayBuffer case + if (buffer instanceof SharedArrayBuffer) { + const arrayBuffer = new ArrayBuffer(buffer.byteLength); + new Uint8Array(arrayBuffer).set(new Uint8Array(buffer)); + actor.rLog.debug({ + msg: "converted SharedArrayBuffer to ArrayBuffer", + byteLength: arrayBuffer.byteLength, + }); + state.websocket.send(arrayBuffer); + } else { + actor.rLog.debug({ + msg: "sending ArrayBuffer", + byteLength: buffer.byteLength, + }); + state.websocket.send(buffer); + } + } else { + actor.rLog.debug({ + msg: "sending string data", + length: (serialized as string).length, + }); + state.websocket.send(serialized); + } + }, + + disconnect: async ( + _actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverWebSocketState, + reason?: string, + ) => { + // Close socket + state.websocket.close(1000, reason); + + // Create promise to wait for socket to close gracefully + await state.closePromise.promise; + }, + + getConnectionReadyState: ( + _actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverWebSocketState, + ): ConnReadyState | undefined => { + return state.websocket.readyState; + }, +}; + +// MARK: SSE +const SSE_DRIVER: ConnDriver = { + sendMessage: ( + _actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverSseState, + message: CachedSerializer, + ) => { + state.stream.writeSSE({ + data: encodeDataToString(message.serialize(state.encoding)), + }); + }, + + disconnect: async ( + _actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverSseState, + _reason?: string, + ) => { + state.stream.close(); + }, + + getConnectionReadyState: ( + _actor: AnyActorInstance, + _conn: AnyConn, + state: ConnDriverSseState, + ): ConnReadyState | undefined => { + if (state.stream.aborted || state.stream.closed) { + return ConnReadyState.CLOSED; + } + + return ConnReadyState.OPEN; + }, +}; + +// MARK: HTTP +const HTTP_DRIVER: ConnDriver = { + getConnectionReadyState(_actor, _conn) { + // TODO: This might not be the correct logic + return ConnReadyState.OPEN; + }, + disconnect: async () => { + // Noop + // TODO: Abort the request + }, +}; + +/** List of all connection drivers. */ +export const CONN_DRIVERS: Record> = { + [ConnDriverKind.WEBSOCKET]: WEBSOCKET_DRIVER, + [ConnDriverKind.SSE]: SSE_DRIVER, + [ConnDriverKind.HTTP]: HTTP_DRIVER, +}; + +export function getConnDriverFromState( + state: ConnDriverState, +): ConnDriver { + if (ConnDriverKind.WEBSOCKET in state) return WEBSOCKET_DRIVER; + else if (ConnDriverKind.SSE in state) return SSE_DRIVER; + else if (ConnDriverKind.HTTP in state) return SSE_DRIVER; + else assertUnreachable(state); +} diff --git a/packages/rivetkit/src/actor/connection.ts b/packages/rivetkit/src/actor/conn.ts similarity index 75% rename from packages/rivetkit/src/actor/connection.ts rename to packages/rivetkit/src/actor/conn.ts index fea9bdc94..8c1c1a95b 100644 --- a/packages/rivetkit/src/actor/connection.ts +++ b/packages/rivetkit/src/actor/conn.ts @@ -2,11 +2,16 @@ import * as cbor from "cbor-x"; import type * as protocol from "@/schemas/client-protocol/mod"; import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { bufferToArrayBuffer } from "@/utils"; +import { + CONN_DRIVERS, + ConnDriverKind, + type ConnDriverState, + ConnReadyState, + getConnDriverFromState, +} from "./conn-drivers"; import type { AnyDatabaseProvider } from "./database"; -import { type ConnDriver, ConnectionReadyState } from "./driver"; import * as errors from "./errors"; import type { ActorInstance } from "./instance"; -import { loggerWithoutContext } from "./log"; import type { PersistedConn } from "./persisted"; import { CachedSerializer } from "./protocol/serde"; import { generateSecureToken } from "./utils"; @@ -23,15 +28,6 @@ export type ConnId = string; export type AnyConn = Conn; -export const CONNECTION_DRIVER_WEBSOCKET = "webSocket"; -export const CONNECTION_DRIVER_SSE = "sse"; -export const CONNECTION_DRIVER_HTTP = "http"; - -export type ConnectionDriver = - | typeof CONNECTION_DRIVER_WEBSOCKET - | typeof CONNECTION_DRIVER_SSE - | typeof CONNECTION_DRIVER_HTTP; - export type ConnectionStatus = "connected" | "reconnecting"; export const CONNECTION_CHECK_LIVENESS_SYMBOL = Symbol("checkLiveness"); @@ -46,8 +42,6 @@ export const CONNECTION_CHECK_LIVENESS_SYMBOL = Symbol("checkLiveness"); export class Conn { subscriptions: Set = new Set(); - #stateEnabled: boolean; - // TODO: Remove this cyclical reference #actor: ActorInstance; @@ -61,22 +55,16 @@ export class Conn { __persist: PersistedConn; /** - * Driver used to manage realtime connection communication. - * - * @protected + * Driver used to manage connection. If undefined, there is no connection connected. */ - #driver: ConnDriver; + __driverState?: ConnDriverState; public get params(): CP { return this.__persist.params; } - public get driver(): ConnectionDriver { - return this.__persist.connDriver as ConnectionDriver; - } - - public get _stateEnabled() { - return this.#stateEnabled; + public get __stateEnabled() { + return this.#actor.connStateEnabled; } /** @@ -138,17 +126,13 @@ export class Conn { public constructor( actor: ActorInstance, persist: PersistedConn, - driver: ConnDriver, - stateEnabled: boolean, ) { this.#actor = actor; this.__persist = persist; - this.#driver = driver; - this.#stateEnabled = stateEnabled; } #validateStateEnabled() { - if (!this.#stateEnabled) { + if (!this.__stateEnabled) { throw new errors.ConnStateNotEnabled(); } } @@ -161,12 +145,22 @@ export class Conn { * @protected */ public _sendMessage(message: CachedSerializer) { - this.#driver.sendMessage?.( - this.#actor, - this, - this.__persist.connDriverState, - message, - ); + if (this.__driverState) { + const driver = getConnDriverFromState(this.__driverState); + if (driver.sendMessage) { + driver.sendMessage(this.#actor, this, this.__driverState, message); + } else { + this.#actor.rLog.debug({ + msg: "conn driver does not support sending messages", + conn: this.id, + }); + } + } else { + this.#actor.rLog.warn({ + msg: "missing connection driver state for send message", + conn: this.id, + }); + } } /** @@ -206,12 +200,23 @@ export class Conn { */ public async disconnect(reason?: string) { this.#status = "reconnecting"; - await this.#driver.disconnect( - this.#actor, - this, - this.__persist.connDriverState, - reason, - ); + + if (this.__driverState) { + const driver = getConnDriverFromState(this.__driverState); + if (driver.disconnect) { + driver.disconnect(this.#actor, this, this.__driverState, reason); + } else { + this.#actor.rLog.debug({ + msg: "no disconnect handler for conn driver", + conn: this.id, + }); + } + } else { + this.#actor.rLog.warn({ + msg: "missing connection driver state for disconnect", + conn: this.id, + }); + } } /** @@ -221,11 +226,20 @@ export class Conn { * @internal */ [CONNECTION_CHECK_LIVENESS_SYMBOL]() { - const readyState = this.#driver.getConnectionReadyState(this.#actor, this); + let readyState: ConnReadyState | undefined; + + if (this.__driverState) { + const driver = getConnDriverFromState(this.__driverState); + readyState = driver.getConnectionReadyState( + this.#actor, + this, + this.__driverState, + ); + } const isConnectionClosed = - readyState === ConnectionReadyState.CLOSED || - readyState === ConnectionReadyState.CLOSING || + readyState === ConnReadyState.CLOSED || + readyState === ConnReadyState.CLOSING || readyState === undefined; const newLastSeen = Date.now(); diff --git a/packages/rivetkit/src/actor/context.ts b/packages/rivetkit/src/actor/context.ts index d4d415b84..b568aa7a3 100644 --- a/packages/rivetkit/src/actor/context.ts +++ b/packages/rivetkit/src/actor/context.ts @@ -2,7 +2,7 @@ import type { ActorKey } from "@/actor/mod"; import type { Client } from "@/client/client"; import type { Logger } from "@/common/log"; import type { Registry } from "@/registry/mod"; -import type { Conn, ConnId } from "./connection"; +import type { Conn, ConnId } from "./conn"; import type { AnyDatabaseProvider, InferDatabaseClient } from "./database"; import type { ActorInstance, SaveStateOptions } from "./instance"; import type { Schedule } from "./schedule"; diff --git a/packages/rivetkit/src/actor/driver.ts b/packages/rivetkit/src/actor/driver.ts index c813747be..a175b3ece 100644 --- a/packages/rivetkit/src/actor/driver.ts +++ b/packages/rivetkit/src/actor/driver.ts @@ -1,16 +1,10 @@ import type { Context as HonoContext } from "hono"; -import type { CachedSerializer } from "@/actor/protocol/serde"; import type { AnyClient } from "@/client/client"; import type { ManagerDriver } from "@/manager/driver"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; -import type * as protocol from "@/schemas/client-protocol/mod"; -import type { AnyConn, ConnectionDriver } from "./connection"; -import type { GenericConnGlobalState } from "./generic-conn-driver"; import type { AnyActorInstance } from "./instance"; -export type ConnectionDriversMap = Record; - export type ActorDriverBuilder = ( registryConfig: RegistryConfig, runConfig: RunConfig, @@ -23,8 +17,6 @@ export interface ActorDriver { loadActor(actorId: string): Promise; - getGenericConnGlobalState(actorId: string): GenericConnGlobalState; - getContext(actorId: string): unknown; readPersistedData(actorId: string): Promise; @@ -51,39 +43,3 @@ export interface ActorDriver { /** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */ serverlessHandleStart?(c: HonoContext): Promise; } - -export enum ConnectionReadyState { - UNKNOWN = -1, - CONNECTING = 0, - OPEN = 1, - CLOSING = 2, - CLOSED = 3, -} - -export interface ConnDriver { - sendMessage?( - actor: AnyActorInstance, - conn: AnyConn, - state: ConnDriverState, - message: CachedSerializer, - ): void; - - /** - * This returns a promise since we commonly disconnect at the end of a program, and not waiting will cause the socket to not close cleanly. - */ - disconnect( - actor: AnyActorInstance, - conn: AnyConn, - state: ConnDriverState, - reason?: string, - ): Promise; - - /** - * Returns the ready state of the connection. - * This is used to determine if the connection is ready to send messages, or if the connection is stale. - */ - getConnectionReadyState( - actor: AnyActorInstance, - conn: AnyConn, - ): ConnectionReadyState | undefined; -} diff --git a/packages/rivetkit/src/actor/generic-conn-driver.ts b/packages/rivetkit/src/actor/generic-conn-driver.ts deleted file mode 100644 index b09ec8b2f..000000000 --- a/packages/rivetkit/src/actor/generic-conn-driver.ts +++ /dev/null @@ -1,248 +0,0 @@ -import type { SSEStreamingApi } from "hono/streaming"; -import type { WSContext } from "hono/ws"; -import type { WebSocket } from "ws"; -import { - type AnyConn, - CONNECTION_DRIVER_HTTP, - CONNECTION_DRIVER_SSE, - CONNECTION_DRIVER_WEBSOCKET, -} from "@/actor/connection"; -import { - type ConnDriver, - type ConnectionDriversMap, - ConnectionReadyState, -} from "@/actor/driver"; -import type { AnyActorInstance } from "@/actor/instance"; -import type { CachedSerializer, Encoding } from "@/actor/protocol/serde"; -import { encodeDataToString } from "@/actor/protocol/serde"; -import type * as protocol from "@/schemas/client-protocol/mod"; -import { promiseWithResolvers } from "@/utils"; -import { loggerWithoutContext } from "./log"; - -// This state is different than `PersistedConn` state since the connection-specific state is persisted & must be serializable. This is also part of the connection driver, not part of the core actor. -// -// This holds the actual connections, which are not serializable. -// -// This is scoped to each actor. Do not share between multiple actors. -export class GenericConnGlobalState { - websockets = new Map(); - sseStreams = new Map(); -} - -/** - * Exposes connection drivers for platforms that support vanilla WebSocket, SSE, and HTTP. - */ -export function createGenericConnDrivers( - globalState: GenericConnGlobalState, -): ConnectionDriversMap { - return { - [CONNECTION_DRIVER_WEBSOCKET]: createGenericWebSocketDriver(globalState), - [CONNECTION_DRIVER_SSE]: createGenericSseDriver(globalState), - [CONNECTION_DRIVER_HTTP]: createGenericHttpDriver(), - }; -} - -// MARK: WebSocket -export interface GenericWebSocketDriverState { - encoding: Encoding; -} - -export function createGenericWebSocketDriver( - globalState: GenericConnGlobalState, -): ConnDriver { - return { - sendMessage: ( - actor: AnyActorInstance, - conn: AnyConn, - state: GenericWebSocketDriverState, - message: CachedSerializer, - ) => { - const ws = globalState.websockets.get(conn.id); - if (!ws) { - actor.rLog.warn({ - msg: "missing ws for sendMessage", - actorId: actor.id, - connId: conn.id, - totalCount: globalState.websockets.size, - }); - return; - } - - const serialized = message.serialize(state.encoding); - - actor.rLog.debug({ - msg: "sending websocket message", - encoding: state.encoding, - dataType: typeof serialized, - isUint8Array: serialized instanceof Uint8Array, - isArrayBuffer: serialized instanceof ArrayBuffer, - dataLength: - (serialized as any).byteLength || (serialized as any).length, - }); - - // Convert Uint8Array to ArrayBuffer for proper transmission - if (serialized instanceof Uint8Array) { - const buffer = serialized.buffer.slice( - serialized.byteOffset, - serialized.byteOffset + serialized.byteLength, - ); - // Handle SharedArrayBuffer case - if (buffer instanceof SharedArrayBuffer) { - const arrayBuffer = new ArrayBuffer(buffer.byteLength); - new Uint8Array(arrayBuffer).set(new Uint8Array(buffer)); - actor.rLog.debug({ - msg: "converted SharedArrayBuffer to ArrayBuffer", - byteLength: arrayBuffer.byteLength, - }); - ws.send(arrayBuffer); - } else { - actor.rLog.debug({ - msg: "sending ArrayBuffer", - byteLength: buffer.byteLength, - }); - ws.send(buffer); - } - } else { - actor.rLog.debug({ - msg: "sending string data", - length: (serialized as string).length, - }); - ws.send(serialized); - } - }, - - disconnect: async ( - actor: AnyActorInstance, - conn: AnyConn, - _state: GenericWebSocketDriverState, - reason?: string, - ) => { - const ws = globalState.websockets.get(conn.id); - if (!ws) { - actor.rLog.warn({ - msg: "missing ws for disconnect", - actorId: actor.id, - connId: conn.id, - totalCount: globalState.websockets.size, - }); - return; - } - const raw = ws.raw as WebSocket | undefined; - if (!raw) { - actor.rLog.warn({ msg: "ws.raw does not exist" }); - return; - } - - // Create promise to wait for socket to close gracefully - const { promise, resolve } = promiseWithResolvers(); - raw.addEventListener("close", () => resolve()); - - // Close socket - ws.close(1000, reason); - - await promise; - }, - - getConnectionReadyState: ( - actor: AnyActorInstance, - conn: AnyConn, - ): ConnectionReadyState | undefined => { - const ws = globalState.websockets.get(conn.id); - if (!ws) { - actor.rLog.warn({ - msg: "missing ws for getConnectionReadyState", - connId: conn.id, - }); - return undefined; - } - - const raw = ws.raw as WebSocket | undefined; - if (!raw) return undefined; - - return raw.readyState as ConnectionReadyState; - }, - }; -} - -// MARK: SSE -export interface GenericSseDriverState { - encoding: Encoding; -} - -export function createGenericSseDriver( - globalState: GenericConnGlobalState, -): ConnDriver { - return { - sendMessage: ( - actor: AnyActorInstance, - conn: AnyConn, - state: GenericSseDriverState, - message: CachedSerializer, - ) => { - const stream = globalState.sseStreams.get(conn.id); - if (!stream) { - actor.rLog.warn({ - msg: "missing sse stream for sendMessage", - connId: conn.id, - }); - return; - } - stream.writeSSE({ - data: encodeDataToString(message.serialize(state.encoding)), - }); - }, - - disconnect: async ( - actor: AnyActorInstance, - conn: AnyConn, - _state: GenericSseDriverState, - _reason?: string, - ) => { - const stream = globalState.sseStreams.get(conn.id); - if (!stream) { - actor.rLog.warn({ - msg: "missing sse stream for disconnect", - connId: conn.id, - }); - return; - } - - stream.close(); - }, - - getConnectionReadyState: ( - actor: AnyActorInstance, - conn: AnyConn, - ): ConnectionReadyState | undefined => { - const stream = globalState.sseStreams.get(conn.id); - if (!stream) { - actor.rLog.warn({ - msg: "missing sse stream for getConnectionReadyState", - connId: conn.id, - }); - return undefined; - } - - if (stream.aborted || stream.closed) { - return ConnectionReadyState.CLOSED; - } - - return ConnectionReadyState.OPEN; - }, - }; -} - -// MARK: HTTP -export type GenericHttpDriverState = Record; - -export function createGenericHttpDriver(): ConnDriver { - return { - getConnectionReadyState(_actor, _conn) { - // TODO: This might not be the correct logic - return ConnectionReadyState.OPEN; - }, - disconnect: async () => { - // Noop - }, - }; -} diff --git a/packages/rivetkit/src/actor/instance.ts b/packages/rivetkit/src/actor/instance.ts index 8908131c5..e4c10611b 100644 --- a/packages/rivetkit/src/actor/instance.ts +++ b/packages/rivetkit/src/actor/instance.ts @@ -1,7 +1,9 @@ import * as cbor from "cbor-x"; +import type { SSEStreamingApi } from "hono/streaming"; +import type { WSContext } from "hono/ws"; import invariant from "invariant"; import onChange from "on-change"; -import type { ActorKey } from "@/actor/mod"; +import type { ActorKey, Encoding } from "@/actor/mod"; import type { Client } from "@/client/client"; import { getBaseLogger, getIncludeTarget, type Logger } from "@/common/log"; import { isCborSerializable, stringifyError } from "@/common/utils"; @@ -20,18 +22,13 @@ import { } from "@/utils"; import type { ActionContext } from "./action"; import type { ActorConfig, OnConnectOptions } from "./config"; -import { - CONNECTION_CHECK_LIVENESS_SYMBOL, - Conn, - type ConnectionDriver, - type ConnId, -} from "./connection"; +import { CONNECTION_CHECK_LIVENESS_SYMBOL, Conn, type ConnId } from "./conn"; +import type { ConnDriver, ConnDriverState } from "./conn-drivers"; import { ActorContext } from "./context"; import type { AnyDatabaseProvider, InferDatabaseClient } from "./database"; -import type { ActorDriver, ConnDriver, ConnectionDriversMap } from "./driver"; +import type { ActorDriver } from "./driver"; import * as errors from "./errors"; import { serializeActorKey } from "./keys"; -import { loggerWithoutContext } from "./log"; import type { PersistedActor, PersistedConn, @@ -162,7 +159,6 @@ export class ActorInstance { #backgroundPromises: Promise[] = []; #abortController = new AbortController(); #config: ActorConfig; - #connectionDrivers!: ConnectionDriversMap; #actorDriver!: ActorDriver; #inlineClient!: Client>; #actorId!: string; @@ -207,9 +203,9 @@ export class ActorInstance { getConnections: async () => { return Array.from(this.#connections.entries()).map(([id, conn]) => ({ id, - stateEnabled: conn._stateEnabled, - params: conn.params as {}, - state: conn._stateEnabled ? conn.state : undefined, + stateEnabled: conn.__stateEnabled, + params: conn.params as any, + state: conn.__stateEnabled ? conn.state : undefined, })); }, setState: async (state: unknown) => { @@ -255,7 +251,6 @@ export class ActorInstance { } async start( - connectionDrivers: ConnectionDriversMap, actorDriver: ActorDriver, inlineClient: Client>, actorId: string, @@ -279,7 +274,6 @@ export class ActorInstance { ), ); - this.#connectionDrivers = connectionDrivers; this.#actorDriver = actorDriver; this.#inlineClient = inlineClient; this.#actorId = actorId; @@ -529,7 +523,7 @@ export class ActorInstance { } } - get #connStateEnabled() { + get connStateEnabled() { return "createConnState" in this.#config || "connState" in this.#config; } @@ -720,13 +714,7 @@ export class ActorInstance { // Load connections for (const connPersist of this.#persist.connections) { // Create connections - const driver = this.__getConnDriver(connPersist.connDriver); - const conn = new Conn( - this, - connPersist, - driver, - this.#connStateEnabled, - ); + const conn = new Conn(this, connPersist); this.#connections.set(conn.id, conn); // Register event subscriptions @@ -792,14 +780,32 @@ export class ActorInstance { } /** - * Removes a connection and cleans up its resources. + * Connection disconnected. + * + * If a clean diconnect, will be removed immediately. + * + * If not a clean disconnect, will keep the connection alive for a given interval to wait for reconnect. */ - __removeConn(conn: Conn | undefined) { - if (!conn) { - this.#rLog.warn({ msg: "`conn` does not exist" }); - return; + __connDisconnected(conn: Conn, wasClean: boolean) { + if (wasClean) { + // Disconnected cleanly, remove the conn + + this.#removeConn(conn); + } else { + // Disconnected uncleanly, allow reconnection + + if (conn.__driverState) { + this.rLog.warn("called conn disconnected without driver state"); + } + + conn.__driverState = undefined; } + } + /** + * Removes a connection and cleans up its resources. + */ + #removeConn(conn: Conn) { // Remove from persist & save immediately const connIdx = this.#persist.connections.findIndex( (c) => c.connId === conn.id, @@ -867,7 +873,7 @@ export class ActorInstance { ); } - if (this.#connStateEnabled) { + if (this.connStateEnabled) { if ("createConnState" in this.#config) { const dataOrPromise = this.#config.createConnState( this.actorContext as unknown as ActorContext< @@ -901,13 +907,6 @@ export class ActorInstance { return connState as CS; } - __getConnDriver(driverId: ConnectionDriver): ConnDriver { - // Get driver - const driver = this.#connectionDrivers[driverId]; - if (!driver) throw new Error(`No connection driver: ${driverId}`); - return driver; - } - /** * Called after establishing a connection handshake. */ @@ -916,8 +915,7 @@ export class ActorInstance { connectionToken: string, params: CP, state: CS, - driverId: ConnectionDriver, - driverState: unknown, + driverState: ConnDriverState, ): Promise> { this.#assertReady(); @@ -926,23 +924,16 @@ export class ActorInstance { } // Create connection - const driver = this.__getConnDriver(driverId); const persist: PersistedConn = { connId: connectionId, token: connectionToken, - connDriver: driverId, - connDriverState: driverState, params: params, state: state, lastSeen: Date.now(), subscriptions: [], }; - const conn = new Conn( - this, - persist, - driver, - this.#connStateEnabled, - ); + const conn = new Conn(this, persist); + conn.__driverState = driverState; this.#connections.set(conn.id, conn); // Update sleep @@ -1151,9 +1142,8 @@ export class ActorInstance { lastSeen, }); - // TODO: Do we need to force disconnect the connection here? - - this.__removeConn(conn); + // Assume that the connection is dead here, no need to disconnect anything + this.#removeConn(conn); } } } @@ -1308,7 +1298,10 @@ export class ActorInstance { /** * Handles raw HTTP requests to the actor. */ - async handleFetch(request: Request, opts: {}): Promise { + async handleFetch( + request: Request, + opts: Record, + ): Promise { this.#assertReady(); if (!this.#config.onFetch) { @@ -1782,10 +1775,6 @@ export class ActorInstance { connections: persist.connections.map((conn) => ({ id: conn.connId, token: conn.token, - driver: conn.connDriver as string, - driverState: bufferToArrayBuffer( - cbor.encode(conn.connDriverState || {}), - ), parameters: bufferToArrayBuffer(cbor.encode(conn.params || {})), state: bufferToArrayBuffer(cbor.encode(conn.state || {})), subscriptions: conn.subscriptions.map((sub) => ({ @@ -1819,8 +1808,6 @@ export class ActorInstance { connections: bareData.connections.map((conn) => ({ connId: conn.id, token: conn.token, - connDriver: conn.driver as ConnectionDriver, - connDriverState: cbor.decode(new Uint8Array(conn.driverState)), params: cbor.decode(new Uint8Array(conn.parameters)), state: cbor.decode(new Uint8Array(conn.state)), subscriptions: conn.subscriptions.map((sub) => ({ diff --git a/packages/rivetkit/src/actor/mod.ts b/packages/rivetkit/src/actor/mod.ts index db652eae2..242291c01 100644 --- a/packages/rivetkit/src/actor/mod.ts +++ b/packages/rivetkit/src/actor/mod.ts @@ -76,16 +76,10 @@ export type { ActionContext } from "./action"; export type * from "./config"; export type { Conn, - ConnectionDriver, ConnectionStatus, generateConnId, generateConnToken, -} from "./connection"; -export { - CONNECTION_DRIVER_HTTP, - CONNECTION_DRIVER_SSE, - CONNECTION_DRIVER_WEBSOCKET, -} from "./connection"; +} from "./conn"; export type { ActorContext } from "./context"; export type { ActionContextOf, @@ -95,10 +89,6 @@ export type { } from "./definition"; export { lookupInRegistry } from "./definition"; export { UserError, type UserErrorOptions } from "./errors"; -export { - createGenericConnDrivers, - GenericConnGlobalState, -} from "./generic-conn-driver"; export type { AnyActorInstance } from "./instance"; export { type ActorRouter, diff --git a/packages/rivetkit/src/actor/persisted.ts b/packages/rivetkit/src/actor/persisted.ts index f7cb61d0d..598a0ebed 100644 --- a/packages/rivetkit/src/actor/persisted.ts +++ b/packages/rivetkit/src/actor/persisted.ts @@ -1,5 +1,3 @@ -import type { ConnectionDriver } from "./connection"; - /** State object that gets automatically persisted to storage. */ export interface PersistedActor { input?: I; @@ -13,8 +11,6 @@ export interface PersistedActor { export interface PersistedConn { connId: string; token: string; - connDriver: ConnectionDriver; - connDriverState: unknown; params: CP; state: CS; subscriptions: PersistedSubscription[]; diff --git a/packages/rivetkit/src/actor/protocol/old.ts b/packages/rivetkit/src/actor/protocol/old.ts index 5659ce25a..886fad98b 100644 --- a/packages/rivetkit/src/actor/protocol/old.ts +++ b/packages/rivetkit/src/actor/protocol/old.ts @@ -16,7 +16,7 @@ import { import { deserializeWithEncoding } from "@/serde"; import { assertUnreachable, bufferToArrayBuffer } from "../../utils"; import { ActionContext } from "../action"; -import type { Conn } from "../connection"; +import type { Conn } from "../conn"; import type { ActorInstance } from "../instance"; export const TransportSchema = z.enum(["websocket", "sse"]); diff --git a/packages/rivetkit/src/actor/router-endpoints.ts b/packages/rivetkit/src/actor/router-endpoints.ts index 901803ca5..98b782aa6 100644 --- a/packages/rivetkit/src/actor/router-endpoints.ts +++ b/packages/rivetkit/src/actor/router-endpoints.ts @@ -2,15 +2,10 @@ import * as cbor from "cbor-x"; import type { Context as HonoContext, HonoRequest } from "hono"; import { type SSEStreamingApi, streamSSE } from "hono/streaming"; import type { WSContext } from "hono/ws"; +import invariant from "invariant"; import { ActionContext } from "@/actor/action"; -import type { AnyConn } from "@/actor/connection"; -import { - CONNECTION_DRIVER_HTTP, - CONNECTION_DRIVER_SSE, - CONNECTION_DRIVER_WEBSOCKET, - generateConnId, - generateConnToken, -} from "@/actor/connection"; +import type { AnyConn } from "@/actor/conn"; +import { generateConnId, generateConnToken } from "@/actor/conn"; import * as errors from "@/actor/errors"; import type { AnyActorInstance } from "@/actor/instance"; import type { InputData } from "@/actor/protocol/serde"; @@ -37,12 +32,8 @@ import { serializeWithEncoding, } from "@/serde"; import { bufferToArrayBuffer, promiseWithResolvers } from "@/utils"; +import { ConnDriverKind } from "./conn-drivers"; import type { ActorDriver } from "./driver"; -import type { - GenericHttpDriverState, - GenericSseDriverState, - GenericWebSocketDriverState, -} from "./generic-conn-driver"; import { loggerWithoutContext } from "./log"; import { parseMessage } from "./protocol/old"; @@ -154,6 +145,9 @@ export async function handleWebSocketConnect( }; } + // Promise used to wait for the websocket close in `disconnect` + const closePromise = promiseWithResolvers(); + return { onOpen: (_evt: any, ws: WSContext) => { actor.rLog.debug("actor websocket open"); @@ -166,13 +160,9 @@ export async function handleWebSocketConnect( const connState = await actor.prepareConn(parameters, req); // Save socket - const connGlobalState = - actorDriver.getGenericConnGlobalState(actorId); - connGlobalState.websockets.set(connId, ws); actor.rLog.debug({ msg: "registered websocket for conn", actorId, - totalCount: connGlobalState.websockets.size, }); // Create connection @@ -181,8 +171,13 @@ export async function handleWebSocketConnect( connToken, parameters, connState, - CONNECTION_DRIVER_WEBSOCKET, - { encoding } satisfies GenericWebSocketDriverState, + { + [ConnDriverKind.WEBSOCKET]: { + encoding, + websocket: ws, + closePromise, + }, + }, ); // Unblock other handlers @@ -258,6 +253,10 @@ export async function handleWebSocketConnect( }, ws: WSContext, ) => { + handlersReject(`WebSocket closed (${event.code}): ${event.reason}`); + + closePromise.resolve(); + if (event.wasClean) { actor.rLog.info({ msg: "websocket closed", @@ -280,24 +279,8 @@ export async function handleWebSocketConnect( // Handle cleanup asynchronously handlersPromise - .then(({ conn, actor, connId }) => { - const connGlobalState = - actorDriver.getGenericConnGlobalState(actorId); - const didDelete = connGlobalState.websockets.delete(connId); - if (didDelete) { - actor.rLog.info({ - msg: "removing websocket for conn", - totalCount: connGlobalState.websockets.size, - }); - } else { - actor.rLog.warn({ - msg: "websocket does not exist for conn", - actorId, - totalCount: connGlobalState.websockets.size, - }); - } - - actor.__removeConn(conn); + .then(({ conn, actor }) => { + actor.__connDisconnected(conn, event.wasClean); }) .catch((error) => { deconstructError( @@ -355,20 +338,13 @@ export async function handleSseConnect( actor.rLog.debug("sse open"); - // Save stream - actorDriver - .getGenericConnGlobalState(actorId) - .sseStreams.set(connId, stream); - // Create connection - conn = await actor.createConn( - connId, - connToken, - parameters, - connState, - CONNECTION_DRIVER_SSE, - { encoding } satisfies GenericSseDriverState, - ); + conn = await actor.createConn(connId, connToken, parameters, connState, { + [ConnDriverKind.SSE]: { + encoding, + stream: stream, + }, + }); // Wait for close const abortResolver = promiseWithResolvers(); @@ -380,18 +356,14 @@ export async function handleSseConnect( // Handle stream abort (when client closes the connection) c.req.raw.signal.addEventListener("abort", async () => { - const rLog = actor?.rLog ?? loggerWithoutContext(); + invariant(actor, "actor should exist"); + const rLog = actor.rLog ?? loggerWithoutContext(); try { rLog.debug("sse stream aborted"); // Cleanup - if (connId) { - actorDriver - .getGenericConnGlobalState(actorId) - .sseStreams.delete(connId); - } - if (conn && actor) { - actor.__removeConn(conn); + if (conn) { + actor.__connDisconnected(conn, false); } abortResolver.resolve(undefined); @@ -426,13 +398,8 @@ export async function handleSseConnect( loggerWithoutContext().error({ msg: "error in sse connection", error }); // Cleanup on error - if (connId !== undefined) { - actorDriver - .getGenericConnGlobalState(actorId) - .sseStreams.delete(connId); - } if (conn && actor !== undefined) { - actor.__removeConn(conn); + actor.__connDisconnected(conn, false); } // Close the stream on error @@ -479,8 +446,7 @@ export async function handleAction( generateConnToken(), parameters, connState, - CONNECTION_DRIVER_HTTP, - {} satisfies GenericHttpDriverState, + { [ConnDriverKind.HTTP]: {} }, ); // Call action @@ -488,7 +454,7 @@ export async function handleAction( output = await actor.executeAction(ctx, actionName, actionArgs); } finally { if (conn) { - actor?.__removeConn(conn); + actor?.__connDisconnected(conn, true); } } diff --git a/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts b/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts index c38d4997b..7944edd9d 100644 --- a/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts +++ b/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts @@ -284,87 +284,5 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { ); }); }); - - describe("Connection Liveness", () => { - // TODO: KIT-242 - test.skip("should return correct liveness status for connections", async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - - // Create actor and connection - const handle = client.connLivenessActor.getOrCreate([ - "test-liveness-status", - ]); - const connA = handle.connect(); - const connB = handle.connect(); - - const connAId = await connA.getConnectionId(); - const connBId = await connB.getConnectionId(); - - // Verify connection works initially - await connA.increment(5); - await connB.increment(5); - - const counter = await handle.getCounter(); - expect(counter).toBe(10); - - const connectionsStatusBeforeKill = - await handle.getWsConnectionsLiveness(); - expect(connectionsStatusBeforeKill).toHaveLength(2); - expect(connectionsStatusBeforeKill).toContainEqual( - expect.objectContaining({ - id: connAId, - status: "connected", - lastSeen: FAKE_TIME.getTime(), - }), - ); - expect(connectionsStatusBeforeKill).toContainEqual( - expect.objectContaining({ - id: connBId, - status: "connected", - lastSeen: FAKE_TIME.getTime(), - }), - ); - - // Kill one connection - await handle.kill(connAId); // instead of dispose, we use kill to simulate a disconnection (e.g. network failure) - // connA.dispose(); - // we killed the connection, but the actor instance does not know about it yet - // it should still be in the list of connections, but with a status of "reconnecting" - const connectionsStatusAfterKill = - await handle.getWsConnectionsLiveness(); - expect(connectionsStatusAfterKill).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: connAId, - status: "reconnecting", - lastSeen: FAKE_TIME.getTime(), - }), - expect.objectContaining({ - id: connBId, - status: "connected", - lastSeen: FAKE_TIME.getTime(), - }), - ]), - ); - - // default time to wait for cleanup is 5 seconds - // check actor options - await waitFor(driverTestConfig, 5_000); - - // After timeout, the killed connection should be unavailable, since the manager has cleaned it up - const connectionsStatusAfterCleanup = - await handle.getWsConnectionsLiveness(); - expect(connectionsStatusAfterCleanup).not.toContainEqual( - expect.objectContaining({ - id: connAId, - }), - ); - expect(connectionsStatusAfterCleanup).toContainEqual( - expect.objectContaining({ - id: connBId, - }), - ); - }); - }); }); } diff --git a/packages/rivetkit/src/drivers/engine/actor-driver.ts b/packages/rivetkit/src/drivers/engine/actor-driver.ts index 68ba46328..116d4af41 100644 --- a/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -9,10 +9,6 @@ import { streamSSE } from "hono/streaming"; import { WSContext } from "hono/ws"; import invariant from "invariant"; import { lookupInRegistry } from "@/actor/definition"; -import { - createGenericConnDrivers, - GenericConnGlobalState, -} from "@/actor/generic-conn-driver"; import { deserializeActorKey } from "@/actor/keys"; import { EncodingSchema } from "@/actor/protocol/serde"; import { type ActorRouter, createActorRouter } from "@/actor/router"; @@ -47,7 +43,6 @@ import { logger } from "./log"; interface ActorHandler { actor?: AnyActorInstance; actorStartPromise?: ReturnType>; - genericConnGlobalState: GenericConnGlobalState; persistedData?: Uint8Array; } @@ -162,14 +157,6 @@ export class EngineActorDriver implements ActorDriver { return handler.actor; } - getGenericConnGlobalState(actorId: string): GenericConnGlobalState { - const handler = this.#actors.get(actorId); - if (!handler) { - throw new Error(`Actor ${actorId} not loaded`); - } - return handler.genericConnGlobalState; - } - getContext(actorId: string): DriverContext { return {}; } @@ -232,7 +219,6 @@ export class EngineActorDriver implements ActorDriver { let handler = this.#actors.get(actorId); if (!handler) { handler = { - genericConnGlobalState: new GenericConnGlobalState(), actorStartPromise: promiseWithResolvers(), persistedData: serializeEmptyPersistData(input), }; @@ -251,11 +237,7 @@ export class EngineActorDriver implements ActorDriver { handler.actor = definition.instantiate(); // Start actor - const connDrivers = createGenericConnDrivers( - handler.genericConnGlobalState, - ); await handler.actor.start( - connDrivers, this, this.#inlineClient, actorId, diff --git a/packages/rivetkit/src/drivers/file-system/actor.ts b/packages/rivetkit/src/drivers/file-system/actor.ts index bf3dbf26a..63ee6add4 100644 --- a/packages/rivetkit/src/drivers/file-system/actor.ts +++ b/packages/rivetkit/src/drivers/file-system/actor.ts @@ -1,7 +1,3 @@ -import { Hono, MiddlewareHandler } from "hono"; -import { streamSSE } from "hono/streaming"; -import type { GenericConnGlobalState } from "@/actor/generic-conn-driver"; -import { loggerWithoutContext } from "@/actor/log"; import type { AnyClient } from "@/client/client"; import type { ActorDriver, @@ -48,10 +44,6 @@ export class FileSystemActorDriver implements ActorDriver { ); } - getGenericConnGlobalState(actorId: string): GenericConnGlobalState { - return this.#state.getActorOrError(actorId).genericConnGlobalState; - } - /** * Get the current storage directory path */ diff --git a/packages/rivetkit/src/drivers/file-system/global-state.ts b/packages/rivetkit/src/drivers/file-system/global-state.ts index 30918c342..4f8e42ee1 100644 --- a/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -5,10 +5,6 @@ import * as path from "node:path"; import invariant from "invariant"; import { lookupInRegistry } from "@/actor/definition"; import { ActorAlreadyExists } from "@/actor/errors"; -import { - createGenericConnDrivers, - GenericConnGlobalState, -} from "@/actor/generic-conn-driver"; import type { AnyActorInstance } from "@/actor/instance"; import type { ActorKey } from "@/actor/mod"; import { generateRandomString } from "@/actor/utils"; @@ -28,7 +24,6 @@ import { bufferToArrayBuffer, type LongTimeoutHandle, promiseWithResolvers, - SinglePromiseQueue, setLongTimeout, stringifyError, } from "@/utils"; @@ -52,8 +47,6 @@ interface ActorEntry { /** Promise for starting the actor. */ startPromise?: ReturnType>; - genericConnGlobalState: GenericConnGlobalState; - alarmTimeout?: LongTimeoutHandle; /** The timestamp currently scheduled for this actor's alarm (ms since epoch). */ alarmTimestamp?: number; @@ -189,7 +182,6 @@ export class FileSystemGlobalState { entry = { id: actorId, - genericConnGlobalState: new GenericConnGlobalState(), removed: false, }; this.#actors.set(actorId, entry); @@ -478,11 +470,7 @@ export class FileSystemGlobalState { entry.actor = definition.instantiate(); // Start actor - const connDrivers = createGenericConnDrivers( - entry.genericConnGlobalState, - ); await entry.actor.start( - connDrivers, actorDriver, inlineClient, actorId, diff --git a/packages/rivetkit/src/mod.ts b/packages/rivetkit/src/mod.ts index 4b7cc14b8..924a7aa29 100644 --- a/packages/rivetkit/src/mod.ts +++ b/packages/rivetkit/src/mod.ts @@ -1,4 +1,4 @@ -export { generateConnId, generateConnToken } from "@/actor/connection"; +export { generateConnId, generateConnToken } from "@/actor/conn"; export * from "@/actor/mod"; export { type AnyClient,