From a1dd3678342011e62b454e968094d75188112e85 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Mon, 11 May 2026 21:47:19 -0700 Subject: [PATCH 01/11] add dashboard to run scenarios --- example/convex/_generated/api.d.ts | 4 + example/convex/schema.ts | 1 + example/convex/test/dashboard.ts | 215 +++++ example/convex/test/pool.ts | 62 ++ example/convex/test/run.ts | 14 +- example/convex/test/scenarios/bigArgs.ts | 5 + example/convex/test/scenarios/bigContext.ts | 5 + .../convex/test/scenarios/bigReturnTypes.ts | 5 + .../convex/test/scenarios/burstyBatches.ts | 5 + example/convex/test/scenarios/overhead.ts | 47 +- example/convex/test/scenarios/sustained.ts | 35 +- example/convex/test/scenarios/throughput.ts | 5 + example/convex/test/work.ts | 33 +- example/src/App.css | 231 +++++- example/src/App.tsx | 754 +++++++++++++++++- package-lock.json | 440 ++++++++++ package.json | 1 + 17 files changed, 1751 insertions(+), 111 deletions(-) create mode 100644 example/convex/test/dashboard.ts create mode 100644 example/convex/test/pool.ts diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index a64483be..9ea3af62 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -10,7 +10,9 @@ import type * as crons from "../crons.js"; import type * as example from "../example.js"; +import type * as test_dashboard from "../test/dashboard.js"; import type * as test_nonRetryable from "../test/nonRetryable.js"; +import type * as test_pool from "../test/pool.js"; import type * as test_run from "../test/run.js"; import type * as test_scenarios_bigArgs from "../test/scenarios/bigArgs.js"; import type * as test_scenarios_bigContext from "../test/scenarios/bigContext.js"; @@ -30,7 +32,9 @@ import type { declare const fullApi: ApiFromModules<{ crons: typeof crons; example: typeof example; + "test/dashboard": typeof test_dashboard; "test/nonRetryable": typeof test_nonRetryable; + "test/pool": typeof test_pool; "test/run": typeof test_run; "test/scenarios/bigArgs": typeof test_scenarios_bigArgs; "test/scenarios/bigContext": typeof test_scenarios_bigContext; diff --git a/example/convex/schema.ts b/example/convex/schema.ts index 0f692e29..689f9db1 100644 --- a/example/convex/schema.ts +++ b/example/convex/schema.ts @@ -13,6 +13,7 @@ export default defineSchema({ parameters: v.any(), taskCount: v.optional(v.number()), endTime: v.optional(v.number()), + pool: v.optional(v.union(v.literal("new"), v.literal("old"))), }), tasks: defineTable({ runId: v.id("runs"), diff --git a/example/convex/test/dashboard.ts b/example/convex/test/dashboard.ts new file mode 100644 index 00000000..b81dfd4a --- /dev/null +++ b/example/convex/test/dashboard.ts @@ -0,0 +1,215 @@ +import { v } from "convex/values"; +import { query, action } from "../_generated/server"; +import { internal } from "../_generated/api"; +import { runStatus } from "./run"; + +function percentile(sorted: number[], p: number): number { + const idx = Math.ceil((p / 100) * sorted.length) - 1; + return sorted[Math.max(0, idx)]; +} + +// Just the list of run docs — no per-row tasks aggregation. Each row +// subscribes to `getRun` separately so we don't .collect() the entire +// tasks table on every history poll. +export const listRuns = query({ + args: { limit: v.optional(v.number()) }, + handler: async (ctx, { limit = 50 }) => { + const runs = await ctx.db.query("runs").order("desc").take(limit); + return runs.map((run) => ({ + _id: run._id, + scenario: run.scenario, + pool: run.pool, + startTime: run.startTime, + taskCount: run.taskCount, + })); + }, +}); + +export const getRun = query({ + args: { runId: v.id("runs") }, + handler: async (ctx, { runId }) => { + const run = await ctx.db.get(runId); + if (!run) return null; + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", run._id)) + .collect(); + const status = await runStatus(ctx, run); + const latencies = tasks + .filter((t) => t.enqueuedAt !== undefined) + .map((t) => t.endTime - t.enqueuedAt!) + .sort((a, b) => a - b); + const endTimes = tasks.map((t) => t.endTime); + const lastEnd = endTimes.length ? Math.max(...endTimes) : undefined; + return { + _id: run._id, + scenario: run.scenario, + parameters: run.parameters, + pool: run.pool, + startTime: run.startTime, + taskCount: run.taskCount, + completedCount: tasks.length, + status, + totalDurationMs: + lastEnd !== undefined ? lastEnd - run.startTime : undefined, + latency: + latencies.length > 0 + ? { + p50: percentile(latencies, 50), + p95: percentile(latencies, 95), + p99: percentile(latencies, 99), + max: latencies[latencies.length - 1], + } + : undefined, + }; + }, +}); + +// Time-bucketed throughput. Returns one point per `bucketMs` window from +// the run's start: completed (count finishing in that window) and inFlight +// (enqueued - completed at that t). +export const throughputOverTime = query({ + args: { runId: v.id("runs"), bucketMs: v.optional(v.number()) }, + handler: async (ctx, { runId, bucketMs = 500 }) => { + const run = await ctx.db.get(runId); + if (!run) return null; + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", runId)) + .collect(); + if (tasks.length === 0) return { bucketMs, points: [] }; + + const start = run.startTime; + const lastEnd = Math.max(...tasks.map((t) => t.endTime)); + const totalDurationMs = lastEnd - start; + const numBuckets = Math.max(1, Math.ceil(totalDurationMs / bucketMs) + 1); + + const completedPerBucket = new Array(numBuckets).fill(0); + const enqueuedPerBucket = new Array(numBuckets).fill(0); + for (const t of tasks) { + const cIdx = Math.min( + numBuckets - 1, + Math.floor((t.endTime - start) / bucketMs), + ); + completedPerBucket[cIdx]++; + if (t.enqueuedAt !== undefined) { + const eIdx = Math.max( + 0, + Math.min( + numBuckets - 1, + Math.floor((t.enqueuedAt - start) / bucketMs), + ), + ); + enqueuedPerBucket[eIdx]++; + } + } + const points: Array<{ + tMs: number; + completed: number; + enqueued: number; + inFlight: number; + }> = []; + let cumEnqueued = 0; + let cumCompleted = 0; + for (let i = 0; i < numBuckets; i++) { + cumEnqueued += enqueuedPerBucket[i]; + cumCompleted += completedPerBucket[i]; + points.push({ + tMs: i * bucketMs, + completed: completedPerBucket[i], + enqueued: enqueuedPerBucket[i], + inFlight: Math.max(0, cumEnqueued - cumCompleted), + }); + } + return { bucketMs, points }; + }, +}); + +// Sorted latency array, thinned to ~200 points for CDF plotting. +export const latencyCdf = query({ + args: { runId: v.id("runs"), points: v.optional(v.number()) }, + handler: async (ctx, { runId, points = 200 }) => { + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", runId)) + .collect(); + const latencies = tasks + .filter((t) => t.enqueuedAt !== undefined) + .map((t) => t.endTime - t.enqueuedAt!) + .sort((a, b) => a - b); + if (latencies.length === 0) return []; + const stride = Math.max(1, Math.floor(latencies.length / points)); + const out: Array<{ pct: number; ms: number }> = []; + for (let i = 0; i < latencies.length; i += stride) { + out.push({ + pct: ((i + 1) / latencies.length) * 100, + ms: latencies[i], + }); + } + // ensure the max latency point is present + out.push({ pct: 100, ms: latencies[latencies.length - 1] }); + return out; + }, +}); + +// Live status of the currently-running scenario, if any. +export const latestRunStatus = query({ + args: {}, + handler: async (ctx) => { + const run = await ctx.db.query("runs").order("desc").first(); + if (!run) return null; + const status = await runStatus(ctx, run); + return { runId: run._id, scenario: run.scenario, status }; + }, +}); + +const scenarioName = v.union( + v.literal("burstyBatches"), + v.literal("throughput"), + v.literal("overhead"), + v.literal("sustained"), + v.literal("bigArgs"), + v.literal("bigContext"), + v.literal("bigReturnTypes"), +); + +/** + * Public action so the dashboard can trigger one or more scenario runs by + * name. Multi-launch is sequenced server-side: each non-final entry is + * awaited via `ctx.runAction` (so we know its tasks are done and the + * scenario's poll loop has returned), then we sleep past the 5s + * "previous run started too recently" guard in `run.start` before the + * next launch. The final entry is also awaited so that any `run.start` + * failure surfaces back to the dashboard instead of disappearing into a + * scheduled action's logs. + */ +const GUARD_BUFFER_MS = 5_500; +/** + * Concurrent benchmark: fires the same scenario at both pools simultaneously + * and waits for both runs to fully complete. Useful for testing whether + * scheduler thrash from a competing pool makes individual pool throughput + * worse than running it alone. + */ +export const runConcurrent = action({ + args: { scenario: scenarioName, args: v.any() }, + handler: async (ctx, { scenario, args }) => { + const fn = internal.test.scenarios[scenario].default; + const [, ] = await Promise.all([ + ctx.runAction(fn, { ...args, pool: "old" }), + ctx.runAction(fn, { ...args, pool: "new" }), + ]); + }, +}); + +export const runScenarios = action({ + args: { scenario: scenarioName, argsList: v.array(v.any()) }, + handler: async (ctx, { scenario, argsList }) => { + const fn = internal.test.scenarios[scenario].default; + for (let i = 0; i < argsList.length; i++) { + if (i > 0) { + await new Promise((r) => setTimeout(r, GUARD_BUFFER_MS)); + } + await ctx.runAction(fn, argsList[i]); + } + }, +}); diff --git a/example/convex/test/pool.ts b/example/convex/test/pool.ts new file mode 100644 index 00000000..b527e7dc --- /dev/null +++ b/example/convex/test/pool.ts @@ -0,0 +1,62 @@ +import { v } from "convex/values"; +import { components } from "../_generated/api"; +import { Workpool, enqueue, enqueueBatch } from "@convex-dev/workpool"; +import type { ComponentApi } from "@convex-dev/workpool/_generated/component.js"; +import { + Workpool as OldWorkpool, + enqueue as enqueueOld, + enqueueBatch as enqueueBatchOld, +} from "@convex-dev/workpool-old"; +import { ActionCtx, MutationCtx } from "../_generated/server"; + +export const POOL_KINDS = ["new", "old"] as const; +export type PoolKind = (typeof POOL_KINDS)[number]; +export const vPoolKind = v.union(v.literal("new"), v.literal("old")); + +type Ctx = ActionCtx | MutationCtx; + +export async function configurePool( + ctx: Ctx, + kind: PoolKind, + maxParallelism: number, +): Promise { + const component = + kind === "new" ? components.testWorkpool : components.oldWorkpool; + await ctx.runMutation(component.config.update, { maxParallelism }); +} + +/** + * Returns a Workpool instance for the chosen pool. Both classes share the + * same `enqueueMutation` / `enqueueAction` / `enqueueBatch` shape, so the + * union return type works for callers that just need to enqueue. + */ +export function makePool( + kind: PoolKind, + opts: { maxParallelism: number }, +): Workpool | OldWorkpool { + if (kind === "new") return new Workpool(components.testWorkpool, opts); + return new OldWorkpool(components.oldWorkpool, opts); +} + +export function getComponent(kind: PoolKind) { + return kind === "new" ? components.testWorkpool : components.oldWorkpool; +} + +export function enqueueFor(kind: PoolKind): { + component: ComponentApi; + enqueueOne: typeof enqueue; + enqueueMany: typeof enqueueBatch; +} { + if (kind === "new") { + return { + component: components.testWorkpool, + enqueueOne: enqueue, + enqueueMany: enqueueBatch, + }; + } + return { + component: components.oldWorkpool, + enqueueOne: enqueueOld, + enqueueMany: enqueueBatchOld, + }; +} diff --git a/example/convex/test/run.ts b/example/convex/test/run.ts index 8d785781..ccb3564a 100644 --- a/example/convex/test/run.ts +++ b/example/convex/test/run.ts @@ -7,6 +7,7 @@ import { v } from "convex/values"; import { Id } from "../_generated/dataModel"; import { assert } from "convex-helpers"; import { components } from "../_generated/api"; +import { getComponent } from "./pool"; export async function runStatus( ctx: QueryCtx, @@ -31,28 +32,32 @@ export const start = internalMutation({ args: { scenario: v.string(), parameters: v.any(), + pool: v.optional(v.union(v.literal("new"), v.literal("old"))), }, handler: async (ctx, args) => { // Check for in-flight tasks from the latest run const latestRun = await ctx.db.query("runs").order("desc").first(); if (latestRun) { - // Check if there are any in-flight tasks + // Check if there are any in-flight tasks. Allow concurrent runs when + // pools differ — runs are scoped to a single pool, so a new "new" + // pool run won't trample an in-flight "old" pool run (or vice versa). const status = await runStatus(ctx, latestRun); + const samePool = (latestRun.pool ?? "new") === (args.pool ?? "new"); - if (["running", "pending"].includes(status)) { + if (samePool && ["running", "pending"].includes(status)) { throw new Error( `Cannot start new run: previous run ${latestRun.scenario} is ${status} (started at ${new Date(latestRun.startTime).toISOString()})`, ); } - if (latestRun.startTime + 5_000 > Date.now()) { + if (samePool && latestRun.startTime + 5_000 > Date.now()) { throw new Error( `Cannot start new run: previous run ${latestRun.scenario} was started less than 5 seconds ago (started at ${new Date(latestRun.startTime).toISOString()})`, ); } } if (args.parameters.maxParallelism !== undefined) { - await ctx.runMutation(components.testWorkpool.config.update, { + await ctx.runMutation(getComponent(args.pool ?? "new").config.update, { maxParallelism: args.parameters.maxParallelism, }); } @@ -63,6 +68,7 @@ export const start = internalMutation({ scenario: args.scenario, parameters: args.parameters, taskCount: args.parameters.taskCount, + pool: args.pool ?? "new", }); return runId; diff --git a/example/convex/test/scenarios/bigArgs.ts b/example/convex/test/scenarios/bigArgs.ts index b038e7ce..81c98403 100644 --- a/example/convex/test/scenarios/bigArgs.ts +++ b/example/convex/test/scenarios/bigArgs.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { generateData, enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; const parameters = { taskCount: v.optional(v.number()), @@ -11,6 +12,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -23,6 +25,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -38,6 +41,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -68,6 +72,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/bigContext.ts b/example/convex/test/scenarios/bigContext.ts index 8a332875..e209e636 100644 --- a/example/convex/test/scenarios/bigContext.ts +++ b/example/convex/test/scenarios/bigContext.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { generateData, enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; /** * Big Context Scenario @@ -20,6 +21,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -32,6 +34,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -47,6 +50,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -84,6 +88,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/bigReturnTypes.ts b/example/convex/test/scenarios/bigReturnTypes.ts index e1c9bc5d..623f8232 100644 --- a/example/convex/test/scenarios/bigReturnTypes.ts +++ b/example/convex/test/scenarios/bigReturnTypes.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; /** * Big Return Types Scenario @@ -19,6 +20,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -31,6 +33,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -46,6 +49,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -75,6 +79,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/burstyBatches.ts b/example/convex/test/scenarios/burstyBatches.ts index 22e26ba3..f5793b13 100644 --- a/example/convex/test/scenarios/burstyBatches.ts +++ b/example/convex/test/scenarios/burstyBatches.ts @@ -3,6 +3,7 @@ import { v } from "convex/values"; import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; +import { vPoolKind } from "../pool"; const parameters = { waveCount: v.optional(v.number()), @@ -11,6 +12,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), maxParallelism: v.optional(v.number()), taskDurationMs: v.optional(v.number()), + pool: v.optional(vPoolKind), }; /** @@ -49,6 +51,7 @@ export default internalAction({ taskType = "mutation", maxParallelism = 50, taskDurationMs = 0, + pool = "new", }, ) => { const taskCount = waveCount * tasksPerWave; @@ -63,6 +66,7 @@ export default internalAction({ maxParallelism, taskDurationMs, }, + pool, }); const scenarioStart = Date.now(); @@ -111,6 +115,7 @@ export default internalAction({ const waveStart = Date.now(); await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/overhead.ts b/example/convex/test/scenarios/overhead.ts index 8c17306d..c5f54574 100644 --- a/example/convex/test/scenarios/overhead.ts +++ b/example/convex/test/scenarios/overhead.ts @@ -1,9 +1,8 @@ import { internalAction, internalMutation } from "../../_generated/server"; import { v } from "convex/values"; -import { internal, components } from "../../_generated/api"; -import { Workpool } from "@convex-dev/workpool"; -import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { internal } from "../../_generated/api"; import { Id } from "../../_generated/dataModel"; +import { PoolKind, makePool } from "../pool"; /** * Throughput / overhead measurement scenario. @@ -66,6 +65,12 @@ const Mode = v.union( v.literal("oldpool-oc"), ); +function poolFromMode(mode: string): PoolKind | null { + if (mode === "workpool-bare" || mode === "workpool-oc") return "new"; + if (mode === "oldpool-bare" || mode === "oldpool-oc") return "old"; + return null; +} + export default internalAction({ args: { taskCount: v.optional(v.number()), @@ -86,38 +91,20 @@ export default internalAction({ pollTimeoutMs = 600_000, }, ) => { + const poolKind = poolFromMode(mode); + const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { scenario: `overhead-${mode}`, parameters: { taskCount, batchSize, mode, maxParallelism, interBatchMs }, + pool: poolKind ?? undefined, }); const scenarioStart = Date.now(); - - const isWorkpoolNew = mode === "workpool-bare" || mode === "workpool-oc"; - const isWorkpoolOld = mode === "oldpool-bare" || mode === "oldpool-oc"; - const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; - - // Configure the right pool (separate components → no cross-contamination) - if (isWorkpoolNew) { - await ctx.runMutation(components.testWorkpool.config.update, { - maxParallelism, - }); - } - if (isWorkpoolOld) { - await ctx.runMutation(components.oldWorkpool.config.update, { - maxParallelism, - }); - } - - const newPool = isWorkpoolNew - ? new Workpool(components.testWorkpool, { maxParallelism }) - : null; - const oldPool = isWorkpoolOld - ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) - : null; + // run.start already configured the right component's maxParallelism. + const pool = poolKind ? makePool(poolKind, { maxParallelism }) : null; console.log( `overhead[${mode}]: ${taskCount} tasks, batchSize=${batchSize}` + - (newPool || oldPool ? `, max=${maxParallelism}` : ""), + (pool ? `, max=${maxParallelism}` : ""), ); const numBatches = Math.ceil(taskCount / batchSize); @@ -140,10 +127,9 @@ export default internalAction({ ), ); } else if (!useOnComplete) { - const pool = newPool ?? oldPool!; await Promise.all( tasks.map(() => - pool.enqueueMutation( + pool!.enqueueMutation( ctx, internal.test.scenarios.overhead.recorder, { runId, enqueuedAt }, @@ -151,10 +137,9 @@ export default internalAction({ ), ); } else { - const pool = newPool ?? oldPool!; await Promise.all( tasks.map(() => - pool.enqueueMutation( + pool!.enqueueMutation( ctx, internal.test.scenarios.overhead.noop, {}, diff --git a/example/convex/test/scenarios/sustained.ts b/example/convex/test/scenarios/sustained.ts index 5e40881f..aa414df7 100644 --- a/example/convex/test/scenarios/sustained.ts +++ b/example/convex/test/scenarios/sustained.ts @@ -1,9 +1,8 @@ import { internalAction, internalMutation } from "../../_generated/server"; import { v } from "convex/values"; -import { internal, components } from "../../_generated/api"; -import { Workpool } from "@convex-dev/workpool"; -import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { internal } from "../../_generated/api"; import { Id } from "../../_generated/dataModel"; +import { PoolKind, makePool } from "../pool"; /** * Sustained, interleaved load scenario. Designed to exercise OCC paths @@ -90,6 +89,10 @@ const Mode = v.union( v.literal("oldpool-oc"), ); +function poolFromMode(mode: string): PoolKind { + return mode === "oldpool-bare" || mode === "oldpool-oc" ? "old" : "new"; +} + export default internalAction({ args: { targetTps: v.optional(v.number()), // tasks per second @@ -113,6 +116,8 @@ export default internalAction({ }, ) => { const totalTasks = targetTps * durationSec; + const poolKind = poolFromMode(mode); + const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { scenario: `sustained-${mode}`, parameters: { @@ -124,29 +129,11 @@ export default internalAction({ mode, maxParallelism, }, + pool: poolKind, }); - const isNew = mode === "workpool-bare" || mode === "workpool-oc"; - const isOld = mode === "oldpool-bare" || mode === "oldpool-oc"; - const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; - - if (isNew) { - await ctx.runMutation(components.testWorkpool.config.update, { - maxParallelism, - }); - } - if (isOld) { - await ctx.runMutation(components.oldWorkpool.config.update, { - maxParallelism, - }); - } - const newPool = isNew - ? new Workpool(components.testWorkpool, { maxParallelism }) - : null; - const oldPool = isOld - ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) - : null; - const pool = newPool ?? oldPool!; + // run.start already configured the right component's maxParallelism. + const pool = makePool(poolKind, { maxParallelism }); console.log( `sustained[${mode}]: ${totalTasks} tasks @ ${targetTps}/s for ${durationSec}s, ` + diff --git a/example/convex/test/scenarios/throughput.ts b/example/convex/test/scenarios/throughput.ts index b27eeb49..f3dfb969 100644 --- a/example/convex/test/scenarios/throughput.ts +++ b/example/convex/test/scenarios/throughput.ts @@ -3,6 +3,7 @@ import { v } from "convex/values"; import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; +import { vPoolKind } from "../pool"; /** * Throughput / saturation scenario. @@ -23,6 +24,7 @@ const parameters = { taskDurationMs: v.optional(v.number()), taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), pollTimeoutMs: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -37,6 +39,7 @@ export default internalAction({ taskDurationMs = 20, taskType = "mutation", pollTimeoutMs = 600_000, // 10 minutes for big runs + pool = "new", }, ) => { const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { @@ -49,6 +52,7 @@ export default internalAction({ taskType, taskDurationMs, }, + pool, }); const fn = @@ -80,6 +84,7 @@ export default internalAction({ const enqueuedAt = Date.now(); await enqueueTasks({ ctx, + pool, taskArgs: Array(thisBatch).fill(baseArgs), taskType, fn, diff --git a/example/convex/test/work.ts b/example/convex/test/work.ts index 0e090875..cc831b74 100644 --- a/example/convex/test/work.ts +++ b/example/convex/test/work.ts @@ -1,14 +1,9 @@ import { internalMutation, internalAction } from "../_generated/server"; import { v } from "convex/values"; import { DefaultFunctionArgs } from "convex/server"; -import { components } from "../_generated/api"; -import { - vOnCompleteArgs, - WorkId, - enqueueBatch, - enqueue, -} from "@convex-dev/workpool"; +import { vOnCompleteArgs, WorkId, enqueue } from "@convex-dev/workpool"; import { ActionCtx } from "../_generated/server"; +import { PoolKind, enqueueFor } from "./pool"; /** * Generates a string of the specified length. @@ -142,6 +137,7 @@ export type { WorkId }; */ export async function enqueueTasks(options: { ctx: ActionCtx; + pool?: PoolKind; taskArgs: T[]; taskType: TaskType; fn: Parameters[3]; @@ -150,33 +146,28 @@ export async function enqueueTasks(options: { }): Promise { const { ctx, + pool = "new", taskArgs, taskType, fn, onCompleteOpts, batchEnqueue = false, } = options; - - let workIds: WorkId[]; + const { component, enqueueOne, enqueueMany } = enqueueFor(pool); if (batchEnqueue) { - console.log("Using batch enqueue"); - workIds = await enqueueBatch( - components.testWorkpool, + return await enqueueMany( + component, ctx, taskType, fn, taskArgs, onCompleteOpts, ); - } else { - console.log("Using individual enqueue"); - workIds = await Promise.all( - taskArgs.map((a) => - enqueue(components.testWorkpool, ctx, taskType, fn, a, onCompleteOpts), - ), - ); } - - return workIds; + return await Promise.all( + taskArgs.map((a) => + enqueueOne(component, ctx, taskType, fn, a, onCompleteOpts), + ), + ); } diff --git a/example/src/App.css b/example/src/App.css index b9d355df..63e0fd80 100644 --- a/example/src/App.css +++ b/example/src/App.css @@ -1,42 +1,221 @@ +:root { + color-scheme: light dark; +} + #root { - max-width: 1280px; + max-width: 1400px; margin: 0 auto; - padding: 2rem; - text-align: center; + padding: 1.5rem; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; + text-align: left; } -.logo { - height: 6em; - padding: 1.5em; - will-change: filter; - transition: filter 300ms; +h1 { + font-size: 1.5rem; + margin: 0 0 1rem; } -.logo:hover { - filter: drop-shadow(0 0 2em #646cffaa); + +h2 { + font-size: 1.1rem; + margin: 1.5rem 0 0.5rem; } -.logo.react:hover { - filter: drop-shadow(0 0 2em #61dafbaa); + +nav.tabs { + display: flex; + gap: 0.25rem; + border-bottom: 1px solid #ddd3; + margin-bottom: 1rem; } -@keyframes logo-spin { - from { - transform: rotate(0deg); - } - to { - transform: rotate(360deg); - } +nav.tabs button { + background: none; + border: none; + padding: 0.5rem 0.9rem; + cursor: pointer; + color: inherit; + font-size: 0.95rem; + border-bottom: 2px solid transparent; } -@media (prefers-reduced-motion: no-preference) { - a:nth-of-type(2) .logo { - animation: logo-spin infinite 20s linear; - } +nav.tabs button.active { + border-bottom-color: #4f8cff; + font-weight: 600; } .card { - padding: 2em; + border: 1px solid #ddd3; + border-radius: 6px; + padding: 1rem; + margin-bottom: 1rem; +} + +table { + width: 100%; + border-collapse: collapse; + font-size: 0.85rem; +} + +th, td { + text-align: left; + padding: 0.4rem 0.6rem; + border-bottom: 1px solid #ddd3; +} + +th { + font-weight: 600; + background: #8881; +} + +tr.clickable { + cursor: pointer; +} + +tr.clickable:hover { + background: #4f8cff15; +} + +.row-actions { + display: flex; + gap: 0.5rem; +} + +button.primary { + background: #4f8cff; + color: white; + border: none; + padding: 0.5rem 1rem; + border-radius: 4px; + cursor: pointer; + font-size: 0.9rem; +} + +button.primary:disabled { + opacity: 0.5; + cursor: not-allowed; +} + +button.secondary { + background: transparent; + border: 1px solid #8884; + padding: 0.4rem 0.8rem; + border-radius: 4px; + cursor: pointer; + color: inherit; + font-size: 0.85rem; +} + +label { + display: flex; + flex-direction: column; + gap: 0.3rem; + font-size: 0.85rem; + margin-bottom: 0.6rem; +} + +input, select, textarea { + font: inherit; + padding: 0.4rem 0.5rem; + border: 1px solid #8884; + border-radius: 4px; + background: transparent; + color: inherit; +} + +textarea { + font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; + font-size: 0.82rem; + min-height: 7rem; +} + +.form-row { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); + gap: 0.6rem 1rem; +} + +.metric-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(120px, 1fr)); + gap: 0.5rem; + margin: 0.5rem 0; +} + +.metric { + background: #8881; + padding: 0.5rem 0.7rem; + border-radius: 4px; +} + +.metric .label { + font-size: 0.72rem; + text-transform: uppercase; + letter-spacing: 0.05em; + opacity: 0.7; +} + +.metric .value { + font-size: 1.05rem; + font-weight: 600; + font-variant-numeric: tabular-nums; +} + +.pool-badge { + display: inline-block; + padding: 0.1rem 0.4rem; + font-size: 0.72rem; + font-weight: 600; + border-radius: 3px; + text-transform: uppercase; +} + +.pool-badge.new { + background: #4f8cff33; + color: #4f8cff; +} + +.pool-badge.old { + background: #ff8c4f33; + color: #ff8c4f; +} + +.pool-badge.none { + background: #8884; +} + +.muted { + opacity: 0.6; +} + +.status-running { + color: #4f8cff; +} + +.status-completed { + color: #5cc97a; +} + +.status-pending { + color: #b8a352; +} + +.delta-positive { + color: #5cc97a; +} + +.delta-negative { + color: #d96363; +} + +.charts { + display: grid; + gap: 1rem; } -.read-the-docs { - color: #888; +pre.params { + font-size: 0.78rem; + background: #8881; + padding: 0.5rem; + border-radius: 4px; + overflow-x: auto; + margin: 0; } diff --git a/example/src/App.tsx b/example/src/App.tsx index de92be12..b53b7d6f 100644 --- a/example/src/App.tsx +++ b/example/src/App.tsx @@ -1,17 +1,761 @@ import "./App.css"; +import { useState, useMemo } from "react"; +import { useQuery, useAction } from "convex/react"; +import { api } from "../convex/_generated/api"; +import type { Id } from "../convex/_generated/dataModel"; +import { + LineChart, + Line, + AreaChart, + Area, + CartesianGrid, + XAxis, + YAxis, + Tooltip, + Legend, + ResponsiveContainer, +} from "recharts"; + +type RunId = Id<"runs">; +type Tab = "history" | "detail" | "compare" | "run"; + +type PoolKind = "new" | "old"; + +function PoolBadge({ pool }: { pool?: PoolKind }) { + const cls = pool ?? "none"; + return {pool ?? "—"}; +} + +function fmt(ms: number | undefined): string { + if (ms === undefined) return "—"; + if (ms < 1000) return `${Math.round(ms)}ms`; + return `${(ms / 1000).toFixed(2)}s`; +} + +function fmtTime(t: number): string { + return new Date(t).toLocaleString(); +} function App() { + const [tab, setTab] = useState("history"); + const [selectedRunId, setSelectedRunId] = useState(null); + const [compareIds, setCompareIds] = useState<[RunId | null, RunId | null]>([ + null, + null, + ]); + + return ( + <> +

Workpool Dashboard

+ + + {tab === "history" && ( + { + setSelectedRunId(id); + setTab("detail"); + }} + onCompare={(a, b) => { + setCompareIds([a, b]); + setTab("compare"); + }} + /> + )} + {tab === "detail" && selectedRunId && ( + + )} + {tab === "compare" && ( + + )} + {tab === "run" && setTab("history")} />} + + ); +} + +function History({ + onPick, + onCompare, +}: { + onPick: (id: RunId) => void; + onCompare: (a: RunId, b: RunId) => void; +}) { + const runs = useQuery(api.test.dashboard.listRuns, { limit: 100 }); + const [compareA, setCompareA] = useState(null); + const [compareB, setCompareB] = useState(null); + + if (runs === undefined) return

Loading…

; + if (runs.length === 0) + return

No runs yet. Use “Run scenario”.

; + + return ( +
+
+ + + {compareA && compareB ? "two runs selected" : "select A and B"} + +
+ + + + + + + + + + + + + + + + + + {runs.map((r) => ( + + ))} + +
ABScenarioPoolStatusTasksDurationp50p95p99Started
+
+ ); +} + +type HistoryRowData = { + _id: RunId; + scenario: string; + pool?: string; + startTime: number; + taskCount?: number; +}; + +function HistoryRow({ + row, + compareA, + compareB, + setCompareA, + setCompareB, + onPick, +}: { + row: HistoryRowData; + compareA: RunId | null; + compareB: RunId | null; + setCompareA: (id: RunId) => void; + setCompareB: (id: RunId) => void; + onPick: (id: RunId) => void; +}) { + const run = useQuery(api.test.dashboard.getRun, { runId: row._id }); + return ( + { + if ((e.target as HTMLElement).tagName === "INPUT") return; + onPick(row._id); + }} + > + + setCompareA(row._id)} + /> + + + setCompareB(row._id)} + /> + + {row.scenario} + + + + + {run ? run.status : "…"} + + + {run ? run.completedCount : "…"}/{row.taskCount ?? "?"} + + {fmt(run?.totalDurationMs)} + {fmt(run?.latency?.p50)} + {fmt(run?.latency?.p95)} + {fmt(run?.latency?.p99)} + {fmtTime(row.startTime)} + + ); +} + +function RunDetail({ runId }: { runId: RunId }) { + const run = useQuery(api.test.dashboard.getRun, { runId }); + const throughput = useQuery(api.test.dashboard.throughputOverTime, { + runId, + bucketMs: 500, + }); + const cdf = useQuery(api.test.dashboard.latencyCdf, { runId }); + + if (run === undefined) return

Loading…

; + if (run === null) return

Run not found.

; + + return ( + <> +
+
+

{run.scenario}

+ + {run.status} + {fmtTime(run.startTime)} +
+
+ + + + + + + +
+
+ parameters +
{JSON.stringify(run.parameters, null, 2)}
+
+
+ +
+ + + + + `${(t / 1000).toFixed(1)}s`} + /> + + `t=${((t as number) / 1000).toFixed(2)}s`} + /> + + + + + + + + + + + + + + t < 1000 ? `${t}ms` : `${(t / 1000).toFixed(1)}s` + } + /> + + + `${(t as number) < 1000 ? `${t}ms` : `${((t as number) / 1000).toFixed(2)}s`}` + } + /> + + + + +
+ + ); +} + +function Compare({ + ids, + setIds, +}: { + ids: [RunId | null, RunId | null]; + setIds: (ids: [RunId | null, RunId | null]) => void; +}) { + const runs = useQuery(api.test.dashboard.listRuns, { limit: 100 }); + const [a, b] = ids; + const runA = useQuery( + api.test.dashboard.getRun, + a ? { runId: a } : "skip", + ); + const runB = useQuery( + api.test.dashboard.getRun, + b ? { runId: b } : "skip", + ); + const tA = useQuery( + api.test.dashboard.throughputOverTime, + a ? { runId: a, bucketMs: 500 } : "skip", + ); + const tB = useQuery( + api.test.dashboard.throughputOverTime, + b ? { runId: b, bucketMs: 500 } : "skip", + ); + const cA = useQuery( + api.test.dashboard.latencyCdf, + a ? { runId: a } : "skip", + ); + const cB = useQuery( + api.test.dashboard.latencyCdf, + b ? { runId: b } : "skip", + ); + + const throughputData = useMemo(() => { + const aPts = tA?.points ?? []; + const bPts = tB?.points ?? []; + const len = Math.max(aPts.length, bPts.length); + const out: Array<{ tMs: number; aCompleted?: number; bCompleted?: number }> = + []; + for (let i = 0; i < len; i++) { + const tMs = (aPts[i]?.tMs ?? bPts[i]?.tMs ?? i * 500) as number; + out.push({ + tMs, + aCompleted: aPts[i]?.completed, + bCompleted: bPts[i]?.completed, + }); + } + return out; + }, [tA, tB]); + + // Merge CDFs by ms axis: zip both sorted arrays into points with optional aPct/bPct. + const cdfData = useMemo(() => { + const aArr = cA ?? []; + const bArr = cB ?? []; + const points: Array<{ ms: number; aPct?: number; bPct?: number }> = []; + aArr.forEach((p) => points.push({ ms: p.ms, aPct: p.pct })); + bArr.forEach((p) => points.push({ ms: p.ms, bPct: p.pct })); + points.sort((x, y) => x.ms - y.ms); + return points; + }, [cA, cB]); + return ( <> -

Convex Workpool Component Example

-

- See example/convex/example.ts for all the ways to use - this component -

+
+ + +
+
+ + {runA && runB && ( +
+

Summary delta

+ +
+ )} + +
+ + + + + `${(t / 1000).toFixed(1)}s`} + /> + + + + + + + + + + + + + + + t < 1000 ? `${t}ms` : `${(t / 1000).toFixed(1)}s` + } + /> + + + + + + + +
); } +function DeltaTable({ + a, + b, +}: { + a: NonNullable>>; + b: NonNullable>>; +}) { + const rows: Array<{ label: string; av?: number; bv?: number; lower: boolean }> = [ + { label: "Total duration (ms)", av: a.totalDurationMs, bv: b.totalDurationMs, lower: true }, + { label: "p50 (ms)", av: a.latency?.p50, bv: b.latency?.p50, lower: true }, + { label: "p95 (ms)", av: a.latency?.p95, bv: b.latency?.p95, lower: true }, + { label: "p99 (ms)", av: a.latency?.p99, bv: b.latency?.p99, lower: true }, + { label: "max (ms)", av: a.latency?.max, bv: b.latency?.max, lower: true }, + ]; + return ( + + + + + + + + + + + {rows.map((row) => { + const delta = + row.av !== undefined && row.bv !== undefined && row.av !== 0 + ? ((row.bv - row.av) / row.av) * 100 + : undefined; + // For "lower is better": negative delta is good. + const better = delta !== undefined && (row.lower ? delta < 0 : delta > 0); + return ( + + + + + + + ); + })} + +
MetricA ({a.scenario} · {a.pool ?? "?"})B ({b.scenario} · {b.pool ?? "?"})Δ (B vs A)
{row.label}{row.av ?? "—"}{row.bv ?? "—"} + {delta === undefined ? "—" : `${delta > 0 ? "+" : ""}${delta.toFixed(1)}%`} +
+ ); +} + +const SCENARIO_PRESETS = { + burstyBatches: { + waveCount: 10, + tasksPerWave: 20, + delayBetweenWavesMs: 500, + maxParallelism: 50, + taskDurationMs: 0, + }, + throughput: { + taskCount: 1000, + batchSize: 100, + interBatchMs: 50, + maxParallelism: 100, + taskDurationMs: 20, + }, + overhead: { + taskCount: 500, + batchSize: 50, + interBatchMs: 0, + mode: "workpool-bare", + maxParallelism: 50, + }, + sustained: { + targetTps: 50, + durationSec: 20, + workerMinMs: 50, + workerMaxMs: 500, + mode: "workpool-bare", + maxParallelism: 100, + }, + bigArgs: { + taskCount: 30, + argSizeBytes: 800000, + maxParallelism: 30, + }, + bigContext: { + taskCount: 30, + contextSizeBytes: 800000, + maxParallelism: 30, + }, + bigReturnTypes: { + taskCount: 20, + returnSizeBytes: 1000000, + maxParallelism: 20, + }, +} as const satisfies Record>; + +type ScenarioName = keyof typeof SCENARIO_PRESETS; + +function RunScenarioForm({ onStarted }: { onStarted: () => void }) { + const runScenarios = useAction(api.test.dashboard.runScenarios); + const [scenario, setScenario] = useState("burstyBatches"); + const [pool, setPool] = useState<"new" | "old" | "both">("new"); + const [paramsText, setParamsText] = useState( + JSON.stringify(SCENARIO_PRESETS[scenario], null, 2), + ); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(null); + + const updateScenario = (next: ScenarioName) => { + setScenario(next); + setParamsText(JSON.stringify(SCENARIO_PRESETS[next], null, 2)); + setError(null); + }; + + const launch = async () => { + setError(null); + let parsed: Record; + try { + parsed = JSON.parse(paramsText); + } catch (e) { + setError(`Invalid JSON: ${(e as Error).message}`); + return; + } + setBusy(true); + try { + const supportsPool = + scenario !== "overhead" && scenario !== "sustained"; + const launches: Array<"new" | "old"> = + pool === "both" ? ["old", "new"] : [pool]; + const argsList = launches.map((p) => ({ + ...parsed, + ...(supportsPool ? { pool: p } : {}), + })); + await runScenarios({ scenario, argsList }); + onStarted(); + } catch (e) { + setError((e as Error).message); + } finally { + setBusy(false); + } + }; + + const supportsPool = scenario !== "overhead" && scenario !== "sustained"; + + return ( +
+
+ + +
+