Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 0.4.7-alpha.0

- Reduces database conflict retries (OCC conflicts) from enqueuing or completing
work while tasks are being dispatched, improving throughput for workpools at
scale.
- Fixes a race condition where a task could get recovered twice if the scheduler
is many minutes behind.

## 0.4.6

- Fails gracefully if the work being started has already been deleted. It will
Expand Down
7 changes: 7 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import type * as test_scenarios_bigArgs from "../test/scenarios/bigArgs.js";
import type * as test_scenarios_bigContext from "../test/scenarios/bigContext.js";
import type * as test_scenarios_bigReturnTypes from "../test/scenarios/bigReturnTypes.js";
import type * as test_scenarios_burstyBatches from "../test/scenarios/burstyBatches.js";
import type * as test_scenarios_overhead from "../test/scenarios/overhead.js";
import type * as test_scenarios_sustained from "../test/scenarios/sustained.js";
import type * as test_scenarios_throughput from "../test/scenarios/throughput.js";
import type * as test_work from "../test/work.js";

import type {
Expand All @@ -31,6 +34,9 @@ declare const fullApi: ApiFromModules<{
"test/scenarios/bigContext": typeof test_scenarios_bigContext;
"test/scenarios/bigReturnTypes": typeof test_scenarios_bigReturnTypes;
"test/scenarios/burstyBatches": typeof test_scenarios_burstyBatches;
"test/scenarios/overhead": typeof test_scenarios_overhead;
"test/scenarios/sustained": typeof test_scenarios_sustained;
"test/scenarios/throughput": typeof test_scenarios_throughput;
"test/work": typeof test_work;
}>;

Expand Down Expand Up @@ -65,4 +71,5 @@ export declare const components: {
bigPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"bigPool">;
serializedPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"serializedPool">;
testWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"testWorkpool">;
oldWorkpool: import("@convex-dev/workpool-old/_generated/component.js").ComponentApi<"oldWorkpool">;
};
2 changes: 2 additions & 0 deletions example/convex/convex.config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";
import workpoolOld from "@convex-dev/workpool-old/convex.config";

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I installed an old version of the workpool so we could do side-by-side comparisons


const app = defineApp();
app.use(workpool, { name: "smallPool" });
app.use(workpool, { name: "bigPool" });
app.use(workpool, { name: "serializedPool" });
app.use(workpool, { name: "testWorkpool" });
app.use(workpoolOld, { name: "oldWorkpool" });

export default app;
212 changes: 212 additions & 0 deletions example/convex/test/scenarios/overhead.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { internalAction, internalMutation } from "../../_generated/server";
import { v } from "convex/values";
import { internal, components } from "../../_generated/api";
import { Workpool } from "@convex-dev/workpool";
import { Workpool as OldWorkpool } from "@convex-dev/workpool-old";
import { Id } from "../../_generated/dataModel";

/**
* Throughput / overhead measurement scenario.
*
* mode determines what does the enqueue:
* raw — ctx.scheduler.runAfter(0, recorder). Bare-Convex floor.
* workpool-bare — new workpool, no onComplete (worker is the recorder)
* workpool-oc — new workpool with onComplete (worker is no-op)
* oldpool-bare — old workpool (workpool-old), no onComplete
* oldpool-oc — old workpool with onComplete
*
* Both pool variants test against the same Convex deployment, against the
* same tasks table, with the same recorder. The only difference between
* `workpool-*` and `oldpool-*` is which workpool component is used.
*/

export const recorder = internalMutation({
args: { runId: v.id("runs"), enqueuedAt: v.number() },
handler: async (ctx, args) => {
await ctx.db.insert("tasks", {
runId: args.runId,
workId: "overhead-test" as never,
type: "mutation",
endTime: Date.now(),
enqueuedAt: args.enqueuedAt,
});
Comment on lines +26 to +32

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the tasks table schema and workId field type expectations
ast-grep --pattern $'table("tasks", {
  $$$
})'

Repository: get-convex/workpool

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Search for tasks table schema definition
rg -A 10 'table.*tasks' --type ts | head -50

Repository: get-convex/workpool

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Find schema files in the convex directory
find . -type f -name '*schema*' -o -name '*convex*' | grep -E '\.(ts|js)$' | head -20

Repository: get-convex/workpool

Length of output: 183


🏁 Script executed:

#!/bin/bash
# Search for workId type definition or field usage
rg 'workId' --type ts -B 2 -A 2 | head -60

Repository: get-convex/workpool

Length of output: 3186


🏁 Script executed:

#!/bin/bash
# Read the example/convex schema file
cat -n example/convex/schema.ts

Repository: get-convex/workpool

Length of output: 953


🏁 Script executed:

#!/bin/bash
# Also check the main schema file
cat -n src/component/schema.ts | head -100

Repository: get-convex/workpool

Length of output: 3837


🏁 Script executed:

#!/bin/bash
# Search for vWorkId definition
rg 'vWorkId' --type ts -B 3 -A 3

Repository: get-convex/workpool

Length of output: 3301


🏁 Script executed:

#!/bin/bash
# Check what vWorkId is exported from the workpool package
grep -r 'export.*vWorkId' --include='*.ts' src/

Repository: get-convex/workpool

Length of output: 137


Remove the unnecessary as never cast on workId.

The schema expects workId: vWorkId, which is a string validator that accepts any string value. The string literal "overhead-test" is valid without any cast. The as never cast is unconventional and unnecessarily suppresses type checking. Remove the cast entirely, or use as any if there's a deliberate reason to bypass type safety for test data.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@example/convex/test/scenarios/overhead.ts` around lines 26 - 32, The insert
call to ctx.db.insert("tasks", ...) uses an unnecessary "as never" cast on
workId; remove the cast so workId is simply "overhead-test" (since the schema's
vWorkId accepts strings), or if you deliberately need to bypass types for this
test replace with "as any"—update the workId property in the ctx.db.insert(...)
object accordingly.

},
});

export const noop = internalMutation({
args: {},
handler: async () => {},
});

export const oncompleteRecorder = internalMutation({
args: {
workId: v.string(),
result: v.any(),
context: v.object({
runId: v.id("runs"),
enqueuedAt: v.number(),
}),
},
handler: async (ctx, args) => {
await ctx.db.insert("tasks", {
runId: args.context.runId,
workId: args.workId as never,
type: "mutation",
endTime: Date.now(),
enqueuedAt: args.context.enqueuedAt,
});
},
});

const Mode = v.union(
v.literal("raw"),
v.literal("workpool-bare"),
v.literal("workpool-oc"),
v.literal("oldpool-bare"),
v.literal("oldpool-oc"),
);

export default internalAction({
args: {
taskCount: v.optional(v.number()),
batchSize: v.optional(v.number()),
interBatchMs: v.optional(v.number()),
mode: v.optional(Mode),
maxParallelism: v.optional(v.number()),
pollTimeoutMs: v.optional(v.number()),
},
handler: async (
ctx,
{
taskCount = 1000,
batchSize = 50,
interBatchMs = 0,
mode = "raw",
maxParallelism = 50,
pollTimeoutMs = 600_000,
},
) => {
const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, {
scenario: `overhead-${mode}`,
parameters: { taskCount, batchSize, mode, maxParallelism, interBatchMs },
});
const scenarioStart = Date.now();

const isWorkpoolNew = mode === "workpool-bare" || mode === "workpool-oc";
const isWorkpoolOld = mode === "oldpool-bare" || mode === "oldpool-oc";
const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc";

// Configure the right pool (separate components → no cross-contamination)
if (isWorkpoolNew) {
await ctx.runMutation(components.testWorkpool.config.update, {
maxParallelism,
});
}
if (isWorkpoolOld) {
await ctx.runMutation(components.oldWorkpool.config.update, {
maxParallelism,
});
}

const newPool = isWorkpoolNew
? new Workpool(components.testWorkpool, { maxParallelism })
: null;
const oldPool = isWorkpoolOld
? new OldWorkpool(components.oldWorkpool, { maxParallelism })
: null;

console.log(
`overhead[${mode}]: ${taskCount} tasks, batchSize=${batchSize}` +
(newPool || oldPool ? `, max=${maxParallelism}` : ""),
);

const numBatches = Math.ceil(taskCount / batchSize);
let enqueued = 0;
for (let batch = 0; batch < numBatches; batch++) {
if (batch > 0 && interBatchMs > 0) {
await new Promise((r) => setTimeout(r, interBatchMs));
}
const thisBatch = Math.min(batchSize, taskCount - enqueued);
const enqueuedAt = Date.now();
const tasks = Array(thisBatch).fill(0);
if (mode === "raw") {
await Promise.all(
tasks.map(() =>
ctx.scheduler.runAfter(
0,
internal.test.scenarios.overhead.recorder,
{ runId, enqueuedAt },
),
),
);
} else if (!useOnComplete) {
const pool = newPool ?? oldPool!;
await Promise.all(
tasks.map(() =>
pool.enqueueMutation(
ctx,
internal.test.scenarios.overhead.recorder,
{ runId, enqueuedAt },
),
),
);
} else {
const pool = newPool ?? oldPool!;
await Promise.all(
tasks.map(() =>
pool.enqueueMutation(
ctx,
internal.test.scenarios.overhead.noop,
{},
{
onComplete: internal.test.scenarios.overhead.oncompleteRecorder,
context: { runId, enqueuedAt },
},
),
),
);
}
enqueued += thisBatch;
}
const enqueueTotal = Date.now() - scenarioStart;
console.log(
`Enqueued ${taskCount} in ${enqueueTotal}ms ` +
`(${(taskCount / (enqueueTotal / 1000)).toFixed(0)}/s).`,
);

const pollStart = Date.now();
let metrics: Record<string, unknown> | null = null;
while (Date.now() - pollStart < pollTimeoutMs) {
metrics = (await ctx.runQuery(internal.test.run.metrics)) as Record<
string,
unknown
> | null;
if (metrics && metrics.status === "completed") break;
await new Promise((r) => setTimeout(r, 100));
}

if (!metrics || metrics.status !== "completed") {
console.log(`Timed out after ${pollTimeoutMs}ms.`);
return { metrics, enqueueTotal, timedOut: true };
}

const total = metrics.totalDurationMs as number;
const completedCount = metrics.completedCount as number;
const tps = (completedCount / total) * 1000;
const msPerTask = total / completedCount;

console.log(`\n=== overhead[${mode}] ===`);
console.log(
`${completedCount}/${taskCount} done in ${total}ms ` +
`(${tps.toFixed(0)} tps, ${msPerTask.toFixed(1)} ms/task)`,
);
return {
mode,
taskCount: completedCount,
totalDurationMs: total,
enqueueTotal,
tasksPerSec: Math.round(tps),
msPerTaskWallClock: Math.round(msPerTask * 10) / 10,
};
},
});
Loading
Loading