From 413aead536c0818a1929bcce7272baf19d983d51 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 00:43:36 -0700 Subject: [PATCH 01/14] convex-test update --- package-lock.json | 8 ++++---- package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2edd3c2..960291c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "chokidar-cli": "3.0.0", "convex": "1.35.1", "convex-helpers": "0.1.111", - "convex-test": "0.0.46", + "convex-test": "0.0.51", "eslint": "10.0.2", "eslint-plugin-react-hooks": "7.1.0-canary-3f0b9e61-20260317", "eslint-plugin-react-refresh": "0.5.0", @@ -2985,9 +2985,9 @@ } }, "node_modules/convex-test": { - "version": "0.0.46", - "resolved": "https://registry.npmjs.org/convex-test/-/convex-test-0.0.46.tgz", - "integrity": "sha512-Me4BqUUyJEKI2COvwCBKqnByNT0AZq6eKs9aTjpPX7mz3PSdmvRRP+Kt8VrWT4kCJ168X+hAEGKHPeH98ncSmg==", + "version": "0.0.51", + "resolved": "https://registry.npmjs.org/convex-test/-/convex-test-0.0.51.tgz", + "integrity": "sha512-J+4YRpKGXJDfnQqiWUUT+ylNmNO36MpkuwqG3JG4ld+7QtroZGF8HqO4qzMmfv5ltm71rPbkBvi//MoMHjnVvQ==", "dev": true, "license": "Apache-2.0", "peerDependencies": { diff --git a/package.json b/package.json index b1580ed..0a772a6 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,7 @@ "chokidar-cli": "3.0.0", "convex": "1.35.1", "convex-helpers": "0.1.111", - "convex-test": "0.0.46", + "convex-test": "0.0.51", "eslint": "10.0.2", "eslint-plugin-react-hooks": "7.1.0-canary-3f0b9e61-20260317", "eslint-plugin-react-refresh": "0.5.0", From a3e8a731d398ed0664ab093d0315a66c70557478 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:01:00 -0700 Subject: [PATCH 02/14] runSnapshotQuery --- src/component/future.ts | 38 ++++ src/component/kick.ts | 1 - src/component/loop.ts | 439 +++++++++++++++++----------------------- src/component/schema.ts | 3 +- 4 files changed, 227 insertions(+), 254 deletions(-) create mode 100644 src/component/future.ts diff --git a/src/component/future.ts b/src/component/future.ts new file mode 100644 index 0000000..4312c88 --- /dev/null +++ b/src/component/future.ts @@ -0,0 +1,38 @@ +import { + type FunctionReference, + type FunctionReturnType, + type OptionalRestArgs, + getFunctionAddress, +} from "convex/server"; +import { convexToJson, jsonToConvex } from "convex/values"; + +declare const Convex: { + asyncSyscall: (op: string, jsonArgs: string) => Promise; +}; + +/** + * Run a query without creating a read dependency. Concurrent writes to the + * data the query reads will NOT cause the calling mutation to retry via OCC. + * + * Tradeoff: a concurrent transaction that hasn't yet committed at snapshot + * time may insert data this query won't see. If missing such inserts could + * break correctness, use ctx.runQuery (which takes a dependency) instead. + */ +export async function runSnapshotQuery< + Query extends FunctionReference<"query", "public" | "internal">, +>( + query: Query, + ...args: OptionalRestArgs +): Promise> { + const queryArgs = (args[0] ?? {}) as Record; + const syscallArgs = { + udfType: "snapshotQuery", + args: convexToJson(queryArgs as never), + ...getFunctionAddress(query), + }; + const resultStr = await Convex.asyncSyscall( + "1.0/runUdf", + JSON.stringify(syscallArgs), + ); + return jsonToConvex(JSON.parse(resultStr)) as FunctionReturnType; +} diff --git a/src/component/kick.ts b/src/component/kick.ts index cd57bc4..1520b55 100644 --- a/src/component/kick.ts +++ b/src/component/kick.ts @@ -78,7 +78,6 @@ export async function kickMainLoop( const scheduledTime = boundScheduledTime(fromSegment(current), console); await ctx.scheduler.runAt(scheduledTime, internal.loop.main, { generation: runStatus.state.generation, - segment: current, }); return current; } diff --git a/src/component/loop.ts b/src/component/loop.ts index 0124b16..07c3a28 100644 --- a/src/component/loop.ts +++ b/src/component/loop.ts @@ -1,8 +1,13 @@ import type { WithoutSystemFields } from "convex/server"; import { v } from "convex/values"; +import { runSnapshotQuery } from "./future.js"; import { internal } from "./_generated/api.js"; import type { Doc, Id } from "./_generated/dataModel.js"; -import { internalMutation, type MutationCtx } from "./_generated/server.js"; +import { + internalMutation, + internalQuery, + type MutationCtx, +} from "./_generated/server.js"; import type { CompleteJob } from "./complete.js"; import { createLogger, @@ -16,8 +21,8 @@ import { DEFAULT_MAX_PARALLELISM, fromSegment, getCurrentSegment, - getNextSegment, max, + min, type RunResult, toSegment, } from "./shared.js"; @@ -32,7 +37,11 @@ const RECOVERY_THRESHOLD_MS = 5 * MINUTE; // attempt to recover jobs this old. export const RECOVERY_PERIOD_SEGMENTS = toSegment(1 * MINUTE); // how often to check. export const STATUS_COOLDOWN = 2 * SECOND; export const COOLDOWN_CHECK_INTERVAL = 200 * MS; -const CURSOR_BUFFER_SEGMENTS = toSegment(30 * SECOND); // buffer for cursor updates. +// Buffer applied when querying with cursors. Transactions that started +// before ours may still be running and commit inserts at segments behind +// a previously advanced cursor — the buffer lets us pick those up. +const CURSOR_BUFFER_SEGMENTS = toSegment(30 * SECOND); + export const INITIAL_STATE: WithoutSystemFields> = { generation: 0n, segmentCursors: { incoming: 0n, completion: 0n, cancelation: 0n }, @@ -48,10 +57,63 @@ export const INITIAL_STATE: WithoutSystemFields> = { running: [], }; +/** + * Single query that returns everything the main loop needs to process. + */ +export const getPendingWork = internalQuery({ + args: { + completionCursor: v.int64(), + cancelationCursor: v.int64(), + incomingCursor: v.int64(), + startLimit: v.number(), + }, + handler: async ( + ctx, + { + completionCursor, + cancelationCursor, + incomingCursor, + startLimit, + }, + ) => { + const completions = await ctx.db + .query("pendingCompletion") + .withIndex("segment", (q) => q.gte("segment", completionCursor)) + .take(startLimit); + const cancelations = await ctx.db + .query("pendingCancelation") + .withIndex("segment", (q) => q.gte("segment", cancelationCursor)) + .take(CANCELLATION_BATCH_SIZE); + // Read +1 so the caller can detect overflow vs. a future-scheduled item. + // Server-side filter excludes any pendingStart whose work is being + // canceled this iteration: handleCancelation will delete those docs, + // so handing them to handleStart would race ("Delete on non-existent + // doc"). The filter is the right tool here — it keeps .take honest + // when we know a small number of specific items to exclude. + const allStarts = + startLimit === 0 + ? [] + : await ctx.db + .query("pendingStart") + .withIndex("segment", (q) => q.gte("segment", incomingCursor)) + .filter((q) => + q.and( + ...cancelations.map((c) => + q.neq(q.field("workId"), c.workId), + ), + ), + ) + .take(startLimit + 1); + return { completions, cancelations, allStarts }; + }, +}); + // There should only ever be at most one of these scheduled or running. export const main = internalMutation({ - args: { generation: v.int64(), segment: v.int64() }, - handler: async (ctx, { generation, segment }) => { + // `segment` is kept for backwards compatibility with in-flight scheduled + // calls from before the upgrade — it's no longer used internally. + args: { generation: v.int64(), segment: v.optional(v.int64()) }, + handler: async (ctx, { generation }) => { // State will be modified and patched at the end of the function. const state = await getOrCreateState(ctx); if (generation !== state.generation) { @@ -69,17 +131,37 @@ export const main = internalMutation({ const globals = await getGlobals(ctx); const console = createLogger(globals.logLevel); - const delayMs = Date.now() - fromSegment(segment); - console.debug(`[main] generation ${generation} behind: ${delayMs}ms`); + const segment = getCurrentSegment(); + + // Pass startLimit = maxParallelism so we fetch enough starts to fill + // slots freed by completions processed in this same iteration. + // Apply CURSOR_BUFFER_SEGMENTS so we still pick up out-of-order inserts + // that landed behind the cursor since our last scan. + const queryArgs = { + completionCursor: + state.segmentCursors.completion - CURSOR_BUFFER_SEGMENTS, + cancelationCursor: + state.segmentCursors.cancelation - CURSOR_BUFFER_SEGMENTS, + incomingCursor: + state.segmentCursors.incoming - CURSOR_BUFFER_SEGMENTS, + startLimit: globals.maxParallelism, + }; + + // Snapshot read — no read dependency, no OCC conflicts. + console.time("[main] getPendingWork"); + const { allStarts, cancelations, completions } = await runSnapshotQuery( + internal.loop.getPendingWork, + queryArgs, + ); + const toStart = allStarts.filter((s) => s.segment <= segment); + console.timeEnd("[main] getPendingWork"); - // Read pendingCompletions, including retry handling. console.time("[main] pendingCompletion"); - const toCancel = await handleCompletions(ctx, state, segment, console); + const toCancel = await handleCompletions(ctx, state, completions, console); console.timeEnd("[main] pendingCompletion"); - // Read pendingCancelation, deleting from pendingStart. If it's still running, queue to cancel. console.time("[main] pendingCancelation"); - await handleCancelation(ctx, state, segment, console, toCancel); + await handleCancelation(ctx, state, cancelations, console, toCancel); console.timeEnd("[main] pendingCancelation"); if (state.running.length === 0) { @@ -91,9 +173,14 @@ export const main = internalMutation({ state.lastRecovery = segment; } - // Read pendingStart up to max capacity. Update the config, and incomingSegmentCursor. + // ── Start new work ── + // Slice to actual available capacity (completions may have freed slots). + // Guard against negative numbers in case running.length > maxParallelism. + const actualCapacity = globals.maxParallelism - state.running.length; + const pending: Doc<"pendingStart">[] = + actualCapacity > 0 ? toStart.slice(0, actualCapacity) : []; console.time("[main] pendingStart"); - await handleStart(ctx, state, segment, console, globals); + await handleStart(ctx, state, pending, console, globals); console.timeEnd("[main] pendingStart"); if (Date.now() - state.report.lastReportTs >= MINUTE) { @@ -115,88 +202,53 @@ export const main = internalMutation({ }; } - await ctx.db.replace("internalState", state._id, state); - await ctx.scheduler.runAfter(0, internal.loop.updateRunStatus, { - generation: state.generation, - segment, - }); - // TODO: if there were more cancellations, schedule main directly. - }, -}); - -export const updateRunStatus = internalMutation({ - args: { generation: v.int64(), segment: v.int64() }, - handler: async (ctx, { generation, segment }) => { - const globals = await getGlobals(ctx); - const console = createLogger(globals.logLevel); - const maxParallelism = globals.maxParallelism; - const state = await getOrCreateState(ctx); - if (generation !== state.generation) { - throw new Error( - `generation mismatch: ${generation} !== ${state.generation}`, - ); + // Advance cursors to skip tombstones on next scan. Only do this when + // we actually did work — the cursor doubles as the cooldown signal + // ("how long since we last processed something"). + const didWork = + completions.length > 0 || cancelations.length > 0 || pending.length > 0; + if (didWork) { + state.segmentCursors.completion = completions.at(-1)?.segment ?? segment; + state.segmentCursors.cancelation = + cancelations.at(-1)?.segment ?? segment; + if (pending.length > 0) { + state.segmentCursors.incoming = pending.at(-1)!.segment; + } else if (actualCapacity > 0) { + // We have no more pending work, update to now + state.segmentCursors.incoming = segment; + } } - console.time("[updateRunStatus] outstandingCancelations"); - const outstandingCancelations = await getNextUp(ctx, "pendingCancelation", { - start: state.segmentCursors.cancelation, - end: segment, - }); - console.timeEnd("[updateRunStatus] outstandingCancelations"); - if (outstandingCancelations) { + await ctx.db.replace("internalState", state._id, state); + + // ── Schedule next iteration ── + if (didWork) { + // More work might have arrived while we were processing. Check again. await ctx.scheduler.runAfter(0, internal.loop.main, { - generation, - segment, + generation: state.generation, }); return; } - // TODO: check for current segment (or from args) first, to avoid OCCs. - console.time("[updateRunStatus] nextSegmentIsActionable"); - const nextSegment = max(segment + 1n, getCurrentSegment()); - const nextIsActionable = await nextSegmentIsActionable( - ctx, - state, - maxParallelism, - nextSegment, - ); - console.timeEnd("[updateRunStatus] nextSegmentIsActionable"); - - if (nextIsActionable) { - await ctx.scheduler.runAt( - boundScheduledTime(fromSegment(nextSegment), console), - internal.loop.main, - { - generation, - segment: nextSegment, - }, - ); - return; - } - - console.time("[updateRunStatus] oldSegmentIsActionable"); - const [oldIsActionable, cursors] = await oldSegmentIsActionable( - ctx, - state, - maxParallelism, - ); - console.timeEnd("[updateRunStatus] oldSegmentIsActionable"); - - if (oldIsActionable) { - await ctx.db.patch("internalState", state._id, { - segmentCursors: { - ...state.segmentCursors, - ...cursors, - }, - }); + // Nothing found in snapshot. Re-read with a real dependency (same args + // for cache-hit efficiency) so a concurrent insert forces an OCC retry. + console.debug("[main] no work — confirming with read dependency"); + const confirm = await ctx.runQuery(internal.loop.getPendingWork, queryArgs); + const confirmStarts = confirm.allStarts; + const confirmStartsNow = confirmStarts.filter((s) => s.segment <= segment); + const confirmFuture = confirmStarts.find((s) => s.segment > segment); + if ( + confirm.completions.length > 0 || + confirm.cancelations.length > 0 || + confirmStartsNow.length > 0 + ) { await ctx.scheduler.runAfter(0, internal.loop.main, { - generation, - segment: getCurrentSegment(), + generation: state.generation, }); return; } - // Cooldown: if any cursor was active within 5 seconds, stay running. + // Cooldown: if any cursor was active within STATUS_COOLDOWN, stay running. const { incoming, completion, cancelation } = state.segmentCursors; const latestCursor = fromSegment( max(incoming, max(completion, cancelation)), @@ -204,49 +256,30 @@ export const updateRunStatus = internalMutation({ if (Date.now() - latestCursor < STATUS_COOLDOWN) { const remaining = STATUS_COOLDOWN - (Date.now() - latestCursor); console.debug( - `[updateRunStatus] cooldown: ${remaining}ms remaining, checking again in ${COOLDOWN_CHECK_INTERVAL}ms`, + `[main] cooldown: ${remaining}ms remaining, checking again in ${COOLDOWN_CHECK_INTERVAL}ms`, ); - const checkAt = Date.now() + COOLDOWN_CHECK_INTERVAL; - const checkSegment = toSegment(checkAt); await ctx.scheduler.runAt( - boundScheduledTime(checkAt, console), - internal.loop.updateRunStatus, - { generation, segment: checkSegment }, + Date.now() + COOLDOWN_CHECK_INTERVAL, + internal.loop.main, + { generation: state.generation }, ); return; } - // Find next actionable segment (min next segment). - console.time("[updateRunStatus] findNextSegment"); - const actionableTables: ( - | "pendingCompletion" - | "pendingCancelation" - | "pendingStart" - )[] = ["pendingCompletion", "pendingCancelation"]; - if (state.running.length < maxParallelism) { - actionableTables.push("pendingStart"); - } - const docs = await Promise.all( - actionableTables.map(async (tableName) => - getNextUp(ctx, tableName, { start: nextSegment }), - ), - ); - console.timeEnd("[updateRunStatus] findNextSegment"); - let targetSegment = docs.map((d) => d?.segment).sort()[0]; - const runStatus = await getOrCreateRunningStatus(ctx); - const saturated = state.running.length >= maxParallelism; - if (targetSegment !== undefined || state.running.length > 0) { - // If there's something to do, schedule for next actionable segment. - // Or the next recovery, whichever comes first. + if (state.running.length > 0 || confirmFuture) { + // Jobs are running and/or there's future-scheduled work. + // Schedule for the future start or next recovery, whichever is sooner. const nextRecoverySegment = state.lastRecovery + RECOVERY_PERIOD_SEGMENTS; - if (!targetSegment || targetSegment > nextRecoverySegment) { - targetSegment = nextRecoverySegment; - } + const target = confirmFuture + ? min(confirmFuture.segment, nextRecoverySegment) + : nextRecoverySegment; + const scheduledId = await ctx.scheduler.runAt( - boundScheduledTime(fromSegment(targetSegment), console), + boundScheduledTime(fromSegment(target), console), internal.loop.main, - { generation, segment: targetSegment }, + { generation: state.generation }, ); +<<<<<<< HEAD if (targetSegment > getNextSegment()) { await ctx.db.patch("runStatus", runStatus._id, { state: { @@ -267,119 +300,49 @@ export const updateRunStatus = internalMutation({ // There seems to be nothing in the future to do, so go idle. await ctx.db.patch("runStatus", runStatus._id, { state: { kind: "idle", generation }, - }); - }, -}); - -async function nextSegmentIsActionable( - ctx: MutationCtx, - state: Doc<"internalState">, - maxParallelism: number, - end: bigint, -): Promise { - // First, try with our cursor range, up to end. - if ( - await getNextUp(ctx, "pendingCancelation", { - start: state.segmentCursors.cancelation, - end, - }) - ) { - return true; - } - if ( - await getNextUp(ctx, "pendingCompletion", { - start: state.segmentCursors.completion, - end, - }) - ) { - return true; - } - if (state.running.length < maxParallelism) { - if ( - await getNextUp(ctx, "pendingStart", { - start: state.segmentCursors.incoming, - end, - }) - ) { - return true; +======= + await ctx.db.patch(runStatus._id, { + state: { + kind: "scheduled", + scheduledId, + saturated: state.running.length >= globals.maxParallelism, + generation: state.generation, + segment: target, + }, + }); + return; } - } - return false; -} -async function oldSegmentIsActionable( - ctx: MutationCtx, - state: Doc<"internalState">, - maxParallelism: number, -): Promise< - [boolean, { completion?: bigint; cancelation?: bigint; incoming?: bigint }] -> { - // Next, we look for out-of-order additions we may have missed. - const oldCompletion = await getNextUp(ctx, "pendingCompletion", { - end: state.segmentCursors.completion, - }); - if (oldCompletion) { - return [true, { completion: oldCompletion.segment }]; - } - const oldCancelation = await getNextUp(ctx, "pendingCancelation", { - end: state.segmentCursors.cancelation, - }); - if (oldCancelation) { - return [true, { cancelation: oldCancelation.segment }]; - } - if (state.running.length < maxParallelism) { - const oldStart = await getNextUp(ctx, "pendingStart", { - end: state.segmentCursors.incoming, + // Nothing to do — go idle. + await ctx.db.patch(runStatus._id, { + state: { kind: "idle", generation: state.generation }, +>>>>>>> 6934e57 (runSnapshotQuery) }); - if (oldStart) { - return [true, { incoming: oldStart.segment }]; - } - } - return [false, {}]; -} + }, +}); -// Fetch the next item. If only one of start & end are provided, it's exclusive. -async function getNextUp( - ctx: MutationCtx, - table: "pendingCompletion" | "pendingCancelation" | "pendingStart", - range: { start?: bigint; end?: bigint }, -) { - return ctx.db - .query(table) - .withIndex("segment", (q) => - range.start !== undefined - ? range.end !== undefined - ? q - .gte("segment", range.start - CURSOR_BUFFER_SEGMENTS) - .lte("segment", range.end) - : q.gt("segment", range.start - CURSOR_BUFFER_SEGMENTS) - : range.end !== undefined - ? q.lt("segment", range.end) - : q, - ) - .first(); -} +/** + * @deprecated Forwarder for in-flight scheduled calls from before the + * upgrade. The scheduling logic has been merged into `main`. + */ +export const updateRunStatus = internalMutation({ + args: { generation: v.int64(), segment: v.int64() }, + handler: async (ctx, { generation }) => { + await ctx.scheduler.runAfter(0, internal.loop.main, { generation }); + }, +}); /** * Handles the completion of pending completions. * This only processes work that succeeded or failed, not canceled. + * Accepts pre-fetched completion docs (from snapshot query). */ async function handleCompletions( ctx: MutationCtx, state: Doc<"internalState">, - segment: bigint, + completed: Doc<"pendingCompletion">[], console: Logger, ) { - const startSegment = state.segmentCursors.completion - CURSOR_BUFFER_SEGMENTS; - // This won't be too many because the jobs all correspond to being scheduled - // by a single main (the previous one), so they're limited by MAX_PARALLELISM. - const completed = await ctx.db - .query("pendingCompletion") - .withIndex("segment", (q) => - q.gte("segment", startSegment).lte("segment", segment), - ) - .collect(); - state.segmentCursors.completion = segment; // Completions that were going to be retried but have since been canceled. const toCancel: CompleteJob[] = []; await Promise.all( @@ -433,21 +396,16 @@ async function handleCompletions( return toCancel; } +/** + * Handles cancelation. Accepts pre-fetched cancelation docs. + */ async function handleCancelation( ctx: MutationCtx, state: Doc<"internalState">, - segment: bigint, + canceled: Doc<"pendingCancelation">[], console: Logger, toCancel: CompleteJob[], ) { - const start = state.segmentCursors.cancelation - CURSOR_BUFFER_SEGMENTS; - const canceled = await ctx.db - .query("pendingCancelation") - .withIndex("segment", (q) => - q.gte("segment", start).lte("segment", segment), - ) - .take(CANCELLATION_BATCH_SIZE); - state.segmentCursors.cancelation = canceled.at(-1)?.segment ?? segment; if (canceled.length) { console.debug(`[main] attempting to cancel ${canceled.length}`); } @@ -533,39 +491,16 @@ async function handleRecovery( } } +/** + * Starts pending work. Accepts pre-fetched pendingStart docs. + */ async function handleStart( ctx: MutationCtx, state: Doc<"internalState">, - segment: bigint, + pending: Doc<"pendingStart">[], console: Logger, - { maxParallelism, logLevel }: Config, + { logLevel }: Config, ) { - // Schedule as many as needed to reach maxParallelism. - const toSchedule = maxParallelism - state.running.length; - - const pending = - toSchedule > 0 - ? await ctx.db - .query("pendingStart") - .withIndex("segment", (q) => - q - .gte( - "segment", - state.segmentCursors.incoming - CURSOR_BUFFER_SEGMENTS, - ) - .lte("segment", segment), - ) - .take(toSchedule) - : []; - - if (pending) { - if (pending.length > 0) { - state.segmentCursors.incoming = pending.at(-1)!.segment; - } else if (toSchedule > 0) { - // We have no more pending work, update to now - state.segmentCursors.incoming = segment; - } - } console.debug(`[main] scheduling ${pending.length} pending work`); // Start new work. state.running.push( diff --git a/src/component/schema.ts b/src/component/schema.ts index 63e2e93..c88d3d7 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -18,6 +18,7 @@ export default defineSchema({ internalState: defineTable({ // Ensure that only one main is running at a time. generation: v.int64(), + // Track where we've scanned to, so we skip tombstones on re-scan. segmentCursors: v.object({ incoming: segment, completion: segment, @@ -42,7 +43,7 @@ export default defineSchema({ ), }), - // Singleton, written by `updateRunStatus` when running, by client or worker otherwise. + // Singleton, written by `main` when scheduling, by client or worker otherwise. // Safe to read from kickLoop, since it should update infrequently. runStatus: defineTable({ state: v.union( From 42399cf72563333ef29463dc88029d42ce7a189f Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:06:27 -0700 Subject: [PATCH 03/14] push limit down --- src/component/loop.ts | 44 ++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/component/loop.ts b/src/component/loop.ts index 07c3a28..1379776 100644 --- a/src/component/loop.ts +++ b/src/component/loop.ts @@ -65,7 +65,8 @@ export const getPendingWork = internalQuery({ completionCursor: v.int64(), cancelationCursor: v.int64(), incomingCursor: v.int64(), - startLimit: v.number(), + maxParallelism: v.number(), + runningCount: v.number(), }, handler: async ( ctx, @@ -73,23 +74,28 @@ export const getPendingWork = internalQuery({ completionCursor, cancelationCursor, incomingCursor, - startLimit, + maxParallelism, + runningCount, }, ) => { const completions = await ctx.db .query("pendingCompletion") .withIndex("segment", (q) => q.gte("segment", completionCursor)) - .take(startLimit); + .take(maxParallelism); const cancelations = await ctx.db .query("pendingCancelation") .withIndex("segment", (q) => q.gte("segment", cancelationCursor)) .take(CANCELLATION_BATCH_SIZE); - // Read +1 so the caller can detect overflow vs. a future-scheduled item. - // Server-side filter excludes any pendingStart whose work is being - // canceled this iteration: handleCancelation will delete those docs, - // so handing them to handleStart would race ("Delete on non-existent - // doc"). The filter is the right tool here — it keeps .take honest - // when we know a small number of specific items to exclude. + // Available slots after we process this batch's completions, plus 1 + // for the +1 trick (detect overflow vs. a future-scheduled retry). + const startLimit = Math.max( + 0, + maxParallelism - runningCount + completions.length, + ); + const excludedIds = [ + ...completions.map((c) => c.workId), + ...cancelations.map((c) => c.workId), + ]; const allStarts = startLimit === 0 ? [] @@ -97,11 +103,7 @@ export const getPendingWork = internalQuery({ .query("pendingStart") .withIndex("segment", (q) => q.gte("segment", incomingCursor)) .filter((q) => - q.and( - ...cancelations.map((c) => - q.neq(q.field("workId"), c.workId), - ), - ), + q.and(...excludedIds.map((id) => q.neq(q.field("workId"), id))), ) .take(startLimit + 1); return { completions, cancelations, allStarts }; @@ -133,18 +135,18 @@ export const main = internalMutation({ const console = createLogger(globals.logLevel); const segment = getCurrentSegment(); - // Pass startLimit = maxParallelism so we fetch enough starts to fill - // slots freed by completions processed in this same iteration. - // Apply CURSOR_BUFFER_SEGMENTS so we still pick up out-of-order inserts - // that landed behind the cursor since our last scan. + // Pass maxParallelism + runningCount so the query bounds each batch to + // what we can actually consume this iteration. Apply CURSOR_BUFFER_SEGMENTS + // so we still pick up out-of-order inserts that landed behind the cursor + // since our last scan. const queryArgs = { completionCursor: state.segmentCursors.completion - CURSOR_BUFFER_SEGMENTS, cancelationCursor: state.segmentCursors.cancelation - CURSOR_BUFFER_SEGMENTS, - incomingCursor: - state.segmentCursors.incoming - CURSOR_BUFFER_SEGMENTS, - startLimit: globals.maxParallelism, + incomingCursor: state.segmentCursors.incoming - CURSOR_BUFFER_SEGMENTS, + maxParallelism: globals.maxParallelism, + runningCount: state.running.length, }; // Snapshot read — no read dependency, no OCC conflicts. From 964d0dc8501e339f5e1a9e74dacbb3d829a6760f Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:37:23 -0700 Subject: [PATCH 04/14] codegen --- src/component/_generated/api.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/component/_generated/api.ts b/src/component/_generated/api.ts index 9587859..6ec0772 100644 --- a/src/component/_generated/api.ts +++ b/src/component/_generated/api.ts @@ -12,6 +12,7 @@ import type * as complete from "../complete.js"; import type * as config from "../config.js"; import type * as crons from "../crons.js"; import type * as danger from "../danger.js"; +import type * as future from "../future.js"; import type * as kick from "../kick.js"; import type * as lib from "../lib.js"; import type * as logging from "../logging.js"; @@ -33,6 +34,7 @@ const fullApi: ApiFromModules<{ config: typeof config; crons: typeof crons; danger: typeof danger; + future: typeof future; kick: typeof kick; lib: typeof lib; logging: typeof logging; From e1ad11adfecdd6fb45f2414c7e652ed7850420d6 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:55:44 -0700 Subject: [PATCH 05/14] f --- src/component/loop.ts | 28 +++------------------------- src/component/stateMachine.test.ts | 29 +++++++++++++++++++---------- 2 files changed, 22 insertions(+), 35 deletions(-) diff --git a/src/component/loop.ts b/src/component/loop.ts index 1379776..de4841c 100644 --- a/src/component/loop.ts +++ b/src/component/loop.ts @@ -102,6 +102,7 @@ export const getPendingWork = internalQuery({ : await ctx.db .query("pendingStart") .withIndex("segment", (q) => q.gte("segment", incomingCursor)) + // eslint-disable-next-line @convex-dev/no-filter-in-query .filter((q) => q.and(...excludedIds.map((id) => q.neq(q.field("workId"), id))), ) @@ -281,29 +282,7 @@ export const main = internalMutation({ internal.loop.main, { generation: state.generation }, ); -<<<<<<< HEAD - if (targetSegment > getNextSegment()) { - await ctx.db.patch("runStatus", runStatus._id, { - state: { - kind: "scheduled", - scheduledId, - saturated, - generation, - segment: targetSegment, - }, - }); - } else { - console.debug( - `[updateRunStatus] staying running because it's the next segment`, - ); - } - return; - } - // There seems to be nothing in the future to do, so go idle. - await ctx.db.patch("runStatus", runStatus._id, { - state: { kind: "idle", generation }, -======= - await ctx.db.patch(runStatus._id, { + await ctx.db.patch("runStatus", runStatus._id, { state: { kind: "scheduled", scheduledId, @@ -316,9 +295,8 @@ export const main = internalMutation({ } // Nothing to do — go idle. - await ctx.db.patch(runStatus._id, { + await ctx.db.patch("runStatus", runStatus._id, { state: { kind: "idle", generation: state.generation }, ->>>>>>> 6934e57 (runSnapshotQuery) }); }, }); diff --git a/src/component/stateMachine.test.ts b/src/component/stateMachine.test.ts index dc19dfd..d01dba6 100644 --- a/src/component/stateMachine.test.ts +++ b/src/component/stateMachine.test.ts @@ -17,6 +17,7 @@ import { DEFAULT_MAX_PARALLELISM, getCurrentSegment, getNextSegment, + SECOND, } from "./shared.js"; import { RECOVERY_PERIOD_SEGMENTS } from "./loop.js"; @@ -219,7 +220,10 @@ describe("state machine", () => { segment?: bigint; }, ): Promise<{ workId: Id<"work">; segment: bigint }> { - const seg = opts?.segment ?? getNextSegment(); + // Default to the current segment so pendingStart entries are eligible + // to start in the same iteration; main reads pendingStart with + // segment <= getCurrentSegment(). + const seg = opts?.segment ?? getCurrentSegment(); const workId = await t.run>(async (ctx) => { let wId: Id<"work">; if (state.work) { @@ -801,7 +805,7 @@ describe("state machine", () => { describe("multi-step / interleaved transitions", () => { it("completion + new enqueue -> main processes both in one pass", async () => { - const seg = getNextSegment(); + const seg = getCurrentSegment(); const { workId: w1 } = await setupState(S1_ENQUEUED, { segment: seg }); await runMain(seg); // w1 is now running @@ -831,7 +835,7 @@ describe("state machine", () => { }); it("two jobs complete(retry) before main -> main retries both", async () => { - const seg = getNextSegment(); + const seg = getCurrentSegment(); const ids = await t.run(async (ctx) => { const w1 = await ctx.db.insert("work", { fnType: "action", @@ -904,16 +908,21 @@ describe("state machine", () => { expect(s1Before.pendingCompletion.retry).toBe(true); expect(s2Before.pendingCompletion.retry).toBe(true); - // Now main processes both completions + // First main pass: process pendingCompletions, queue retry + // pendingStart entries (segment = now + jittered backoff, so they + // can land in the next segment and be ineligible this iteration). await runMain(seg); + // Advance past the retry backoff window so the retry pendingStart + // segments are <= getCurrentSegment(). + vi.setSystemTime(Date.now() + SECOND); + await runMain(getCurrentSegment()); + const s1After = await observeState(ids.w1); const s2After = await observeState(ids.w2); - // Both pendingCompletions should be consumed + // Both pendingCompletions consumed and retries are running. expect(s1After.pendingCompletion).toBe(false); expect(s2After.pendingCompletion).toBe(false); - // With small backoff (100ms), retry pendingStarts are immediately picked up - // by handleStart in the same main pass, so both jobs are running again expect(s1After.running).toBe(true); expect(s2After.running).toBe(true); expect(s1After.pendingStart).toBe(false); @@ -1057,7 +1066,7 @@ describe("state machine", () => { }); it("main crash recovery: state recoverable after restart", async () => { - const seg = getNextSegment(); + const seg = getCurrentSegment(); const { workId } = await setupState(S1_ENQUEUED, { segment: seg }); await runMain(seg); @@ -1066,7 +1075,7 @@ describe("state machine", () => { await runComplete(workId, { kind: "success" }, 0); // New main picks up completion - const seg2 = getNextSegment(); + const seg2 = getCurrentSegment(); await runMain(seg2); const s = await observeState(workId); expect(s.work).toBe(false); @@ -1074,7 +1083,7 @@ describe("state machine", () => { }); it("all three pending queues populated -> main processes all in order", async () => { - const seg = getNextSegment(); + const seg = getCurrentSegment(); const ids = await t.run(async (ctx) => { // Job 1: in running, has pendingCompletion const w1 = await ctx.db.insert("work", { From 93d9068725d865568b71148c6aff47166a40e122 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:17:41 -0700 Subject: [PATCH 06/14] update tests --- src/component/loop.test.ts | 1822 ++++++++++++++---------------------- 1 file changed, 695 insertions(+), 1127 deletions(-) diff --git a/src/component/loop.test.ts b/src/component/loop.test.ts index 3adc764..a68bfa7 100644 --- a/src/component/loop.test.ts +++ b/src/component/loop.test.ts @@ -11,1276 +11,844 @@ import { } from "vitest"; import { api, internal } from "./_generated/api.js"; import type { Doc, Id } from "./_generated/dataModel.js"; -import type { MutationCtx } from "./_generated/server.js"; -import { DEFAULT_LOG_LEVEL } from "./logging.js"; import schema from "./schema.js"; -import { - DEFAULT_MAX_PARALLELISM, - getCurrentSegment, - getNextSegment, - toSegment, -} from "./shared.js"; +import { DEFAULT_MAX_PARALLELISM, getCurrentSegment } from "./shared.js"; import { STATUS_COOLDOWN } from "./loop.js"; const modules = import.meta.glob("./**/*.ts"); - +const SECOND = 1000; +const MINUTE = 60 * SECOND; + +/** + * Behavior tests for the main loop, designed from first principles around + * what an external observer can see: + * + * - api.lib.status — public-facing state of a single work item + * - runStatus.state — loop lifecycle (running / scheduled / idle) + * - pending* tables — work in flight that the loop will process + * - state.running — slots currently occupied by workers + * + * These tests do NOT assert on implementation specifics like cursor + * positions, segment values, or which scheduler call was made — those + * change when the loop's internals change, and they're not the contract. + * + * Setup conventions: + * - vi.useFakeTimers() so time advances deterministically + * - The loop is driven manually via runMain(); convex-test doesn't + * auto-flush scheduled functions + * - simulateCompletion() pretends a worker finished its job by + * calling internal.complete.complete; this is how production gets + * work into pendingCompletion, so it's the correct seam for testing + */ describe("loop", () => { async function setupTest() { const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("globals", { + logLevel: "WARN", + maxParallelism: DEFAULT_MAX_PARALLELISM, + }); + }); return t; } - let t: Awaited>; - async function setMaxParallelism(maxParallelism: number) { - await t.run(async (ctx) => { - const globals = await ctx.db.query("globals").unique(); - if (!globals) { - await ctx.db.insert("globals", { - logLevel: DEFAULT_LOG_LEVEL, - maxParallelism, - }); - } else { - await ctx.db.patch("globals", globals._id, { - maxParallelism, + beforeEach(async () => { + vi.useFakeTimers(); + t = await setupTest(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + // ── helpers ────────────────────────────────────────────────────────── + + /** Seed an empty running loop: internalState + runStatus=running. */ + async function initialize(opts: { maxParallelism?: number } = {}) { + if (opts.maxParallelism !== undefined) { + await t.run(async (ctx) => { + const g = await ctx.db.query("globals").unique(); + assert(g); + await ctx.db.patch("globals", g._id, { + maxParallelism: opts.maxParallelism!, }); - } + }); + } + await t.run(async (ctx) => { + await ctx.db.insert("internalState", { + generation: 1n, + segmentCursors: { incoming: 0n, completion: 0n, cancelation: 0n }, + lastRecovery: 0n, + report: { + completed: 0, + succeeded: 0, + failed: 0, + retries: 0, + canceled: 0, + lastReportTs: Date.now(), + }, + running: [], + }); + await ctx.db.insert("runStatus", { state: { kind: "running" } }); }); } - async function makeDummyWork( - ctx: MutationCtx, + /** + * Insert a work doc + pendingStart at the given segment (default: now). + * Bypasses the public enqueue API to keep tests focused on the loop. + */ + async function enqueueWork( overrides: Partial>> = {}, - ) { - return ctx.db.insert("work", { - fnType: "action", - fnHandle: "test_handle", - fnName: "test_handle", - fnArgs: {}, - attempts: 0, - ...overrides, + segment = getCurrentSegment(), + ): Promise> { + return t.run(async (ctx) => { + const workId = await ctx.db.insert("work", { + fnType: "action", + fnHandle: "test_handle", + fnName: "test_handle", + fnArgs: {}, + attempts: 0, + ...overrides, + }); + await ctx.db.insert("pendingStart", { workId, segment }); + return workId; }); } - async function makeDummyScheduledFunction( - ctx: MutationCtx, - workId: Id<"work">, - ) { - return ctx.scheduler.runAfter(0, internal.worker.runActionWrapper, { - workId, - fnHandle: "test_handle", - fnArgs: {}, - logLevel: "WARN", - attempt: 0, + /** Drive the main loop one iteration with the current generation. */ + async function runMain() { + const generation = await t.run(async (ctx) => { + const s = await ctx.db.query("internalState").unique(); + return s?.generation ?? 0n; }); + await t.mutation(internal.loop.main, { generation }); } - async function insertInternalState( - ctx: MutationCtx, - overrides: Partial>> = {}, + /** Pretend a worker finished a job by inserting pendingCompletion. */ + async function simulateCompletion( + workId: Id<"work">, + result: + | { kind: "success"; returnValue: unknown } + | { kind: "failed"; error: string } + | { kind: "canceled" }, + attempt = 0, ) { - await ctx.db.insert("internalState", { - generation: 1n, - segmentCursors: { incoming: 0n, completion: 0n, cancelation: 0n }, - lastRecovery: getCurrentSegment(), - report: { - completed: 0, - succeeded: 0, - failed: 0, - retries: 0, - canceled: 0, - lastReportTs: Date.now(), - }, - running: [], - ...overrides, + await t.mutation(internal.complete.complete, { + jobs: [{ workId, runResult: result, attempt }], }); } - beforeEach(async () => { - vi.useFakeTimers(); - t = await setupTest(); - await t.run(async (ctx) => { - await ctx.db.insert("globals", { - logLevel: "WARN", - maxParallelism: DEFAULT_MAX_PARALLELISM, - }); + /** Snapshot of everything an outside observer might check. */ + async function observe() { + return t.run(async (ctx) => { + const state = await ctx.db.query("internalState").unique(); + const runStatus = await ctx.db.query("runStatus").unique(); + const pendingStart = await ctx.db.query("pendingStart").collect(); + const pendingCompletion = await ctx.db + .query("pendingCompletion") + .collect(); + const pendingCancelation = await ctx.db + .query("pendingCancelation") + .collect(); + return { + running: state?.running ?? [], + generation: state?.generation ?? 0n, + runStatus: runStatus?.state, + pendingStart, + pendingCompletion, + pendingCancelation, + }; }); - }); + } - afterEach(() => { - vi.useRealTimers(); - }); + async function statusOf(workId: Id<"work">) { + return t.query(api.lib.status, { id: workId }); + } - describe("data state machine", () => { - it("should follow the pendingStart -> workerRunning -> complete flow", async () => { - // Setup initial state - const workId = await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); + // ──────────────────────────────────────────────────────────────────── + // Forward progress: work moves through the pipeline + // ──────────────────────────────────────────────────────────────────── - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); + describe("forward progress", () => { + it("starts a pending work item when main runs", async () => { + await initialize(); + const workId = await enqueueWork(); - // Create work - const workId = await makeDummyWork(ctx, { attempts: 0 }); + await runMain(); - // Create pendingStart - await ctx.db.insert("pendingStart", { - workId, - segment: 1n, - }); + const o = await observe(); + expect(o.pendingStart).toHaveLength(0); + expect(o.running.map((r) => r.workId)).toEqual([workId]); + expect(await statusOf(workId)).toMatchObject({ state: "running" }); + }); - return workId; - }); + it("removes work from running once a successful completion is processed", async () => { + await initialize(); + const workId = await enqueueWork(); + await runMain(); + + await simulateCompletion( + workId, + { kind: "success", returnValue: null }, + 0, + ); + await runMain(); + + const o = await observe(); + expect(o.running).toHaveLength(0); + expect(o.pendingCompletion).toHaveLength(0); + // Work doc deleted → status reports "finished". + expect(await statusOf(workId)).toMatchObject({ state: "finished" }); + }); - // Run main loop to process pendingStart -> workerRunning - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); + it("treats a final failure (no retry policy) as terminal", async () => { + await initialize(); + const workId = await enqueueWork(); + await runMain(); - // Verify work is now in running state - await t.run(async (ctx) => { - // Check that pendingStart was deleted - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(0); - - // Check that work is in running list - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - assert(state); - expect(state.running).toHaveLength(1); - expect(state.running[0].workId).toBe(workId); - }); + await simulateCompletion(workId, { kind: "failed", error: "boom" }, 0); + await runMain(); - // Complete the work (workerRunning -> complete) - await t.mutation(internal.complete.complete, { - jobs: [ - { - workId, - runResult: { kind: "success", returnValue: null }, - attempt: 0, - }, - ], - }); - - // Verify pendingCompletion was created - await t.run(async (ctx) => { - const pendingCompletions = await ctx.db - .query("pendingCompletion") - .collect(); - expect(pendingCompletions).toHaveLength(1); - expect(pendingCompletions[0].workId).toBe(workId); - expect(pendingCompletions[0].runResult.kind).toBe("success"); - expect(pendingCompletions[0].retry).toBe(false); - }); + const o = await observe(); + expect(o.running).toHaveLength(0); + expect(await statusOf(workId)).toMatchObject({ state: "finished" }); }); - it("should follow the pendingStart + pendingCancelation -> complete flow", async () => { - // Setup initial state - const workId = await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); + it("processes multiple work items concurrently within capacity", async () => { + await initialize({ maxParallelism: 5 }); + const ids = []; + for (let i = 0; i < 3; i++) ids.push(await enqueueWork()); - // Create work - const workId = await makeDummyWork(ctx, { attempts: 0 }); + await runMain(); - // Create pendingStart - await ctx.db.insert("pendingStart", { - workId, - segment: 1n, - }); + const o = await observe(); + expect(o.running).toHaveLength(3); + expect(new Set(o.running.map((r) => r.workId))).toEqual(new Set(ids)); + }); + }); - // Create pendingCancelation - await ctx.db.insert("pendingCancelation", { - workId, - segment: 1n, - }); + // ──────────────────────────────────────────────────────────────────── + // Capacity: maxParallelism is respected + // ──────────────────────────────────────────────────────────────────── - return workId; - }); + describe("capacity", () => { + it("never starts more than maxParallelism in one iteration", async () => { + await initialize({ maxParallelism: 3 }); + for (let i = 0; i < 7; i++) await enqueueWork(); - // Run main loop to process pendingStart and pendingCancelation - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); + await runMain(); - // Verify work was canceled - await t.run(async (ctx) => { - // Check that pendingStart was deleted - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(0); - - // Check that pendingCancelation was deleted - const pendingCancelations = await ctx.db - .query("pendingCancelation") - .collect(); - expect(pendingCancelations).toHaveLength(0); - - // Check that work is not in running list - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - assert(state); - expect(state.running).toHaveLength(0); - expect(state.report.canceled).toBe(1); - - const work = await ctx.db.get("work", workId); - expect(work).not.toBeNull(); - expect(work!.canceled).toBe(true); - }); + const o = await observe(); + expect(o.running).toHaveLength(3); + expect(o.pendingStart).toHaveLength(7 - 3); }); - it("should follow the complete -> pendingCompletion -> pendingStart flow for retries", async () => { - // Setup initial state with a running job that will need retry - const workId = await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); - - // Create work with retry behavior - const workId = await makeDummyWork(ctx, { - attempts: 0, - retryBehavior: { - maxAttempts: 3, - initialBackoffMs: 1000, - base: 2, - }, - }); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); + it("picks up overflow on subsequent iterations as slots free", async () => { + await initialize({ maxParallelism: 2 }); + const ids = []; + for (let i = 0; i < 4; i++) ids.push(await enqueueWork()); + + await runMain(); + let o = await observe(); + expect(o.running).toHaveLength(2); + expect(o.pendingStart).toHaveLength(2); + + // Complete one running job; another should take its place. + const finished = o.running[0].workId; + await simulateCompletion( + finished, + { kind: "success", returnValue: null }, + 0, + ); + await runMain(); + + o = await observe(); + expect(o.running).toHaveLength(2); + expect(o.pendingStart).toHaveLength(1); + // The completed one is gone. + expect(o.running.map((r) => r.workId)).not.toContain(finished); + }); - // Add to running list - const state = await ctx.db.query("internalState").unique(); - assert(state); - await ctx.db.patch("internalState", state._id, { - running: [{ workId, scheduledId, started: Date.now() }], + it("does not start new work when running.length already exceeds maxParallelism", async () => { + // Edge case: maxParallelism was lowered while jobs were running. + await initialize({ maxParallelism: 2 }); + // Pre-populate state.running with 4 entries. + const runningIds: { + workId: Id<"work">; + scheduledId: Id<"_scheduled_functions">; + }[] = []; + for (let i = 0; i < 4; i++) { + const workId = await t.run(async (ctx) => { + return ctx.db.insert("work", { + fnType: "action", + fnHandle: "h", + fnName: "h", + fnArgs: {}, + attempts: 0, + }); }); - - return workId; - }); - - // Complete the work with failure (workerRunning -> complete) - await t.mutation(internal.complete.complete, { - jobs: [ - { + const scheduledId = await t.run(async (ctx) => { + return ctx.scheduler.runAfter(0, internal.worker.runActionWrapper, { workId, - runResult: { kind: "failed", error: "Test error" }, + fnHandle: "h", + fnArgs: {}, + logLevel: "WARN", attempt: 0, - }, - ], - }); - - // Verify pendingCompletion was created with retry=true + }); + }); + runningIds.push({ workId, scheduledId }); + } await t.run(async (ctx) => { - const pendingCompletions = await ctx.db - .query("pendingCompletion") - .collect(); - expect(pendingCompletions).toHaveLength(1); - expect(pendingCompletions[0].workId).toBe(workId); - expect(pendingCompletions[0].runResult.kind).toBe("failed"); - expect(pendingCompletions[0].retry).toBe(true); + const s = await ctx.db.query("internalState").unique(); + assert(s); + await ctx.db.patch("internalState", s._id, { + running: runningIds.map((r) => ({ + ...r, + started: Date.now(), + })), + }); }); + // New pending work arrives while we're already over capacity. + await enqueueWork(); - // Run main loop to process pendingCompletion -> pendingStart - await t.mutation(internal.loop.main, { - generation: 1n, - segment: getNextSegment(), - }); + await runMain(); - // Verify work is now in pendingStart for retry - await t.run(async (ctx) => { - // Check that pendingCompletion was deleted - const pendingCompletions = await ctx.db - .query("pendingCompletion") - .collect(); - expect(pendingCompletions).toHaveLength(0); - - // Check that pendingStart was created for retry - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(1); - expect(pendingStarts[0].workId).toBe(workId); - - // Check that work still exists - const work = await ctx.db.get("work", workId); - expect(work).not.toBeNull(); - expect(work!.attempts).toBe(1); - }); + const o = await observe(); + // No new starts — already over capacity. + expect(o.running).toHaveLength(4); + expect(o.pendingStart).toHaveLength(1); }); }); - describe("status transitions", () => { - it("should transition from idle to running when work is enqueued", async () => { - // Setup initial idle state - await t.run(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create idle runStatus - await ctx.db.insert("runStatus", { - state: { kind: "idle", generation: 1n }, - }); - }); - - // Enqueue work - await t.mutation(api.lib.enqueue, { - fnHandle: "testHandle", - fnName: "testFunction", - fnArgs: { test: true }, - fnType: "mutation", - runAt: Date.now(), - config: { - maxParallelism: 10, - logLevel: "INFO", + // ──────────────────────────────────────────────────────────────────── + // Retry: failed work is retried per the retry policy + // ──────────────────────────────────────────────────────────────────── + + describe("retry", () => { + it("re-enqueues a failed job that has a retry policy with attempts left", async () => { + await initialize(); + const workId = await enqueueWork({ + retryBehavior: { + maxAttempts: 3, + initialBackoffMs: 100, + base: 2, }, }); + await runMain(); - // Verify state transition to running - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("running"); - }); - }); - - it("should transition from running to scheduled when all work is started and there's leftover capacity", async () => { - // Setup initial running state with work - await t.run(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); + // Worker reports failure on first attempt. + await simulateCompletion(workId, { kind: "failed", error: "boom" }, 0); + await runMain(); - // Create work - const workId = await makeDummyWork(ctx); - - // Create pendingStart - await ctx.db.insert("pendingStart", { - workId, - segment: 1n, - }); + // Work doc still exists; pendingStart was re-inserted with backoff segment. + const o = await observe(); + expect(o.pendingStart).toHaveLength(1); + expect(o.pendingStart[0].workId).toBe(workId); + expect(await statusOf(workId)).toMatchObject({ + state: "pending", + previousAttempts: 1, }); + }); - // Run main loop to process the work - await t.mutation(internal.loop.main, { - generation: 1n, - segment: getNextSegment(), + it("does NOT re-enqueue a failed job that was canceled before retry processed", async () => { + await initialize(); + const workId = await enqueueWork({ + retryBehavior: { + maxAttempts: 3, + initialBackoffMs: 100, + base: 2, + }, }); + await runMain(); - // Advance clock past the 5s cooldown so cursors are stale - vi.setSystemTime(Date.now() + STATUS_COOLDOWN + 1000); + // Worker reports failure (would normally retry). + await simulateCompletion(workId, { kind: "failed", error: "boom" }, 0); + // Cancel arrives before main can process the retry. + await t.mutation(api.lib.cancel, { id: workId }); - // Run updateRunStatus to transition to scheduled - await t.mutation(internal.loop.updateRunStatus, { - generation: 2n, - segment: getNextSegment(), - }); + await runMain(); - // Verify state transition to scheduled - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("scheduled"); - assert(runStatus.state.kind === "scheduled"); - expect(runStatus.state.saturated).toBe(false); - }); + const o = await observe(); + // Loop's direct effect: no retry was queued, work is marked canceled. + // (A follow-up `complete` mutation is scheduled to finalize the work + // doc deletion — that's complete.ts's responsibility, not the loop's.) + expect(o.pendingStart).toHaveLength(0); + const work = await t.run(async (ctx) => ctx.db.get("work", workId)); + expect(work?.canceled).toBe(true); }); + }); - it("should transition from running to saturated when maxed out", async () => { - // Setup initial running state with max capacity - await setMaxParallelism(1); - const segment = getCurrentSegment(); - await t.run(async (ctx) => { - // Create work item - const workId = await makeDummyWork(ctx); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); - - // Create internal state with running job - await insertInternalState(ctx, { - running: [{ workId, scheduledId, started: Date.now() }], - }); - - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); + // ──────────────────────────────────────────────────────────────────── + // Cancellation + // ──────────────────────────────────────────────────────────────────── + + describe("cancellation", () => { + it("removes a pendingStart cancellation before the work runs", async () => { + await initialize(); + const workId = await enqueueWork(); + await t.mutation(api.lib.cancel, { id: workId }); + + await runMain(); + + const o = await observe(); + expect(o.pendingStart).toHaveLength(0); + expect(o.running).toHaveLength(0); + // Work is marked canceled by the loop. Final deletion happens when + // the scheduled `complete` mutation runs (separate concern). + const work = await t.run(async (ctx) => ctx.db.get("work", workId)); + expect(work?.canceled).toBe(true); + }); - // Create another pendingStart to exceed capacity - const anotherWorkId = await makeDummyWork(ctx); + it("marks an already-running work as canceled", async () => { + await initialize(); + const workId = await enqueueWork(); + await runMain(); // start it + expect((await observe()).running).toHaveLength(1); - await ctx.db.insert("pendingStart", { - workId: anotherWorkId, - segment, - }); - }); + await t.mutation(api.lib.cancel, { id: workId }); + await runMain(); // process the cancellation - // Run updateRunStatus to transition to scheduled with saturated=true - await t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment, - }); - - // Verify state transition to scheduled with saturated=true - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("scheduled"); - assert(runStatus.state.kind === "scheduled"); - expect(runStatus.state.saturated).toBe(true); - }); + const work = await t.run(async (ctx) => ctx.db.get("work", workId)); + expect(work?.canceled).toBe(true); }); - it("should transition from scheduled to running when new work is enqueued", async () => { - // Setup initial scheduled state - await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); + it("is a graceful no-op for already-finished work", async () => { + await initialize(); + const workId = await enqueueWork(); + await runMain(); + await simulateCompletion( + workId, + { kind: "success", returnValue: null }, + 0, + ); + await runMain(); + + // Work doc already gone — cancel should not throw. + await t.mutation(api.lib.cancel, { id: workId }); + const o = await observe(); + expect(o.pendingCancelation).toHaveLength(0); + }); + }); - // Schedule main loop - const scheduledId = await ctx.scheduler.runAfter( - 1000, - internal.loop.main, - { generation: 1n, segment: getNextSegment() + 10n }, - ); + // ──────────────────────────────────────────────────────────────────── + // Lifecycle: runStatus transitions + // ──────────────────────────────────────────────────────────────────── - // Create scheduled runStatus - await ctx.db.insert("runStatus", { - state: { - kind: "scheduled", - segment: getNextSegment() + 10n, - scheduledId, - saturated: false, - generation: 1n, - }, - }); + describe("lifecycle", () => { + it("transitions running -> idle when there's nothing to do (past cooldown)", async () => { + await initialize(); + // No pending work, cursors at 0 → far in the past, past cooldown. + vi.setSystemTime(Date.now() + STATUS_COOLDOWN + SECOND); - return scheduledId; - }); + await runMain(); - // Enqueue work to trigger transition to running - await t.mutation(api.lib.enqueue, { - fnHandle: "testHandle", - fnName: "testFunction", - fnArgs: { test: true }, - fnType: "mutation", - runAt: Date.now(), - config: { - maxParallelism: 10, - logLevel: "INFO", - }, - }); - - // Verify state transition to running - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("running"); - }); + expect((await observe()).runStatus).toMatchObject({ kind: "idle" }); }); - it("should transition from running to idle when all work is done", async () => { - const segment = getNextSegment(); - // Setup initial running state with work - const workId = await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create running runStatus - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); + it("stays running during the cooldown window", async () => { + await initialize(); + const workId = await enqueueWork(); + await runMain(); // process the work; cursors advance to ~now + + // Complete it so there's no work in flight. + await simulateCompletion( + workId, + { kind: "success", returnValue: null }, + 0, + ); + await runMain(); // process completion; cursors at ~now + + // Within cooldown — should stay running. + const o = await observe(); + expect(o.runStatus).toMatchObject({ kind: "running" }); + }); - // Create work - const workId = await makeDummyWork(ctx, { attempts: 0 }); + it("transitions to scheduled (saturated=false) when only future-scheduled work remains", async () => { + await initialize(); + // A retry-style pendingStart in the future. + const future = getCurrentSegment() + 1000n; + await enqueueWork({}, future); + // Cursors at 0 → past cooldown, so we're not held in cooldown. + + await runMain(); + + const o = await observe(); + expect(o.runStatus?.kind).toBe("scheduled"); + if (o.runStatus?.kind === "scheduled") { + expect(o.runStatus.segment).toBeLessThanOrEqual(future); + // No running jobs; capacity isn't full → saturated must be false. + expect(o.runStatus.saturated).toBe(false); + } + }); - // Create pendingStart - await ctx.db.insert("pendingStart", { - workId, - segment, - }); + it("doesn't lose work when re-checking before going idle", async () => { + // Snapshot-then-confirm safety net: even if the snapshot shows no + // work, the runQuery confirmation should pick up data committed + // before this iteration started. + await initialize(); + const workId = await enqueueWork(); - return workId; - }); + await runMain(); - // Run main loop to process the work - await t.mutation(internal.loop.main, { generation: 1n, segment }); + const o = await observe(); + // The work was started, NOT lost to a "go idle" decision. + expect(o.running.map((r) => r.workId)).toEqual([workId]); + }); + }); - // Complete the work - await t.mutation(internal.complete.complete, { - jobs: [ - { + // ──────────────────────────────────────────────────────────────────── + // Saturated state: scheduled with running.length == maxParallelism + // The flag changes how kickMainLoop behaves (no enqueue-kicks; yes + // completion-kicks). + // ──────────────────────────────────────────────────────────────────── + + describe("saturated", () => { + /** + * Pre-populate state.running with N entries, each backed by a real work + * doc + scheduled worker (so recovery checks don't fire). Useful for + * exercising main when the loop is already at-capacity. + */ + async function fillRunningTo(count: number): Promise[]> { + const ids: Id<"work">[] = []; + const entries: { + workId: Id<"work">; + scheduledId: Id<"_scheduled_functions">; + started: number; + }[] = []; + for (let i = 0; i < count; i++) { + const workId = await t.run(async (ctx) => + ctx.db.insert("work", { + fnType: "action", + fnHandle: "test_handle", + fnName: "test_handle", + fnArgs: {}, + attempts: 0, + }), + ); + const scheduledId = await t.run(async (ctx) => + ctx.scheduler.runAfter(0, internal.worker.runActionWrapper, { workId, - runResult: { kind: "success", returnValue: null }, + fnHandle: "test_handle", + fnArgs: {}, + logLevel: "WARN", attempt: 0, - }, - ], + }), + ); + ids.push(workId); + entries.push({ workId, scheduledId, started: Date.now() }); + } + await t.run(async (ctx) => { + const s = await ctx.db.query("internalState").unique(); + assert(s); + await ctx.db.patch("internalState", s._id, { running: entries }); }); + return ids; + } - // Run main loop again to process the completion - await t.mutation(internal.loop.main, { generation: 2n, segment }); - - // Advance clock past the 5s cooldown so cursors are stale - vi.setSystemTime(Date.now() + STATUS_COOLDOWN + 1000); + it("records saturated=true when transitioning to scheduled at full capacity", async () => { + await initialize({ maxParallelism: 3 }); + // Fill to capacity. No completions, no future starts → main has + // nothing to do this iteration but jobs are running, so it should + // schedule itself (e.g. for recovery) with saturated=true. + await fillRunningTo(3); - // Run updateRunStatus to transition to idle - await t.mutation(internal.loop.updateRunStatus, { - generation: 3n, - segment, - }); + await runMain(); - // Verify state transition to idle - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("idle"); - assert(runStatus.state.kind === "idle"); - }); + const o = await observe(); + assert(o.runStatus); + expect(o.runStatus.kind).toBe("scheduled"); + if (o.runStatus.kind === "scheduled") { + expect(o.runStatus.saturated).toBe(true); + } }); - it("should transition from scheduled to running when main loop runs", async () => { - const segment = getNextSegment(); - await t.run(async (ctx) => { - await insertInternalState(ctx); - const scheduledId = await ctx.scheduler.runAfter( - 1000, - internal.loop.main, - { generation: 1n, segment }, - ); + it("records saturated=false when scheduling with under-capacity running jobs", async () => { + await initialize({ maxParallelism: 5 }); + // Fewer running jobs than max → not saturated. + await fillRunningTo(2); - await ctx.db.insert("runStatus", { - state: { - kind: "scheduled", - scheduledId, - generation: 1n, - segment, - saturated: false, - }, - }); - }); - // Run main loop - await t.mutation(internal.loop.main, { generation: 1n, segment }); + await runMain(); - // Verify state transition to running - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - assert(runStatus); - expect(runStatus.state.kind).toBe("running"); - }); + const o = await observe(); + assert(o.runStatus); + expect(o.runStatus.kind).toBe("scheduled"); + if (o.runStatus.kind === "scheduled") { + expect(o.runStatus.saturated).toBe(false); + } }); - }); - - describe("main function", () => { - it("should handle generation mismatch", async () => { - // Setup state with different generation - await t.run(async (ctx) => { - await insertInternalState(ctx, { generation: 2n }); - }); - // Call main with mismatched generation - await expect( - t.mutation(internal.loop.main, { generation: 1n, segment: 1n }), - ).rejects.toThrow("generation mismatch"); + it("clears saturated when a completion frees a slot", async () => { + // Saturated → completion arrives → kick wakes main → main runs and + // sees a freed slot → next scheduled state has saturated=false. + await initialize({ maxParallelism: 2 }); + const ids = await fillRunningTo(2); + await runMain(); // first transition: scheduled, saturated=true + expect((await observe()).runStatus).toMatchObject({ + kind: "scheduled", + saturated: true, + }); + + // A worker completes — frees a slot. + await simulateCompletion( + ids[0], + { kind: "success", returnValue: null }, + 0, + ); + await runMain(); + + const o = await observe(); + assert(o.runStatus); + // After processing the completion, running.length is 1 < 2, so any + // subsequent scheduled state should NOT be saturated. + if (o.runStatus.kind === "scheduled") { + expect(o.runStatus.saturated).toBe(false); + } else { + // Or we might be in 'running' (within cooldown) — also fine; just + // ensure we did not stay saturated=true. + expect(o.runStatus.kind).not.toBe("idle"); + } }); - it("should process pending completions", async () => { - // Setup state with a running job - await t.run(async (ctx) => { - // Create a work item for the running list - const workId = await makeDummyWork(ctx); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); - - // Create internal state - await insertInternalState(ctx, { - running: [{ workId, scheduledId, started: 900000 }], - }); + it("does not start new work while saturated, even when pendingStart accumulates", async () => { + // Demonstrates that the capacity-aware query honors the running cap: + // when running == max, getPendingWork returns zero starts, so new + // enqueues sit in pendingStart until a slot opens. + await initialize({ maxParallelism: 2 }); + await fillRunningTo(2); + + // New work arrives while saturated. + const newWorkId = await enqueueWork(); + + await runMain(); + + const o = await observe(); + // No new starts — we're at max capacity. + expect(o.running).toHaveLength(2); + expect(o.pendingStart.map((p) => p.workId)).toContain(newWorkId); + assert(o.runStatus); + if (o.runStatus.kind === "scheduled") { + expect(o.runStatus.saturated).toBe(true); + } + }); - // Create pending completion - await ctx.db.insert("pendingCompletion", { - workId, - runResult: { kind: "success", returnValue: null }, - segment: 1n, - retry: false, - }); - }); - - // Call main - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); - - // Verify completion was processed - await t.run(async (ctx) => { - // Check that pendingCompletion was deleted - const completions = await ctx.db.query("pendingCompletion").collect(); - expect(completions).toHaveLength(0); - - // Check that work was removed from running list - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - assert(state); - expect(state.running).toHaveLength(0); - expect(state.report.completed).toBe(1); - expect(state.report.succeeded).toBe(1); + it("stays saturated when a completion frees a slot but more work is waiting", async () => { + // Externally observable: a completion arriving while saturated, with + // more pendingStart queued, should leave runStatus = scheduled + + // saturated=true. The freed slot gets refilled from pendingStart in + // the same iteration, so running.length stays at max and the visible + // saturated state doesn't drop. + await initialize({ maxParallelism: 2 }); + const ids = await fillRunningTo(2); + // Two more items waiting behind the at-capacity loop. + await enqueueWork(); + await enqueueWork(); + + // First main iteration arrives at the saturated end state. + await runMain(); + expect((await observe()).runStatus).toMatchObject({ + kind: "scheduled", + saturated: true, + }); + + // A worker completes — frees a slot, but pendingStart still has work. + await simulateCompletion( + ids[0], + { kind: "success", returnValue: null }, + 0, + ); + + // First iteration after the completion does work (processes + // completion + starts a new pending), so didWork=true and main + // self-reschedules with runStatus = "running". + await runMain(); + // Advance past the cooldown so the next iteration actually records + // the end-of-run state instead of holding "running" via cooldown. + vi.setSystemTime(Date.now() + STATUS_COOLDOWN + SECOND); + await runMain(); + + const o = await observe(); + assert(o.runStatus); + // Slot was refilled from pendingStart → running back at max → + // saturated=true is the externally observed state again. + expect(o.running).toHaveLength(2); + expect(o.runStatus).toMatchObject({ + kind: "scheduled", + saturated: true, }); }); + }); - it("should handle job retries", async () => { - // Setup state with a job that needs retry - const workId = await t.run>(async (ctx) => { - // Create a work item for the running list - const workId = await makeDummyWork(ctx, { - attempts: 1, - retryBehavior: { - maxAttempts: 3, - initialBackoffMs: 1000, - base: 2, - }, - }); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); + // ──────────────────────────────────────────────────────────────────── + // Recovery: stuck running jobs get cleaned up + // ──────────────────────────────────────────────────────────────────── - // Create internal state - await insertInternalState(ctx, { + describe("recovery", () => { + it("flags running entries whose worker has been silent past the threshold", async () => { + await initialize(); + // Pre-populate state.running with an old entry. + const workId = await t.run(async (ctx) => { + const wid = await ctx.db.insert("work", { + fnType: "action", + fnHandle: "h", + fnName: "h", + fnArgs: {}, + attempts: 0, + }); + const scheduledId = await ctx.scheduler.runAfter( + 0, + internal.worker.runActionWrapper, + { + workId: wid, + fnHandle: "h", + fnArgs: {}, + logLevel: "WARN", + attempt: 0, + }, + ); + const s = await ctx.db.query("internalState").unique(); + assert(s); + await ctx.db.patch("internalState", s._id, { running: [ { - workId, + workId: wid, scheduledId, - started: 900000, + // Started 10 minutes ago — past 5-minute recovery threshold. + started: Date.now() - 10 * MINUTE, }, ], - }); - - // Create pending completion with failed result - await ctx.db.insert("pendingCompletion", { - workId, - runResult: { kind: "failed", error: "test error" }, - segment: 1n, - retry: true, - }); - - return workId; - }); - - // Call main - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); - - // Verify job was retried - await t.run(async (ctx) => { - // Check that pendingCompletion was deleted - const completions = await ctx.db.query("pendingCompletion").collect(); - expect(completions).toHaveLength(0); - - // Check that work was updated - const work = await ctx.db.get("work", workId); - expect(work).toBeDefined(); - expect(work!.attempts).toBe(1); - - // Check that a new pendingStart was created - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(1); - expect(pendingStarts[0].workId).toBe(workId); - - // Check that report was updated - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.report.retries).toBe(1); - }); - }); - - it("should process pending cancelations", async () => { - // Setup state with a pending cancelation - const workId = await t.run>(async (ctx) => { - // Create a work item for the running list - const runningWorkId = await makeDummyWork(ctx); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction( - ctx, - runningWorkId, - ); - - // Create internal state - await insertInternalState(ctx, { - running: [{ workId: runningWorkId, scheduledId, started: 900000 }], - }); - - // Create work - const workId = await makeDummyWork(ctx, { - retryBehavior: { - maxAttempts: 3, - initialBackoffMs: 1000, - base: 2, - }, - }); - - // Create pending start - await ctx.db.insert("pendingStart", { - workId, - segment: 1n, - }); - - // Create pending cancelation - await ctx.db.insert("pendingCancelation", { - workId, - segment: 1n, - }); - - return workId; - }); - - // Call main - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); - - // Verify cancelation was processed - await t.run(async (ctx) => { - // Check that pendingCancelation was deleted - const cancelations = await ctx.db.query("pendingCancelation").collect(); - expect(cancelations).toHaveLength(0); - - // Check that pendingStart was deleted - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(0); - - const work = await ctx.db.get("work", workId); - expect(work).toBeDefined(); - expect(work!.canceled).toBe(true); - - // Check that report was updated - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.report.canceled).toBe(1); - }); - }); - - it("should schedule new work", async () => { - // Setup state with pending start items - const workId = await t.run>(async (ctx) => { - // Create internal state - await insertInternalState(ctx); - - // Create work - const workId = await makeDummyWork(ctx); - - // Create pending start - await ctx.db.insert("pendingStart", { - workId, - segment: 1n, - }); - - return workId; - }); - - // Call main - await t.mutation(internal.loop.main, { generation: 1n, segment: 1n }); - - // Verify work was started - await t.run(async (ctx) => { - // Check that pendingStart was deleted - const pendingStarts = await ctx.db.query("pendingStart").collect(); - expect(pendingStarts).toHaveLength(0); - - // Check that work was added to running list - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.running).toHaveLength(1); - expect(state!.running[0].workId).toBe(workId); - }); - }); - - it("should schedule recovery for old jobs", async () => { - // Setup state with old running jobs - const oldTime = Date.now() - 5 * 60 * 1000 - 1000; // Older than recovery threshold - - await t.run(async (ctx) => { - // Create work for the running list - const workId = await makeDummyWork(ctx); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); - - // Create internal state with old job - await insertInternalState(ctx, { + // Force recovery to be eligible to run this iteration. lastRecovery: 0n, - running: [{ workId, scheduledId, started: oldTime }], }); + return wid; }); - // Call main - const segment = toSegment(60 * 60 * 1000); - await t.mutation(internal.loop.main, { - generation: 1n, - segment, - }); + await runMain(); - // Verify recovery was scheduled - await t.run(async (ctx) => { - // Check that lastRecovery was updated - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.lastRecovery).toBe(segment); - - // We can't directly check if recovery.recover was scheduled, - // but we can verify the state was updated correctly - }); + // We can't directly verify "recovery was scheduled" without inspecting + // the scheduler queue, but we can verify lastRecovery was advanced. + const after = await observe(); + const state = await t.run(async (ctx) => + ctx.db.query("internalState").unique(), + ); + assert(state); + expect(state.lastRecovery).toBeGreaterThan(0n); + // Work is still in running (recovery removes it via complete, which + // happens in a separately-scheduled mutation). + expect(after.running.map((r) => r.workId)).toContain(workId); }); }); - describe("updateRunStatus function", () => { - it("should handle generation mismatch", async () => { - // Setup state with different generation - await t.run(async (ctx) => { - await insertInternalState(ctx, { generation: 2n }); - }); + // ──────────────────────────────────────────────────────────────────── + // Generation safety: stale main calls cannot clobber state + // ──────────────────────────────────────────────────────────────────── - // Call updateRunStatus with mismatched generation + describe("generation safety", () => { + it("rejects main calls with the wrong generation", async () => { + await initialize(); + // Current generation is 1n. Calling with 99n should error. await expect( - t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment: 1n, - }), - ).rejects.toThrow("generation mismatch"); - }); - - it("should schedule main immediately if there are outstanding cancelations", async () => { - // Setup state with outstanding cancelations - await t.run(async (ctx) => { - // Create work for cancelation - const workId = await makeDummyWork(ctx); - - // Create internal state - await insertInternalState(ctx, {}); - - // Create run status - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); - - // Create pending cancelation - await ctx.db.insert("pendingCancelation", { - workId, - segment: 1n, - }); - }); - - // Call updateRunStatus - await t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment: 1n, - }); - - // Verify main was scheduled (indirectly by checking runStatus) - await t.run(async (ctx) => { - // We can't directly check if main was scheduled, - // but we can verify the state was updated correctly - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - // The state should no longer be idle - expect(runStatus!.state.kind).not.toBe("idle"); - }); - }); - - it("should transition to idle state when there is no work", async () => { - // Setup state with no work - await t.run(async (ctx) => { - // Create internal state with no running jobs - await insertInternalState(ctx, {}); - - // Create run status in running state - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); - }); - - // Call updateRunStatus - await t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment: 1n, - }); - - // Verify idle state was set - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - expect(runStatus!.state.kind).toBe("idle"); - assert(runStatus!.state.kind === "idle"); - expect(runStatus!.state.generation).toBe(1n); - }); + t.mutation(internal.loop.main, { generation: 99n }), + ).rejects.toThrow(/generation mismatch/); }); - it("should set saturated flag when at max capacity", async () => { - // Setup state with running jobs at max capacity - const now = getCurrentSegment(); - const later = now + 10n; - await setMaxParallelism(10); - await t.run(async (ctx) => { - // Create 10 work items and scheduled functions - const runningJobs = await Promise.all( - Array(10) - .fill(0) - .map(async () => { - const workId = await makeDummyWork(ctx); - - // Schedule a function and get its ID - const scheduledId = await makeDummyScheduledFunction(ctx, workId); - - return { workId, scheduledId, started: Date.now() }; - }), - ); - - // Create internal state with max running jobs - await insertInternalState(ctx, { - running: runningJobs, - }); - - // Create run status - await ctx.db.insert("runStatus", { - state: { kind: "running" }, - }); - - // Create future completion to trigger scheduling - await ctx.db.insert("pendingCompletion", { - workId: runningJobs[0].workId, - runResult: { kind: "success", returnValue: null }, - segment: later, - retry: false, - }); - }); - - // Call updateRunStatus - await t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment: 1n, - }); - - // Verify scheduled state was set with saturated flag - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - expect(runStatus).toBeDefined(); - expect(runStatus!.state.kind).toBe("scheduled"); - assert(runStatus!.state.kind === "scheduled"); - expect(runStatus!.state.saturated).toBe(true); - }); + it("increments the generation each time main runs", async () => { + await initialize(); + const before = (await observe()).generation; + await runMain(); + const after = (await observe()).generation; + expect(after).toBeGreaterThan(before); }); + }); - it("should reset cursors correctly when there's old work detected", async () => { - // Setup state with old work - const now = getCurrentSegment(); - await t.run(async (ctx) => { - // Create internal state with old work - await insertInternalState(ctx, { - segmentCursors: { - incoming: now - 1n, - completion: now - 1n, - cancelation: now - 1n, - }, + // ──────────────────────────────────────────────────────────────────── + // Snapshot semantics: the snapshot-then-confirm safety net + // ──────────────────────────────────────────────────────────────────── + + describe("snapshot semantics", () => { + it("the snapshot read does not see the calling mutation's pending writes", async () => { + // Verifies the prototype's distinguishing feature: + // runSnapshotQuery from inside a mutation does NOT see writes the + // mutation has performed. ctx.runQuery does. This is what makes + // the snapshot-then-confirm pattern correct. + const { runSnapshotQuery } = await import("./future.js"); + const result = await t.run(async (ctx) => { + const workId = await ctx.db.insert("work", { + fnType: "action", + fnHandle: "h", + fnName: "h", + fnArgs: {}, + attempts: 0, }); - }); - - // Insert very old work - await t.run(async (ctx) => { - const workId = await makeDummyWork(ctx); await ctx.db.insert("pendingStart", { workId, - segment: 0n, + segment: getCurrentSegment(), }); - }); - - // Call updateRunStatus - await t.mutation(internal.loop.updateRunStatus, { - generation: 1n, - segment: now, - }); - - // Verify cursors were reset - await t.run(async (ctx) => { - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.segmentCursors.incoming).toBe(0n); - }); - - // Set maxParallelism to 0 so it doesn't schedule anything / make progress - await setMaxParallelism(0); - - // Run main - await t.mutation(internal.loop.main, { - generation: 1n, - segment: now, - }); - - // Verify start cursor weren't updated - await t.run(async (ctx) => { - const state = await ctx.db.query("internalState").unique(); - expect(state).toBeDefined(); - expect(state!.segmentCursors.incoming).toBe(0n); - }); - }); - }); - - describe("complete function", () => { - it("should run onComplete handlers and delete work", async () => { - // Setup mock work with onComplete handler - const workId = await t.run>(async (ctx) => { - const workId = await makeDummyWork(ctx, { - attempts: 0, - onComplete: { - // TODO: make this a real handle - fnHandle: "onComplete_handle", - context: { data: "test" }, - }, + const snap = await runSnapshotQuery(internal.loop.getPendingWork, { + completionCursor: 0n, + cancelationCursor: 0n, + incomingCursor: 0n, + maxParallelism: 10, + runningCount: 0, }); - return workId; - }); - - // Call complete - await t.mutation(internal.complete.complete, { - jobs: [ - { - workId, - runResult: { kind: "success", returnValue: null }, - attempt: 0, - }, - ], - }); - - // Verify work was deleted - await t.run(async (ctx) => { - const work = await ctx.db.get("work", workId); - expect(work).toBeNull(); - }); - }); - - it("should handle missing work gracefully", async () => { - // Call complete with non-existent work ID - const workId = await t.run(async (ctx) => { - const id = await makeDummyWork(ctx, { attempts: 0 }); - await ctx.db.delete("work", id); - return id; - }); - await t.mutation(internal.complete.complete, { - jobs: [ - { - workId, - runResult: { kind: "success", returnValue: null }, - attempt: 0, - }, - ], - }); - - // No error should be thrown - }); - }); - - describe("status cooldown", () => { - it("should stay running within the cooldown window", async () => { - const segment = getNextSegment(); - await t.run(async (ctx) => { - await insertInternalState(ctx); - await ctx.db.insert("runStatus", { state: { kind: "running" } }); - - const workId = await makeDummyWork(ctx); - await ctx.db.insert("pendingStart", { workId, segment }); - }); - - // Process the work - await t.mutation(internal.loop.main, { generation: 1n, segment }); - - // Advance less than the cooldown - vi.setSystemTime(Date.now() + STATUS_COOLDOWN - 1000); - - // updateRunStatus should schedule main again (staying running) - await t.mutation(internal.loop.updateRunStatus, { - generation: 2n, - segment, - }); - - // runStatus should still be "running" — no transition - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - assert(runStatus); - expect(runStatus.state.kind).toBe("running"); + const real = await ctx.runQuery(internal.loop.getPendingWork, { + completionCursor: 0n, + cancelationCursor: 0n, + incomingCursor: 0n, + maxParallelism: 10, + runningCount: 0, + }); + return { snap: snap.allStarts.length, real: real.allStarts.length }; }); + expect(result.snap).toBe(0); + expect(result.real).toBe(1); }); - it("should transition after the cooldown expires", async () => { - const segment = getNextSegment(); - await t.run(async (ctx) => { - await insertInternalState(ctx); - await ctx.db.insert("runStatus", { state: { kind: "running" } }); - - const workId = await makeDummyWork(ctx); - await ctx.db.insert("pendingStart", { workId, segment }); - }); - - // Process the work - await t.mutation(internal.loop.main, { generation: 1n, segment }); + it("processes work that was committed before main started", async () => { + // The snapshot read is at a later snapshot than the inserts, + // so it sees them. This is the common case. + await initialize(); + const workId = await enqueueWork(); - // Advance past the cooldown - vi.setSystemTime(Date.now() + STATUS_COOLDOWN + 1000); + await runMain(); - // Now it should transition - await t.mutation(internal.loop.updateRunStatus, { - generation: 2n, - segment, - }); - - await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - assert(runStatus); - // Should have transitioned out of running (to scheduled or idle) - expect(runStatus.state.kind).not.toBe("running"); - }); + expect((await observe()).running.map((r) => r.workId)).toEqual([workId]); }); + }); - it("should pick up new work arriving during cooldown without a kick", async () => { - const segment = getNextSegment(); - await t.run(async (ctx) => { - await insertInternalState(ctx); - await ctx.db.insert("runStatus", { state: { kind: "running" } }); - - const workId = await makeDummyWork(ctx); - await ctx.db.insert("pendingStart", { workId, segment }); - }); - - // Process wave 1 - await t.mutation(internal.loop.main, { generation: 1n, segment }); - - // Advance 1 second (within cooldown) - vi.setSystemTime(Date.now() + 1000); - const segment2 = getNextSegment(); + // ──────────────────────────────────────────────────────────────────── + // Backwards compatibility with the pre-merge API + // ──────────────────────────────────────────────────────────────────── - // updateRunStatus during cooldown — schedules main for next segment - await t.mutation(internal.loop.updateRunStatus, { - generation: 2n, - segment, - }); - - // Enqueue wave 2 while the loop is still warm - await t.run(async (ctx) => { - const workId2 = await makeDummyWork(ctx); - await ctx.db.insert("pendingStart", { - workId: workId2, - segment: segment2, - }); - }); + describe("backwards compatibility", () => { + it("main accepts (and ignores) a legacy `segment` arg", async () => { + await initialize(); + const workId = await enqueueWork(); - // The scheduled main from cooldown should pick up wave 2 + // The legacy callsites pass `segment`; the new main treats it as + // optional. Calls should still process work as expected. await t.mutation(internal.loop.main, { - generation: 2n, - segment: segment2, + generation: 1n, + segment: 12345n, }); - // Verify both items processed - await t.run(async (ctx) => { - const state = await ctx.db.query("internalState").unique(); - assert(state); - expect(state.running).toHaveLength(2); - // pendingStart should be empty - const pending = await ctx.db.query("pendingStart").collect(); - expect(pending).toHaveLength(0); - }); + expect((await observe()).running.map((r) => r.workId)).toEqual([workId]); }); - it("bursty throughput: multiple waves processed without going idle", async () => { - const WAVE_COUNT = 3; - const TASKS_PER_WAVE = 3; - const WAVE_GAP_MS = 1000; // 1s between waves, well within 5s cooldown - - await t.run(async (ctx) => { - await insertInternalState(ctx); - await ctx.db.insert("runStatus", { state: { kind: "running" } }); - }); - - let generation = 1n; - const statusChecks: string[] = []; - - for (let wave = 0; wave < WAVE_COUNT; wave++) { - if (wave > 0) { - // Advance time between waves (within cooldown) - vi.setSystemTime(Date.now() + WAVE_GAP_MS); - } - - const waveSeg = getNextSegment(); - - // Enqueue tasks for this wave - await t.run(async (ctx) => { - for (let i = 0; i < TASKS_PER_WAVE; i++) { - const workId = await makeDummyWork(ctx); - await ctx.db.insert("pendingStart", { workId, segment: waveSeg }); - } - }); - - // Run main to process the wave - await t.mutation(internal.loop.main, { - generation, - segment: waveSeg, - }); - generation++; - - // Check status after updateRunStatus - await t.mutation(internal.loop.updateRunStatus, { - generation, - segment: waveSeg, - }); - - const status = await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - assert(runStatus); - return runStatus.state.kind; - }); - statusChecks.push(status); - - // If main was scheduled by cooldown, run it to advance generation - if (status === "running") { - // The cooldown scheduled main for next segment — run it so - // generation stays consistent for the next wave. - const nextSeg = getNextSegment(); - await t.mutation(internal.loop.main, { - generation, - segment: nextSeg, - }); - generation++; - } - } - - // During the cooldown window, every wave should see "running" - for (let i = 0; i < WAVE_COUNT; i++) { - expect(statusChecks[i]).toBe("running"); - } - - // After the cooldown expires, updateRunStatus should transition. - // Don't run main again — that would refresh the cursors. - vi.setSystemTime(Date.now() + STATUS_COOLDOWN + 1000); - + it("updateRunStatus schedules a main call (forwards in-flight upgrade traffic)", async () => { + await initialize(); + // A pre-upgrade scheduled call lands here after deploy. await t.mutation(internal.loop.updateRunStatus, { - generation, - segment: getNextSegment(), - }); - - const finalStatus = await t.run(async (ctx) => { - const runStatus = await ctx.db.query("runStatus").unique(); - assert(runStatus); - return runStatus.state.kind; - }); - // Should have transitioned out of running - expect(finalStatus).not.toBe("running"); + generation: 1n, + segment: 12345n, + }); + // The forwarder should have scheduled main; we don't drain the + // full pipeline (that's covered by the other tests). Just verify + // a main call was queued. + const scheduled = await t.run(async (ctx) => + ctx.db.system.query("_scheduled_functions").collect(), + ); + const mainCalls = scheduled.filter((s) => s.name.endsWith("loop:main")); + expect(mainCalls.length).toBeGreaterThan(0); }); }); }); From 8fc93f75ae9348476a8b17bc471cf87c7969ebd9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 01:47:16 -0700 Subject: [PATCH 07/14] no longer need to enqueue in the future --- src/component/complete.ts | 10 ++++++++-- src/component/kick.test.ts | 22 ++++++++-------------- src/component/kick.ts | 23 +++++++++++------------ src/component/lib.ts | 23 ++++++++++------------- 4 files changed, 37 insertions(+), 41 deletions(-) diff --git a/src/component/complete.ts b/src/component/complete.ts index 90daf44..5a4a9a4 100644 --- a/src/component/complete.ts +++ b/src/component/complete.ts @@ -5,7 +5,12 @@ import { internal } from "./_generated/api.js"; import { internalMutation, type MutationCtx } from "./_generated/server.js"; import { kickMainLoop } from "./kick.js"; import { createLogger } from "./logging.js"; -import { type OnCompleteArgs, type RunResult, vResult } from "./shared.js"; +import { + getCurrentSegment, + type OnCompleteArgs, + type RunResult, + vResult, +} from "./shared.js"; import { recordCompleted } from "./stats.js"; import { assert } from "convex-helpers"; @@ -192,7 +197,8 @@ export async function completeHandler( }), ); if (pendingCompletions.length > 0) { - const segment = await kickMainLoop(ctx, "complete"); + await kickMainLoop(ctx, "complete"); + const segment = getCurrentSegment(); await Promise.all( pendingCompletions.map((completion) => ctx.db.insert("pendingCompletion", { diff --git a/src/component/kick.test.ts b/src/component/kick.test.ts index ef3c33f..3edc4c9 100644 --- a/src/component/kick.test.ts +++ b/src/component/kick.test.ts @@ -16,7 +16,6 @@ import { modules } from "./setup.test.js"; import { DEFAULT_MAX_PARALLELISM, fromSegment, - getCurrentSegment, getNextSegment, toSegment, } from "./shared.js"; @@ -58,12 +57,11 @@ describe("kickMainLoop", () => { expect(runStatus.state.kind).toBe("running"); // Second kick should not change state - const segment = await kickMainLoop(ctx, "enqueue"); + await kickMainLoop(ctx, "enqueue"); const afterStatus = await ctx.db.query("runStatus").unique(); assert(afterStatus); expect(afterStatus.state.kind).toBe("running"); expect(afterStatus._id).toBe(runStatus._id); - expect(segment).toBe(getNextSegment()); }); }); @@ -100,8 +98,7 @@ describe("kickMainLoop", () => { }); // Kick should reschedule to run sooner - const segment = await kickMainLoop(ctx, "enqueue"); - expect(segment).toBe(getCurrentSegment()); + await kickMainLoop(ctx, "enqueue"); const afterStatus = await ctx.db.query("runStatus").unique(); assert(afterStatus); @@ -142,8 +139,7 @@ describe("kickMainLoop", () => { }); // Kick should not change state when saturated - const segment = await kickMainLoop(ctx, "enqueue"); - expect(segment).toBe(getNextSegment()); + await kickMainLoop(ctx, "enqueue"); const afterStatus = await ctx.db.query("runStatus").unique(); assert(afterStatus); expect(afterStatus.state.kind).toBe("scheduled"); @@ -195,16 +191,14 @@ describe("kickMainLoop", () => { test("handles race conditions between multiple kicks", async () => { const t = convexTest(schema, modules); - // Run kicks in separate transactions to simulate concurrent access - const segments = await Promise.all( + // Run kicks in separate transactions to simulate concurrent access. + // None should throw; the loser transactions just observe the winner's + // running state and return early. + await Promise.all( Array.from({ length: 10 }, () => - t.run(async (ctx) => { - const segment = await kickMainLoop(ctx, "enqueue"); - return segment; - }), + t.mutation((ctx) => kickMainLoop(ctx, "enqueue")), ), ); - expect(segments.filter((s) => s === getCurrentSegment())).toHaveLength(1); // Check final state in a new transaction await t.run(async (ctx) => { diff --git a/src/component/kick.ts b/src/component/kick.ts index 1520b55..d4c92f8 100644 --- a/src/component/kick.ts +++ b/src/component/kick.ts @@ -8,31 +8,29 @@ import { type Config, fromSegment, getCurrentSegment, - getNextSegment, SECOND, toSegment, } from "./shared.js"; /** - * Called from outside the loop. - * Returns the soonest segment to enqueue work for the main loop. + * Wakes the main loop if it isn't already running. No-ops when a wake-up + * wouldn't be productive (e.g. enqueue while saturated). */ export async function kickMainLoop( ctx: MutationCtx, source: "enqueue" | "cancel" | "complete" | "kick", config?: Config, -): Promise { +): Promise { const globals = config ?? (await getOrUpdateGlobals(ctx, config)); const console = createLogger(globals.logLevel); const runStatus = await getOrCreateRunStatus(ctx); - const next = getNextSegment(); // Only kick to run now if we're scheduled or idle. if (runStatus.state.kind === "running") { console.debug( `[${source}] main is actively running, so we don't need to kick it`, ); - return next; + return; } // main is scheduled to run later, so we should cancel it and reschedule. if (runStatus.state.kind === "scheduled") { @@ -40,19 +38,19 @@ export async function kickMainLoop( console.debug( `[${source}] main is saturated, so we don't need to kick it`, ); - return next; + return; } if (source === "complete" && !runStatus.state.saturated) { console.debug( `[${source}] main is not saturated, so kicking for completion isn't necessary`, ); - return next; + return; } if (runStatus.state.segment <= toSegment(Date.now() + SECOND)) { console.debug( `[${source}] main is scheduled to run soon enough, so we don't need to kick it`, ); - return next; + return; } console.debug( `[${source}] main is scheduled to run later, so reschedule it to run now`, @@ -74,12 +72,13 @@ export async function kickMainLoop( await ctx.db.patch("runStatus", runStatus._id, { state: { kind: "running" }, }); - const current = getCurrentSegment(); - const scheduledTime = boundScheduledTime(fromSegment(current), console); + const scheduledTime = boundScheduledTime( + fromSegment(getCurrentSegment()), + console, + ); await ctx.scheduler.runAt(scheduledTime, internal.loop.main, { generation: runStatus.state.generation, }); - return current; } export const forceKick = internalMutation({ diff --git a/src/component/lib.ts b/src/component/lib.ts index 2f67996..fd5dac8 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -19,7 +19,7 @@ import { boundScheduledTime, vConfig, fnType, - getNextSegment, + getCurrentSegment, max, vOnCompleteFnContext, retryBehavior, @@ -53,14 +53,13 @@ export const enqueue = mutation({ handler: async (ctx, { config, ...itemArgs }) => { const globals = await getOrUpdateGlobals(ctx, config); const console = createLogger(globals.logLevel); - const kickSegment = await kickMainLoop(ctx, "enqueue", globals); - return await enqueueHandler(ctx, console, kickSegment, itemArgs); + await kickMainLoop(ctx, "enqueue", globals); + return await enqueueHandler(ctx, console, itemArgs); }, }); async function enqueueHandler( ctx: MutationCtx, console: Logger, - kickSegment: bigint, { runAt, ...workArgs }: ObjectType, ) { runAt = boundScheduledTime(runAt, console); @@ -115,7 +114,7 @@ async function enqueueHandler( await ctx.db.insert("pendingStart", { workId, - segment: max(toSegment(runAt), kickSegment), + segment: max(toSegment(runAt), getCurrentSegment()), }); recordEnqueued(console, { workId, fnName: workArgs.fnName, runAt }); return workId; @@ -130,10 +129,8 @@ export const enqueueBatch = mutation({ handler: async (ctx, { config, items }) => { const globals = await getOrUpdateGlobals(ctx, config); const console = createLogger(globals.logLevel); - const kickSegment = await kickMainLoop(ctx, "enqueue", globals); - return Promise.all( - items.map((item) => enqueueHandler(ctx, console, kickSegment, item)), - ); + await kickMainLoop(ctx, "enqueue", globals); + return Promise.all(items.map((item) => enqueueHandler(ctx, console, item))); }, }); @@ -146,10 +143,10 @@ export const cancel = mutation({ const globals = await getOrUpdateGlobals(ctx, { logLevel }); const shouldCancel = await shouldCancelWorkItem(ctx, id, globals.logLevel); if (shouldCancel) { - const segment = await kickMainLoop(ctx, "cancel", globals); + await kickMainLoop(ctx, "cancel", globals); await ctx.db.insert("pendingCancelation", { workId: id, - segment, + segment: getCurrentSegment(), }); } }, @@ -176,10 +173,10 @@ export const cancelAll = mutation({ shouldCancelWorkItem(ctx, _id, globals.logLevel), ), ); - let segment = getNextSegment(); if (shouldCancel.some((c) => c)) { - segment = await kickMainLoop(ctx, "cancel", globals); + await kickMainLoop(ctx, "cancel", globals); } + const segment = getCurrentSegment(); await Promise.all( pageOfWork.map(({ _id }, index) => { if (shouldCancel[index]) { From 65616ebd9113b6ae4e228ceda2f97984fa425b4f Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 02:19:45 -0700 Subject: [PATCH 08/14] getPendingWork -> getPending --- src/component/loop.test.ts | 6 +++--- src/component/loop.ts | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/component/loop.test.ts b/src/component/loop.test.ts index a68bfa7..d5913c2 100644 --- a/src/component/loop.test.ts +++ b/src/component/loop.test.ts @@ -611,7 +611,7 @@ describe("loop", () => { it("does not start new work while saturated, even when pendingStart accumulates", async () => { // Demonstrates that the capacity-aware query honors the running cap: - // when running == max, getPendingWork returns zero starts, so new + // when running == max, getPending returns zero starts, so new // enqueues sit in pendingStart until a slot opens. await initialize({ maxParallelism: 2 }); await fillRunningTo(2); @@ -783,14 +783,14 @@ describe("loop", () => { workId, segment: getCurrentSegment(), }); - const snap = await runSnapshotQuery(internal.loop.getPendingWork, { + const snap = await runSnapshotQuery(internal.loop.getPending, { completionCursor: 0n, cancelationCursor: 0n, incomingCursor: 0n, maxParallelism: 10, runningCount: 0, }); - const real = await ctx.runQuery(internal.loop.getPendingWork, { + const real = await ctx.runQuery(internal.loop.getPending, { completionCursor: 0n, cancelationCursor: 0n, incomingCursor: 0n, diff --git a/src/component/loop.ts b/src/component/loop.ts index de4841c..510ad84 100644 --- a/src/component/loop.ts +++ b/src/component/loop.ts @@ -60,7 +60,7 @@ export const INITIAL_STATE: WithoutSystemFields> = { /** * Single query that returns everything the main loop needs to process. */ -export const getPendingWork = internalQuery({ +export const getPending = internalQuery({ args: { completionCursor: v.int64(), cancelationCursor: v.int64(), @@ -151,13 +151,13 @@ export const main = internalMutation({ }; // Snapshot read — no read dependency, no OCC conflicts. - console.time("[main] getPendingWork"); + console.time("[main] getPending"); const { allStarts, cancelations, completions } = await runSnapshotQuery( - internal.loop.getPendingWork, + internal.loop.getPending, queryArgs, ); const toStart = allStarts.filter((s) => s.segment <= segment); - console.timeEnd("[main] getPendingWork"); + console.timeEnd("[main] getPending"); console.time("[main] pendingCompletion"); const toCancel = await handleCompletions(ctx, state, completions, console); @@ -236,7 +236,7 @@ export const main = internalMutation({ // Nothing found in snapshot. Re-read with a real dependency (same args // for cache-hit efficiency) so a concurrent insert forces an OCC retry. console.debug("[main] no work — confirming with read dependency"); - const confirm = await ctx.runQuery(internal.loop.getPendingWork, queryArgs); + const confirm = await ctx.runQuery(internal.loop.getPending, queryArgs); const confirmStarts = confirm.allStarts; const confirmStartsNow = confirmStarts.filter((s) => s.segment <= segment); const confirmFuture = confirmStarts.find((s) => s.segment > segment); From 6427b915df21ae73b10e4d44425d48bc4ba426d9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Fri, 8 May 2026 13:26:13 -0700 Subject: [PATCH 09/14] 0.4.7-alpha.0 --- CHANGELOG.md | 8 ++++++++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 333012a..9df98ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.4.7-alpha.0 + +- Reduces database conflict retries (OCC conflicts) from enqueuing or completing + work while tasks are being dispatched, improving throughput for workpools at + scale. +- Fixes a race condition where a task could get recovered twice if the scheduler + is many minutes behind. + ## 0.4.6 - Fails gracefully if the work being started has already been deleted. It will diff --git a/package-lock.json b/package-lock.json index 960291c..38d9b85 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@convex-dev/workpool", - "version": "0.4.6", + "version": "0.4.7-alpha.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@convex-dev/workpool", - "version": "0.4.6", + "version": "0.4.7-alpha.0", "license": "Apache-2.0", "devDependencies": { "@convex-dev/eslint-plugin": "^2.0.0", diff --git a/package.json b/package.json index 0a772a6..6532d90 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "email": "support@convex.dev", "url": "https://github.com/get-convex/workpool/issues" }, - "version": "0.4.6", + "version": "0.4.7-alpha.0", "license": "Apache-2.0", "keywords": [ "convex", From 8d30300c193d98874b4b7a7546137eaa410dc4e9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 11 May 2026 18:26:15 -0700 Subject: [PATCH 10/14] add more test scenarios --- example/convex/_generated/api.d.ts | 7 + example/convex/convex.config.ts | 2 + example/convex/test/scenarios/overhead.ts | 212 ++++++++++++++++ example/convex/test/scenarios/sustained.ts | 268 ++++++++++++++++++++ example/convex/test/scenarios/throughput.ts | 150 +++++++++++ package-lock.json | 13 + package.json | 1 + 7 files changed, 653 insertions(+) create mode 100644 example/convex/test/scenarios/overhead.ts create mode 100644 example/convex/test/scenarios/sustained.ts create mode 100644 example/convex/test/scenarios/throughput.ts diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index ae4c8f4..dec8b99 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -15,6 +15,9 @@ import type * as test_scenarios_bigArgs from "../test/scenarios/bigArgs.js"; import type * as test_scenarios_bigContext from "../test/scenarios/bigContext.js"; import type * as test_scenarios_bigReturnTypes from "../test/scenarios/bigReturnTypes.js"; import type * as test_scenarios_burstyBatches from "../test/scenarios/burstyBatches.js"; +import type * as test_scenarios_overhead from "../test/scenarios/overhead.js"; +import type * as test_scenarios_sustained from "../test/scenarios/sustained.js"; +import type * as test_scenarios_throughput from "../test/scenarios/throughput.js"; import type * as test_work from "../test/work.js"; import type { @@ -31,6 +34,9 @@ declare const fullApi: ApiFromModules<{ "test/scenarios/bigContext": typeof test_scenarios_bigContext; "test/scenarios/bigReturnTypes": typeof test_scenarios_bigReturnTypes; "test/scenarios/burstyBatches": typeof test_scenarios_burstyBatches; + "test/scenarios/overhead": typeof test_scenarios_overhead; + "test/scenarios/sustained": typeof test_scenarios_sustained; + "test/scenarios/throughput": typeof test_scenarios_throughput; "test/work": typeof test_work; }>; @@ -65,4 +71,5 @@ export declare const components: { bigPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"bigPool">; serializedPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"serializedPool">; testWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"testWorkpool">; + oldWorkpool: import("@convex-dev/workpool-old/_generated/component.js").ComponentApi<"oldWorkpool">; }; diff --git a/example/convex/convex.config.ts b/example/convex/convex.config.ts index 1c51cb0..b3e9ad0 100644 --- a/example/convex/convex.config.ts +++ b/example/convex/convex.config.ts @@ -1,10 +1,12 @@ import { defineApp } from "convex/server"; import workpool from "@convex-dev/workpool/convex.config"; +import workpoolOld from "@convex-dev/workpool-old/convex.config"; const app = defineApp(); app.use(workpool, { name: "smallPool" }); app.use(workpool, { name: "bigPool" }); app.use(workpool, { name: "serializedPool" }); app.use(workpool, { name: "testWorkpool" }); +app.use(workpoolOld, { name: "oldWorkpool" }); export default app; diff --git a/example/convex/test/scenarios/overhead.ts b/example/convex/test/scenarios/overhead.ts new file mode 100644 index 0000000..8c17306 --- /dev/null +++ b/example/convex/test/scenarios/overhead.ts @@ -0,0 +1,212 @@ +import { internalAction, internalMutation } from "../../_generated/server"; +import { v } from "convex/values"; +import { internal, components } from "../../_generated/api"; +import { Workpool } from "@convex-dev/workpool"; +import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { Id } from "../../_generated/dataModel"; + +/** + * Throughput / overhead measurement scenario. + * + * mode determines what does the enqueue: + * raw — ctx.scheduler.runAfter(0, recorder). Bare-Convex floor. + * workpool-bare — new workpool, no onComplete (worker is the recorder) + * workpool-oc — new workpool with onComplete (worker is no-op) + * oldpool-bare — old workpool (workpool-old), no onComplete + * oldpool-oc — old workpool with onComplete + * + * Both pool variants test against the same Convex deployment, against the + * same tasks table, with the same recorder. The only difference between + * `workpool-*` and `oldpool-*` is which workpool component is used. + */ + +export const recorder = internalMutation({ + args: { runId: v.id("runs"), enqueuedAt: v.number() }, + handler: async (ctx, args) => { + await ctx.db.insert("tasks", { + runId: args.runId, + workId: "overhead-test" as never, + type: "mutation", + endTime: Date.now(), + enqueuedAt: args.enqueuedAt, + }); + }, +}); + +export const noop = internalMutation({ + args: {}, + handler: async () => {}, +}); + +export const oncompleteRecorder = internalMutation({ + args: { + workId: v.string(), + result: v.any(), + context: v.object({ + runId: v.id("runs"), + enqueuedAt: v.number(), + }), + }, + handler: async (ctx, args) => { + await ctx.db.insert("tasks", { + runId: args.context.runId, + workId: args.workId as never, + type: "mutation", + endTime: Date.now(), + enqueuedAt: args.context.enqueuedAt, + }); + }, +}); + +const Mode = v.union( + v.literal("raw"), + v.literal("workpool-bare"), + v.literal("workpool-oc"), + v.literal("oldpool-bare"), + v.literal("oldpool-oc"), +); + +export default internalAction({ + args: { + taskCount: v.optional(v.number()), + batchSize: v.optional(v.number()), + interBatchMs: v.optional(v.number()), + mode: v.optional(Mode), + maxParallelism: v.optional(v.number()), + pollTimeoutMs: v.optional(v.number()), + }, + handler: async ( + ctx, + { + taskCount = 1000, + batchSize = 50, + interBatchMs = 0, + mode = "raw", + maxParallelism = 50, + pollTimeoutMs = 600_000, + }, + ) => { + const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { + scenario: `overhead-${mode}`, + parameters: { taskCount, batchSize, mode, maxParallelism, interBatchMs }, + }); + const scenarioStart = Date.now(); + + const isWorkpoolNew = mode === "workpool-bare" || mode === "workpool-oc"; + const isWorkpoolOld = mode === "oldpool-bare" || mode === "oldpool-oc"; + const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; + + // Configure the right pool (separate components → no cross-contamination) + if (isWorkpoolNew) { + await ctx.runMutation(components.testWorkpool.config.update, { + maxParallelism, + }); + } + if (isWorkpoolOld) { + await ctx.runMutation(components.oldWorkpool.config.update, { + maxParallelism, + }); + } + + const newPool = isWorkpoolNew + ? new Workpool(components.testWorkpool, { maxParallelism }) + : null; + const oldPool = isWorkpoolOld + ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) + : null; + + console.log( + `overhead[${mode}]: ${taskCount} tasks, batchSize=${batchSize}` + + (newPool || oldPool ? `, max=${maxParallelism}` : ""), + ); + + const numBatches = Math.ceil(taskCount / batchSize); + let enqueued = 0; + for (let batch = 0; batch < numBatches; batch++) { + if (batch > 0 && interBatchMs > 0) { + await new Promise((r) => setTimeout(r, interBatchMs)); + } + const thisBatch = Math.min(batchSize, taskCount - enqueued); + const enqueuedAt = Date.now(); + const tasks = Array(thisBatch).fill(0); + if (mode === "raw") { + await Promise.all( + tasks.map(() => + ctx.scheduler.runAfter( + 0, + internal.test.scenarios.overhead.recorder, + { runId, enqueuedAt }, + ), + ), + ); + } else if (!useOnComplete) { + const pool = newPool ?? oldPool!; + await Promise.all( + tasks.map(() => + pool.enqueueMutation( + ctx, + internal.test.scenarios.overhead.recorder, + { runId, enqueuedAt }, + ), + ), + ); + } else { + const pool = newPool ?? oldPool!; + await Promise.all( + tasks.map(() => + pool.enqueueMutation( + ctx, + internal.test.scenarios.overhead.noop, + {}, + { + onComplete: internal.test.scenarios.overhead.oncompleteRecorder, + context: { runId, enqueuedAt }, + }, + ), + ), + ); + } + enqueued += thisBatch; + } + const enqueueTotal = Date.now() - scenarioStart; + console.log( + `Enqueued ${taskCount} in ${enqueueTotal}ms ` + + `(${(taskCount / (enqueueTotal / 1000)).toFixed(0)}/s).`, + ); + + const pollStart = Date.now(); + let metrics: Record | null = null; + while (Date.now() - pollStart < pollTimeoutMs) { + metrics = (await ctx.runQuery(internal.test.run.metrics)) as Record< + string, + unknown + > | null; + if (metrics && metrics.status === "completed") break; + await new Promise((r) => setTimeout(r, 100)); + } + + if (!metrics || metrics.status !== "completed") { + console.log(`Timed out after ${pollTimeoutMs}ms.`); + return { metrics, enqueueTotal, timedOut: true }; + } + + const total = metrics.totalDurationMs as number; + const completedCount = metrics.completedCount as number; + const tps = (completedCount / total) * 1000; + const msPerTask = total / completedCount; + + console.log(`\n=== overhead[${mode}] ===`); + console.log( + `${completedCount}/${taskCount} done in ${total}ms ` + + `(${tps.toFixed(0)} tps, ${msPerTask.toFixed(1)} ms/task)`, + ); + return { + mode, + taskCount: completedCount, + totalDurationMs: total, + enqueueTotal, + tasksPerSec: Math.round(tps), + msPerTaskWallClock: Math.round(msPerTask * 10) / 10, + }; + }, +}); diff --git a/example/convex/test/scenarios/sustained.ts b/example/convex/test/scenarios/sustained.ts new file mode 100644 index 0000000..8b039da --- /dev/null +++ b/example/convex/test/scenarios/sustained.ts @@ -0,0 +1,268 @@ +import { internalAction, internalMutation } from "../../_generated/server"; +import { v } from "convex/values"; +import { internal, components } from "../../_generated/api"; +import { Workpool } from "@convex-dev/workpool"; +import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { Id } from "../../_generated/dataModel"; + +/** + * Sustained, interleaved load scenario. Designed to exercise OCC paths + * that the burst-then-drain `overhead` scenario cannot: + * + * - Tasks arrive at a target rate, not in bursts. + * - Each worker takes a randomized duration so completions interleave + * with new arrivals (rather than landing in lockstep waves). + * - The run lasts long enough for the system to reach steady state. + * + * This means at any moment there's a mix of: tasks being enqueued, workers + * running, completions arriving — exactly the scenario where main / + * updateRunStatus / kickMainLoop reads can race with concurrent writes. + * + * Modes mirror overhead.ts: workpool-bare/oc and oldpool-bare/oc on the + * same deployment. Workers are actions (so they can actually sleep). + */ + +// Worker: an action that sleeps for [minMs, maxMs] then records completion. +export const sleepingRecorder = internalAction({ + args: { + runId: v.id("runs"), + enqueuedAt: v.number(), + minMs: v.number(), + maxMs: v.number(), + }, + handler: async (ctx, args) => { + const ms = args.minMs + Math.random() * (args.maxMs - args.minMs); + await new Promise((r) => setTimeout(r, ms)); + await ctx.runMutation(internal.test.scenarios.sustained.recordTask, { + runId: args.runId, + enqueuedAt: args.enqueuedAt, + }); + }, +}); + +export const recordTask = internalMutation({ + args: { runId: v.id("runs"), enqueuedAt: v.number() }, + handler: async (ctx, args) => { + await ctx.db.insert("tasks", { + runId: args.runId, + workId: "sustained" as never, + type: "action", + endTime: Date.now(), + enqueuedAt: args.enqueuedAt, + }); + }, +}); + +// onComplete callback variant +export const oncompleteRecord = internalMutation({ + args: { + workId: v.string(), + result: v.any(), + context: v.object({ + runId: v.id("runs"), + enqueuedAt: v.number(), + }), + }, + handler: async (ctx, args) => { + await ctx.db.insert("tasks", { + runId: args.context.runId, + workId: args.workId as never, + type: "action", + endTime: Date.now(), + enqueuedAt: args.context.enqueuedAt, + }); + }, +}); + +// A no-op action for the onComplete-mode runs (worker just sleeps). +export const sleepingNoop = internalAction({ + args: { minMs: v.number(), maxMs: v.number() }, + handler: async (_ctx, args) => { + const ms = args.minMs + Math.random() * (args.maxMs - args.minMs); + await new Promise((r) => setTimeout(r, ms)); + }, +}); + +const Mode = v.union( + v.literal("workpool-bare"), + v.literal("workpool-oc"), + v.literal("oldpool-bare"), + v.literal("oldpool-oc"), +); + +export default internalAction({ + args: { + targetTps: v.optional(v.number()), // tasks per second + durationSec: v.optional(v.number()), // how long to keep enqueuing + workerMinMs: v.optional(v.number()), + workerMaxMs: v.optional(v.number()), + mode: v.optional(Mode), + maxParallelism: v.optional(v.number()), + pollTimeoutMs: v.optional(v.number()), + }, + handler: async ( + ctx, + { + targetTps = 50, + durationSec = 20, + workerMinMs = 50, + workerMaxMs = 500, + mode = "workpool-bare", + maxParallelism = 100, + pollTimeoutMs = 600_000, + }, + ) => { + const totalTasks = targetTps * durationSec; + const interMs = 1000 / targetTps; + const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { + scenario: `sustained-${mode}`, + parameters: { + taskCount: totalTasks, + targetTps, + durationSec, + workerMinMs, + workerMaxMs, + mode, + maxParallelism, + }, + }); + + const isNew = mode === "workpool-bare" || mode === "workpool-oc"; + const isOld = mode === "oldpool-bare" || mode === "oldpool-oc"; + const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; + + if (isNew) { + await ctx.runMutation(components.testWorkpool.config.update, { + maxParallelism, + }); + } + if (isOld) { + await ctx.runMutation(components.oldWorkpool.config.update, { + maxParallelism, + }); + } + const newPool = isNew + ? new Workpool(components.testWorkpool, { maxParallelism }) + : null; + const oldPool = isOld + ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) + : null; + const pool = newPool ?? oldPool!; + + console.log( + `sustained[${mode}]: ${totalTasks} tasks @ ${targetTps}/s for ${durationSec}s, ` + + `worker=${workerMinMs}-${workerMaxMs}ms, max=${maxParallelism}`, + ); + + const startTime = Date.now(); + const deadline = startTime + durationSec * 1000; + let enqueued = 0; + + // Trickle tasks at the target rate. Each iteration kicks off a single + // enqueue but doesn't await it — that's the realistic pattern (every + // wave of clients independently calls enqueue). + const pending: Promise[] = []; + while (Date.now() < deadline) { + const enqueuedAt = Date.now(); + const args = { + runId, + enqueuedAt, + minMs: workerMinMs, + maxMs: workerMaxMs, + }; + let p: Promise; + if (useOnComplete) { + p = pool.enqueueAction( + ctx, + internal.test.scenarios.sustained.sleepingNoop, + { minMs: workerMinMs, maxMs: workerMaxMs }, + { + onComplete: internal.test.scenarios.sustained.oncompleteRecord, + context: { runId, enqueuedAt }, + }, + ); + } else { + p = pool.enqueueAction( + ctx, + internal.test.scenarios.sustained.sleepingRecorder, + args, + ); + } + pending.push(p); + enqueued++; + // Pace + const elapsed = Date.now() - startTime; + const targetElapsed = (enqueued / targetTps) * 1000; + const sleep = targetElapsed - elapsed; + if (sleep > 0) await new Promise((r) => setTimeout(r, sleep)); + } + await Promise.allSettled(pending); + const enqueueTotal = Date.now() - startTime; + console.log( + `Enqueued ${enqueued} in ${enqueueTotal}ms (target ${totalTasks}). Waiting...`, + ); + + // Poll until all `enqueued` tasks have completed (we may have overshot + // or undershot the target count slightly). + // Update the run's taskCount to the actual number enqueued so the + // metrics query knows when we're "done". + await ctx.runMutation(internal.test.scenarios.sustained.setTaskCount, { + runId, + taskCount: enqueued, + }); + + const pollStart = Date.now(); + let metrics: Record | null = null; + while (Date.now() - pollStart < pollTimeoutMs) { + metrics = (await ctx.runQuery(internal.test.run.metrics)) as Record< + string, + unknown + > | null; + if (metrics && metrics.status === "completed") break; + await new Promise((r) => setTimeout(r, 200)); + } + + if (!metrics || metrics.status !== "completed") { + console.log(`Timed out after ${pollTimeoutMs}ms.`); + return { metrics, enqueueTotal, timedOut: true }; + } + + const total = metrics.totalDurationMs as number; + const completedCount = metrics.completedCount as number; + const tps = (completedCount / total) * 1000; + const msPerTask = total / completedCount; + const latency = metrics.latency as + | { p50: number; p95: number; p99: number; max: number } + | undefined; + + console.log(`\n=== sustained[${mode}] ===`); + console.log( + `${completedCount}/${enqueued} done in ${total}ms ` + + `(${tps.toFixed(0)} tps, ${msPerTask.toFixed(1)} ms/task wall)`, + ); + if (latency) + console.log( + `Latency p50=${latency.p50}ms p95=${latency.p95}ms ` + + `p99=${latency.p99}ms max=${latency.max}ms`, + ); + return { + mode, + taskCount: completedCount, + enqueued, + totalDurationMs: total, + enqueueTotal, + tasksPerSec: Math.round(tps), + msPerTaskWallClock: Math.round(msPerTask * 10) / 10, + latency, + }; + }, +}); + +// Helper to patch the actual taskCount after pacing is done (since pacing +// can over/undershoot the nominal target). +export const setTaskCount = internalMutation({ + args: { runId: v.id("runs"), taskCount: v.number() }, + handler: async (ctx, args) => { + await ctx.db.patch("runs", args.runId, { taskCount: args.taskCount }); + }, +}); diff --git a/example/convex/test/scenarios/throughput.ts b/example/convex/test/scenarios/throughput.ts new file mode 100644 index 0000000..e33f3d7 --- /dev/null +++ b/example/convex/test/scenarios/throughput.ts @@ -0,0 +1,150 @@ +import { internalAction } from "../../_generated/server"; +import { v } from "convex/values"; +import { internal } from "../../_generated/api"; +import { enqueueTasks, TaskType } from "../work"; +import { Id } from "../../_generated/dataModel"; + +/** + * Throughput / saturation scenario. + * + * Enqueues `taskCount` tasks in `batchSize`-sized chunks via batch enqueue, + * with `interBatchMs` gap between chunks. Designed for sustained-load + * throughput measurement at high parallelism (e.g. 5000 tasks at max=200). + * + * Run: + * npx convex run test/scenarios/throughput:default \ + * '{"taskCount":5000,"batchSize":100,"interBatchMs":50,"maxParallelism":200,"taskDurationMs":20}' + */ +const parameters = { + taskCount: v.optional(v.number()), + batchSize: v.optional(v.number()), + interBatchMs: v.optional(v.number()), + maxParallelism: v.optional(v.number()), + taskDurationMs: v.optional(v.number()), + taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), + pollTimeoutMs: v.optional(v.number()), +}; + +export default internalAction({ + args: parameters, + handler: async ( + ctx, + { + taskCount = 5000, + batchSize = 100, + interBatchMs = 50, + maxParallelism = 200, + taskDurationMs = 20, + taskType = "mutation", + pollTimeoutMs = 600_000, // 10 minutes for big runs + }, + ) => { + const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { + scenario: "throughput", + parameters: { + taskCount, + batchSize, + interBatchMs, + maxParallelism, + taskType, + taskDurationMs, + }, + }); + + const fn = + taskType === "action" + ? internal.test.work.configurableAction + : internal.test.work.configurableMutation; + + const scenarioStart = Date.now(); + console.log( + `throughput: ${taskCount} tasks in batches of ${batchSize}, ` + + `${interBatchMs}ms gap, max=${maxParallelism}, work=${taskDurationMs}ms`, + ); + + const baseArgs = { + payload: "throughput", + returnBytes: 10, + runId, + ...(taskType === "action" ? { durationMs: taskDurationMs } : {}), + ...(taskType === "mutation" ? { readWriteData: 0 } : {}), + }; + + const numBatches = Math.ceil(taskCount / batchSize); + let enqueued = 0; + for (let batch = 0; batch < numBatches; batch++) { + if (batch > 0 && interBatchMs > 0) { + await new Promise((r) => setTimeout(r, interBatchMs)); + } + const thisBatch = Math.min(batchSize, taskCount - enqueued); + const enqueuedAt = Date.now(); + await enqueueTasks({ + ctx, + taskArgs: Array(thisBatch).fill(baseArgs), + taskType, + fn, + onCompleteOpts: { + onComplete: internal.test.work.markTaskCompleted, + context: { runId, type: taskType as TaskType, enqueuedAt }, + }, + batchEnqueue: true, + }); + enqueued += thisBatch; + } + const enqueueTotal = Date.now() - scenarioStart; + console.log( + `Enqueued ${taskCount} in ${enqueueTotal}ms ` + + `(${(taskCount / (enqueueTotal / 1000)).toFixed(0)}/s). Waiting...`, + ); + + // Poll for completion + const pollStart = Date.now(); + let metrics: Record | null = null; + while (Date.now() - pollStart < pollTimeoutMs) { + metrics = (await ctx.runQuery(internal.test.run.metrics)) as Record< + string, + unknown + > | null; + if (metrics && metrics.status === "completed") break; + await new Promise((r) => setTimeout(r, 250)); + } + + const timedOut = !metrics || metrics.status !== "completed"; + if (timedOut) { + console.log(`Timed out after ${pollTimeoutMs}ms.`); + console.log( + "Metrics:", + JSON.stringify( + { ...metrics, completedCount: metrics?.completedCount }, + null, + 2, + ), + ); + return { metrics, enqueueTotal, timedOut }; + } + + const totalDurationMs = metrics!.totalDurationMs as number; + const completedCount = metrics!.completedCount as number; + const latency = metrics!.latency as + | { p50: number; p95: number; p99: number; max: number } + | undefined; + + const tasksPerSec = ((completedCount / totalDurationMs) * 1000).toFixed(0); + console.log(`\n=== throughput results ===`); + console.log(`Completed: ${completedCount}/${taskCount}`); + console.log(`Total duration: ${totalDurationMs}ms (${tasksPerSec} tasks/s)`); + console.log(`Enqueue total: ${enqueueTotal}ms`); + if (latency) { + console.log( + `Latency p50=${latency.p50}ms p95=${latency.p95}ms ` + + `p99=${latency.p99}ms max=${latency.max}ms`, + ); + } + return { + metrics, + enqueueTotal, + timedOut: false, + tasksPerSec: Number(tasksPerSec), + }; + }, +}); diff --git a/package-lock.json b/package-lock.json index 38d9b85..61b55d0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "Apache-2.0", "devDependencies": { "@convex-dev/eslint-plugin": "^2.0.0", + "@convex-dev/workpool-old": "npm:@convex-dev/workpool@0.4.6", "@edge-runtime/vm": "5.0.0", "@eslint/eslintrc": "3.3.3", "@eslint/js": "10.0.1", @@ -578,6 +579,18 @@ "node": ">=10" } }, + "node_modules/@convex-dev/workpool-old": { + "name": "@convex-dev/workpool", + "version": "0.4.6", + "resolved": "https://registry.npmjs.org/@convex-dev/workpool/-/workpool-0.4.6.tgz", + "integrity": "sha512-e+cIgQePx5rrGizvzaYocWQCsDFbFlYqR1ZbtFPLa909izwc5GvoasPJPANeFhHRWaA5NBipNVqNtfHySi7Ldw==", + "dev": true, + "license": "Apache-2.0", + "peerDependencies": { + "convex": "^1.31.7", + "convex-helpers": "^0.1.94" + } + }, "node_modules/@edge-runtime/primitives": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/@edge-runtime/primitives/-/primitives-6.0.0.tgz", diff --git a/package.json b/package.json index 6532d90..a87acc4 100644 --- a/package.json +++ b/package.json @@ -67,6 +67,7 @@ "convex-helpers": "^0.1.94" }, "devDependencies": { + "@convex-dev/workpool-old": "npm:@convex-dev/workpool@0.4.6", "@convex-dev/eslint-plugin": "^2.0.0", "@edge-runtime/vm": "5.0.0", "@eslint/eslintrc": "3.3.3", From 7217a484cd7b4f3a86811ba11e6a5dd88a4152b9 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 11 May 2026 20:04:04 -0700 Subject: [PATCH 11/14] account for getPending with the updated count --- src/component/loop.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/component/loop.ts b/src/component/loop.ts index 510ad84..323961e 100644 --- a/src/component/loop.ts +++ b/src/component/loop.ts @@ -233,10 +233,15 @@ export const main = internalMutation({ return; } - // Nothing found in snapshot. Re-read with a real dependency (same args - // for cache-hit efficiency) so a concurrent insert forces an OCC retry. + // Nothing found in snapshot. Re-read with a real dependency so a + // concurrent insert forces an OCC retry. Override runningCount so it + // reflects post-recovery state — otherwise a stale count can leave + // startLimit pinned to 0 and miss now-runnable pendingStart. console.debug("[main] no work — confirming with read dependency"); - const confirm = await ctx.runQuery(internal.loop.getPending, queryArgs); + const confirm = await ctx.runQuery(internal.loop.getPending, { + ...queryArgs, + runningCount: state.running.length, + }); const confirmStarts = confirm.allStarts; const confirmStartsNow = confirmStarts.filter((s) => s.segment <= segment); const confirmFuture = confirmStarts.find((s) => s.segment > segment); From fa39df1fdb7a7945c396a2fee28981b257f9c144 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 11 May 2026 20:05:10 -0700 Subject: [PATCH 12/14] drop old tests --- src/component/loop.test.ts | 55 -------------------------------------- 1 file changed, 55 deletions(-) diff --git a/src/component/loop.test.ts b/src/component/loop.test.ts index d5913c2..d172b88 100644 --- a/src/component/loop.test.ts +++ b/src/component/loop.test.ts @@ -760,61 +760,6 @@ describe("loop", () => { }); }); - // ──────────────────────────────────────────────────────────────────── - // Snapshot semantics: the snapshot-then-confirm safety net - // ──────────────────────────────────────────────────────────────────── - - describe("snapshot semantics", () => { - it("the snapshot read does not see the calling mutation's pending writes", async () => { - // Verifies the prototype's distinguishing feature: - // runSnapshotQuery from inside a mutation does NOT see writes the - // mutation has performed. ctx.runQuery does. This is what makes - // the snapshot-then-confirm pattern correct. - const { runSnapshotQuery } = await import("./future.js"); - const result = await t.run(async (ctx) => { - const workId = await ctx.db.insert("work", { - fnType: "action", - fnHandle: "h", - fnName: "h", - fnArgs: {}, - attempts: 0, - }); - await ctx.db.insert("pendingStart", { - workId, - segment: getCurrentSegment(), - }); - const snap = await runSnapshotQuery(internal.loop.getPending, { - completionCursor: 0n, - cancelationCursor: 0n, - incomingCursor: 0n, - maxParallelism: 10, - runningCount: 0, - }); - const real = await ctx.runQuery(internal.loop.getPending, { - completionCursor: 0n, - cancelationCursor: 0n, - incomingCursor: 0n, - maxParallelism: 10, - runningCount: 0, - }); - return { snap: snap.allStarts.length, real: real.allStarts.length }; - }); - expect(result.snap).toBe(0); - expect(result.real).toBe(1); - }); - - it("processes work that was committed before main started", async () => { - // The snapshot read is at a later snapshot than the inserts, - // so it sees them. This is the common case. - await initialize(); - const workId = await enqueueWork(); - - await runMain(); - - expect((await observe()).running.map((r) => r.workId)).toEqual([workId]); - }); - }); - // ──────────────────────────────────────────────────────────────────── // Backwards compatibility with the pre-merge API // ──────────────────────────────────────────────────────────────────── From b33835348609392b5363615f66428c1623d6d464 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 11 May 2026 20:24:55 -0700 Subject: [PATCH 13/14] delete bogus test --- src/component/loop.test.ts | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/component/loop.test.ts b/src/component/loop.test.ts index d172b88..659f3d3 100644 --- a/src/component/loop.test.ts +++ b/src/component/loop.test.ts @@ -481,19 +481,6 @@ describe("loop", () => { } }); - it("doesn't lose work when re-checking before going idle", async () => { - // Snapshot-then-confirm safety net: even if the snapshot shows no - // work, the runQuery confirmation should pick up data committed - // before this iteration started. - await initialize(); - const workId = await enqueueWork(); - - await runMain(); - - const o = await observe(); - // The work was started, NOT lost to a "go idle" decision. - expect(o.running.map((r) => r.workId)).toEqual([workId]); - }); }); // ──────────────────────────────────────────────────────────────────── From 3369e6cc2dc7f123eb153e3152baa503a7690d8a Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Tue, 12 May 2026 19:51:20 -0700 Subject: [PATCH 14/14] format --- example/convex/test/scenarios/throughput.ts | 4 +++- src/component/loop.test.ts | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/example/convex/test/scenarios/throughput.ts b/example/convex/test/scenarios/throughput.ts index e33f3d7..b27eeb4 100644 --- a/example/convex/test/scenarios/throughput.ts +++ b/example/convex/test/scenarios/throughput.ts @@ -132,7 +132,9 @@ export default internalAction({ const tasksPerSec = ((completedCount / totalDurationMs) * 1000).toFixed(0); console.log(`\n=== throughput results ===`); console.log(`Completed: ${completedCount}/${taskCount}`); - console.log(`Total duration: ${totalDurationMs}ms (${tasksPerSec} tasks/s)`); + console.log( + `Total duration: ${totalDurationMs}ms (${tasksPerSec} tasks/s)`, + ); console.log(`Enqueue total: ${enqueueTotal}ms`); if (latency) { console.log( diff --git a/src/component/loop.test.ts b/src/component/loop.test.ts index 659f3d3..9c57476 100644 --- a/src/component/loop.test.ts +++ b/src/component/loop.test.ts @@ -480,7 +480,6 @@ describe("loop", () => { expect(o.runStatus.saturated).toBe(false); } }); - }); // ────────────────────────────────────────────────────────────────────