Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub struct Pegboard {
///
/// **Experimental**
pub retry_reset_duration: Option<i64>,
/// Maximum exponent for the reschedule backoff calculation.
///
/// This controls the maximum backoff duration when rescheduling actors.
///
/// **Experimental**
pub reschedule_backoff_max_exponent: Option<usize>,
}

impl Pegboard {
Expand All @@ -47,4 +53,8 @@ impl Pegboard {
pub fn retry_reset_duration(&self) -> i64 {
self.retry_reset_duration.unwrap_or(10 * 60 * 1000)
}

pub fn reschedule_backoff_max_exponent(&self) -> usize {
self.reschedule_backoff_max_exponent.unwrap_or(8)
}
}
10 changes: 8 additions & 2 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub async fn reschedule_actor(
let mut backoff = reschedule_backoff(
state.reschedule_state.retry_count,
ctx.config().pegboard().base_retry_timeout(),
ctx.config().pegboard().reschedule_backoff_max_exponent(),
);

let (now, reset) = ctx
Expand Down Expand Up @@ -736,6 +737,7 @@ async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result<(
let backoff = reschedule_backoff(
input.retry_count,
ctx.config().pegboard().base_retry_timeout(),
ctx.config().pegboard().reschedule_backoff_max_exponent(),
);
state.reschedule_ts = Some(now + i64::try_from(backoff.current_duration())?);
}
Expand Down Expand Up @@ -815,6 +817,10 @@ pub async fn set_complete(ctx: &ActivityCtx, input: &SetCompleteInput) -> Result
Ok(())
}

fn reschedule_backoff(retry_count: usize, base_retry_timeout: usize) -> util::backoff::Backoff {
util::backoff::Backoff::new_at(8, None, base_retry_timeout, 500, retry_count)
fn reschedule_backoff(
retry_count: usize,
base_retry_timeout: usize,
max_exponent: usize,
) -> util::backoff::Backoff {
util::backoff::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count)
}
172 changes: 168 additions & 4 deletions rivetkit-typescript/packages/next-js/src/mod.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { existsSync, statSync } from "node:fs";
import { join } from "node:path";
import type { Registry, RunConfigInput } from "rivetkit";
import { stringifyError } from "rivetkit/utils";
import { logger } from "./log";

export const toNextHandler = (
Expand All @@ -11,8 +14,8 @@ export const toNextHandler = (
// Configure serverless
inputConfig.runnerKind = "serverless";

// Auto-configure serverless runner if not in prod
if (process.env.NODE_ENV !== "production") {
// Auto-configure serverless runner if not in prod
logger().debug(
"detected development environment, auto-starting engine and auto-configuring serverless",
);
Expand Down Expand Up @@ -42,17 +45,24 @@ export const toNextHandler = (

const { fetch } = registry.start(inputConfig);

// Function that Next will call when handling requests
const fetchWrapper = async (
request: Request,
{ params }: { params: Promise<{ all: string[] }> },
) => {
): Promise<Response> => {
const { all } = await params;

const newUrl = new URL(request.url);
newUrl.pathname = all.join("/");
const newReq = new Request(newUrl, request);

return await fetch(newReq);
if (process.env.NODE_ENV !== "development") {
// Handle request
const newReq = new Request(newUrl, request);
return await fetch(newReq);
} else {
// Special request handling for file watching
return await handleRequestWithFileWatcher(request, newUrl, fetch);
}
};

return {
Expand All @@ -64,3 +74,157 @@ export const toNextHandler = (
OPTIONS: fetchWrapper,
};
};

/**
* Special request handler that will watch the source file to terminate this
* request once complete.
*
* See docs on watchRouteFile for more information.
*/
async function handleRequestWithFileWatcher(
request: Request,
newUrl: URL,
fetch: (request: Request, ...args: any) => Response | Promise<Response>,
): Promise<Response> {
// Create a new abort controller that we can abort, since the signal on
// the request we cannot control
const mergedController = new AbortController();
const abortMerged = () => mergedController.abort();
request.signal?.addEventListener("abort", abortMerged);

// Watch for file changes in dev
//
// We spawn one watcher per-request since there is not a clean way of
// cleaning up global watchers when hot reloading in Next
const watchIntervalId = watchRouteFile(mergedController);

// Clear interval if request is aborted
request.signal.addEventListener("abort", () => {
logger().debug("clearing file watcher interval: request aborted");
clearInterval(watchIntervalId);
});

// Replace URL and abort signal
const newReq = new Request(newUrl, {
// Copy old request properties
method: request.method,
headers: request.headers,
body: request.body,
credentials: request.credentials,
cache: request.cache,
redirect: request.redirect,
referrer: request.referrer,
integrity: request.integrity,
// Override with new signal
signal: mergedController.signal,
});

// Handle request
const response = await fetch(newReq);

// HACK: Next.js does not provide a way to detect when a request
// finishes, so we need to tap the response stream
//
// We can't just wait for `await fetch` to finish since SSE streams run
// for longer
if (response.body) {
const wrappedStream = waitForStreamFinish(response.body, () => {
logger().debug("clearing file watcher interval: stream finished");
clearInterval(watchIntervalId);
});
return new Response(wrappedStream, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
} else {
// No response body, clear interval immediately
logger().debug("clearing file watcher interval: no response body");
clearInterval(watchIntervalId);
return response;
}
}

/**
* HACK: Watch for file changes on this route in order to shut down the runner.
* We do this because Next.js does not terminate long-running requests on file
* change, so we need to manually shut down the runner in order to trigger a
* new `/start` request with the new code.
*
* We don't use file watchers since those are frequently buggy x-platform and
* subject to misconfigured inotify limits.
*/
function watchRouteFile(abortController: AbortController): NodeJS.Timeout {
logger().debug("starting file watcher");

const routePath = join(
process.cwd(),
".next/server/app/api/rivet/[...all]/route.js",
);

let lastMtime: number | null = null;
const checkFile = () => {
logger().debug({ msg: "checking for file changes", routePath });
try {
if (!existsSync(routePath)) {
return;
}

const stats = statSync(routePath);
const mtime = stats.mtimeMs;

if (lastMtime !== null && mtime !== lastMtime) {
logger().info({ msg: "route file changed", routePath });
abortController.abort();
}

lastMtime = mtime;
} catch (err) {
logger().info({
msg: "failed to check for route file change",
err: stringifyError(err),
});
}
};

checkFile();

return setInterval(checkFile, 1000);
}

/**
* Waits for a stream to finish and calls onFinish on complete.
*
* Used for cancelling the file watcher.
*/
function waitForStreamFinish(
body: ReadableStream<Uint8Array>,
onFinish: () => void,
): ReadableStream {
const reader = body.getReader();
return new ReadableStream({
async start(controller) {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
logger().debug("stream completed");
onFinish();
controller.close();
break;
}
controller.enqueue(value);
}
} catch (err) {
logger().debug("stream errored");
onFinish();
controller.error(err);
}
},
cancel() {
logger().debug("stream cancelled");
onFinish();
reader.cancel();
},
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,11 @@ export class EngineActorDriver implements ActorDriver {
stream.onAbort(() => {});
c.req.raw.signal.addEventListener("abort", () => {
logger().debug("SSE aborted, shutting down runner");
this.shutdownRunner(true);

// We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason.
//
// If we did not use a graceful shutdown, the runner would
this.shutdownRunner(false);
});

await this.#runnerStarted.promise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ export async function ensureEngineProcess(
// order to account for this.
RIVET__PEGBOARD__RETRY_RESET_DURATION: "100",
RIVET__PEGBOARD__BASE_RETRY_TIMEOUT: "100",
// Set max exponent to 1 to have a maximum of base_retry_timeout
RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT: "1",
},
});

Expand Down
Loading