From c313f20b26a15d6fd7a751154d83779f0bab3fd3 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 18 Sep 2025 12:15:43 -0700 Subject: [PATCH] feat: serverless --- examples/next-js/package.json | 1 + .../api/{registry => rivet}/[...all]/route.ts | 2 +- packages/next-js/src/mod.ts | 27 +++- packages/rivetkit/package.json | 3 +- packages/rivetkit/scripts/dump-openapi.ts | 1 + packages/rivetkit/src/actor/driver.ts | 5 + .../rivetkit/src/common/versioned-data.ts | 10 +- .../rivetkit/src/driver-test-suite/mod.ts | 1 + .../src/drivers/engine/actor-driver.ts | 24 +++- .../rivetkit/src/drivers/file-system/actor.ts | 2 + packages/rivetkit/src/manager/router.ts | 71 +++++++++-- packages/rivetkit/src/registry/mod.ts | 115 +++++++++++++++++- .../actor-http-client.ts | 25 ++-- packages/rivetkit/src/test/mod.ts | 1 + scripts/nuke-cache.sh | 74 +++++++++++ 15 files changed, 319 insertions(+), 43 deletions(-) rename examples/next-js/src/app/api/{registry => rivet}/[...all]/route.ts (54%) create mode 100755 scripts/nuke-cache.sh diff --git a/examples/next-js/package.json b/examples/next-js/package.json index 49667391f..85368dfa2 100644 --- a/examples/next-js/package.json +++ b/examples/next-js/package.json @@ -4,6 +4,7 @@ "private": true, "scripts": { "dev": "next dev", + "build": "next build", "start": "next start", "lint": "next lint" }, diff --git a/examples/next-js/src/app/api/registry/[...all]/route.ts b/examples/next-js/src/app/api/rivet/[...all]/route.ts similarity index 54% rename from examples/next-js/src/app/api/registry/[...all]/route.ts rename to examples/next-js/src/app/api/rivet/[...all]/route.ts index f26406286..6c9669741 100644 --- a/examples/next-js/src/app/api/registry/[...all]/route.ts +++ b/examples/next-js/src/app/api/rivet/[...all]/route.ts @@ -1,4 +1,4 @@ import { toNextHandler } from "@rivetkit/next-js"; import { registry } from "@/rivet/registry"; -export const { GET, POST, HEAD, PATCH, OPTIONS } = toNextHandler(registry); +export const { GET, POST, PUT, PATCH, HEAD, OPTIONS } = toNextHandler(registry); diff --git a/packages/next-js/src/mod.ts b/packages/next-js/src/mod.ts index a1eeaa11e..1cc75ee43 100644 --- a/packages/next-js/src/mod.ts +++ b/packages/next-js/src/mod.ts @@ -6,14 +6,29 @@ export const toNextHandler = ( ) => { // Don't run server locally since we're using the fetch handler directly inputConfig.disableServer = true; + inputConfig.disableActorDriver = true; - const { fetch } = registry.start(inputConfig); + const { fetch } = registry.startServerless(inputConfig); + + const fetchWrapper = async ( + request: Request, + { params }: { params: Promise<{ all: string[] }> }, + ) => { + const { all } = await params; + + const newUrl = new URL(request.url); + newUrl.pathname = all.join("/"); + const newReq = new Request(newUrl, request); + + return await fetch(newReq); + }; return { - GET: fetch, - POST: fetch, - PATCH: fetch, - HEAD: fetch, - OPTIONS: fetch, + GET: fetchWrapper, + POST: fetchWrapper, + PUT: fetchWrapper, + PATCH: fetchWrapper, + HEAD: fetchWrapper, + OPTIONS: fetchWrapper, }; }; diff --git a/packages/rivetkit/package.json b/packages/rivetkit/package.json index f1141bbe1..661a4db36 100644 --- a/packages/rivetkit/package.json +++ b/packages/rivetkit/package.json @@ -188,7 +188,8 @@ "tsx": "^4.19.4", "typescript": "^5.7.3", "vitest": "^3.1.1", - "ws": "^8.18.1" + "ws": "^8.18.1", + "bufferutil": "^4.0.9" }, "peerDependencies": { "@hono/node-server": "^1.14.0", diff --git a/packages/rivetkit/scripts/dump-openapi.ts b/packages/rivetkit/scripts/dump-openapi.ts index 0d709d616..834b1072d 100644 --- a/packages/rivetkit/scripts/dump-openapi.ts +++ b/packages/rivetkit/scripts/dump-openapi.ts @@ -38,6 +38,7 @@ function main() { registryConfig, driverConfig, managerDriver, + undefined, ); const openApiDoc = openapi.getOpenAPIDocument({ diff --git a/packages/rivetkit/src/actor/driver.ts b/packages/rivetkit/src/actor/driver.ts index fbee281b4..c813747be 100644 --- a/packages/rivetkit/src/actor/driver.ts +++ b/packages/rivetkit/src/actor/driver.ts @@ -1,3 +1,4 @@ +import type { Context as HonoContext } from "hono"; import type { CachedSerializer } from "@/actor/protocol/serde"; import type { AnyClient } from "@/client/client"; import type { ManagerDriver } from "@/manager/driver"; @@ -45,6 +46,10 @@ export interface ActorDriver { sleep?(actorId: string): Promise; shutdown?(immediate: boolean): Promise; + + // Serverless + /** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */ + serverlessHandleStart?(c: HonoContext): Promise; } export enum ConnectionReadyState { diff --git a/packages/rivetkit/src/common/versioned-data.ts b/packages/rivetkit/src/common/versioned-data.ts index 800c962af..78d93fa40 100644 --- a/packages/rivetkit/src/common/versioned-data.ts +++ b/packages/rivetkit/src/common/versioned-data.ts @@ -63,8 +63,8 @@ export class VersionedDataHandler { } private embedVersion(data: VersionedData): Uint8Array { - const versionBytes = new Uint8Array(4); - new DataView(versionBytes.buffer).setUint32(0, data.version, true); + const versionBytes = new Uint8Array(2); + new DataView(versionBytes.buffer).setUint16(0, data.version, true); const result = new Uint8Array(versionBytes.length + data.data.length); result.set(versionBytes); @@ -74,15 +74,15 @@ export class VersionedDataHandler { } private extractVersion(bytes: Uint8Array): VersionedData { - if (bytes.length < 4) { + if (bytes.length < 2) { throw new Error("Invalid versioned data: too short"); } - const version = new DataView(bytes.buffer, bytes.byteOffset).getUint32( + const version = new DataView(bytes.buffer, bytes.byteOffset).getUint16( 0, true, ); - const data = bytes.slice(4); + const data = bytes.slice(2); return { version, data }; } diff --git a/packages/rivetkit/src/driver-test-suite/mod.ts b/packages/rivetkit/src/driver-test-suite/mod.ts index bfe87bf52..e310fee60 100644 --- a/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/packages/rivetkit/src/driver-test-suite/mod.ts @@ -199,6 +199,7 @@ export async function createTestRuntime( registry.config, config, managerDriver, + undefined, ); // Inject WebSocket diff --git a/packages/rivetkit/src/drivers/engine/actor-driver.ts b/packages/rivetkit/src/drivers/engine/actor-driver.ts index 1f5d2b0da..a20ff4e25 100644 --- a/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -4,6 +4,8 @@ import type { } from "@rivetkit/engine-runner"; import { Runner } from "@rivetkit/engine-runner"; import * as cbor from "cbor-x"; +import type { Context as HonoContext } from "hono"; +import { streamSSE } from "hono/streaming"; import { WSContext } from "hono/ws"; import invariant from "invariant"; import { lookupInRegistry } from "@/actor/definition"; @@ -63,6 +65,9 @@ export class EngineActorDriver implements ActorDriver { #actorRouter: ActorRouter; #version: number = 1; // Version for the runner protocol + #runnerStarted: PromiseWithResolvers = Promise.withResolvers(); + #runnerStopped: PromiseWithResolvers = Promise.withResolvers(); + constructor( registryConfig: RegistryConfig, runConfig: RunConfig, @@ -111,6 +116,8 @@ export class EngineActorDriver implements ActorDriver { runnerName: this.#config.runnerName, }); } + + this.#runnerStarted.resolve(undefined); }, onDisconnected: () => { logger().warn({ @@ -120,7 +127,9 @@ export class EngineActorDriver implements ActorDriver { }); hasDisconnected = true; }, - onShutdown: () => {}, + onShutdown: () => { + this.#runnerStopped.resolve(undefined); + }, fetch: this.#runnerFetch.bind(this), websocket: this.#runnerWebSocket.bind(this), onActorStart: this.#runnerOnActorStart.bind(this), @@ -381,4 +390,17 @@ export class EngineActorDriver implements ActorDriver { logger().info({ msg: "stopping engine actor driver" }); await this.#runner.shutdown(immediate); } + + async serverlessHandleStart(c: HonoContext): Promise { + await this.#runnerStarted.promise; + + return streamSSE(c, async (stream) => { + // Runner id should be set if the runner started + const runnerId = this.#runner.runnerId; + invariant(runnerId, "runnerId not set"); + stream.writeSSE({ data: runnerId }); + + return this.#runnerStopped.promise; + }); + } } diff --git a/packages/rivetkit/src/drivers/file-system/actor.ts b/packages/rivetkit/src/drivers/file-system/actor.ts index 57cf53e2e..bf3dbf26a 100644 --- a/packages/rivetkit/src/drivers/file-system/actor.ts +++ b/packages/rivetkit/src/drivers/file-system/actor.ts @@ -1,3 +1,5 @@ +import { Hono, MiddlewareHandler } from "hono"; +import { streamSSE } from "hono/streaming"; import type { GenericConnGlobalState } from "@/actor/generic-conn-driver"; import { loggerWithoutContext } from "@/actor/log"; import type { AnyClient } from "@/client/client"; diff --git a/packages/rivetkit/src/manager/router.ts b/packages/rivetkit/src/manager/router.ts index d1252bc31..902dd8556 100644 --- a/packages/rivetkit/src/manager/router.ts +++ b/packages/rivetkit/src/manager/router.ts @@ -1,8 +1,14 @@ import { createRoute, OpenAPIHono } from "@hono/zod-openapi"; import * as cbor from "cbor-x"; -import { Hono, Context as HonoContext, Next } from "hono"; +import { + Hono, + Context as HonoContext, + type MiddlewareHandler, + Next, +} from "hono"; import { cors as corsMiddleware } from "hono/cors"; import { createMiddleware } from "hono/factory"; +import { streamSSE } from "hono/streaming"; import invariant from "invariant"; import { z } from "zod"; import { ActorNotFound, Unsupported } from "@/actor/errors"; @@ -21,7 +27,7 @@ import { loggerMiddleware, } from "@/common/router"; import { deconstructError, noopNext } from "@/common/utils"; -import { HEADER_ACTOR_ID } from "@/driver-helpers/mod"; +import { type ActorDriver, HEADER_ACTOR_ID } from "@/driver-helpers/mod"; import type { TestInlineDriverCallRequest, TestInlineDriverCallResponse, @@ -72,7 +78,8 @@ export function createManagerRouter( registryConfig: RegistryConfig, runConfig: RunConfig, managerDriver: ManagerDriver, -): { router: Hono; openapi: OpenAPIHono } { + serverlessActorDriverBuilder: (() => ActorDriver) | undefined, +): { router: Hono; openapi: OpenAPIHono; cors: MiddlewareHandler } { const router = new OpenAPIHono({ strict: false }).basePath( runConfig.basePath, ); @@ -83,6 +90,53 @@ export function createManagerRouter( ? corsMiddleware(runConfig.cors) : createMiddleware((_c, next) => next()); + if (serverlessActorDriverBuilder) { + addServerlessRoutes(serverlessActorDriverBuilder, router, cors); + } else { + addManagerRoutes(registryConfig, runConfig, managerDriver, router, cors); + } + + // Error handling + router.notFound(handleRouteNotFound); + router.onError(handleRouteError); + + return { router: router as Hono, openapi: router, cors }; +} + +function addServerlessRoutes( + serverlessActorDriverBuilder: () => ActorDriver, + router: OpenAPIHono, + cors: MiddlewareHandler, +) { + // GET / + router.get("/", cors, (c) => { + return c.text( + "This is a RivetKit server.\n\nLearn more at https://rivetkit.org", + ); + }); + + // Serverless start endpoint + router.get("/start", cors, async (c) => { + const actorDriver = serverlessActorDriverBuilder(); + invariant( + actorDriver.serverlessHandleStart, + "missing serverlessHandleStart on ActorDriver", + ); + return await actorDriver.serverlessHandleStart(c); + }); + + router.get("/health", cors, (c) => { + return c.text("ok"); + }); +} + +function addManagerRoutes( + registryConfig: RegistryConfig, + runConfig: RunConfig, + managerDriver: ManagerDriver, + router: OpenAPIHono, + cors: MiddlewareHandler, +) { // Actor gateway router.use("*", cors, actorGateway.bind(undefined, runConfig, managerDriver)); @@ -405,6 +459,7 @@ export function createManagerRouter( method: c.req.method, headers: c.req.raw.headers, body: c.req.raw.body, + duplex: "half", }), ); @@ -432,6 +487,10 @@ export function createManagerRouter( }); } + router.get("/health", cors, (c) => { + return c.text("ok"); + }); + managerDriver.modifyManagerRouter?.( registryConfig, router as unknown as Hono, @@ -453,12 +512,6 @@ export function createManagerRouter( .route("/", createManagerInspectorRouter()), ); } - - // Error handling - router.notFound(handleRouteNotFound); - router.onError(handleRouteError); - - return { router: router as Hono, openapi: router }; } function createApiActor(actor: ActorOutput): ApiActor { diff --git a/packages/rivetkit/src/registry/mod.ts b/packages/rivetkit/src/registry/mod.ts index 8aafa0993..7c4ed7d03 100644 --- a/packages/rivetkit/src/registry/mod.ts +++ b/packages/rivetkit/src/registry/mod.ts @@ -1,5 +1,6 @@ import { type Client, createClientWithDriver } from "@/client/client"; import { configureBaseLogger, configureDefaultLogger } from "@/common/log"; +import type { ActorDriver } from "@/driver-helpers/mod"; import { chooseDefaultDriver } from "@/drivers/default"; import { configureInspectorAccessToken, @@ -81,11 +82,6 @@ export class Registry { // Create router const managerDriver = driver.manager(this.#config, config); configureInspectorAccessToken(config, managerDriver); - const { router: hono } = createManagerRouter( - this.#config, - config, - managerDriver, - ); // Create client const client = createClientWithDriver(managerDriver, config); @@ -130,6 +126,115 @@ export class Registry { ); } + const { router: hono } = createManagerRouter( + this.#config, + config, + managerDriver, + undefined, + ); + + // Start server + if (!config.disableServer) { + (async () => { + const out = await crossPlatformServe(hono, undefined); + upgradeWebSocket = out.upgradeWebSocket; + })(); + } + + return { + client, + fetch: hono.fetch.bind(hono), + }; + } + + public startServerless(inputConfig?: RunConfigInput): ServerOutput { + const config = RunConfigSchema.parse(inputConfig); + + // Configure logger + if (config.logging?.baseLogger) { + // Use provided base logger + configureBaseLogger(config.logging.baseLogger); + } else { + // Configure default logger with log level from config + // getPinoLevel will handle env variable priority + configureDefaultLogger(config.logging?.level); + } + + // Choose the driver based on configuration + const driver = chooseDefaultDriver(config); + + // TODO: Find cleaner way of disabling by default + if (driver.name === "engine") { + config.inspector.enabled = false; + config.disableServer = true; + config.disableActorDriver = true; + } + if (driver.name === "cloudflare-workers") { + config.inspector.enabled = false; + config.disableServer = true; + config.disableActorDriver = true; + config.noWelcome = true; + } + + // Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe + let upgradeWebSocket: any; + if (!config.getUpgradeWebSocket) { + config.getUpgradeWebSocket = () => upgradeWebSocket!; + } + + // Create router + const managerDriver = driver.manager(this.#config, config); + + // Create client + const client = createClientWithDriver(managerDriver, config); + + const driverLog = managerDriver.extraStartupLog?.() ?? {}; + logger().info({ + msg: "rivetkit ready", + driver: driver.name, + definitions: Object.keys(this.#config.use).length, + ...driverLog, + }); + if (config.inspector?.enabled && managerDriver.inspector) { + logger().info({ msg: "inspector ready", url: getInspectorUrl(config) }); + } + + // Print welcome information + if (!config.noWelcome) { + const displayInfo = managerDriver.displayInformation(); + console.log(); + console.log(` RivetKit ${pkg.version} (${displayInfo.name})`); + console.log(` - Endpoint: http://127.0.0.1:6420`); + for (const [k, v] of Object.entries(displayInfo.properties)) { + const padding = " ".repeat(Math.max(0, 13 - k.length)); + console.log(` - ${k}:${padding}${v}`); + } + if (config.inspector?.enabled && managerDriver.inspector) { + console.log(` - Inspector: ${getInspectorUrl(config)}`); + } + console.log(); + } + + let serverlessActorDriverBuilder: (() => ActorDriver) | undefined = () => { + return driver.actor(this.#config, config, managerDriver, client); + }; + + // HACK: We need to find a better way to let the driver itself decide when to start the actor driver + // Create runner + // + // Even though we do not use the return value, this is required to start the code that will handle incoming actors + if (!config.disableActorDriver) { + const _actorDriver = serverlessActorDriverBuilder(); + serverlessActorDriverBuilder = undefined; + } + + const { router: hono } = createManagerRouter( + this.#config, + config, + managerDriver, + serverlessActorDriverBuilder, + ); + // Start server if (!config.disableServer) { (async () => { diff --git a/packages/rivetkit/src/remote-manager-driver/actor-http-client.ts b/packages/rivetkit/src/remote-manager-driver/actor-http-client.ts index afa33fee7..1e5e7eee1 100644 --- a/packages/rivetkit/src/remote-manager-driver/actor-http-client.ts +++ b/packages/rivetkit/src/remote-manager-driver/actor-http-client.ts @@ -16,28 +16,23 @@ export async function sendHttpRequestToActor( let bodyToSend: ArrayBuffer | null = null; const guardHeaders = buildGuardHeadersForHttp(actorRequest, actorId); - if ( - actorRequest.body && - actorRequest.method !== "GET" && - actorRequest.method !== "HEAD" - ) { + if (actorRequest.method !== "GET" && actorRequest.method !== "HEAD") { if (actorRequest.bodyUsed) { throw new Error("Request body has already been consumed"); } // TODO: This buffers the entire request in memory every time. We // need to properly implement streaming bodies. - // Clone and read the body to ensure it can be sent - const clonedRequest = actorRequest.clone(); - bodyToSend = await clonedRequest.arrayBuffer(); + const reqBody = await actorRequest.arrayBuffer(); - // If this is a streaming request, we need to convert the headers - // for the basic array buffer - guardHeaders.delete("transfer-encoding"); - guardHeaders.set( - "content-length", - String((bodyToSend as ArrayBuffer).byteLength), - ); + if (reqBody.byteLength !== 0) { + bodyToSend = reqBody; + + // If this is a streaming request, we need to convert the headers + // for the basic array buffer + guardHeaders.delete("transfer-encoding"); + guardHeaders.set("content-length", String(bodyToSend.byteLength)); + } } const guardRequest = new Request(guardUrl, { diff --git a/packages/rivetkit/src/test/mod.ts b/packages/rivetkit/src/test/mod.ts index 153b5beec..791f899e3 100644 --- a/packages/rivetkit/src/test/mod.ts +++ b/packages/rivetkit/src/test/mod.ts @@ -35,6 +35,7 @@ function serve(registry: Registry, inputConfig?: InputConfig): ServerType { registry.config, runConfig, managerDriver, + undefined, ); // Inject WebSocket diff --git a/scripts/nuke-cache.sh b/scripts/nuke-cache.sh new file mode 100755 index 000000000..f5985e4ea --- /dev/null +++ b/scripts/nuke-cache.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Clear JavaScript-related cache folders recursively +# Usage: ./clear_js_cache.sh [directory] + +# Set the target directory (default to current directory) +TARGET_DIR="${1:-.}" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${YELLOW}Clearing JavaScript cache folders in: $TARGET_DIR${NC}" + +# Counter for removed folders +removed_count=0 + +# Function to safely remove directory +remove_dir() { + local dir="$1" + if [ -d "$dir" ]; then + echo -e "Removing: ${RED}$dir${NC}" + rm -rf "$dir" + ((removed_count++)) + fi +} + +# Find and remove cache folders +while IFS= read -r -d '' dir; do + remove_dir "$dir" +done < <(find "$TARGET_DIR" -type d \( \ + -name "node_modules" -o \ + -name ".next" -o \ + -name ".nuxt" -o \ + -name "dist" -o \ + -name "build" -o \ + -name ".cache" -o \ + -name ".parcel-cache" -o \ + -name ".webpack" -o \ + -name ".rollup.cache" -o \ + -name ".vite" -o \ + -name ".turbo" -o \ + -name ".nx" -o \ + -name "coverage" -o \ + -name ".nyc_output" -o \ + -name "out" -o \ + -name ".output" -o \ + -name ".vercel" -o \ + -name ".netlify" -o \ + -name "storybook-static" \ +\) -print0) + +# Also remove common cache files +echo -e "${YELLOW}Removing cache files...${NC}" +find "$TARGET_DIR" -type f \( \ + -name "*.log" -o \ + -name ".DS_Store" -o \ + -name "Thumbs.db" -o \ + -name "*.tmp" -o \ + -name "*.temp" \ +\) -delete 2>/dev/null + +# Remove package-lock.json and yarn.lock (optional - uncomment if needed) +# find "$TARGET_DIR" -name "package-lock.json" -delete 2>/dev/null +# find "$TARGET_DIR" -name "yarn.lock" -delete 2>/dev/null + +echo -e "${GREEN}✓ Cleanup complete! Removed $removed_count cache folders.${NC}" + +# Optional: Show disk space freed (requires du command) +if command -v du >/dev/null 2>&1; then + echo -e "${YELLOW}Disk space check completed.${NC}" +fi