Skip to content

Commit 943ae66

Browse files
committed
fix(next-js): auto shut down runners when source code updates
1 parent 3763786 commit 943ae66

File tree

2 files changed

+173
-5
lines changed

2 files changed

+173
-5
lines changed

rivetkit-typescript/packages/next-js/src/mod.ts

Lines changed: 168 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import { existsSync, statSync } from "node:fs";
2+
import { join } from "node:path";
13
import type { Registry, RunConfigInput } from "rivetkit";
4+
import { stringifyError } from "rivetkit/utils";
25
import { logger } from "./log";
36

47
export const toNextHandler = (
@@ -11,8 +14,8 @@ export const toNextHandler = (
1114
// Configure serverless
1215
inputConfig.runnerKind = "serverless";
1316

14-
// Auto-configure serverless runner if not in prod
1517
if (process.env.NODE_ENV !== "production") {
18+
// Auto-configure serverless runner if not in prod
1619
logger().debug(
1720
"detected development environment, auto-starting engine and auto-configuring serverless",
1821
);
@@ -42,17 +45,24 @@ export const toNextHandler = (
4245

4346
const { fetch } = registry.start(inputConfig);
4447

48+
// Function that Next will call when handling requests
4549
const fetchWrapper = async (
4650
request: Request,
4751
{ params }: { params: Promise<{ all: string[] }> },
48-
) => {
52+
): Promise<Response> => {
4953
const { all } = await params;
5054

5155
const newUrl = new URL(request.url);
5256
newUrl.pathname = all.join("/");
53-
const newReq = new Request(newUrl, request);
5457

55-
return await fetch(newReq);
58+
if (process.env.NODE_ENV !== "development") {
59+
// Handle request
60+
const newReq = new Request(newUrl, request);
61+
return await fetch(newReq);
62+
} else {
63+
// Special request handling for file watching
64+
return await handleRequestWithFileWatcher(request, newUrl, fetch);
65+
}
5666
};
5767

5868
return {
@@ -64,3 +74,157 @@ export const toNextHandler = (
6474
OPTIONS: fetchWrapper,
6575
};
6676
};
77+
78+
/**
79+
* Special request handler that will watch the source file to terminate this
80+
* request once complete.
81+
*
82+
* See docs on watchRouteFile for more information.
83+
*/
84+
async function handleRequestWithFileWatcher(
85+
request: Request,
86+
newUrl: URL,
87+
fetch: (request: Request, ...args: any) => Response | Promise<Response>,
88+
): Promise<Response> {
89+
// Create a new abort controller that we can abort, since the signal on
90+
// the request we cannot control
91+
const mergedController = new AbortController();
92+
const abortMerged = () => mergedController.abort();
93+
request.signal?.addEventListener("abort", abortMerged);
94+
95+
// Watch for file changes in dev
96+
//
97+
// We spawn one watcher per-request since there is not a clean way of
98+
// cleaning up global watchers when hot reloading in Next
99+
const watchIntervalId = watchRouteFile(mergedController);
100+
101+
// Clear interval if request is aborted
102+
request.signal.addEventListener("abort", () => {
103+
logger().debug("clearing file watcher interval: request aborted");
104+
clearInterval(watchIntervalId);
105+
});
106+
107+
// Replace URL and abort signal
108+
const newReq = new Request(newUrl, {
109+
// Copy old request properties
110+
method: request.method,
111+
headers: request.headers,
112+
body: request.body,
113+
credentials: request.credentials,
114+
cache: request.cache,
115+
redirect: request.redirect,
116+
referrer: request.referrer,
117+
integrity: request.integrity,
118+
// Override with new signal
119+
signal: mergedController.signal,
120+
});
121+
122+
// Handle request
123+
const response = await fetch(newReq);
124+
125+
// HACK: Next.js does not provide a way to detect when a request
126+
// finishes, so we need to tap the response stream
127+
//
128+
// We can't just wait for `await fetch` to finish since SSE streams run
129+
// for longer
130+
if (response.body) {
131+
const wrappedStream = waitForStreamFinish(response.body, () => {
132+
logger().debug("clearing file watcher interval: stream finished");
133+
clearInterval(watchIntervalId);
134+
});
135+
return new Response(wrappedStream, {
136+
status: response.status,
137+
statusText: response.statusText,
138+
headers: response.headers,
139+
});
140+
} else {
141+
// No response body, clear interval immediately
142+
logger().debug("clearing file watcher interval: no response body");
143+
clearInterval(watchIntervalId);
144+
return response;
145+
}
146+
}
147+
148+
/**
149+
* HACK: Watch for file changes on this route in order to shut down the runner.
150+
* We do this because Next.js does not terminate long-running requests on file
151+
* change, so we need to manually shut down the runner in order to trigger a
152+
* new `/start` request with the new code.
153+
*
154+
* We don't use file watchers since those are frequently buggy x-platform and
155+
* subject to misconfigured inotify limits.
156+
*/
157+
function watchRouteFile(abortController: AbortController): NodeJS.Timeout {
158+
logger().debug("starting file watcher");
159+
160+
const routePath = join(
161+
process.cwd(),
162+
".next/server/app/api/rivet/[...all]/route.js",
163+
);
164+
165+
let lastMtime: number | null = null;
166+
const checkFile = () => {
167+
logger().debug({ msg: "checking for file changes", routePath });
168+
try {
169+
if (!existsSync(routePath)) {
170+
return;
171+
}
172+
173+
const stats = statSync(routePath);
174+
const mtime = stats.mtimeMs;
175+
176+
if (lastMtime !== null && mtime !== lastMtime) {
177+
logger().info({ msg: "route file changed", routePath });
178+
abortController.abort();
179+
}
180+
181+
lastMtime = mtime;
182+
} catch (err) {
183+
logger().info({
184+
msg: "failed to check for route file change",
185+
err: stringifyError(err),
186+
});
187+
}
188+
};
189+
190+
checkFile();
191+
192+
return setInterval(checkFile, 1000);
193+
}
194+
195+
/**
196+
* Waits for a stream to finish and calls onFinish on complete.
197+
*
198+
* Used for cancelling the file watcher.
199+
*/
200+
function waitForStreamFinish(
201+
body: ReadableStream<Uint8Array>,
202+
onFinish: () => void,
203+
): ReadableStream {
204+
const reader = body.getReader();
205+
return new ReadableStream({
206+
async start(controller) {
207+
try {
208+
while (true) {
209+
const { done, value } = await reader.read();
210+
if (done) {
211+
logger().debug("stream completed");
212+
onFinish();
213+
controller.close();
214+
break;
215+
}
216+
controller.enqueue(value);
217+
}
218+
} catch (err) {
219+
logger().debug("stream errored");
220+
onFinish();
221+
controller.error(err);
222+
}
223+
},
224+
cancel() {
225+
logger().debug("stream cancelled");
226+
onFinish();
227+
reader.cancel();
228+
},
229+
});
230+
}

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,11 @@ export class EngineActorDriver implements ActorDriver {
646646
stream.onAbort(() => {});
647647
c.req.raw.signal.addEventListener("abort", () => {
648648
logger().debug("SSE aborted, shutting down runner");
649-
this.shutdownRunner(true);
649+
650+
// We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason.
651+
//
652+
// If we did not use a graceful shutdown, the runner would
653+
this.shutdownRunner(false);
650654
});
651655

652656
await this.#runnerStarted.promise;

0 commit comments

Comments
 (0)