Skip to content

use snapshot query#194

Merged
ianmacartney merged 14 commits into
mainfrom
ian/snapshot-query-prototype
May 13, 2026
Merged

use snapshot query#194
ianmacartney merged 14 commits into
mainfrom
ian/snapshot-query-prototype

Conversation

@ianmacartney

@ianmacartney ianmacartney commented Apr 22, 2026

Copy link
Copy Markdown
Member

TL;DR

Introduces a snapshot-then-confirm read pattern to eliminate OCC conflicts

What changed?

  • loop.ts:
    • Issues a snapshot read via runSnapshotQuery(getPendingWork, ...) to fetch completions, cancelations, and pending starts without taking read dependencies, then confirms with a real ctx.runQuery only when the snapshot finds nothing to do.
    • Advances cursors only when work was actually processed (didWork), so the cursor doubles as a cooldown signal.
    • Schedules the next iteration of main directly rather than going through updateRunStatus.
  • Extracts all pending-work reads into a single query that can be called both as a snapshot read and as a dependency-taking confirmation read, with the same arguments for cache efficiency.
  • kick.ts: kickMainLoop no longer returns a segment value. Callers that previously used the return value to set pendingStart/pendingCancelation segments now call getCurrentSegment() directly after the kick.
  • lib.ts and complete.ts: Updated to match the new kickMainLoop void signature.
  • loop.test.ts: Fully rewritten around observable behavior (work item status, runStatus state, pending table contents) rather than internal implementation details like cursor positions and segment values. Tests are organized into focused describe blocks: forward progress, capacity, retry, cancellation, lifecycle, saturated state, recovery, generation safety, snapshot semantics, and backwards compatibility.
  • stateMachine.test.ts: Updated segment defaults from getNextSegment() to getCurrentSegment() so pending entries are immediately eligible in the same iteration, and added an explicit time advance between retry completion and the follow-up runMain call.

How to test?

Run the existing test suite:

npm test

The rewritten loop.test.ts covers the key behavioral contracts. Pay particular attention to the snapshot semantics suite, which directly verifies that runSnapshotQuery does not observe the calling mutation's own pending writes while ctx.runQuery does, and the backwards compatibility suite, which confirms that legacy segment-bearing scheduled calls and updateRunStatus calls still route correctly after the upgrade.

Why make this change?

The previous design split the main loop across two mutations (mainupdateRunStatus), which introduced unnecessary scheduling latency and complexity. More importantly, the loop's reads created OCC conflicts under concurrent load because every read of the pending tables took a dependency.

The new design uses a snapshot read (no dependency) to do the bulk of the work, then takes a single dependency-bearing confirmation read only when the snapshot finds nothing — the common idle case. This eliminates the OCC retry storm under high concurrency while preserving correctness: if a concurrent transaction commits new work between the snapshot and the confirmation, the confirmation's read dependency causes a retry, ensuring no work is missed.

@coderabbitai

coderabbitai Bot commented Apr 22, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Rate limit exceeded

@ianmacartney has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 49 minutes and 45 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 29625a38-6771-4f9c-a756-ea85e9f0823d

📥 Commits

Reviewing files that changed from the base of the PR and between fa39df1 and 3369e6c.

📒 Files selected for processing (2)
  • example/convex/test/scenarios/throughput.ts
  • src/component/loop.test.ts
📝 Walkthrough

Walkthrough

This PR refactors the segment management and work-fetching architecture. kickMainLoop no longer returns segment values; it now performs state transitions and schedules internal.loop.main. Callers (enqueue, cancel, complete) await kickMainLoop and then read the segment via getCurrentSegment() when inserting pending rows. A new runSnapshotQuery helper runs snapshot-style internal queries. main now uses a new getPending snapshot query to fetch completions, cancelations, and candidate starts in one read, processes pre-fetched docs, and advances cursors only when work occurs. Tests and examples are updated accordingly.

Sequence Diagram(s)

sequenceDiagram
  participant Client as enqueue/cancel/complete
  participant Kick as kickMainLoop
  participant Segment as getCurrentSegment
  participant DB as pending tables
  participant Snapshot as runSnapshotQuery(getPending)
  participant Main as internal.loop.main
  Client->>Kick: await kickMainLoop(...)
  Kick-->>Client: void
  Client->>Segment: getCurrentSegment()
  Segment-->>Client: segment
  Client->>DB: insert pending* with segment
  Main->>Snapshot: runSnapshotQuery(getPending)
  Snapshot-->>Main: {completions, cancelations, allStarts}
  Main->>Main: slice starts -> handleStart / handleCancelation / handleCompletion
  Main->>DB: update internalState cursors / schedule or idle
Loading

Possibly related PRs

Suggested reviewers

  • reeceyang
  • Nicolapps
🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'use snapshot query' directly refers to the main architectural change introduced in this PR, which implements a snapshot-then-confirm read pattern using snapshot queries.
Description check ✅ Passed The description provides comprehensive coverage of all major changes, including the snapshot-then-confirm pattern, segment handling updates, test rewrites, and the rationale for eliminating OCC conflicts.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch ian/snapshot-query-prototype

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

ianmacartney commented Apr 22, 2026

Copy link
Copy Markdown
Member Author

@pkg-pr-new

pkg-pr-new Bot commented Apr 22, 2026

Copy link
Copy Markdown

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/workpool/@convex-dev/workpool@194

commit: 3369e6c

@ianmacartney ianmacartney force-pushed the ian/snapshot-query-prototype branch from 3d023b6 to 8fc93f7 Compare May 8, 2026 08:59
@ianmacartney ianmacartney changed the title simplified use snapshot query May 8, 2026
@ianmacartney ianmacartney marked this pull request as ready for review May 8, 2026 09:17

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/component/complete.ts (1)

199-207: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep pendingCompletion.segment strictly ahead of the scanned cursor.

Line 201 now writes completions at getCurrentSegment(). With the new cursor semantics (“where we've scanned to”), that can place a completion on an already-scanned segment boundary while main is still running, so the row can be skipped indefinitely if the completion query is cursor-exclusive. Please keep new completion rows strictly after the current scan position instead of reusing the current segment.

Suggested fix
 import {
-  getCurrentSegment,
+  getNextSegment,
   type OnCompleteArgs,
   type RunResult,
   vResult,
 } from "./shared.js";
@@
   if (pendingCompletions.length > 0) {
     await kickMainLoop(ctx, "complete");
-    const segment = getCurrentSegment();
+    const segment = getNextSegment();
     await Promise.all(
       pendingCompletions.map((completion) =>
         ctx.db.insert("pendingCompletion", {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/component/complete.ts` around lines 199 - 207, The pending completion
rows are being written with segment = getCurrentSegment(), which can equal the
scanner's current position and cause those rows to be skipped; change the
insertion to use a strictly-ahead segment (e.g., compute nextSegment =
getCurrentSegment() + 1 or call an existing getNextSegment()/advanceSegment
helper) and use that nextSegment when calling ctx.db.insert for
pendingCompletions; update uses in the pendingCompletions map and any related
tests to ensure the segment type/overflow semantics match the scanner (handle
numeric vs. bigint or boundary conditions consistently).
🧹 Nitpick comments (1)
src/component/lib.ts (1)

115-118: ⚡ Quick win

Use getNextSegment() for pending queue writes to ensure visibility on next scan cycle.

These inserts currently use getCurrentSegment(). When new pendingStart/pendingCancelation rows are written at the current segment, they may not be picked up until the next main loop scan cycle, causing processing latency. Using getNextSegment() instead guarantees these rows land ahead of the cursor position and will be included in the next scan, improving responsiveness.

Suggested fix
 import {
   boundScheduledTime,
   vConfig,
   fnType,
-  getCurrentSegment,
+  getNextSegment,
   max,
   vOnCompleteFnContext,
   retryBehavior,
   status as statusValidator,
   toSegment,
@@
   await ctx.db.insert("pendingStart", {
     workId,
-    segment: max(toSegment(runAt), getCurrentSegment()),
+    segment: max(toSegment(runAt), getNextSegment()),
   });
@@
       await ctx.db.insert("pendingCancelation", {
         workId: id,
-        segment: getCurrentSegment(),
+        segment: getNextSegment(),
       });
@@
-    const segment = getCurrentSegment();
+    const segment = getNextSegment();

Also applies to: 147–150, 177–186

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/component/lib.ts` around lines 115 - 118, The pending queue inserts
currently compute segment using getCurrentSegment(), which can place new
pendingStart/pendingCancelation rows at the cursor and delay pickup; update the
INSERTs that call ctx.db.insert("pendingStart", { workId, segment:
max(toSegment(runAt), getCurrentSegment()) }) and the similar pendingCancelation
inserts to use getNextSegment() instead of getCurrentSegment() (i.e., segment:
max(toSegment(runAt), getNextSegment())) so new rows land ahead of the scan
cursor and are visible on the next cycle; apply this change to all occurrences
(the blocks creating "pendingStart" and "pendingCancelation", including the
other mentioned locations).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/component/future.ts`:
- Around line 9-37: The function runSnapshotQuery currently depends on the
undocumented Convex.asyncSyscall with udfType: "snapshotQuery" (see
Convex.asyncSyscall, udfType: "snapshotQuery", getFunctionAddress, convexToJson,
jsonToConvex), which is unsupported; replace this internal syscall usage by
wiring into the documented runQuery API: change runSnapshotQuery to
accept/obtain a Convex execution context (the ctx used in mutations) and call
ctx.runQuery(query, ...args) (or call the public client runQuery helper) instead
of constructing syscallArgs and calling Convex.asyncSyscall, remove
getFunctionAddress/convexToJson/jsonToConvex usage, and return the ctx.runQuery
result cast to FunctionReturnType<Query> so behavior remains snapshot-consistent
using the supported API.

In `@src/component/loop.ts`:
- Around line 143-151: Rebuild the queryArgs object immediately before the
confirm read (i.e., after calling handleRecovery and right before the
confirm/getPendingWork call) so runningCount reflects the post-recovery state;
specifically, recreate queryArgs (using state.segmentCursors,
CURSOR_BUFFER_SEGMENTS, globals.maxParallelism and state.running.length) instead
of reusing the earlier captured one in the didWork === false path (also apply
the same change where queryArgs is used around the other occurrences referenced
near lines with similar blocks), ensuring getPendingWork computes correct
startLimit based on the updated runningCount.

---

Outside diff comments:
In `@src/component/complete.ts`:
- Around line 199-207: The pending completion rows are being written with
segment = getCurrentSegment(), which can equal the scanner's current position
and cause those rows to be skipped; change the insertion to use a strictly-ahead
segment (e.g., compute nextSegment = getCurrentSegment() + 1 or call an existing
getNextSegment()/advanceSegment helper) and use that nextSegment when calling
ctx.db.insert for pendingCompletions; update uses in the pendingCompletions map
and any related tests to ensure the segment type/overflow semantics match the
scanner (handle numeric vs. bigint or boundary conditions consistently).

---

Nitpick comments:
In `@src/component/lib.ts`:
- Around line 115-118: The pending queue inserts currently compute segment using
getCurrentSegment(), which can place new pendingStart/pendingCancelation rows at
the cursor and delay pickup; update the INSERTs that call
ctx.db.insert("pendingStart", { workId, segment: max(toSegment(runAt),
getCurrentSegment()) }) and the similar pendingCancelation inserts to use
getNextSegment() instead of getCurrentSegment() (i.e., segment:
max(toSegment(runAt), getNextSegment())) so new rows land ahead of the scan
cursor and are visible on the next cycle; apply this change to all occurrences
(the blocks creating "pendingStart" and "pendingCancelation", including the
other mentioned locations).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ea3f2867-4b07-4a46-b466-b5db083b631c

📥 Commits

Reviewing files that changed from the base of the PR and between 7da34f4 and 8fc93f7.

⛔ Files ignored due to path filters (2)
  • package-lock.json is excluded by !**/package-lock.json
  • src/component/_generated/api.ts is excluded by !**/_generated/**
📒 Files selected for processing (10)
  • package.json
  • src/component/complete.ts
  • src/component/future.ts
  • src/component/kick.test.ts
  • src/component/kick.ts
  • src/component/lib.ts
  • src/component/loop.test.ts
  • src/component/loop.ts
  • src/component/schema.ts
  • src/component/stateMachine.test.ts

Comment thread src/component/future.ts
Comment thread src/component/loop.ts
@ianmacartney ianmacartney requested a review from reeceyang May 8, 2026 16:59

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/component/loop.ts (1)

143-151: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Rebuild queryArgs after recovery before the confirm read.

queryArgs.runningCount is captured before handleRecovery, but recovery can shrink state.running. In the didWork === false path, the confirm query can still compute startLimit = 0 and miss pendingStart rows that became runnable only because recovery freed capacity.

Suggested fix
-    const queryArgs = {
+    const buildQueryArgs = () => ({
       completionCursor:
         state.segmentCursors.completion - CURSOR_BUFFER_SEGMENTS,
       cancelationCursor:
         state.segmentCursors.cancelation - CURSOR_BUFFER_SEGMENTS,
       incomingCursor: state.segmentCursors.incoming - CURSOR_BUFFER_SEGMENTS,
       maxParallelism: globals.maxParallelism,
       runningCount: state.running.length,
-    };
+    });
+    const queryArgs = buildQueryArgs();
...
-    const confirm = await ctx.runQuery(internal.loop.getPending, queryArgs);
+    const confirm = await ctx.runQuery(
+      internal.loop.getPending,
+      buildQueryArgs(),
+    );

Also applies to: 173-176, 236-247

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/component/loop.ts` around lines 143 - 151, Recompute the queryArgs after
running handleRecovery so the runningCount reflects any threads freed by
recovery: move or recreate the queryArgs object (the one with completionCursor,
cancelationCursor, incomingCursor, maxParallelism, runningCount) to after
handleRecovery is called (and before the confirm/read that uses
startLimit/pendingStart) so that runningCount uses the updated state.running;
ensure similar updates where queryArgs was built earlier (notably the other
occurrences around the didWork === false path and the regions referenced) so
confirm reads won't miss rows that became runnable due to recovery.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/component/loop.ts`:
- Around line 208-222: The cursor updates can be moved backwards by
buffered/late rows; fix by never decreasing any cursor when assigning from
completions.at(-1), cancelations.at(-1) or pending.at(-1). Replace direct
assignments in the loop that set state.segmentCursors.completion, .cancelation
and .incoming with logic that takes the max of the existing cursor and the
candidate segment (e.g., use Math.max(existingCursor, candidateSegment) for
completion and cancelation, and likewise for incoming when using pending.at(-1)
or segment), so cursors only advance or stay the same; keep the existing branch
behavior around actualCapacity unchanged.

---

Duplicate comments:
In `@src/component/loop.ts`:
- Around line 143-151: Recompute the queryArgs after running handleRecovery so
the runningCount reflects any threads freed by recovery: move or recreate the
queryArgs object (the one with completionCursor, cancelationCursor,
incomingCursor, maxParallelism, runningCount) to after handleRecovery is called
(and before the confirm/read that uses startLimit/pendingStart) so that
runningCount uses the updated state.running; ensure similar updates where
queryArgs was built earlier (notably the other occurrences around the didWork
=== false path and the regions referenced) so confirm reads won't miss rows that
became runnable due to recovery.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 208b2d91-89b8-416e-9a31-aac7852d8ed2

📥 Commits

Reviewing files that changed from the base of the PR and between 8fc93f7 and 6427b91.

⛔ Files ignored due to path filters (1)
  • package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (4)
  • CHANGELOG.md
  • package.json
  • src/component/loop.test.ts
  • src/component/loop.ts
✅ Files skipped from review due to trivial changes (2)
  • CHANGELOG.md
  • package.json
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/component/loop.test.ts

Comment thread src/component/loop.ts
Comment on lines +208 to +222
// Advance cursors to skip tombstones on next scan. Only do this when
// we actually did work — the cursor doubles as the cooldown signal
// ("how long since we last processed something").
const didWork =
completions.length > 0 || cancelations.length > 0 || pending.length > 0;
if (didWork) {
state.segmentCursors.completion = completions.at(-1)?.segment ?? segment;
state.segmentCursors.cancelation =
cancelations.at(-1)?.segment ?? segment;
if (pending.length > 0) {
state.segmentCursors.incoming = pending.at(-1)!.segment;
} else if (actualCapacity > 0) {
// We have no more pending work, update to now
state.segmentCursors.incoming = segment;
}

@coderabbitai coderabbitai Bot May 8, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't let buffered rows move the cursors backwards.

These queries intentionally read behind the cursor. If a late commit returns an older segment here, writing completions.at(-1)?.segment / cancelations.at(-1)?.segment back into state.segmentCursors rewinds the cooldown marker. The next no-work pass can then go idle immediately after processing real work, which is exactly the window this buffer is supposed to protect.

Suggested fix
     if (didWork) {
-      state.segmentCursors.completion = completions.at(-1)?.segment ?? segment;
-      state.segmentCursors.cancelation =
-        cancelations.at(-1)?.segment ?? segment;
+      if (completions.length > 0) {
+        state.segmentCursors.completion = segment;
+      }
+      if (cancelations.length > 0) {
+        state.segmentCursors.cancelation = segment;
+      }
       if (pending.length > 0) {
-        state.segmentCursors.incoming = pending.at(-1)!.segment;
+        state.segmentCursors.incoming = segment;
       } else if (actualCapacity > 0) {
-        // We have no more pending work, update to now
+        // We have no more pending work, update to now.
         state.segmentCursors.incoming = segment;
       }
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/component/loop.ts` around lines 208 - 222, The cursor updates can be
moved backwards by buffered/late rows; fix by never decreasing any cursor when
assigning from completions.at(-1), cancelations.at(-1) or pending.at(-1).
Replace direct assignments in the loop that set state.segmentCursors.completion,
.cancelation and .incoming with logic that takes the max of the existing cursor
and the candidate segment (e.g., use Math.max(existingCursor, candidateSegment)
for completion and cancelation, and likewise for incoming when using
pending.at(-1) or segment), so cursors only advance or stay the same; keep the
existing branch behavior around actualCapacity unchanged.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai can you clarify what you mean by this? what is a "no-work pass"?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reeceyang Sure! Let me trace through the exact scenario.

What is a "no-work pass"?

The main loop runs repeatedly. Each iteration checks didWork:

const didWork =
  completions.length > 0 || cancelations.length > 0 || pending.length > 0;
  • Work pass: didWork === true → cursors are advanced, and the loop reschedules itself immediately.
  • No-work pass: didWork === false → the loop does an OCC-confirming re-read and then decides whether to go idle, stay in cooldown, or schedule for the future.

The cooldown mechanism

The cooldown lives inside the no-work pass:

const { incoming, completion, cancelation } = state.segmentCursors;
const latestCursor = fromSegment(max(incoming, max(completion, cancelation)));
if (Date.now() - latestCursor < STATUS_COOLDOWN) {
  // stay running a bit longer, check again
}

The idea is: "if we processed real work recently (within STATUS_COOLDOWN), don't go idle yet — more work might arrive." The freshness is measured by how large the cursor segment values are (a segment encodes a timestamp via toSegment/fromSegment).

How the cursor rewind breaks this

Here's the problematic scenario step by step:

  1. Pass N (work pass): The snapshot query reads pendingCompletion with a buffer (completionCursor - CURSOR_BUFFER_SEGMENTS), so it may return a late-committed row whose .segment is an old value (e.g., 30 seconds in the past). completions.at(-1)?.segment returns that old segment, and the code writes it into state.segmentCursors.completion.

  2. Pass N+1 (no-work pass): No new work is found. The cooldown check computes fromSegment(latestCursor) using the old segment that was just written. Because that segment corresponds to a timestamp ~30 seconds ago, Date.now() - latestCursor >= STATUS_COOLDOWN is already true — so the cooldown window is skipped and the loop goes idle, even though real work was just processed moments ago.

The fix (always setting the cursor to segment = getCurrentSegment() when work is processed) ensures the cooldown clock always reflects "right now," so the loop stays alive for a full STATUS_COOLDOWN window after the last batch of real work.


🧠 Learnings used
Learnt from: ianmacartney
Repo: get-convex/workpool PR: 170
File: src/component/danger.ts:36-43
Timestamp: 2026-02-19T08:37:48.834Z
Learning: In Convex (v1.31.0+), ctx.db.delete supports both two-argument form (ctx.db.delete(tableName, id)) and single-argument form (ctx.db.delete(id)). Both are valid; prefer the two-argument form for clarity when deleting by table and id, and use the single-argument form only when the intent is purely to delete by id without explicit table context. Ensure code consistently follows one form across a module or project where appropriate.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai it does seem like this could be an issue, but only in the rare case that incoming, completion, and cancelation are all rewound.

@ianmacartney I wonder if it would be simpler to keep track of the cooldown signal separately instead of using the cursors here?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reeceyang That's a good refinement — you're right that the bug requires the maximum of all three cursors to end up old, not just one.

Looking more carefully at the incoming cursor: if actualCapacity > 0 and no pending starts are found, it's set to segment (the current time), which would keep the max cursor fresh. So the narrowest scenario where all three rewind simultaneously is when:

  1. The system is fully saturated (actualCapacity <= 0), so incoming is not updated to the current segment, and
  2. Both the last completions and last cancelations entries come entirely from buffered (late-committed) rows with old segment values.

That combination — saturation plus late rows on both queues in the same pass — is indeed uncommon, so the practical risk is lower than my initial description implied.

On the separate cooldown signal idea: it would cleanly decouple two concerns that are currently entangled:

  • Cursor purpose: "where to start scanning next time" (skip tombstones, avoid re-processing)
  • Cooldown purpose: "did we process real work recently enough to stay alive?"

A dedicated lastWorkTs: number (wall-clock timestamp) stored in internalState would make the intent obvious and eliminate the dependency on segment-to-timestamp conversion for cooldown. The trade-off is a small extra write to internalState on each work pass. Given that internalState is already being patched every work pass anyway, the overhead is negligible. That said, it's a design call best made by @ianmacartney since it touches the invariants the state machine relies on.


🧠 Learnings used
Learnt from: ianmacartney
Repo: get-convex/workpool PR: 170
File: src/component/danger.ts:36-43
Timestamp: 2026-02-19T08:37:48.834Z
Learning: In Convex (v1.31.0+), ctx.db.delete supports both two-argument form (ctx.db.delete(tableName, id)) and single-argument form (ctx.db.delete(id)). Both are valid; prefer the two-argument form for clarity when deleting by table and id, and use the single-argument form only when the intent is purely to delete by id without explicit table context. Ensure code consistently follows one form across a module or project where appropriate.

Comment thread src/component/loop.ts
.withIndex("segment", (q) => q.gte("segment", cancelationCursor))
.take(CANCELLATION_BATCH_SIZE);
// Available slots after we process this batch's completions, plus 1
// for the +1 trick (detect overflow vs. a future-scheduled retry).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the +1 trick?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we fetch one extra document below to know whether there is more work to be done, and also know the next time we'd need to be scheduled for.

Comment thread src/component/loop.ts Outdated
},
});
// Nothing found in snapshot. Re-read with a real dependency (same args
// for cache-hit efficiency) so a concurrent insert forces an OCC retry.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are running the query through ctx.runQuery, it won't be cached, so cache-hit efficiency isn't applicable here. I also think coderabbit's comment above about using a stale running count when reusing queryArgs might be a real issue here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe didWork should also include whether handleRecovery did anything?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I keep thinking we'll add sub-query caching via ctx.runQuery but I think the index caching will be the solution there for the foreseeable future.
Agreed on the stale running count. I'm less sure about didWork for recovery - since didWork will cause main to re-execute immediately, whereas recovery is an async process, so the next iteration of main isn't expected to see anything different - only once it finishes

Comment thread src/component/loop.ts
Comment on lines +208 to +222
// Advance cursors to skip tombstones on next scan. Only do this when
// we actually did work — the cursor doubles as the cooldown signal
// ("how long since we last processed something").
const didWork =
completions.length > 0 || cancelations.length > 0 || pending.length > 0;
if (didWork) {
state.segmentCursors.completion = completions.at(-1)?.segment ?? segment;
state.segmentCursors.cancelation =
cancelations.at(-1)?.segment ?? segment;
if (pending.length > 0) {
state.segmentCursors.incoming = pending.at(-1)!.segment;
} else if (actualCapacity > 0) {
// We have no more pending work, update to now
state.segmentCursors.incoming = segment;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai can you clarify what you mean by this? what is a "no-work pass"?

Comment thread src/component/loop.test.ts Outdated
it("doesn't lose work when re-checking before going idle", async () => {
// Snapshot-then-confirm safety net: even if the snapshot shows no
// work, the runQuery confirmation should pick up data committed
// before this iteration started.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this test enqueues the work strictly before running the main loop, so I'm not sure if this is actually tested.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - going to nix this test, I think it's redundant

Comment thread src/component/loop.test.ts Outdated
Comment thread src/component/loop.test.ts Outdated

describe("snapshot semantics", () => {
it("the snapshot read does not see the calling mutation's pending writes", async () => {
// Verifies the prototype's distinguishing feature:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just testing that snapshotQuery works correctly in the convex-test environment? which might not be necessary to include

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@example/convex/test/scenarios/overhead.ts`:
- Around line 26-32: The insert call to ctx.db.insert("tasks", ...) uses an
unnecessary "as never" cast on workId; remove the cast so workId is simply
"overhead-test" (since the schema's vWorkId accepts strings), or if you
deliberately need to bypass types for this test replace with "as any"—update the
workId property in the ctx.db.insert(...) object accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 1bfe1ee6-986c-4606-8178-4f9defef7091

📥 Commits

Reviewing files that changed from the base of the PR and between 6427b91 and fa39df1.

⛔ Files ignored due to path filters (2)
  • example/convex/_generated/api.d.ts is excluded by !**/_generated/**
  • package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (7)
  • example/convex/convex.config.ts
  • example/convex/test/scenarios/overhead.ts
  • example/convex/test/scenarios/sustained.ts
  • example/convex/test/scenarios/throughput.ts
  • package.json
  • src/component/loop.test.ts
  • src/component/loop.ts
✅ Files skipped from review due to trivial changes (1)
  • package.json
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/component/loop.test.ts
  • src/component/loop.ts

Comment on lines +26 to +32
await ctx.db.insert("tasks", {
runId: args.runId,
workId: "overhead-test" as never,
type: "mutation",
endTime: Date.now(),
enqueuedAt: args.enqueuedAt,
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the tasks table schema and workId field type expectations
ast-grep --pattern $'table("tasks", {
  $$$
})'

Repository: get-convex/workpool

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Search for tasks table schema definition
rg -A 10 'table.*tasks' --type ts | head -50

Repository: get-convex/workpool

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Find schema files in the convex directory
find . -type f -name '*schema*' -o -name '*convex*' | grep -E '\.(ts|js)$' | head -20

Repository: get-convex/workpool

Length of output: 183


🏁 Script executed:

#!/bin/bash
# Search for workId type definition or field usage
rg 'workId' --type ts -B 2 -A 2 | head -60

Repository: get-convex/workpool

Length of output: 3186


🏁 Script executed:

#!/bin/bash
# Read the example/convex schema file
cat -n example/convex/schema.ts

Repository: get-convex/workpool

Length of output: 953


🏁 Script executed:

#!/bin/bash
# Also check the main schema file
cat -n src/component/schema.ts | head -100

Repository: get-convex/workpool

Length of output: 3837


🏁 Script executed:

#!/bin/bash
# Search for vWorkId definition
rg 'vWorkId' --type ts -B 3 -A 3

Repository: get-convex/workpool

Length of output: 3301


🏁 Script executed:

#!/bin/bash
# Check what vWorkId is exported from the workpool package
grep -r 'export.*vWorkId' --include='*.ts' src/

Repository: get-convex/workpool

Length of output: 137


Remove the unnecessary as never cast on workId.

The schema expects workId: vWorkId, which is a string validator that accepts any string value. The string literal "overhead-test" is valid without any cast. The as never cast is unconventional and unnecessarily suppresses type checking. Remove the cast entirely, or use as any if there's a deliberate reason to bypass type safety for test data.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@example/convex/test/scenarios/overhead.ts` around lines 26 - 32, The insert
call to ctx.db.insert("tasks", ...) uses an unnecessary "as never" cast on
workId; remove the cast so workId is simply "overhead-test" (since the schema's
vWorkId accepts strings), or if you deliberately need to bypass types for this
test replace with "as any"—update the workId property in the ctx.db.insert(...)
object accordingly.

@ianmacartney ianmacartney left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also adding a stacked branch with a dashboard with a self-hosted scenario runner

@@ -1,10 +1,12 @@
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";
import workpoolOld from "@convex-dev/workpool-old/convex.config";

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I installed an old version of the workpool so we could do side-by-side comparisons

Comment thread package.json
"convex-helpers": "^0.1.94"
},
"devDependencies": {
"@convex-dev/workpool-old": "npm:@convex-dev/workpool@0.4.6",

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

installed as an npm alias here

Comment thread src/component/loop.ts
Comment on lines +208 to +222
// Advance cursors to skip tombstones on next scan. Only do this when
// we actually did work — the cursor doubles as the cooldown signal
// ("how long since we last processed something").
const didWork =
completions.length > 0 || cancelations.length > 0 || pending.length > 0;
if (didWork) {
state.segmentCursors.completion = completions.at(-1)?.segment ?? segment;
state.segmentCursors.cancelation =
cancelations.at(-1)?.segment ?? segment;
if (pending.length > 0) {
state.segmentCursors.incoming = pending.at(-1)!.segment;
} else if (actualCapacity > 0) {
// We have no more pending work, update to now
state.segmentCursors.incoming = segment;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai it does seem like this could be an issue, but only in the rare case that incoming, completion, and cancelation are all rewound.

@ianmacartney I wonder if it would be simpler to keep track of the cooldown signal separately instead of using the cursors here?

@ianmacartney ianmacartney merged commit 461be2b into main May 13, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants