diff --git a/examples/kitchen-sink/src/backend/actors/demo.ts b/examples/kitchen-sink/src/backend/actors/demo.ts index 03aa01b311..8dc4f6d24e 100644 --- a/examples/kitchen-sink/src/backend/actors/demo.ts +++ b/examples/kitchen-sink/src/backend/actors/demo.ts @@ -157,6 +157,6 @@ export const demo = actor({ ...websocketActions, }, options: { - sleepTimeout: 10_000, + sleepTimeout: 5_000, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts index 5b732f5bae..25f9aeee8d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts @@ -35,9 +35,17 @@ export interface ActorDriver { */ getDatabase(actorId: string): Promise; - sleep?(actorId: string): Promise; + /** + * Requests the actor to go to sleep. + * + * This will call `_stop` independently. + */ + startSleep?(actorId: string): void; - shutdown?(immediate: boolean): Promise; + /** + * Shuts down the actor runner. + */ + shutdownRunner?(immediate: boolean): Promise; // Serverless /** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts index 573735aab3..e316b06dc8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts @@ -17,6 +17,7 @@ import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { arrayBuffersEqual, bufferToArrayBuffer, + EXTRA_ERROR_LOG, getEnvUniversal, promiseWithResolvers, SinglePromiseQueue, @@ -194,8 +195,11 @@ export class ActorInstance { #sleepTimeout?: NodeJS.Timeout; - // Track active raw requests so sleep logic can account for them - #activeRawFetchCount = 0; + /** + * Track active HTTP requests through Hono router so sleep logic can + * account for them. Does not include WebSockets. + **/ + #activeHonoHttpRequests = 0; #activeRawWebSockets = new Set(); #schedule!: Schedule; @@ -287,7 +291,7 @@ export class ActorInstance { } get #sleepingSupported(): boolean { - return this.#actorDriver.sleep !== undefined; + return this.#actorDriver.startSleep !== undefined; } /** @@ -1517,10 +1521,6 @@ export class ActorInstance { throw new errors.FetchHandlerNotDefined(); } - // Track active raw fetch while handler runs - this.#activeRawFetchCount++; - this.#resetSleepTimer(); - try { const response = await this.#config.onFetch( this.actorContext, @@ -1538,12 +1538,6 @@ export class ActorInstance { }); throw error; } finally { - // Decrement active raw fetch counter and re-evaluate sleep - this.#activeRawFetchCount = Math.max( - 0, - this.#activeRawFetchCount - 1, - ); - this.#resetSleepTimer(); this.#savePersistThrottled(); } } @@ -1880,6 +1874,29 @@ export class ActorInstance { } } + /** + * Called by router middleware when an HTTP request begins. + */ + __beginHonoHttpRequest() { + this.#activeHonoHttpRequests++; + this.#resetSleepTimer(); + } + + /** + * Called by router middleware when an HTTP request ends. + */ + __endHonoHttpRequest() { + this.#activeHonoHttpRequests--; + if (this.#activeHonoHttpRequests < 0) { + this.#activeHonoHttpRequests = 0; + this.#rLog.warn({ + msg: "active hono requests went below 0, this is a RivetKit bug", + ...EXTRA_ERROR_LOG, + }); + } + this.#resetSleepTimer(); + } + // MARK: Sleep /** * Reset timer from the last actor interaction that allows it to be put to sleep. @@ -1900,6 +1917,7 @@ export class ActorInstance { msg: "resetting sleep timer", canSleep, existingTimeout: !!this.#sleepTimeout, + timeout: this.#config.options.sleepTimeout, }); if (this.#sleepTimeout) { @@ -1912,12 +1930,7 @@ export class ActorInstance { if (canSleep) { this.#sleepTimeout = setTimeout(() => { - this._sleep().catch((error) => { - this.#rLog.error({ - msg: "error during sleep", - error: stringifyError(error), - }); - }); + this._sleep(); }, this.#config.options.sleepTimeout); } } @@ -1935,9 +1948,10 @@ export class ActorInstance { if (conn.status === "connected") return false; } - // Do not sleep if raw fetches are in-flight - if (this.#activeRawFetchCount > 0) return false; + // Do not sleep if Hono HTTP requests are in-flight + if (this.#activeHonoHttpRequests > 0) return false; + // TODO: When WS hibernation is ready, update this to only count non-hibernatable websockets // Do not sleep if there are raw websockets open if (this.#activeRawWebSockets.size > 0) return false; @@ -1945,8 +1959,8 @@ export class ActorInstance { } /** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */ - async _sleep() { - const sleep = this.#actorDriver.sleep?.bind( + _sleep() { + const sleep = this.#actorDriver.startSleep?.bind( this.#actorDriver, this.#actorId, ); @@ -1962,11 +1976,11 @@ export class ActorInstance { this.#rLog.info({ msg: "actor sleeping" }); // Schedule sleep to happen on the next tick. This allows for any action that calls _sleep to complete. - setImmediate(async () => { + setImmediate(() => { // The actor driver should call stop when ready to stop // // This will call _stop once Pegboard responds with the new status - await sleep(); + sleep(); }); } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts index 517b3e4232..61b00b987b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts @@ -78,6 +78,17 @@ export function createActorRouter( router.use("*", loggerMiddleware(loggerWithoutContext())); + // Track all HTTP requests to prevent actor from sleeping during active requests + router.use("*", async (c, next) => { + const actor = await actorDriver.loadActor(c.env.actorId); + actor.__beginHonoHttpRequest(); + try { + await next(); + } finally { + actor.__endHonoHttpRequest(); + } + }); + router.get("/", (c) => { return c.text( "This is an RivetKit actor.\n\nLearn more at https://rivetkit.org", diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index f1cc9bc1a6..f31768595b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -550,11 +550,11 @@ export class EngineActorDriver implements ActorDriver { }); } - async sleep(actorId: string) { + startSleep(actorId: string) { this.#runner.sleepActor(actorId); } - async shutdown(immediate: boolean): Promise { + async shutdownRunner(immediate: boolean): Promise { logger().info({ msg: "stopping engine actor driver" }); await this.#runner.shutdown(immediate); } @@ -565,7 +565,7 @@ export class EngineActorDriver implements ActorDriver { stream.onAbort(() => {}); c.req.raw.signal.addEventListener("abort", () => { logger().debug("SSE aborted, shutting down runner"); - this.shutdown(true); + this.shutdownRunner(true); }); await this.#runnerStarted.promise; diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts index 63ee6add49..0ff7b47705 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts @@ -79,7 +79,8 @@ export class FileSystemActorDriver implements ActorDriver { return this.#state.createDatabase(actorId); } - sleep(actorId: string): Promise { - return this.#state.sleepActor(actorId); + startSleep(actorId: string): void { + // Spawns the sleepActor promise + this.#state.sleepActor(actorId); } } diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts index db733e58d8..4357f986ff 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts @@ -78,7 +78,7 @@ runDriverTests({ }, driver: driverConfig, cleanup: async () => { - await actorDriver.shutdown?.(true); + await actorDriver.shutdownRunner?.(true); }, }; },