From a68dc68ffc576b45d9262e6392e5623a5b3e0308 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 6 Dec 2025 01:05:31 -0800 Subject: [PATCH] chore: effect example --- examples/effect/.gitignore | 2 + examples/effect/README.md | 37 ++ examples/effect/package.json | 25 + examples/effect/scripts/connect.ts | 19 + examples/effect/src/actors/counter.ts | 32 ++ examples/effect/src/actors/index.ts | 9 + examples/effect/src/actors/user.ts | 83 +++ examples/effect/src/effect/action.ts | 111 ++++ examples/effect/src/effect/actor.ts | 336 +++++++++++ examples/effect/src/effect/hook.ts | 1 + examples/effect/src/effect/index.ts | 3 + examples/effect/src/effect/log.ts | 29 + examples/effect/src/server.ts | 3 + examples/effect/tsconfig.json | 52 ++ examples/effect/turbo.json | 4 + pnpm-lock.yaml | 209 +++++++ .../packages/rivetkit/src/actor/config.ts | 536 +++++++++++++----- .../src/actor/instance/concurrency-manager.ts | 341 +++++++++++ .../src/actor/instance/connection-manager.ts | 70 +-- .../rivetkit/src/actor/instance/mod.ts | 242 ++++---- .../src/actor/instance/schedule-manager.ts | 29 +- .../src/actor/instance/state-manager.ts | 28 +- .../packages/rivetkit/src/actor/mod.ts | 3 + 23 files changed, 1850 insertions(+), 354 deletions(-) create mode 100644 examples/effect/.gitignore create mode 100644 examples/effect/README.md create mode 100644 examples/effect/package.json create mode 100644 examples/effect/scripts/connect.ts create mode 100644 examples/effect/src/actors/counter.ts create mode 100644 examples/effect/src/actors/index.ts create mode 100644 examples/effect/src/actors/user.ts create mode 100644 examples/effect/src/effect/action.ts create mode 100644 examples/effect/src/effect/actor.ts create mode 100644 examples/effect/src/effect/hook.ts create mode 100644 examples/effect/src/effect/index.ts create mode 100644 examples/effect/src/effect/log.ts create mode 100644 examples/effect/src/server.ts create mode 100644 examples/effect/tsconfig.json create mode 100644 examples/effect/turbo.json create mode 100644 rivetkit-typescript/packages/rivetkit/src/actor/instance/concurrency-manager.ts diff --git a/examples/effect/.gitignore b/examples/effect/.gitignore new file mode 100644 index 0000000000..79b7a1192f --- /dev/null +++ b/examples/effect/.gitignore @@ -0,0 +1,2 @@ +.actorcore +node_modules \ No newline at end of file diff --git a/examples/effect/README.md b/examples/effect/README.md new file mode 100644 index 0000000000..213d0f5d7e --- /dev/null +++ b/examples/effect/README.md @@ -0,0 +1,37 @@ +# Counter for RivetKit + +Example project demonstrating basic actor state management and RPC calls with [RivetKit](https://rivetkit.org). + +[Learn More →](https://github.com/rivet-dev/rivetkit) + +[Discord](https://rivet.dev/discord) — [Documentation](https://rivetkit.org) — [Issues](https://github.com/rivet-dev/rivetkit/issues) + +## Getting Started + +### Prerequisites + +- Node.js + +### Installation + +```sh +git clone https://github.com/rivet-dev/rivetkit +cd rivetkit/examples/counter +npm install +``` + +### Development + +```sh +npm run dev +``` + +Run the connect script to interact with the counter: + +```sh +tsx scripts/connect.ts +``` + +## License + +Apache 2.0 \ No newline at end of file diff --git a/examples/effect/package.json b/examples/effect/package.json new file mode 100644 index 0000000000..fbd82c360a --- /dev/null +++ b/examples/effect/package.json @@ -0,0 +1,25 @@ +{ + "name": "example-effect", + "version": "2.0.21", + "private": true, + "type": "module", + "scripts": { + "dev": "tsx src/server.ts", + "check-types": "tsc --noEmit", + "connect": "tsx scripts/connect.ts", + "test": "vitest run" + }, + "devDependencies": { + "@effect/language-service": "^0.60.0", + "@types/node": "^22.13.9", + "rivetkit": "workspace:*", + "tsx": "^3.12.7", + "typescript": "^5.7.3", + "vitest": "^3.1.1" + }, + "stableVersion": "0.8.0", + "dependencies": { + "@effect/workflow": "^0.15.1", + "effect": "^3.19.9" + } +} diff --git a/examples/effect/scripts/connect.ts b/examples/effect/scripts/connect.ts new file mode 100644 index 0000000000..84e806829f --- /dev/null +++ b/examples/effect/scripts/connect.ts @@ -0,0 +1,19 @@ +import { createClient } from "rivetkit/client"; +import type { Registry } from "../src/actors"; + +async function main() { + const client = createClient("http://localhost:6420"); + + const counter = client.counter.getOrCreate().connect(); + + counter.on("newCount", (count: number) => console.log("Event:", count)); + + while (true) { + const out = await counter.increment(1); + console.log("RPC:", out); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + } +} + +main(); diff --git a/examples/effect/src/actors/counter.ts b/examples/effect/src/actors/counter.ts new file mode 100644 index 0000000000..1e7fc4f441 --- /dev/null +++ b/examples/effect/src/actors/counter.ts @@ -0,0 +1,32 @@ +import { actor } from "rivetkit"; +import { Action } from "../effect"; + +export const counter = actor({ + state: { + count: 0, + }, + actions: { + increment: Action.effect(function* (c, x: number) { + yield* Action.updateState(c, (s) => { + s.count += x; + }); + const s = yield* Action.state(c); + yield* Action.broadcast(c, "newCount", s.count); + return s.count; + }), + getCount: Action.effect(function* (c) { + const s = yield* Action.state(c); + return s.count; + }), + // increment: Action.createWithSchema( + // IncrementInput, + // function* (c, { amount }) { + // yield* Actor.updateState(c, (s) => { + // s.count += amount; + // }); + // const s = yield* Actor.state(c); + // return s.count; + // }, + // ), + }, +}); diff --git a/examples/effect/src/actors/index.ts b/examples/effect/src/actors/index.ts new file mode 100644 index 0000000000..e91ed8c15b --- /dev/null +++ b/examples/effect/src/actors/index.ts @@ -0,0 +1,9 @@ +import { setup } from "rivetkit"; +import { counter } from "./counter"; +import { user } from "./user"; + +export const registry = setup({ + use: { counter, user }, +}); + +export type Registry = typeof registry; diff --git a/examples/effect/src/actors/user.ts b/examples/effect/src/actors/user.ts new file mode 100644 index 0000000000..00bf244771 --- /dev/null +++ b/examples/effect/src/actors/user.ts @@ -0,0 +1,83 @@ +import { Data, Effect } from "effect"; +import { actor } from "rivetkit"; +import { Action } from "@rivetkit/effect"; +import { Activity } from "@effect/workflow"; + +export const user = actor({ + createState: (c, input: UserInput): UserState => ({ + email: input.email, + customerId: crypto.randomUUID(), + }), + actions: { + getEmail: Action.effect(function* (c) { + const s = yield* Action.state(c); + return s.email; + }), + updateEmail: Action.workflow(function* (c, newEmail: string) { + if (!validateEmail(newEmail)) { + return yield* Effect.fail( + new InvalidEmailError({ email: newEmail }), + ); + } + + const s = yield* Action.state(c); + yield* Activity.make({ + name: "UpdateStripeEmail", + execute: updateStripeCustomerEmail(s.customerId, newEmail), + }); + yield* Action.updateState(c, (state) => { + state.email = newEmail; + }); + yield* Activity.make({ + name: "SendConfirmationEmail", + execute: sendResendEmailConfirmation(newEmail), + }); + }), + }, +}); + +// === +interface UserInput { + email: string; +} + +interface UserState { + email: string; + customerId: string; +} + +class InvalidEmailError extends Data.TaggedError("InvalidEmailError")<{ + email: string; +}> {} + +function validateEmail(email: string): boolean { + return true; +} + +// External service functions (empty implementations for demonstration) +function createStripeCustomer(email: string): Effect.Effect { + return Effect.succeed(`cus_${Date.now()}`); + // Real implementation would use Effect.tryPromise() or similar + // return Effect.tryPromise(() => stripe.customers.create({ email })) +} + +function updateStripeCustomerEmail( + customerId: string, + email: string, +): Effect.Effect { + return Effect.void; + // Real implementation: + // return Effect.tryPromise(() => stripe.customers.update(customerId, { email })) +} + +function sendResendWelcomeEmail(email: string): Effect.Effect { + return Effect.void; + // Real implementation: + // return Effect.tryPromise(() => resend.emails.send({ to: email, ... })) +} + +function sendResendEmailConfirmation(email: string): Effect.Effect { + return Effect.void; + // Real implementation: + // return Effect.tryPromise(() => resend.emails.send({ to: email, ... })) +} diff --git a/examples/effect/src/effect/action.ts b/examples/effect/src/effect/action.ts new file mode 100644 index 0000000000..ff6fbb5cdd --- /dev/null +++ b/examples/effect/src/effect/action.ts @@ -0,0 +1,111 @@ +import { Context, Effect } from "effect"; +import type { + ActorContext, + AnyDatabaseProvider, + Registry, + ActionContext, + InitContext, +} from "rivetkit"; +import type { YieldWrap } from "effect/Utils"; +import { ActorContextTag } from "./hook"; + +export * from "./hook"; + +export const getConn = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActionContext, +) => Effect.succeed(c.conn); + +export function effect< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + AEff, + Args extends unknown[], +>( + genFn: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ...args: Args + ) => Generator>, AEff, never>, +): ( + c: ActionContext, + ...args: Args +) => Promise { + return (c, ...args) => { + const gen = genFn(c, ...args); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + // Provide ActorContext via Effect Context + const withContext = Effect.provideService( + eff, + ActorContextTag, + c, + ) as Effect.Effect; + + return Effect.runPromise(withContext); + }; +} + +export function workflow< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + AEff, + Args extends unknown[], +>( + genFn: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ...args: Args + ) => Generator>, AEff, never>, +): ( + c: ActionContext, + ...args: Args +) => Promise { + return (c, ...args) => { + const gen = genFn(c, ...args); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + // Provide ActorContext via Effect Context + const withContext = Effect.provideService( + eff, + ActorContextTag, + c, + ) as Effect.Effect; + + // Make workflow execution durable by using waitUntil + const workflowPromise = Effect.runPromise(withContext); + c.waitUntil(workflowPromise.then(() => {})); + + return workflowPromise; + }; +} diff --git a/examples/effect/src/effect/actor.ts b/examples/effect/src/effect/actor.ts new file mode 100644 index 0000000000..0f92d4c14c --- /dev/null +++ b/examples/effect/src/effect/actor.ts @@ -0,0 +1,336 @@ +import { Context, Effect } from "effect"; +import type { + ActorContext, + AnyDatabaseProvider, + Registry, +} from "rivetkit"; +import type { YieldWrap } from "effect/Utils"; + +// Context tag for accessing ActorContext within Effects +export const ActorContextTag = + Context.GenericTag>( + "ActorContext", + ); + +export const context = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>(): Effect.Effect< + ActorContext, + never, + typeof ActorContextTag +> => ActorContextTag as any; + +export const state = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.state); + +export const updateState = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, + f: (state: TState) => void, +): Effect.Effect => Effect.sync(() => f(c.state)); + +export const vars = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.vars); + +export const updateVars = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, + f: (vars: TVars) => void, +): Effect.Effect => Effect.sync(() => f(c.vars)); + +export const broadcast = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + Args extends Array, +>( + c: ActorContext, + name: string, + ...args: Args +): Effect.Effect => + Effect.sync(() => c.broadcast(name, ...args)); + +export const getLog = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +) => Effect.succeed(c.log); + +export const getActorId = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.actorId); + +export const getName = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.name); + +export const getKey = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +) => Effect.succeed(c.key); + +export const getRegion = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.region); + +export const getSchedule = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +) => Effect.succeed(c.schedule); + +export const getConns = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +) => Effect.succeed(c.conns); + +export const getClient = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + R extends Registry, +>( + c: ActorContext, +) => Effect.succeed(c.client()); + +export const getDb = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +) => Effect.succeed(c.db); + +export const saveState = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, + opts: Parameters[0], +): Effect.Effect => Effect.promise(() => c.saveState(opts)); + +export const waitUntil = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + A, + E, +>( + c: ActorContext, + effect: Effect.Effect, +): Effect.Effect => + Effect.sync(() => { + const promise = Effect.runPromise(effect).then(() => {}); + c.waitUntil(promise); + }); + +export const getAbortSignal = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.abortSignal); + +export const sleep = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.sync(() => c.sleep()); + +export const destroy = < + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +>( + c: ActorContext, +): Effect.Effect => Effect.sync(() => c.destroy()); + +export function effect< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + AEff, +>( + genFn: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => Generator>, AEff, never>, +): ( + c: ActorContext, +) => Promise { + return (c) => { + const gen = genFn(c); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + // Provide ActorContext via Effect Context + const withContext = Effect.provideService( + eff, + ActorContextTag, + c, + ) as Effect.Effect; + + // Make workflow execution durable by using waitUntil + const workflowPromise = Effect.runPromise(withContext); + c.waitUntil(workflowPromise.then(() => {})); + + return workflowPromise; + }; +} + +export function workflow< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + AEff, +>( + genFn: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => Generator>, AEff, never>, +): ( + c: ActorContext, +) => Promise { + return (c) => { + const gen = genFn(c); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + // Provide ActorContext via Effect Context + const withContext = Effect.provideService( + eff, + ActorContextTag, + c, + ) as Effect.Effect; + + // Make workflow execution durable by using waitUntil + const workflowPromise = Effect.runPromise(withContext); + c.waitUntil(workflowPromise.then(() => {})); + + return workflowPromise; + }; +} diff --git a/examples/effect/src/effect/hook.ts b/examples/effect/src/effect/hook.ts new file mode 100644 index 0000000000..e389e4a8ba --- /dev/null +++ b/examples/effect/src/effect/hook.ts @@ -0,0 +1 @@ +export * from "./actor"; diff --git a/examples/effect/src/effect/index.ts b/examples/effect/src/effect/index.ts new file mode 100644 index 0000000000..8a351a5b3a --- /dev/null +++ b/examples/effect/src/effect/index.ts @@ -0,0 +1,3 @@ +export * as Hook from "./hook"; +export * as Action from "./action"; +export * from "./log"; diff --git a/examples/effect/src/effect/log.ts b/examples/effect/src/effect/log.ts new file mode 100644 index 0000000000..46e44e436e --- /dev/null +++ b/examples/effect/src/effect/log.ts @@ -0,0 +1,29 @@ +import { Effect } from "effect"; +import { ActorContextTag, context } from "./hook"; + +// Log namespace for structured logging +export namespace Log { + export const info = (message: string): Effect.Effect => + Effect.gen(function* () { + const c = yield* context(); + c.log.info(message); + }); + + export const warn = (message: string): Effect.Effect => + Effect.gen(function* () { + const c = yield* context(); + c.log.warn(message); + }); + + export const error = (message: string): Effect.Effect => + Effect.gen(function* () { + const c = yield* context(); + c.log.error(message); + }); + + export const debug = (message: string): Effect.Effect => + Effect.gen(function* () { + const c = yield* context(); + c.log.debug(message); + }); +} diff --git a/examples/effect/src/server.ts b/examples/effect/src/server.ts new file mode 100644 index 0000000000..b2f9974ccd --- /dev/null +++ b/examples/effect/src/server.ts @@ -0,0 +1,3 @@ +import { registry } from "./actors"; + +registry.start(); diff --git a/examples/effect/tsconfig.json b/examples/effect/tsconfig.json new file mode 100644 index 0000000000..291580bad3 --- /dev/null +++ b/examples/effect/tsconfig.json @@ -0,0 +1,52 @@ +{ + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ + "target": "esnext", + /* Specify a set of bundled library declaration files that describe the target runtime environment. */ + "lib": ["esnext"], + /* Specify what JSX code is generated. */ + "jsx": "react-jsx", + + /* Specify what module code is generated. */ + "module": "esnext", + /* Specify how TypeScript looks up a file from a given module specifier. */ + "moduleResolution": "bundler", + /* Specify the base directory to resolve non-relative module names. */ + "baseUrl": ".", + /* Specify path mapping to be computed relative to baseUrl option. */ + "paths": { + "@rivetkit/effect": ["./src/effect/index.ts"] + }, + /* Specify type package names to be included without being referenced in a source file. */ + "types": ["node"], + /* Enable importing .json files */ + "resolveJsonModule": true, + + /* Allow JavaScript files to be a part of your program. Use the `checkJS` option to get errors from these files. */ + "allowJs": true, + /* Enable error reporting in type-checked JavaScript files. */ + "checkJs": false, + + /* Disable emitting files from a compilation. */ + "noEmit": true, + + /* Ensure that each file can be safely transpiled without relying on other imports. */ + "isolatedModules": true, + /* Allow 'import x from y' when a module doesn't have a default export. */ + "allowSyntheticDefaultImports": true, + /* Ensure that casing is correct in imports. */ + "forceConsistentCasingInFileNames": true, + + /* Enable all strict type-checking options. */ + "strict": true, + + /* Skip type checking all .d.ts files. */ + "skipLibCheck": true, + "plugins": [ + { "name": "@effect/language-service" } + ] + }, + "include": ["src/**/*.ts", "scripts/**/*.ts", "tests/**/*.ts"] +} diff --git a/examples/effect/turbo.json b/examples/effect/turbo.json new file mode 100644 index 0000000000..29d4cb2625 --- /dev/null +++ b/examples/effect/turbo.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://turbo.build/schema.json", + "extends": ["//"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 48c6faafea..bb9ec38914 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -754,6 +754,34 @@ importers: specifier: ^5.5.2 version: 5.9.2 + examples/effect: + dependencies: + '@effect/workflow': + specifier: ^0.15.1 + version: 0.15.2(@effect/experimental@0.57.11(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11))(@effect/platform@0.93.6(effect@3.19.11))(@effect/rpc@0.72.2(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11))(effect@3.19.11) + effect: + specifier: ^3.19.9 + version: 3.19.11 + devDependencies: + '@effect/language-service': + specifier: ^0.60.0 + version: 0.60.0 + '@types/node': + specifier: ^22.13.9 + version: 22.19.1 + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.7.3 + version: 5.9.3 + vitest: + specifier: ^3.1.1 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.1)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.0) + examples/elysia: dependencies: '@rivetkit/react': @@ -3568,6 +3596,42 @@ packages: '@drizzle-team/brocli@0.10.2': resolution: {integrity: sha512-z33Il7l5dKjUgGULTqBsQBQwckHh5AbIuxhdsIxDDiZAzBOrZO6q9ogcWC65kU382AfynTfgNumVcNIjuIua6w==} + '@effect/experimental@0.57.11': + resolution: {integrity: sha512-M5uug3Drs/gyTHLfA+XzcIZQGUEV/Jn5yi1POki4oZswhpzNmsVTHl4THpxAordRKwa5lFvTSlsRP684YH7pSw==} + peerDependencies: + '@effect/platform': ^0.93.6 + effect: ^3.19.9 + ioredis: ^5 + lmdb: ^3 + peerDependenciesMeta: + ioredis: + optional: true + lmdb: + optional: true + + '@effect/language-service@0.60.0': + resolution: {integrity: sha512-elJDWHG5Naq3OkilPt9ZRn56JfSA3MhXUIlDx9RWJeScHm96kZ+HkZ3eFBxqROzXwD6Q2DTtFctFwOM0+QLZEA==} + hasBin: true + + '@effect/platform@0.93.6': + resolution: {integrity: sha512-I5lBGQWzWXP4zlIdPs7z7WHmEFVBQhn+74emr/h16GZX96EEJ6I1rjGaKyZF7mtukbMuo9wEckDPssM8vskZ/w==} + peerDependencies: + effect: ^3.19.8 + + '@effect/rpc@0.72.2': + resolution: {integrity: sha512-BmTXybXCOq96D2r9mvSW/YdiTQs5CStnd4II+lfVKrMr3pMNERKLZ2LG37Tfm4Sy3Q8ire6IVVKO/CN+VR0uQQ==} + peerDependencies: + '@effect/platform': ^0.93.3 + effect: ^3.19.5 + + '@effect/workflow@0.15.2': + resolution: {integrity: sha512-UAo5QWEvyyKsnf4EQ7WL3zwiuZS4Wd5fmAxdpcpZSIxNOvsABp3DOuyRCiidD8l3sQhdPwES/UsVK4QOCQ7wew==} + peerDependencies: + '@effect/experimental': ^0.57.11 + '@effect/platform': ^0.93.6 + '@effect/rpc': ^0.72.2 + effect: ^3.19.10 + '@emnapi/core@1.7.1': resolution: {integrity: sha512-o1uhUASyo921r2XtHYOHy7gdkGLge8ghBEQHMWmyJFoXlpU58kIrhhN3w26lpQb6dspetweapMn2CSNwQ8I4wg==} @@ -5378,6 +5442,36 @@ packages: react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==} + cpu: [arm64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==} + cpu: [x64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==} + cpu: [arm64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==} + cpu: [arm] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==} + cpu: [x64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==} + cpu: [x64] + os: [win32] + '@napi-rs/wasm-runtime@0.2.12': resolution: {integrity: sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==} @@ -9072,6 +9166,9 @@ packages: ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} + effect@3.19.11: + resolution: {integrity: sha512-UTEj3c1s41Ha3uzSPKKvFBZaDjZ8ez00Q2NYWVm2mKh2LXeX8j6LTg1HcQHnmdUhOjr79KHmhVWYB/zbegLO1A==} + electron-to-chromium@1.5.215: resolution: {integrity: sha512-TIvGp57UpeNetj/wV/xpFNpWGb0b/ROw372lHPx5Aafx02gjTBtWnEEcaSX3W2dLM3OSdGGyHX/cHl01JQsLaQ==} @@ -9594,6 +9691,10 @@ packages: engines: {node: '>= 10.17.0'} hasBin: true + fast-check@3.23.2: + resolution: {integrity: sha512-h5+1OzzfCC3Ef7VbtKdcv7zsstUQwUDlYpUTvjeUsJAssPgLn7QzbboPtL5ro04Mq0rPOsMzl7q5hIbRs2wD1A==} + engines: {node: '>=8.0.0'} + fast-copy@3.0.2: resolution: {integrity: sha512-dl0O9Vhju8IrcLndv2eU4ldt1ftXMqqfgN4H1cpmGV7P6jeB9FwpN9a2c8DPGE1Ys88rNUJVYDHq73CGAGOPfQ==} @@ -9739,6 +9840,9 @@ packages: resolution: {integrity: sha512-/t88Ty3d5JWQbWYgaOGCCYfXRwV1+be02WqYYlL6h0lEiUAMPM8o8qKGO01YIkOHzka2up08wvgYD0mDiI+q3Q==} engines: {node: '>= 0.8'} + find-my-way-ts@0.1.6: + resolution: {integrity: sha512-a85L9ZoXtNAey3Y6Z+eBWW658kO/MwR7zIafkIUPUMf3isZG0NCs2pjW2wtjxAKuJPxMAsHUIP4ZPGv0o5gyTA==} + find-root@1.1.0: resolution: {integrity: sha512-NKfW6bec6GfKc0SGx1e07QZY9PE99u0Bft/0rzSD5k3sO/vwkVUpDUKVm5Gpp5Ue3YfShPFTX2070tDs5kB9Ng==} @@ -11429,12 +11533,22 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + msgpackr-extract@3.0.3: + resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==} + hasBin: true + + msgpackr@1.11.5: + resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} + muggle-string@0.3.1: resolution: {integrity: sha512-ckmWDJjphvd/FvZawgygcUeQCxzvohjFO5RxTjj4eq8kw359gFF3E1brjfI+viLMxss5JrHTDRHZvu2/tuy0Qg==} muggle-string@0.4.1: resolution: {integrity: sha512-VNTrAak/KhO2i8dqqnqnAHOa3cYBwXEZe9h+D5h/1ZqFSTEFHdM65lR7RoIqq3tBBYavsOXV84NoHXZ0AkPyqQ==} + multipasta@0.2.7: + resolution: {integrity: sha512-KPA58d68KgGil15oDqXjkUBEBYc00XvbPj5/X+dyzeo/lWm9Nc25pQRlf1D+gv4OpK7NM0J1odrbu9JNNGvynA==} + mute-stream@3.0.0: resolution: {integrity: sha512-dkEJPVvun4FryqBmZ5KhDo0K9iDXAwn08tMLDinNdRBNPcYEDiWYysLcc6k3mjTMlbP9KyylvRpd4wFtwrT9rw==} engines: {node: ^20.17.0 || >=22.9.0} @@ -11581,6 +11695,10 @@ packages: resolution: {integrity: sha512-+P72GAjVAbTxjjwUmwjVrqrdZROD4nf8KgpBoDxqXXTiYZZt/ud60dE5yvCSr9lRO8e8yv6kgJIC0K0PfZFVQw==} hasBin: true + node-gyp-build-optional-packages@5.2.2: + resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} + hasBin: true + node-int64@0.4.0: resolution: {integrity: sha512-O5lz91xSOeoXP6DulyHfllpq+Eg00MWitZIbtPfoSEvqIHdl5gfcY6hYzDWnj0qD5tz52PI08u9qUvSVeUBeHw==} @@ -12260,6 +12378,9 @@ packages: deprecated: < 24.15.0 is no longer supported hasBin: true + pure-rand@6.1.0: + resolution: {integrity: sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA==} + qrcode-terminal@0.11.0: resolution: {integrity: sha512-Uu7ii+FQy4Qf82G4xu7ShHhjhGahEpCWc3x8UavY3CTcWV+ufmmCtwkr7ZKsX42jdL0kr1B5FKUeqJvAn51jzQ==} hasBin: true @@ -13872,6 +13993,10 @@ packages: resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==} engines: {node: '>= 0.4.0'} + uuid@11.1.0: + resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==} + hasBin: true + uuid@12.0.0: resolution: {integrity: sha512-USe1zesMYh4fjCA8ZH5+X5WIVD0J4V1Jksm1bFTVBX2F/cwSXt0RO5w/3UXbdLKmZX65MiWV+hwhSS8p6oBTGA==} hasBin: true @@ -15707,6 +15832,34 @@ snapshots: '@drizzle-team/brocli@0.10.2': {} + '@effect/experimental@0.57.11(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11)': + dependencies: + '@effect/platform': 0.93.6(effect@3.19.11) + effect: 3.19.11 + uuid: 11.1.0 + + '@effect/language-service@0.60.0': {} + + '@effect/platform@0.93.6(effect@3.19.11)': + dependencies: + effect: 3.19.11 + find-my-way-ts: 0.1.6 + msgpackr: 1.11.5 + multipasta: 0.2.7 + + '@effect/rpc@0.72.2(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11)': + dependencies: + '@effect/platform': 0.93.6(effect@3.19.11) + effect: 3.19.11 + msgpackr: 1.11.5 + + '@effect/workflow@0.15.2(@effect/experimental@0.57.11(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11))(@effect/platform@0.93.6(effect@3.19.11))(@effect/rpc@0.72.2(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11))(effect@3.19.11)': + dependencies: + '@effect/experimental': 0.57.11(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11) + '@effect/platform': 0.93.6(effect@3.19.11) + '@effect/rpc': 0.72.2(@effect/platform@0.93.6(effect@3.19.11))(effect@3.19.11) + effect: 3.19.11 + '@emnapi/core@1.7.1': dependencies: '@emnapi/wasi-threads': 1.1.0 @@ -17480,6 +17633,24 @@ snapshots: react: 18.3.1 react-dom: 18.3.1(react@18.3.1) + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + optional: true + '@napi-rs/wasm-runtime@0.2.12': dependencies: '@emnapi/core': 1.7.1 @@ -21676,6 +21847,11 @@ snapshots: ee-first@1.1.1: {} + effect@3.19.11: + dependencies: + '@standard-schema/spec': 1.0.0 + fast-check: 3.23.2 + electron-to-chromium@1.5.215: {} electron-to-chromium@1.5.257: {} @@ -22598,6 +22774,10 @@ snapshots: transitivePeerDependencies: - supports-color + fast-check@3.23.2: + dependencies: + pure-rand: 6.1.0 + fast-copy@3.0.2: {} fast-decode-uri-component@1.0.1: {} @@ -22739,6 +22919,8 @@ snapshots: transitivePeerDependencies: - supports-color + find-my-way-ts@0.1.6: {} + find-root@1.1.0: {} find-up@4.1.0: @@ -25005,10 +25187,28 @@ snapshots: ms@2.1.3: {} + msgpackr-extract@3.0.3: + dependencies: + node-gyp-build-optional-packages: 5.2.2 + optionalDependencies: + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3 + optional: true + + msgpackr@1.11.5: + optionalDependencies: + msgpackr-extract: 3.0.3 + muggle-string@0.3.1: {} muggle-string@0.4.1: {} + multipasta@0.2.7: {} + mute-stream@3.0.0: {} mz@2.7.0: @@ -25164,6 +25364,11 @@ snapshots: detect-libc: 2.1.2 optional: true + node-gyp-build-optional-packages@5.2.2: + dependencies: + detect-libc: 2.1.2 + optional: true + node-int64@0.4.0: {} node-releases@2.0.20: {} @@ -25864,6 +26069,8 @@ snapshots: - typescript - utf-8-validate + pure-rand@6.1.0: {} + qrcode-terminal@0.11.0: {} qrcode.react@4.2.0(react@19.1.1): @@ -28123,6 +28330,8 @@ snapshots: utils-merge@1.0.1: {} + uuid@11.1.0: {} + uuid@12.0.0: {} uuid@7.0.3: {} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 360cf05832..17580d2508 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -19,6 +19,103 @@ export type InitContext = ActorContext< undefined >; +// MARK: - Concurrency Types + +/** + * Concurrency mode for actions and hooks. + * + * - `serial`: Waits for all serial & parallel operations to complete before executing. + * Blocks other serial & parallel operations from running while executing. + * - `parallel`: Waits for all serial operations to complete before executing. + * Can run concurrently with other parallel operations. + * - `readonly`: Does not wait for or block any other operations. Can always run immediately. + * + * @default "serial" + */ +export type Concurrency = "serial" | "parallel" | "readonly"; + +/** + * Base options for concurrency control. + */ +export interface ConcurrencyOptions { + /** + * Concurrency mode for this operation. + * + * - `serial`: Waits for all serial & parallel operations to complete before executing. + * Blocks other serial & parallel operations from running while executing. + * - `parallel`: Waits for all serial operations to complete before executing. + * Can run concurrently with other parallel operations. + * - `readonly`: Does not wait for or block any other operations. Can always run immediately. + * + * @default "serial" + */ + concurrency?: Concurrency; +} + +/** + * Configuration options for an individual action. + */ +export interface ActionOptions extends ConcurrencyOptions { + /** + * Custom timeout for this action in milliseconds. + * Overrides the default actionTimeout from actor options. + */ + timeout?: number; +} + +/** + * Configuration options for a hook. + */ +export type HookOptions = ConcurrencyOptions; + +// MARK: - Wrapped Definition Types + +/** + * Symbol used to identify wrapped definitions (actions and hooks). + */ +export const WRAPPED_DEFINITION_SYMBOL = Symbol.for("rivet.wrappedDefinition"); + +/** + * A wrapped definition containing the handler and options. + * Used for both actions and hooks. + */ +export interface WrappedDefinition< + THandler extends (...args: any[]) => any, + TOptions, +> { + [WRAPPED_DEFINITION_SYMBOL]: true; + handler: THandler; + options: TOptions; +} + +/** + * Type guard to check if a value is a WrappedDefinition. + */ +export function isWrappedDefinition< + THandler extends (...args: any[]) => any, + TOptions, +>( + value: THandler | WrappedDefinition | undefined, +): value is WrappedDefinition { + return ( + typeof value === "object" && + value !== null && + WRAPPED_DEFINITION_SYMBOL in value + ); +} + +/** + * A wrapped action definition containing the handler and options. + */ +export type ActionDefinition any> = + WrappedDefinition; + +/** + * A wrapped hook definition containing the handler and options. + */ +export type HookDefinition any> = + WrappedDefinition; + export interface ActorTypes< TState, TConnParams, @@ -36,8 +133,40 @@ export interface ActorTypes< } // Helper for validating function types - accepts generic for specific function signatures -const zFunction = any = (...args: unknown[]) => unknown>() => - z.custom((val) => typeof val === "function"); +const zFunction = < + T extends (...args: any[]) => any = (...args: unknown[]) => unknown, +>() => z.custom((val) => typeof val === "function"); + +// Schema for hook options (concurrency only) +const HookOptionsSchema = z.object({ + concurrency: z.enum(["serial", "parallel", "readonly"]).optional(), +}); + +// Schema for a hook that can be either a function or a wrapped definition +const hookSchema = z + .union([ + zFunction(), + z.object({ + handler: zFunction(), + options: HookOptionsSchema, + }), + ]) + .optional(); + +// Schema for action options (concurrency + timeout) +const ActionOptionsSchema = z.object({ + timeout: z.number().positive().optional(), + concurrency: z.enum(["serial", "parallel", "readonly"]).optional(), +}); + +// Schema for an action that can be either a function or a wrapped definition +const ActionSchema = z.union([ + zFunction(), + z.object({ + handler: zFunction(), + options: ActionOptionsSchema, + }), +]); // This schema is used to validate the input at runtime. The generic types are defined below in `ActorConfig`. // @@ -46,18 +175,18 @@ const zFunction = any = (...args: unknown[]) => u // (b) it makes the type definitions incredibly difficult to read as opposed to vanilla TypeScript. export const ActorConfigSchema = z .object({ - onCreate: zFunction().optional(), - onDestroy: zFunction().optional(), - onWake: zFunction().optional(), - onSleep: zFunction().optional(), - onStateChange: zFunction().optional(), - onBeforeConnect: zFunction().optional(), - onConnect: zFunction().optional(), - onDisconnect: zFunction().optional(), - onBeforeActionResponse: zFunction().optional(), - onRequest: zFunction().optional(), - onWebSocket: zFunction().optional(), - actions: z.record(z.string(), zFunction()).default(() => ({})), + onCreate: hookSchema, + onDestroy: hookSchema, + onWake: hookSchema, + onSleep: hookSchema, + onStateChange: hookSchema, + onBeforeConnect: hookSchema, + onConnect: hookSchema, + onDisconnect: hookSchema, + onBeforeActionResponse: hookSchema, + onRequest: hookSchema, + onWebSocket: hookSchema, + actions: z.record(z.string(), ActionSchema).default(() => ({})), state: z.any().optional(), createState: zFunction().optional(), connState: z.any().optional(), @@ -186,6 +315,44 @@ type CreateVars = } | Record; +/** + * Type for an action handler function. + */ +export type ActionHandler< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> = ( + c: ActionContext, + ...args: any[] +) => any; + +/** + * Type for an action entry - either a raw handler function or a wrapped ActionDefinition. + */ +export type ActionEntry< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> = + | ActionHandler + | ActionDefinition< + ActionHandler< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + > + >; + export interface Actions< TState, TConnParams, @@ -194,19 +361,23 @@ export interface Actions< TInput, TDatabase extends AnyDatabaseProvider, > { - [Action: string]: ( - c: ActionContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ...args: any[] - ) => any; + [Action: string]: ActionEntry< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >; } +/** + * Type for a hook entry - either a raw handler function or a wrapped HookDefinition. + */ +export type HookEntry any> = + | THandler + | HookDefinition; + //export type ActorConfig = BaseActorConfig & // ActorConfigLifecycle & // CreateState & @@ -239,31 +410,35 @@ interface BaseActorConfig< * Use this hook to initialize your actor's state. * This is called before any other lifecycle hooks. */ - onCreate?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - input: TInput, - ) => void | Promise; + onCreate?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + input: TInput, + ) => void | Promise + >; /** * Called when the actor is destroyed. */ - onDestroy?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ) => void | Promise; + onDestroy?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise + >; /** * Called when the actor is started and ready to receive connections and action. @@ -273,16 +448,18 @@ interface BaseActorConfig< * * @returns Void or a Promise that resolves when startup is complete */ - onWake?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ) => void | Promise; + onWake?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise + >; /** * Called when the actor is stopping or sleeping. @@ -294,16 +471,18 @@ interface BaseActorConfig< * * @returns Void or a Promise that resolves when shutdown is complete */ - onSleep?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ) => void | Promise; + onSleep?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise + >; /** * Called when the actor's state changes. @@ -316,17 +495,19 @@ interface BaseActorConfig< * * @param newState The updated state */ - onStateChange?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - newState: TState, - ) => void; + onStateChange?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + newState: TState, + ) => void + >; /** * Called before a client connects to the actor. @@ -338,10 +519,12 @@ interface BaseActorConfig< * @returns The initial connection state or a Promise that resolves to it * @throws Throw an error to reject the connection */ - onBeforeConnect?: ( - c: OnBeforeConnectContext, - params: TConnParams, - ) => void | Promise; + onBeforeConnect?: HookEntry< + ( + c: OnBeforeConnectContext, + params: TConnParams, + ) => void | Promise + >; /** * Called when a client successfully connects to the actor. @@ -352,17 +535,26 @@ interface BaseActorConfig< * @param conn The connection object * @returns Void or a Promise that resolves when connection handling is complete */ - onConnect?: ( - c: OnConnectContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - conn: Conn, - ) => void | Promise; + onConnect?: HookEntry< + ( + c: OnConnectContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + conn: Conn< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise + >; /** * Called when a client disconnects from the actor. @@ -373,17 +565,26 @@ interface BaseActorConfig< * @param conn The connection that is being closed * @returns Void or a Promise that resolves when disconnect handling is complete */ - onDisconnect?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - conn: Conn, - ) => void | Promise; + onDisconnect?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + conn: Conn< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise + >; /** * Called before sending an action response to the client. @@ -397,19 +598,21 @@ interface BaseActorConfig< * @param output The output that will be sent to the client * @returns The modified output to send to the client */ - onBeforeActionResponse?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - name: string, - args: unknown[], - output: Out, - ) => Out | Promise; + onBeforeActionResponse?: HookEntry< + ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + name: string, + args: unknown[], + output: Out, + ) => Out | Promise + >; /** * Called when a raw HTTP request is made to the actor. @@ -422,17 +625,19 @@ interface BaseActorConfig< * @param opts Additional options * @returns A Response object to send back, or void to continue with default routing */ - onRequest?: ( - c: RequestContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - request: Request, - ) => Response | Promise; + onRequest?: HookEntry< + ( + c: RequestContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + request: Request, + ) => Response | Promise + >; /** * Called when a raw WebSocket connection is established to the actor. @@ -444,17 +649,19 @@ interface BaseActorConfig< * @param websocket The raw WebSocket connection * @param opts Additional options including the original HTTP upgrade request */ - onWebSocket?: ( - c: WebSocketContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - websocket: UniversalWebSocket, - ) => void | Promise; + onWebSocket?: HookEntry< + ( + c: WebSocketContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + websocket: UniversalWebSocket, + ) => void | Promise + >; actions: TActions; } @@ -574,6 +781,59 @@ export type ActorConfigInput< CreateVars & ActorDatabaseConfig; +/** + * Wraps an action handler with configuration options. + * + * @example + * ```ts + * actor({ + * actions: { + * foo: action((c, arg1: number) => { ... }, { timeout: 30000 }) + * } + * }) + * ``` + * + * @param handler The action handler function + * @param options Configuration options for this action + * @returns An ActionDefinition that can be used in the actions object + */ +export function action any>( + handler: THandler, + options: ActionOptions = {}, +): ActionDefinition { + return { + [WRAPPED_DEFINITION_SYMBOL]: true, + handler, + options, + }; +} + +/** + * Wraps a hook handler with configuration options. + * + * @example + * ```ts + * actor({ + * onCreate: handler((c, input) => { ... }, { concurrency: "serial" }), + * onConnect: handler((c, conn) => { ... }, { concurrency: "parallel" }), + * }) + * ``` + * + * @param fn The hook handler function + * @param options Configuration options for this hook + * @returns A HookDefinition that can be used for lifecycle/connection hooks + */ +export function handler any>( + fn: THandler, + options: HookOptions = {}, +): HookDefinition { + return { + [WRAPPED_DEFINITION_SYMBOL]: true, + handler: fn, + options, + }; +} + // For testing type definitions: export function test< TState, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/concurrency-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/concurrency-manager.ts new file mode 100644 index 0000000000..ede37736c4 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/concurrency-manager.ts @@ -0,0 +1,341 @@ +import { stringifyError } from "@/common/utils"; +import { + isWrappedDefinition, + type Concurrency, + type ActorConfig, + type HookOptions, + type WrappedDefinition, +} from "../config"; +import type { Conn } from "../conn/mod"; +import { ActionContext } from "../contexts/action"; +import type { AnyDatabaseProvider } from "../database"; +import * as errors from "../errors"; +import { DeadlineError, deadline } from "../utils"; +import type { ActorInstance } from "./mod"; + +/** + * Manages action and hook execution with concurrency control. + * Handles serial, parallel, and readonly concurrency modes. + */ +export class ConcurrencyManager { + #actor: ActorInstance; + #config: ActorConfig; + + /** Currently executing serial operation promise, if any */ + #runningSerial: Promise | null = null; + /** Currently executing parallel operation promises */ + #runningParallel: Set> = new Set(); + + constructor( + actor: ActorInstance, + config: ActorConfig, + ) { + this.#actor = actor; + this.#config = config; + } + + // MARK: - Core Execution + + /** + * Core execution method that handles concurrency for any async operation. + * + * @param concurrency - The concurrency mode + * @param executor - Function that performs the actual work + * @param timeout - Optional timeout in milliseconds + * @returns The executor's return value + */ + async #executeWithConcurrency( + concurrency: Concurrency, + executor: () => T | Promise, + timeout?: number, + ): Promise { + // Wait for appropriate operations based on concurrency mode + await this.#waitForConcurrency(concurrency); + + // Create the execution promise + const executionPromise = (async () => { + const result = executor(); + if (result instanceof Promise) { + if (timeout !== undefined) { + return await deadline(result, timeout); + } + return await result; + } + return result; + })(); + + // Track the promise based on concurrency mode + if (concurrency === "serial") { + this.#runningSerial = executionPromise; + try { + return await executionPromise; + } finally { + this.#runningSerial = null; + } + } else if (concurrency === "parallel") { + this.#runningParallel.add(executionPromise); + try { + return await executionPromise; + } finally { + this.#runningParallel.delete(executionPromise); + } + } else { + // readonly: not tracked at all + return await executionPromise; + } + } + + /** + * Waits for the appropriate operations to complete based on concurrency mode. + */ + async #waitForConcurrency(concurrency: Concurrency): Promise { + if (concurrency === "readonly") { + // readonly: does not wait for anything + return; + } + + if (concurrency === "parallel") { + // parallel: waits for all serial operations to complete + if (this.#runningSerial) { + await this.#runningSerial; + } + return; + } + + // serial: waits for all serial & parallel operations to complete + const waitPromises: Promise[] = []; + if (this.#runningSerial) { + waitPromises.push(this.#runningSerial); + } + for (const p of this.#runningParallel) { + waitPromises.push(p); + } + if (waitPromises.length > 0) { + await Promise.all(waitPromises); + } + } + + // MARK: - Action Execution + + /** + * Executes an action with proper concurrency handling. + * + * @param ctx - The action context + * @param actionName - Name of the action to execute + * @param args - Arguments to pass to the action + * @returns The action's return value + */ + async executeAction( + ctx: ActionContext, + actionName: string, + args: unknown[], + ): Promise { + this.#actor.assertReady(); + + if (!(actionName in this.#config.actions)) { + this.#actor.rLog.warn({ msg: "action does not exist", actionName }); + throw new errors.ActionNotFound(actionName); + } + + const actionEntry = this.#config.actions[actionName]; + + // Unwrap WrappedDefinition if needed + const actionFunction = isWrappedDefinition(actionEntry) + ? actionEntry.handler + : actionEntry; + const actionOptions = isWrappedDefinition(actionEntry) + ? actionEntry.options + : undefined; + + if (typeof actionFunction !== "function") { + this.#actor.rLog.warn({ + msg: "action is not a function", + actionName, + type: typeof actionFunction, + }); + throw new errors.ActionNotFound(actionName); + } + + // Use action-specific timeout if provided, otherwise use default + const actionTimeout = + actionOptions?.timeout ?? this.#config.options.actionTimeout; + + // Get concurrency mode, defaulting to "serial" + const concurrency: Concurrency = + actionOptions?.concurrency ?? "serial"; + + try { + return await this.#executeWithConcurrency( + concurrency, + () => this.#invokeAction(ctx, actionName, actionFunction, args), + actionTimeout, + ); + } catch (error) { + if (error instanceof DeadlineError) { + throw new errors.ActionTimedOut(); + } + this.#actor.rLog.error({ + msg: "action error", + actionName, + error: stringifyError(error), + }); + throw error; + } finally { + this.#actor.stateManager.savePersistThrottled(); + } + } + + /** + * Invokes the action function and processes the response. + */ + async #invokeAction( + ctx: ActionContext, + actionName: string, + actionFunction: (...args: any[]) => any, + args: unknown[], + ): Promise { + this.#actor.rLog.debug({ + msg: "executing action", + actionName, + args, + }); + + let output = actionFunction.call(undefined, ctx, ...args); + if (output instanceof Promise) { + output = await output; + } + + // Process through onBeforeActionResponse if configured + if (this.#config.onBeforeActionResponse) { + output = await this.#invokeOnBeforeActionResponse( + actionName, + args, + output, + ); + } + + return output; + } + + /** + * Invokes the onBeforeActionResponse hook. + */ + async #invokeOnBeforeActionResponse( + actionName: string, + args: unknown[], + output: unknown, + ): Promise { + const hookEntry = this.#config.onBeforeActionResponse; + if (!hookEntry) return output; + + try { + const handler = isWrappedDefinition(hookEntry) + ? hookEntry.handler + : hookEntry; + + const processedOutput = handler( + this.#actor.actorContext, + actionName, + args, + output, + ); + + if (processedOutput instanceof Promise) { + return await processedOutput; + } + return processedOutput; + } catch (error) { + this.#actor.rLog.error({ + msg: "error in `onBeforeActionResponse`", + error: stringifyError(error), + }); + return output; + } + } + + /** + * Executes a scheduled action with proper concurrency handling. + * Creates a temporary internal connection for the scheduled action. + * + * @param createConn - Function to create a temporary connection + * @param actionName - Name of the action to execute + * @param args - Arguments to pass to the action + * @returns The action's return value + */ + async executeScheduledAction( + createConn: () => Promise>, + actionName: string, + args: unknown[], + ): Promise { + const conn = await createConn(); + + try { + const ctx = new ActionContext(this.#actor, conn); + return await this.executeAction(ctx, actionName, args); + } finally { + conn.disconnect(); + } + } + + // MARK: - Hook Execution + + /** + * Executes a hook with proper concurrency handling. + * + * @param hookEntry - The hook entry (function or HookDefinition) + * @param invoker - Function that invokes the hook handler + * @param defaultConcurrency - Default concurrency mode if not specified + */ + async executeHook any>( + hookEntry: THandler | WrappedDefinition | undefined, + invoker: (handler: THandler) => unknown, + defaultConcurrency: Concurrency = "serial", + ): Promise { + if (!hookEntry) return; + + const handler = isWrappedDefinition(hookEntry) + ? hookEntry.handler + : hookEntry; + const options = isWrappedDefinition(hookEntry) + ? hookEntry.options + : undefined; + const concurrency = (options as HookOptions | undefined)?.concurrency ?? defaultConcurrency; + + await this.#executeWithConcurrency(concurrency, async () => { + const result = invoker(handler as THandler); + if (result instanceof Promise) { + await result; + } + }); + } + + /** + * Executes a hook that returns a value with proper concurrency handling. + * + * @param hookEntry - The hook entry (function or HookDefinition) + * @param invoker - Function that invokes the hook handler + * @param defaultConcurrency - Default concurrency mode if not specified + * @returns The hook's return value + */ + async executeHookWithReturn any, TReturn>( + hookEntry: THandler | WrappedDefinition, + invoker: (handler: THandler) => TReturn | Promise, + defaultConcurrency: Concurrency = "serial", + ): Promise { + const handler = isWrappedDefinition(hookEntry) + ? hookEntry.handler + : hookEntry; + const options = isWrappedDefinition(hookEntry) + ? hookEntry.options + : undefined; + const concurrency = (options as HookOptions | undefined)?.concurrency ?? defaultConcurrency; + + return await this.#executeWithConcurrency(concurrency, async () => { + const result = invoker(handler as THandler); + if (result instanceof Promise) { + return await result; + } + return result; + }); + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts index 6cd1f69af3..6ef82629df 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts @@ -111,10 +111,13 @@ export class ConnectionManager< } // Create new connection - if (this.#actor.config.onBeforeConnect) { - const ctx = new OnBeforeConnectContext(this.#actor, request); - await this.#actor.config.onBeforeConnect(ctx, params); - } + await this.#actor.concurrencyManager.executeHook( + this.#actor.config.onBeforeConnect, + (handler) => { + const ctx = new OnBeforeConnectContext(this.#actor, request); + return handler(ctx, params); + }, + ); // Create connection state if enabled let connState: CS | undefined; @@ -294,28 +297,18 @@ export class ConnectionManager< this.#actor.inspector.emitter.emit("connectionUpdated"); - // Trigger disconnect - if (this.#actor.config.onDisconnect) { - try { - const result = this.#actor.config.onDisconnect( - this.#actor.actorContext, - conn, - ); - if (result instanceof Promise) { - result.catch((error) => { - this.#actor.rLog.error({ - msg: "error in `onDisconnect`", - error: stringifyError(error), - }); - }); - } - } catch (error) { + // Trigger disconnect with concurrency handling (runs in background) + this.#actor.concurrencyManager + .executeHook( + this.#actor.config.onDisconnect, + (handler) => handler(this.#actor.actorContext, conn), + ) + .catch((error) => { this.#actor.rLog.error({ msg: "error in `onDisconnect`", error: stringifyError(error), }); - } - } + }); // Remove from connsWithPersistChanged after onDisconnect to handle any // state changes made during the disconnect callback. Disconnected connections @@ -437,29 +430,24 @@ export class ConnectionManager< } #callOnConnect(conn: Conn) { - if (this.#actor.config.onConnect) { - try { - const ctx = new OnConnectContext(this.#actor, conn); - const result = this.#actor.config.onConnect(ctx, conn); - if (result instanceof Promise) { - deadline( - result, + // Run in background with concurrency handling + this.#actor.concurrencyManager + .executeHook( + this.#actor.config.onConnect, + (handler) => { + const ctx = new OnConnectContext(this.#actor, conn); + return deadline( + Promise.resolve(handler(ctx, conn)), this.#actor.config.options.onConnectTimeout, - ).catch((error) => { - this.#actor.rLog.error({ - msg: "error in `onConnect`, closing socket", - error, - }); - conn?.disconnect("`onConnect` failed"); - }); - } - } catch (error) { + ); + }, + ) + .catch((error) => { this.#actor.rLog.error({ - msg: "error in `onConnect`", + msg: "error in `onConnect`, closing socket", error: stringifyError(error), }); conn?.disconnect("`onConnect` failed"); - } - } + }); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index c40b7ffaa4..5bf0598874 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -15,7 +15,7 @@ import type * as protocol from "@/schemas/client-protocol/mod"; import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { ToClientSchema } from "@/schemas/client-protocol-zod/mod"; import { EXTRA_ERROR_LOG } from "@/utils"; -import type { ActorConfig, InitContext } from "../config"; +import { type ActorConfig, type InitContext } from "../config"; import type { ConnDriver } from "../conn/driver"; import { createHttpDriver } from "../conn/drivers/http"; import { @@ -45,6 +45,7 @@ import { deadline, generateSecureToken, } from "../utils"; +import { ConcurrencyManager } from "./concurrency-manager"; import { ConnectionManager } from "./connection-manager"; import { EventManager } from "./event-manager"; import { KEYS } from "./kv"; @@ -137,6 +138,9 @@ export class ActorInstance { // MARK: - HTTP/WebSocket Tracking #activeHonoHttpRequests = 0; + // MARK: - Concurrency Manager + #concurrencyManager!: ConcurrencyManager; + // MARK: - Deprecated (kept for compatibility) #schedule!: Schedule; @@ -284,6 +288,10 @@ export class ActorInstance { return this.#config; } + get concurrencyManager(): ConcurrencyManager { + return this.#concurrencyManager; + } + // MARK: - State Access get persist(): PersistedActor { return this.stateManager.persist; @@ -343,6 +351,7 @@ export class ActorInstance { this.connectionManager = new ConnectionManager(this); this.stateManager = new StateManager(this, actorDriver, this.#config); this.eventManager = new EventManager(this); + this.#concurrencyManager = new ConcurrencyManager(this, this.#config); this.#scheduleManager = new ScheduleManager( this, actorDriver, @@ -583,87 +592,13 @@ export class ActorInstance { } // MARK: - Action Execution + async executeAction( ctx: ActionContext, actionName: string, args: unknown[], ): Promise { - this.assertReady(); - - if (!(actionName in this.#config.actions)) { - this.#rLog.warn({ msg: "action does not exist", actionName }); - throw new errors.ActionNotFound(actionName); - } - - const actionFunction = this.#config.actions[actionName]; - if (typeof actionFunction !== "function") { - this.#rLog.warn({ - msg: "action is not a function", - actionName, - type: typeof actionFunction, - }); - throw new errors.ActionNotFound(actionName); - } - - try { - this.#rLog.debug({ - msg: "executing action", - actionName, - args, - }); - - const outputOrPromise = actionFunction.call( - undefined, - ctx, - ...args, - ); - - let output: unknown; - if (outputOrPromise instanceof Promise) { - output = await deadline( - outputOrPromise, - this.#config.options.actionTimeout, - ); - } else { - output = outputOrPromise; - } - - // Process through onBeforeActionResponse if configured - if (this.#config.onBeforeActionResponse) { - try { - const processedOutput = this.#config.onBeforeActionResponse( - this.actorContext, - actionName, - args, - output, - ); - if (processedOutput instanceof Promise) { - output = await processedOutput; - } else { - output = processedOutput; - } - } catch (error) { - this.#rLog.error({ - msg: "error in `onBeforeActionResponse`", - error: stringifyError(error), - }); - } - } - - return output; - } catch (error) { - if (error instanceof DeadlineError) { - throw new errors.ActionTimedOut(); - } - this.#rLog.error({ - msg: "action error", - actionName, - error: stringifyError(error), - }); - throw error; - } finally { - this.stateManager.savePersistThrottled(); - } + return this.#concurrencyManager.executeAction(ctx, actionName, args); } // MARK: - HTTP/WebSocket Handlers @@ -678,8 +613,13 @@ export class ActorInstance { } try { - const ctx = new RequestContext(this, conn, request); - const response = await this.#config.onRequest(ctx, request); + const response = await this.#concurrencyManager.executeHookWithReturn( + this.#config.onRequest, + (handler) => { + const ctx = new RequestContext(this, conn, request); + return handler(ctx, request); + }, + ); if (!response) { throw new errors.InvalidRequestHandlerResponse(); } @@ -712,20 +652,24 @@ export class ActorInstance { // Reset sleep timer when handling WebSocket this.resetSleepTimer(); - // Handle WebSocket - const ctx = new WebSocketContext(this, conn, request); - - // NOTE: This is async and will run in the background - const voidOrPromise = this.#config.onWebSocket(ctx, websocket); - - // Save changes from the WebSocket open - if (voidOrPromise instanceof Promise) { - voidOrPromise.then(() => { + // NOTE: This is async and will run in the background with concurrency handling + this.#concurrencyManager + .executeHook( + this.#config.onWebSocket, + (handler) => { + const ctx = new WebSocketContext(this, conn, request); + return handler(ctx, websocket); + }, + ) + .then(() => { this.stateManager.savePersistThrottled(); + }) + .catch((error) => { + this.#rLog.error({ + msg: "onWebSocket async error", + error: stringifyError(error), + }); }); - } else { - this.stateManager.savePersistThrottled(); - } } catch (error) { this.#rLog.error({ msg: "onWebSocket error", @@ -744,6 +688,32 @@ export class ActorInstance { await this.#scheduleManager.scheduleEvent(timestamp, action, args); } + /** + * Executes a scheduled action with proper concurrency handling. + * This is called by the ScheduleManager for scheduled events. + * @internal + */ + async executeScheduledAction( + actionName: string, + args: unknown[], + ): Promise { + return this.#concurrencyManager.executeScheduledAction( + async () => { + // Create a temporary internal connection for the scheduled action + return this.connectionManager.prepareAndConnectConn( + createHttpDriver(), + // Scheduled actions don't have connection params + undefined as unknown as CP, + undefined, + undefined, + undefined, + ); + }, + actionName, + args, + ); + } + async onAlarm() { this.resetSleepTimer(); await this.#scheduleManager.onAlarm(); @@ -825,9 +795,10 @@ export class ActorInstance { await this.stateManager.initializeState(persistData); // Call onCreate lifecycle - if (this.#config.onCreate) { - await this.#config.onCreate(this.actorContext, persistData.input!); - } + await this.#concurrencyManager.executeHook( + this.#config.onCreate, + (handler) => handler(this.actorContext, persistData.input!), + ); } async #restoreExistingActor(persistData: PersistedActor) { @@ -915,57 +886,56 @@ export class ActorInstance { async #callOnStart() { this.#rLog.info({ msg: "actor starting" }); - if (this.#config.onWake) { - const result = this.#config.onWake(this.actorContext); - if (result instanceof Promise) { - await result; - } - } + await this.#concurrencyManager.executeHook( + this.#config.onWake, + (handler) => handler(this.actorContext), + ); } async #callOnSleep() { - if (this.#config.onSleep) { - try { - this.#rLog.debug({ msg: "calling onSleep" }); - const result = this.#config.onSleep(this.actorContext); - if (result instanceof Promise) { - await deadline(result, this.#config.options.onSleepTimeout); - } - this.#rLog.debug({ msg: "onSleep completed" }); - } catch (error) { - if (error instanceof DeadlineError) { - this.#rLog.error({ msg: "onSleep timed out" }); - } else { - this.#rLog.error({ - msg: "error in onSleep", - error: stringifyError(error), - }); - } + try { + this.#rLog.debug({ msg: "calling onSleep" }); + await this.#concurrencyManager.executeHook( + this.#config.onSleep, + (handler) => + deadline( + Promise.resolve(handler(this.actorContext)), + this.#config.options.onSleepTimeout, + ), + ); + this.#rLog.debug({ msg: "onSleep completed" }); + } catch (error) { + if (error instanceof DeadlineError) { + this.#rLog.error({ msg: "onSleep timed out" }); + } else { + this.#rLog.error({ + msg: "error in onSleep", + error: stringifyError(error), + }); } } } async #callOnDestroy() { - if (this.#config.onDestroy) { - try { - this.#rLog.debug({ msg: "calling onDestroy" }); - const result = this.#config.onDestroy(this.actorContext); - if (result instanceof Promise) { - await deadline( - result, + try { + this.#rLog.debug({ msg: "calling onDestroy" }); + await this.#concurrencyManager.executeHook( + this.#config.onDestroy, + (handler) => + deadline( + Promise.resolve(handler(this.actorContext)), this.#config.options.onDestroyTimeout, - ); - } - this.#rLog.debug({ msg: "onDestroy completed" }); - } catch (error) { - if (error instanceof DeadlineError) { - this.#rLog.error({ msg: "onDestroy timed out" }); - } else { - this.#rLog.error({ - msg: "error in onDestroy", - error: stringifyError(error), - }); - } + ), + ); + this.#rLog.debug({ msg: "onDestroy completed" }); + } catch (error) { + if (error instanceof DeadlineError) { + this.#rLog.error({ msg: "onDestroy timed out" }); + } else { + this.#rLog.error({ + msg: "error in onDestroy", + error: stringifyError(error), + }); } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/schedule-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/schedule-manager.ts index 2291b23f07..36d9802c97 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/schedule-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/schedule-manager.ts @@ -260,36 +260,13 @@ export class ScheduleManager { action: event.action, }); - // Look up the action function - const fn = this.#config.actions[event.action]; - - if (!fn) { - throw new Error( - `Missing action for scheduled event: ${event.action}`, - ); - } - - if (typeof fn !== "function") { - throw new Error( - `Scheduled event action ${event.action} is not a function (got ${typeof fn})`, - ); - } - - // Decode arguments and execute + // Decode arguments const args = event.args ? cbor.decode(new Uint8Array(event.args)) : []; - const result = fn.call( - undefined, - this.#actor.actorContext, - ...args, - ); - - // Handle async actions - if (result instanceof Promise) { - await result; - } + // Execute through actor with proper concurrency handling + await this.#actor.executeScheduledAction(event.action, args); this.#actor.log.debug({ msg: "scheduled event completed", diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts index 35250ff7d4..af37fd1f64 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts @@ -345,20 +345,22 @@ export class StateManager { this.#actor.isReady() && !this.#isInOnStateChange ) { - try { - this.#isInOnStateChange = true; - this.#config.onStateChange( - this.#actor.actorContext, - this.#persistRaw.state, - ); - } catch (error) { - this.#actor.rLog.error({ - msg: "error in `_onStateChange`", - error: stringifyError(error), + this.#isInOnStateChange = true; + // Run with concurrency handling (async, runs in background) + this.#actor.concurrencyManager + .executeHook( + this.#config.onStateChange, + (handler) => handler(this.#actor.actorContext, this.#persistRaw.state), + ) + .catch((error) => { + this.#actor.rLog.error({ + msg: "error in `onStateChange`", + error: stringifyError(error), + }); + }) + .finally(() => { + this.#isInOnStateChange = false; }); - } finally { - this.#isInOnStateChange = false; - } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts index bd7d2c7d1e..a5e24db187 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts @@ -4,6 +4,7 @@ import { type ActorConfigInput, ActorConfigSchema, ActorTypes, + action, } from "./config"; import type { AnyDatabaseProvider } from "./database"; import { ActorDefinition } from "./definition"; @@ -71,6 +72,7 @@ export type { UniversalWebSocket, } from "@/common/websocket-interface"; export type { ActorKey } from "@/manager/protocol/query"; +export { action, handler } from "./config"; export type * from "./config"; export type { AnyConn, Conn } from "./conn/mod"; export type { ActionContext } from "./contexts/action"; @@ -95,3 +97,4 @@ export { createActorRouter, } from "./router"; export { routeWebSocket } from "./router-websocket-endpoints"; +export type { AnyDatabaseProvider } from "./database";