Skip to content

Commit d69006b

Browse files
committed
feat(rivetkit): integrate websocket hibernation (#3301)
1 parent cdb01b7 commit d69006b

File tree

18 files changed

+520
-55
lines changed

18 files changed

+520
-55
lines changed

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export interface RunnerConfig {
6262
config: ActorConfig,
6363
) => Promise<void>;
6464
onActorStop: (actorId: string, generation: number) => Promise<void>;
65-
getActorHibernationConfig: (actorId: string, requestId: ArrayBuffer) => HibernationConfig;
65+
getActorHibernationConfig: (actorId: string, requestId: ArrayBuffer, request: Request) => HibernationConfig;
6666
noAutoShutdown?: boolean;
6767
}
6868

engine/sdks/typescript/runner/src/tunnel.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -541,21 +541,10 @@ export class Tunnel {
541541
// Store adapter
542542
this.#actorWebSockets.set(webSocketId, adapter);
543543

544-
// Send open confirmation
545-
let hibernationConfig = this.#runner.config.getActorHibernationConfig(actor.actorId, requestId);
546-
this.#sendMessage(requestId, {
547-
tag: "ToServerWebSocketOpen",
548-
val: {
549-
canHibernate: hibernationConfig.enabled,
550-
lastMsgIndex: BigInt(hibernationConfig.lastMsgIndex ?? -1),
551-
},
552-
});
553-
554-
// Notify adapter that connection is open
555-
adapter._handleOpen(requestId);
556-
557-
// Create a minimal request object for the websocket handler
558-
// Include original headers from the open message
544+
// Convert headers to map
545+
//
546+
// We need to manually ensure the original Upgrade/Connection WS
547+
// headers are present
559548
const headerInit: Record<string, string> = {};
560549
if (open.headers) {
561550
for (const [k, v] of open.headers as ReadonlyMap<
@@ -565,7 +554,6 @@ export class Tunnel {
565554
headerInit[k] = v;
566555
}
567556
}
568-
// Ensure websocket upgrade headers are present
569557
headerInit["Upgrade"] = "websocket";
570558
headerInit["Connection"] = "Upgrade";
571559

@@ -574,6 +562,21 @@ export class Tunnel {
574562
headers: headerInit,
575563
});
576564

565+
// Send open confirmation
566+
let hibernationConfig = this.#runner.config.getActorHibernationConfig(actor.actorId, requestId, request);
567+
this.#sendMessage(requestId, {
568+
tag: "ToServerWebSocketOpen",
569+
val: {
570+
canHibernate: hibernationConfig.enabled,
571+
lastMsgIndex: BigInt(hibernationConfig.lastMsgIndex ?? -1),
572+
},
573+
});
574+
575+
// Notify adapter that connection is open
576+
adapter._handleOpen(requestId);
577+
578+
579+
577580
// Call websocket handler
578581
await websocketHandler(
579582
this.#runner,

rivetkit-openapi/openapi.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"openapi": "3.0.0",
33
"info": {
4-
"version": "2.0.21",
4+
"version": "2.0.22-rc.1",
55
"title": "RivetKit API"
66
},
77
"components": {

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@
153153
],
154154
"scripts": {
155155
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
156-
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts",
156+
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts",
157157
"check-types": "tsc --noEmit",
158158
"test": "vitest run",
159159
"test:watch": "vitest",
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# MARK: Connection
2+
# Represents an event subscription.
3+
type PersistedSubscription struct {
4+
# Event name
5+
eventName: str
6+
}
7+
8+
type PersistedConnection struct {
9+
id: str
10+
token: str
11+
parameters: data
12+
state: data
13+
subscriptions: list<PersistedSubscription>
14+
lastSeen: u64
15+
}
16+
17+
# MARK: Schedule Event
18+
type GenericPersistedScheduleEvent struct {
19+
# Action name
20+
action: str
21+
# Arguments for the action
22+
#
23+
# CBOR array
24+
args: optional<data>
25+
}
26+
27+
type PersistedScheduleEventKind union {
28+
GenericPersistedScheduleEvent
29+
}
30+
31+
type PersistedScheduleEvent struct {
32+
eventId: str
33+
timestamp: u64
34+
kind: PersistedScheduleEventKind
35+
}
36+
37+
# MARK: WebSocket
38+
type PersistedHibernatableWebSocket struct {
39+
requestId: data
40+
lastSeenTimestamp: u64
41+
msgIndex: u64
42+
}
43+
44+
# MARK: Actor
45+
# Represents the persisted state of an actor.
46+
type PersistedActor struct {
47+
# Input data passed to the actor on initialization
48+
input: optional<data>
49+
hasInitialized: bool
50+
state: data
51+
connections: list<PersistedConnection>
52+
scheduledEvents: list<PersistedScheduleEvent>
53+
hibernatableWebSocket: list<PersistedHibernatableWebSocket>
54+
}

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ export const ActorConfigSchema = z
7070
connectionLivenessInterval: z.number().positive().default(5000),
7171
noSleep: z.boolean().default(false),
7272
sleepTimeout: z.number().positive().default(30_000),
73+
/** @experimental */
74+
canHibernatWebSocket: z
75+
.union([
76+
z.boolean(),
77+
z
78+
.function()
79+
.args(z.custom<Request>())
80+
.returns(z.boolean()),
81+
])
82+
.default(false),
7383
})
7484
.strict()
7585
.default({}),

rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { AnyConn } from "@/actor/conn";
55
import type { AnyActorInstance } from "@/actor/instance";
66
import type { CachedSerializer, Encoding } from "@/actor/protocol/serde";
77
import { encodeDataToString } from "@/actor/protocol/serde";
8+
import type { HonoWebSocketAdapter } from "@/manager/hono-websocket-adapter";
89
import type * as protocol from "@/schemas/client-protocol/mod";
910
import { assertUnreachable, type promiseWithResolvers } from "@/utils";
1011

@@ -67,6 +68,15 @@ export interface ConnDriver<State> {
6768
conn: AnyConn,
6869
state: State,
6970
): ConnReadyState | undefined;
71+
72+
/**
73+
* If the underlying connection can hibernate.
74+
*/
75+
isHibernatable(
76+
actor: AnyActorInstance,
77+
conn: AnyConn,
78+
state: State,
79+
): boolean;
7080
}
7181

7282
// MARK: WebSocket
@@ -140,6 +150,22 @@ const WEBSOCKET_DRIVER: ConnDriver<ConnDriverWebSocketState> = {
140150
): ConnReadyState | undefined => {
141151
return state.websocket.readyState;
142152
},
153+
154+
isHibernatable(
155+
_actor: AnyActorInstance,
156+
_conn: AnyConn,
157+
state: ConnDriverWebSocketState,
158+
): boolean {
159+
// Extract isHibernatable from the HonoWebSocketAdapter
160+
if (state.websocket.raw) {
161+
const raw = state.websocket.raw as HonoWebSocketAdapter;
162+
if (typeof raw.isHibernatable === "boolean") {
163+
return raw.isHibernatable;
164+
}
165+
}
166+
167+
return false;
168+
},
143169
};
144170

145171
// MARK: SSE
@@ -175,6 +201,10 @@ const SSE_DRIVER: ConnDriver<ConnDriverSseState> = {
175201

176202
return ConnReadyState.OPEN;
177203
},
204+
205+
isHibernatable(): boolean {
206+
return false;
207+
},
178208
};
179209

180210
// MARK: HTTP
@@ -187,6 +217,9 @@ const HTTP_DRIVER: ConnDriver<ConnDriverHttpState> = {
187217
// Noop
188218
// TODO: Abort the request
189219
},
220+
isHibernatable(): boolean {
221+
return false;
222+
},
190223
};
191224

192225
/** List of all connection drivers. */

rivetkit-typescript/packages/rivetkit/src/actor/conn.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as cbor from "cbor-x";
22
import invariant from "invariant";
3+
import { PersistedHibernatableWebSocket } from "@/schemas/actor-persist/mod";
34
import type * as protocol from "@/schemas/client-protocol/mod";
45
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
56
import { bufferToArrayBuffer } from "@/utils";
@@ -125,6 +126,25 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
125126
return this.__status;
126127
}
127128

129+
/**
130+
* @experimental
131+
*
132+
* If the underlying connection can hibernate.
133+
*/
134+
public get isHibernatable(): boolean {
135+
if (this.__driverState) {
136+
const driverKind = getConnDriverKindFromState(this.__driverState);
137+
const driver = CONN_DRIVERS[driverKind];
138+
return driver.isHibernatable(
139+
this.#actor,
140+
this,
141+
(this.__driverState as any)[driverKind],
142+
);
143+
} else {
144+
return false;
145+
}
146+
}
147+
128148
/**
129149
* Timestamp of the last time the connection was seen, i.e. the last time the connection was active and checked for liveness.
130150
*/

0 commit comments

Comments
 (0)