Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/next-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { toNextHandler } from "@rivetkit/next-js";
import { registry } from "@/rivet/registry";

export const { GET, POST, HEAD, PATCH, OPTIONS } = toNextHandler(registry);
export const { GET, POST, PUT, PATCH, HEAD, OPTIONS } = toNextHandler(registry);
27 changes: 21 additions & 6 deletions packages/next-js/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,29 @@ export const toNextHandler = (
) => {
// Don't run server locally since we're using the fetch handler directly
inputConfig.disableServer = true;
inputConfig.disableActorDriver = true;

const { fetch } = registry.start(inputConfig);
const { fetch } = registry.startServerless(inputConfig);

const fetchWrapper = async (
request: Request,
{ params }: { params: Promise<{ all: string[] }> },
) => {
const { all } = await params;

const newUrl = new URL(request.url);
newUrl.pathname = all.join("/");
const newReq = new Request(newUrl, request);

return await fetch(newReq);
};

return {
GET: fetch,
POST: fetch,
PATCH: fetch,
HEAD: fetch,
OPTIONS: fetch,
GET: fetchWrapper,
POST: fetchWrapper,
PUT: fetchWrapper,
PATCH: fetchWrapper,
HEAD: fetchWrapper,
OPTIONS: fetchWrapper,
};
};
3 changes: 2 additions & 1 deletion packages/rivetkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@
"tsx": "^4.19.4",
"typescript": "^5.7.3",
"vitest": "^3.1.1",
"ws": "^8.18.1"
"ws": "^8.18.1",
"bufferutil": "^4.0.9"
},
"peerDependencies": {
"@hono/node-server": "^1.14.0",
Expand Down
1 change: 1 addition & 0 deletions packages/rivetkit/scripts/dump-openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ function main() {
registryConfig,
driverConfig,
managerDriver,
undefined,
);

const openApiDoc = openapi.getOpenAPIDocument({
Expand Down
5 changes: 5 additions & 0 deletions packages/rivetkit/src/actor/driver.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Context as HonoContext } from "hono";
import type { CachedSerializer } from "@/actor/protocol/serde";
import type { AnyClient } from "@/client/client";
import type { ManagerDriver } from "@/manager/driver";
Expand Down Expand Up @@ -45,6 +46,10 @@ export interface ActorDriver {
sleep?(actorId: string): Promise<void>;

shutdown?(immediate: boolean): Promise<void>;

// Serverless
/** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */
serverlessHandleStart?(c: HonoContext): Promise<Response>;
}

export enum ConnectionReadyState {
Expand Down
10 changes: 5 additions & 5 deletions packages/rivetkit/src/common/versioned-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ export class VersionedDataHandler<T> {
}

private embedVersion(data: VersionedData<Uint8Array>): Uint8Array {
const versionBytes = new Uint8Array(4);
new DataView(versionBytes.buffer).setUint32(0, data.version, true);
const versionBytes = new Uint8Array(2);
new DataView(versionBytes.buffer).setUint16(0, data.version, true);

const result = new Uint8Array(versionBytes.length + data.data.length);
result.set(versionBytes);
Expand All @@ -74,15 +74,15 @@ export class VersionedDataHandler<T> {
}

private extractVersion(bytes: Uint8Array): VersionedData<Uint8Array> {
if (bytes.length < 4) {
if (bytes.length < 2) {
throw new Error("Invalid versioned data: too short");
}

const version = new DataView(bytes.buffer, bytes.byteOffset).getUint32(
const version = new DataView(bytes.buffer, bytes.byteOffset).getUint16(
0,
true,
);
const data = bytes.slice(4);
const data = bytes.slice(2);

return { version, data };
}
Expand Down
1 change: 1 addition & 0 deletions packages/rivetkit/src/driver-test-suite/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ export async function createTestRuntime(
registry.config,
config,
managerDriver,
undefined,
);

// Inject WebSocket
Expand Down
24 changes: 23 additions & 1 deletion packages/rivetkit/src/drivers/engine/actor-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type {
} from "@rivetkit/engine-runner";
import { Runner } from "@rivetkit/engine-runner";
import * as cbor from "cbor-x";
import type { Context as HonoContext } from "hono";
import { streamSSE } from "hono/streaming";
import { WSContext } from "hono/ws";
import invariant from "invariant";
import { lookupInRegistry } from "@/actor/definition";
Expand Down Expand Up @@ -63,6 +65,9 @@ export class EngineActorDriver implements ActorDriver {
#actorRouter: ActorRouter;
#version: number = 1; // Version for the runner protocol

#runnerStarted: PromiseWithResolvers<undefined> = Promise.withResolvers();
#runnerStopped: PromiseWithResolvers<undefined> = Promise.withResolvers();

constructor(
registryConfig: RegistryConfig,
runConfig: RunConfig,
Expand Down Expand Up @@ -111,6 +116,8 @@ export class EngineActorDriver implements ActorDriver {
runnerName: this.#config.runnerName,
});
}

this.#runnerStarted.resolve(undefined);
},
onDisconnected: () => {
logger().warn({
Expand All @@ -120,7 +127,9 @@ export class EngineActorDriver implements ActorDriver {
});
hasDisconnected = true;
},
onShutdown: () => {},
onShutdown: () => {
this.#runnerStopped.resolve(undefined);
},
fetch: this.#runnerFetch.bind(this),
websocket: this.#runnerWebSocket.bind(this),
onActorStart: this.#runnerOnActorStart.bind(this),
Expand Down Expand Up @@ -381,4 +390,17 @@ export class EngineActorDriver implements ActorDriver {
logger().info({ msg: "stopping engine actor driver" });
await this.#runner.shutdown(immediate);
}

async serverlessHandleStart(c: HonoContext): Promise<Response> {
await this.#runnerStarted.promise;

return streamSSE(c, async (stream) => {
// Runner id should be set if the runner started
const runnerId = this.#runner.runnerId;
invariant(runnerId, "runnerId not set");
stream.writeSSE({ data: runnerId });

return this.#runnerStopped.promise;
});
}
}
2 changes: 2 additions & 0 deletions packages/rivetkit/src/drivers/file-system/actor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Hono, MiddlewareHandler } from "hono";
import { streamSSE } from "hono/streaming";
import type { GenericConnGlobalState } from "@/actor/generic-conn-driver";
import { loggerWithoutContext } from "@/actor/log";
import type { AnyClient } from "@/client/client";
Expand Down
71 changes: 62 additions & 9 deletions packages/rivetkit/src/manager/router.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { createRoute, OpenAPIHono } from "@hono/zod-openapi";
import * as cbor from "cbor-x";
import { Hono, Context as HonoContext, Next } from "hono";
import {
Hono,
Context as HonoContext,
type MiddlewareHandler,
Next,
} from "hono";
import { cors as corsMiddleware } from "hono/cors";
import { createMiddleware } from "hono/factory";
import { streamSSE } from "hono/streaming";
import invariant from "invariant";
import { z } from "zod";
import { ActorNotFound, Unsupported } from "@/actor/errors";
Expand All @@ -21,7 +27,7 @@ import {
loggerMiddleware,
} from "@/common/router";
import { deconstructError, noopNext } from "@/common/utils";
import { HEADER_ACTOR_ID } from "@/driver-helpers/mod";
import { type ActorDriver, HEADER_ACTOR_ID } from "@/driver-helpers/mod";
import type {
TestInlineDriverCallRequest,
TestInlineDriverCallResponse,
Expand Down Expand Up @@ -72,7 +78,8 @@ export function createManagerRouter(
registryConfig: RegistryConfig,
runConfig: RunConfig,
managerDriver: ManagerDriver,
): { router: Hono; openapi: OpenAPIHono } {
serverlessActorDriverBuilder: (() => ActorDriver) | undefined,
): { router: Hono; openapi: OpenAPIHono; cors: MiddlewareHandler } {
const router = new OpenAPIHono({ strict: false }).basePath(
runConfig.basePath,
);
Expand All @@ -83,6 +90,53 @@ export function createManagerRouter(
? corsMiddleware(runConfig.cors)
: createMiddleware((_c, next) => next());

if (serverlessActorDriverBuilder) {
addServerlessRoutes(serverlessActorDriverBuilder, router, cors);
} else {
addManagerRoutes(registryConfig, runConfig, managerDriver, router, cors);
}

// Error handling
router.notFound(handleRouteNotFound);
router.onError(handleRouteError);

return { router: router as Hono, openapi: router, cors };
}

function addServerlessRoutes(
serverlessActorDriverBuilder: () => ActorDriver,
router: OpenAPIHono,
cors: MiddlewareHandler,
) {
// GET /
router.get("/", cors, (c) => {
return c.text(
"This is a RivetKit server.\n\nLearn more at https://rivetkit.org",
);
});

// Serverless start endpoint
router.get("/start", cors, async (c) => {
const actorDriver = serverlessActorDriverBuilder();
invariant(
actorDriver.serverlessHandleStart,
"missing serverlessHandleStart on ActorDriver",
);
return await actorDriver.serverlessHandleStart(c);
});

router.get("/health", cors, (c) => {
return c.text("ok");
});
}

function addManagerRoutes(
registryConfig: RegistryConfig,
runConfig: RunConfig,
managerDriver: ManagerDriver,
router: OpenAPIHono,
cors: MiddlewareHandler,
) {
// Actor gateway
router.use("*", cors, actorGateway.bind(undefined, runConfig, managerDriver));

Expand Down Expand Up @@ -405,6 +459,7 @@ export function createManagerRouter(
method: c.req.method,
headers: c.req.raw.headers,
body: c.req.raw.body,
duplex: "half",
}),
);

Expand Down Expand Up @@ -432,6 +487,10 @@ export function createManagerRouter(
});
}

router.get("/health", cors, (c) => {
return c.text("ok");
});

managerDriver.modifyManagerRouter?.(
registryConfig,
router as unknown as Hono,
Expand All @@ -453,12 +512,6 @@ export function createManagerRouter(
.route("/", createManagerInspectorRouter()),
);
}

// Error handling
router.notFound(handleRouteNotFound);
router.onError(handleRouteError);

return { router: router as Hono, openapi: router };
}

function createApiActor(actor: ActorOutput): ApiActor {
Expand Down
Loading
Loading