diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b59c18ff..fe87ecbc 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,6 +7,21 @@ npm i npm run dev ``` +## Running the example dashboard + +The `example/` app includes a dashboard for launching benchmark scenarios, +inspecting individual runs (throughput, latency CDF), and comparing two runs +side-by-side. To use it, run the backend and the frontend together: + +```sh +npm run dev # in one terminal: convex dev + library codegen watch +npm run dev:dashboard # in another: vite dev server for example/ +``` + +Then open the URL vite prints (typically ). The "Run +scenario" tab launches presets; "History" lists past runs; "Compare" diffs two +of them. See `example/README.md` for more. + ## Testing ```sh diff --git a/eslint.config.js b/eslint.config.js index 76ef82e6..479a3ea3 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -9,6 +9,7 @@ export default [ { ignores: [ "dist/**", + "example/dist/**", "eslint.config.js", "vitest.config.ts", "**/_generated/", diff --git a/example/README.md b/example/README.md index 5d731357..fc3ec220 100644 --- a/example/README.md +++ b/example/README.md @@ -1,4 +1,48 @@ # Example app -Components need an app that uses them in order to run codegen. An example app is -also useful for testing and documentation. +Components need an app that uses them in order to run codegen. This example app +also doubles as a benchmark dashboard for the workpool component itself — it +exercises the API and surfaces throughput and latency metrics for the scenarios +in `convex/test/scenarios/`. + +## Running the dashboard + +From the repo root, in two terminals: + +```sh +npm run dev # backend: convex dev + workpool codegen watch +npm run dev:dashboard # frontend: vite dev server (defaults to http://localhost:5173) +``` + +The first run of `npm run dev` writes `.env.local` with `VITE_CONVEX_URL`, which +the vite config reads from the repo root (`envDir: "../"`). + +## What's in the dashboard + +- **Run scenario** — pick a preset (`burstyBatches`, `throughput`, `overhead`, + `sustained`, `bigArgs`, `bigContext`, `bigReturnTypes`), tweak the JSON + parameters, and launch it against the "new" pool (this branch), the "old" pool + (`workpool@0.4.6`, installed as `@convex-dev/workpool-old`), or both + back-to-back. +- **History** — every run is persisted to the `runs` table. Pick A and B to diff + them. +- **Detail** — per-run throughput-over-time and latency CDF charts. +- **Compare** — side-by-side throughput and CDF for two runs, plus a summary + delta table (p50/p95/p99/max/duration). + +URL state is encoded in the hash (`#detail/`, `#compare/,`), so +links are shareable. + +## Deploying it as a static site (optional) + +The example is wired to `@convex-dev/static-hosting`, so you can publish the +dashboard to your dev deployment with: + +```sh +npm run deploy:dashboard # uploads to dev +npm run deploy:dashboard:prod # uploads to prod +``` + +It will be served at `https://.convex.site/`. See the component +setup in `convex/convex.config.ts`, `convex/http.ts`, and +`convex/staticHosting.ts`. diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index a64483be..09937400 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -10,7 +10,11 @@ import type * as crons from "../crons.js"; import type * as example from "../example.js"; +import type * as http from "../http.js"; +import type * as staticHosting from "../staticHosting.js"; +import type * as test_dashboard from "../test/dashboard.js"; import type * as test_nonRetryable from "../test/nonRetryable.js"; +import type * as test_pool from "../test/pool.js"; import type * as test_run from "../test/run.js"; import type * as test_scenarios_bigArgs from "../test/scenarios/bigArgs.js"; import type * as test_scenarios_bigContext from "../test/scenarios/bigContext.js"; @@ -30,7 +34,11 @@ import type { declare const fullApi: ApiFromModules<{ crons: typeof crons; example: typeof example; + http: typeof http; + staticHosting: typeof staticHosting; + "test/dashboard": typeof test_dashboard; "test/nonRetryable": typeof test_nonRetryable; + "test/pool": typeof test_pool; "test/run": typeof test_run; "test/scenarios/bigArgs": typeof test_scenarios_bigArgs; "test/scenarios/bigContext": typeof test_scenarios_bigContext; @@ -74,4 +82,5 @@ export declare const components: { serializedPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"serializedPool">; testWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"testWorkpool">; oldWorkpool: import("@convex-dev/workpool-old/_generated/component.js").ComponentApi<"oldWorkpool">; + selfHosting: import("@convex-dev/static-hosting/_generated/component.js").ComponentApi<"selfHosting">; }; diff --git a/example/convex/convex.config.ts b/example/convex/convex.config.ts index b3e9ad02..d780a28c 100644 --- a/example/convex/convex.config.ts +++ b/example/convex/convex.config.ts @@ -1,6 +1,7 @@ import { defineApp } from "convex/server"; import workpool from "@convex-dev/workpool/convex.config"; import workpoolOld from "@convex-dev/workpool-old/convex.config"; +import staticHosting from "@convex-dev/static-hosting/convex.config"; const app = defineApp(); app.use(workpool, { name: "smallPool" }); @@ -8,5 +9,6 @@ app.use(workpool, { name: "bigPool" }); app.use(workpool, { name: "serializedPool" }); app.use(workpool, { name: "testWorkpool" }); app.use(workpoolOld, { name: "oldWorkpool" }); +app.use(staticHosting); export default app; diff --git a/example/convex/http.ts b/example/convex/http.ts new file mode 100644 index 00000000..374fc578 --- /dev/null +++ b/example/convex/http.ts @@ -0,0 +1,9 @@ +import { httpRouter } from "convex/server"; +import { registerStaticRoutes } from "@convex-dev/static-hosting"; +import { components } from "./_generated/api"; + +const http = httpRouter(); + +registerStaticRoutes(http, components.selfHosting); + +export default http; diff --git a/example/convex/schema.ts b/example/convex/schema.ts index 0f692e29..689f9db1 100644 --- a/example/convex/schema.ts +++ b/example/convex/schema.ts @@ -13,6 +13,7 @@ export default defineSchema({ parameters: v.any(), taskCount: v.optional(v.number()), endTime: v.optional(v.number()), + pool: v.optional(v.union(v.literal("new"), v.literal("old"))), }), tasks: defineTable({ runId: v.id("runs"), diff --git a/example/convex/staticHosting.ts b/example/convex/staticHosting.ts new file mode 100644 index 00000000..79c32cd4 --- /dev/null +++ b/example/convex/staticHosting.ts @@ -0,0 +1,18 @@ +import { + exposeDeploymentQuery, + exposeUploadApi, +} from "@convex-dev/static-hosting"; +import { components } from "./_generated/api"; + +export const { + generateUploadUrl, + generateUploadUrls, + recordAsset, + recordAssets, + gcOldAssets, + listAssets, +} = exposeUploadApi(components.selfHosting); + +export const { getCurrentDeployment } = exposeDeploymentQuery( + components.selfHosting, +); diff --git a/example/convex/test/dashboard.ts b/example/convex/test/dashboard.ts new file mode 100644 index 00000000..431814fc --- /dev/null +++ b/example/convex/test/dashboard.ts @@ -0,0 +1,215 @@ +import { v } from "convex/values"; +import { query, action } from "../_generated/server"; +import { internal } from "../_generated/api"; +import { runStatus } from "./run"; + +function percentile(sorted: number[], p: number): number { + const idx = Math.ceil((p / 100) * sorted.length) - 1; + return sorted[Math.max(0, idx)]; +} + +// Just the list of run docs — no per-row tasks aggregation. Each row +// subscribes to `getRun` separately so we don't .collect() the entire +// tasks table on every history poll. +export const listRuns = query({ + args: { limit: v.optional(v.number()) }, + handler: async (ctx, { limit = 50 }) => { + const runs = await ctx.db.query("runs").order("desc").take(limit); + return runs.map((run) => ({ + _id: run._id, + scenario: run.scenario, + pool: run.pool, + startTime: run.startTime, + taskCount: run.taskCount, + })); + }, +}); + +export const getRun = query({ + args: { runId: v.id("runs") }, + handler: async (ctx, { runId }) => { + const run = await ctx.db.get("runs", runId); + if (!run) return null; + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", run._id)) + .collect(); + const status = await runStatus(ctx, run); + const latencies = tasks + .filter((t) => t.enqueuedAt !== undefined) + .map((t) => t.endTime - t.enqueuedAt!) + .sort((a, b) => a - b); + const endTimes = tasks.map((t) => t.endTime); + const lastEnd = endTimes.length ? Math.max(...endTimes) : undefined; + return { + _id: run._id, + scenario: run.scenario, + parameters: run.parameters, + pool: run.pool, + startTime: run.startTime, + taskCount: run.taskCount, + completedCount: tasks.length, + status, + totalDurationMs: + lastEnd !== undefined ? lastEnd - run.startTime : undefined, + latency: + latencies.length > 0 + ? { + p50: percentile(latencies, 50), + p95: percentile(latencies, 95), + p99: percentile(latencies, 99), + max: latencies[latencies.length - 1], + } + : undefined, + }; + }, +}); + +// Time-bucketed throughput. Returns one point per `bucketMs` window from +// the run's start: completed (count finishing in that window) and inFlight +// (enqueued - completed at that t). +export const throughputOverTime = query({ + args: { runId: v.id("runs"), bucketMs: v.optional(v.number()) }, + handler: async (ctx, { runId, bucketMs = 500 }) => { + const run = await ctx.db.get("runs", runId); + if (!run) return null; + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", runId)) + .collect(); + if (tasks.length === 0) return { bucketMs, points: [] }; + + const start = run.startTime; + const lastEnd = Math.max(...tasks.map((t) => t.endTime)); + const totalDurationMs = lastEnd - start; + const numBuckets = Math.max(1, Math.ceil(totalDurationMs / bucketMs) + 1); + + const completedPerBucket = new Array(numBuckets).fill(0); + const enqueuedPerBucket = new Array(numBuckets).fill(0); + for (const t of tasks) { + const cIdx = Math.min( + numBuckets - 1, + Math.floor((t.endTime - start) / bucketMs), + ); + completedPerBucket[cIdx]++; + if (t.enqueuedAt !== undefined) { + const eIdx = Math.max( + 0, + Math.min( + numBuckets - 1, + Math.floor((t.enqueuedAt - start) / bucketMs), + ), + ); + enqueuedPerBucket[eIdx]++; + } + } + const points: Array<{ + tMs: number; + completed: number; + enqueued: number; + inFlight: number; + }> = []; + let cumEnqueued = 0; + let cumCompleted = 0; + for (let i = 0; i < numBuckets; i++) { + cumEnqueued += enqueuedPerBucket[i]; + cumCompleted += completedPerBucket[i]; + points.push({ + tMs: i * bucketMs, + completed: completedPerBucket[i], + enqueued: enqueuedPerBucket[i], + inFlight: Math.max(0, cumEnqueued - cumCompleted), + }); + } + return { bucketMs, points }; + }, +}); + +// Sorted latency array, thinned to ~200 points for CDF plotting. +export const latencyCdf = query({ + args: { runId: v.id("runs"), points: v.optional(v.number()) }, + handler: async (ctx, { runId, points = 200 }) => { + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", runId)) + .collect(); + const latencies = tasks + .filter((t) => t.enqueuedAt !== undefined) + .map((t) => t.endTime - t.enqueuedAt!) + .sort((a, b) => a - b); + if (latencies.length === 0) return []; + const stride = Math.max(1, Math.floor(latencies.length / points)); + const out: Array<{ pct: number; ms: number }> = []; + for (let i = 0; i < latencies.length; i += stride) { + out.push({ + pct: ((i + 1) / latencies.length) * 100, + ms: latencies[i], + }); + } + // ensure the max latency point is present + out.push({ pct: 100, ms: latencies[latencies.length - 1] }); + return out; + }, +}); + +// Live status of the currently-running scenario, if any. +export const latestRunStatus = query({ + args: {}, + handler: async (ctx) => { + const run = await ctx.db.query("runs").order("desc").first(); + if (!run) return null; + const status = await runStatus(ctx, run); + return { runId: run._id, scenario: run.scenario, status }; + }, +}); + +const scenarioName = v.union( + v.literal("burstyBatches"), + v.literal("throughput"), + v.literal("overhead"), + v.literal("sustained"), + v.literal("bigArgs"), + v.literal("bigContext"), + v.literal("bigReturnTypes"), +); + +/** + * Public action so the dashboard can trigger one or more scenario runs by + * name. Multi-launch is sequenced server-side: each non-final entry is + * awaited via `ctx.runAction` (so we know its tasks are done and the + * scenario's poll loop has returned), then we sleep past the 5s + * "previous run started too recently" guard in `run.start` before the + * next launch. The final entry is also awaited so that any `run.start` + * failure surfaces back to the dashboard instead of disappearing into a + * scheduled action's logs. + */ +const GUARD_BUFFER_MS = 5_500; +/** + * Concurrent benchmark: fires the same scenario at both pools simultaneously + * and waits for both runs to fully complete. Useful for testing whether + * scheduler thrash from a competing pool makes individual pool throughput + * worse than running it alone. + */ +export const runConcurrent = action({ + args: { scenario: scenarioName, args: v.any() }, + handler: async (ctx, { scenario, args }) => { + const fn = internal.test.scenarios[scenario].default; + const [,] = await Promise.all([ + ctx.runAction(fn, { ...args, pool: "old" }), + ctx.runAction(fn, { ...args, pool: "new" }), + ]); + }, +}); + +export const runScenarios = action({ + args: { scenario: scenarioName, argsList: v.array(v.any()) }, + handler: async (ctx, { scenario, argsList }) => { + const fn = internal.test.scenarios[scenario].default; + for (let i = 0; i < argsList.length; i++) { + if (i > 0) { + await new Promise((r) => setTimeout(r, GUARD_BUFFER_MS)); + } + await ctx.runAction(fn, argsList[i]); + } + }, +}); diff --git a/example/convex/test/pool.ts b/example/convex/test/pool.ts new file mode 100644 index 00000000..b527e7dc --- /dev/null +++ b/example/convex/test/pool.ts @@ -0,0 +1,62 @@ +import { v } from "convex/values"; +import { components } from "../_generated/api"; +import { Workpool, enqueue, enqueueBatch } from "@convex-dev/workpool"; +import type { ComponentApi } from "@convex-dev/workpool/_generated/component.js"; +import { + Workpool as OldWorkpool, + enqueue as enqueueOld, + enqueueBatch as enqueueBatchOld, +} from "@convex-dev/workpool-old"; +import { ActionCtx, MutationCtx } from "../_generated/server"; + +export const POOL_KINDS = ["new", "old"] as const; +export type PoolKind = (typeof POOL_KINDS)[number]; +export const vPoolKind = v.union(v.literal("new"), v.literal("old")); + +type Ctx = ActionCtx | MutationCtx; + +export async function configurePool( + ctx: Ctx, + kind: PoolKind, + maxParallelism: number, +): Promise { + const component = + kind === "new" ? components.testWorkpool : components.oldWorkpool; + await ctx.runMutation(component.config.update, { maxParallelism }); +} + +/** + * Returns a Workpool instance for the chosen pool. Both classes share the + * same `enqueueMutation` / `enqueueAction` / `enqueueBatch` shape, so the + * union return type works for callers that just need to enqueue. + */ +export function makePool( + kind: PoolKind, + opts: { maxParallelism: number }, +): Workpool | OldWorkpool { + if (kind === "new") return new Workpool(components.testWorkpool, opts); + return new OldWorkpool(components.oldWorkpool, opts); +} + +export function getComponent(kind: PoolKind) { + return kind === "new" ? components.testWorkpool : components.oldWorkpool; +} + +export function enqueueFor(kind: PoolKind): { + component: ComponentApi; + enqueueOne: typeof enqueue; + enqueueMany: typeof enqueueBatch; +} { + if (kind === "new") { + return { + component: components.testWorkpool, + enqueueOne: enqueue, + enqueueMany: enqueueBatch, + }; + } + return { + component: components.oldWorkpool, + enqueueOne: enqueueOld, + enqueueMany: enqueueBatchOld, + }; +} diff --git a/example/convex/test/run.ts b/example/convex/test/run.ts index 8d785781..be49ba88 100644 --- a/example/convex/test/run.ts +++ b/example/convex/test/run.ts @@ -6,7 +6,7 @@ import { import { v } from "convex/values"; import { Id } from "../_generated/dataModel"; import { assert } from "convex-helpers"; -import { components } from "../_generated/api"; +import { getComponent } from "./pool"; export async function runStatus( ctx: QueryCtx, @@ -31,28 +31,32 @@ export const start = internalMutation({ args: { scenario: v.string(), parameters: v.any(), + pool: v.optional(v.union(v.literal("new"), v.literal("old"))), }, handler: async (ctx, args) => { // Check for in-flight tasks from the latest run const latestRun = await ctx.db.query("runs").order("desc").first(); if (latestRun) { - // Check if there are any in-flight tasks + // Check if there are any in-flight tasks. Allow concurrent runs when + // pools differ — runs are scoped to a single pool, so a new "new" + // pool run won't trample an in-flight "old" pool run (or vice versa). const status = await runStatus(ctx, latestRun); + const samePool = (latestRun.pool ?? "new") === (args.pool ?? "new"); - if (["running", "pending"].includes(status)) { + if (samePool && ["running", "pending"].includes(status)) { throw new Error( `Cannot start new run: previous run ${latestRun.scenario} is ${status} (started at ${new Date(latestRun.startTime).toISOString()})`, ); } - if (latestRun.startTime + 5_000 > Date.now()) { + if (samePool && latestRun.startTime + 5_000 > Date.now()) { throw new Error( `Cannot start new run: previous run ${latestRun.scenario} was started less than 5 seconds ago (started at ${new Date(latestRun.startTime).toISOString()})`, ); } } if (args.parameters.maxParallelism !== undefined) { - await ctx.runMutation(components.testWorkpool.config.update, { + await ctx.runMutation(getComponent(args.pool ?? "new").config.update, { maxParallelism: args.parameters.maxParallelism, }); } @@ -63,6 +67,7 @@ export const start = internalMutation({ scenario: args.scenario, parameters: args.parameters, taskCount: args.parameters.taskCount, + pool: args.pool ?? "new", }); return runId; @@ -99,6 +104,46 @@ export const status = internalQuery({ }, }); +// runId-scoped metrics — necessary for concurrent runs where the "latest" +// run is ambiguous. Same payload as `metrics` for the caller's runId. +export const metricsForRun = internalQuery({ + args: { runId: v.id("runs") }, + handler: async (ctx, { runId }) => { + const run = await ctx.db.get("runs", runId); + if (!run) return null; + const tasks = await ctx.db + .query("tasks") + .withIndex("runId", (q) => q.eq("runId", run._id)) + .collect(); + const status = await runStatus(ctx, run); + const completedCount = tasks.length; + const taskCount = run.taskCount ?? 0; + const latencies = tasks + .filter((t) => t.enqueuedAt !== undefined) + .map((t) => t.endTime - t.enqueuedAt!) + .sort((a, b) => a - b); + const endTimes = tasks.map((t) => t.endTime); + const lastEndTime = endTimes.length ? Math.max(...endTimes) : undefined; + const totalDurationMs = lastEndTime + ? lastEndTime - run.startTime + : undefined; + return { + status, + completedCount, + taskCount, + ...(totalDurationMs !== undefined && { totalDurationMs }), + ...(latencies.length > 0 && { + latency: { + p50: percentile(latencies, 50), + p95: percentile(latencies, 95), + p99: percentile(latencies, 99), + max: latencies[latencies.length - 1], + }, + }), + }; + }, +}); + function percentile(sorted: number[], p: number): number { const idx = Math.ceil((p / 100) * sorted.length) - 1; return sorted[Math.max(0, idx)]; diff --git a/example/convex/test/scenarios/bigArgs.ts b/example/convex/test/scenarios/bigArgs.ts index b038e7ce..81c98403 100644 --- a/example/convex/test/scenarios/bigArgs.ts +++ b/example/convex/test/scenarios/bigArgs.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { generateData, enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; const parameters = { taskCount: v.optional(v.number()), @@ -11,6 +12,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -23,6 +25,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -38,6 +41,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -68,6 +72,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/bigContext.ts b/example/convex/test/scenarios/bigContext.ts index 8a332875..e209e636 100644 --- a/example/convex/test/scenarios/bigContext.ts +++ b/example/convex/test/scenarios/bigContext.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { generateData, enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; /** * Big Context Scenario @@ -20,6 +21,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -32,6 +34,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -47,6 +50,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -84,6 +88,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/bigReturnTypes.ts b/example/convex/test/scenarios/bigReturnTypes.ts index e1c9bc5d..623f8232 100644 --- a/example/convex/test/scenarios/bigReturnTypes.ts +++ b/example/convex/test/scenarios/bigReturnTypes.ts @@ -4,6 +4,7 @@ import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; import { WorkId } from "@convex-dev/workpool"; +import { vPoolKind } from "../pool"; /** * Big Return Types Scenario @@ -19,6 +20,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), batchEnqueue: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -31,6 +33,7 @@ export default internalAction({ taskType = "mutation", batchEnqueue = false, maxParallelism = 50, + pool = "new", }, ): Promise<{ workIds: WorkId[]; @@ -46,6 +49,7 @@ export default internalAction({ batchEnqueue, maxParallelism, }, + pool, }); console.log( @@ -75,6 +79,7 @@ export default internalAction({ // Use shared enqueueTasks helper const workIds = await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/burstyBatches.ts b/example/convex/test/scenarios/burstyBatches.ts index 22e26ba3..f5793b13 100644 --- a/example/convex/test/scenarios/burstyBatches.ts +++ b/example/convex/test/scenarios/burstyBatches.ts @@ -3,6 +3,7 @@ import { v } from "convex/values"; import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; +import { vPoolKind } from "../pool"; const parameters = { waveCount: v.optional(v.number()), @@ -11,6 +12,7 @@ const parameters = { taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), maxParallelism: v.optional(v.number()), taskDurationMs: v.optional(v.number()), + pool: v.optional(vPoolKind), }; /** @@ -49,6 +51,7 @@ export default internalAction({ taskType = "mutation", maxParallelism = 50, taskDurationMs = 0, + pool = "new", }, ) => { const taskCount = waveCount * tasksPerWave; @@ -63,6 +66,7 @@ export default internalAction({ maxParallelism, taskDurationMs, }, + pool, }); const scenarioStart = Date.now(); @@ -111,6 +115,7 @@ export default internalAction({ const waveStart = Date.now(); await enqueueTasks({ ctx, + pool, taskArgs, taskType, fn, diff --git a/example/convex/test/scenarios/overhead.ts b/example/convex/test/scenarios/overhead.ts index 8c17306d..e75ba91c 100644 --- a/example/convex/test/scenarios/overhead.ts +++ b/example/convex/test/scenarios/overhead.ts @@ -1,23 +1,20 @@ import { internalAction, internalMutation } from "../../_generated/server"; import { v } from "convex/values"; -import { internal, components } from "../../_generated/api"; -import { Workpool } from "@convex-dev/workpool"; -import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { internal } from "../../_generated/api"; import { Id } from "../../_generated/dataModel"; +import { makePool, vPoolKind } from "../pool"; /** * Throughput / overhead measurement scenario. * - * mode determines what does the enqueue: - * raw — ctx.scheduler.runAfter(0, recorder). Bare-Convex floor. - * workpool-bare — new workpool, no onComplete (worker is the recorder) - * workpool-oc — new workpool with onComplete (worker is no-op) - * oldpool-bare — old workpool (workpool-old), no onComplete - * oldpool-oc — old workpool with onComplete + * mode: "raw" (bare ctx.scheduler) or "pool" (use a workpool) + * pool: "new" | "old" (only meaningful when mode = "pool") + * onComplete: if true, worker is a no-op and the recorder runs as the + * onComplete callback. If false, the worker itself records. * * Both pool variants test against the same Convex deployment, against the * same tasks table, with the same recorder. The only difference between - * `workpool-*` and `oldpool-*` is which workpool component is used. + * `pool=new` and `pool=old` is which workpool component is used. */ export const recorder = internalMutation({ @@ -58,13 +55,7 @@ export const oncompleteRecorder = internalMutation({ }, }); -const Mode = v.union( - v.literal("raw"), - v.literal("workpool-bare"), - v.literal("workpool-oc"), - v.literal("oldpool-bare"), - v.literal("oldpool-oc"), -); +const Mode = v.union(v.literal("raw"), v.literal("pool")); export default internalAction({ args: { @@ -72,6 +63,8 @@ export default internalAction({ batchSize: v.optional(v.number()), interBatchMs: v.optional(v.number()), mode: v.optional(Mode), + pool: v.optional(vPoolKind), + onComplete: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), pollTimeoutMs: v.optional(v.number()), }, @@ -82,42 +75,35 @@ export default internalAction({ batchSize = 50, interBatchMs = 0, mode = "raw", + pool: poolKind = "new", + onComplete = false, maxParallelism = 50, pollTimeoutMs = 600_000, }, ) => { + const usePool = mode === "pool"; + const scenarioLabel = usePool + ? `overhead-${poolKind}${onComplete ? "-oc" : "-bare"}` + : "overhead-raw"; const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { - scenario: `overhead-${mode}`, - parameters: { taskCount, batchSize, mode, maxParallelism, interBatchMs }, + scenario: scenarioLabel, + parameters: { + taskCount, + batchSize, + mode, + onComplete, + maxParallelism, + interBatchMs, + }, + pool: usePool ? poolKind : undefined, }); const scenarioStart = Date.now(); - - const isWorkpoolNew = mode === "workpool-bare" || mode === "workpool-oc"; - const isWorkpoolOld = mode === "oldpool-bare" || mode === "oldpool-oc"; - const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; - - // Configure the right pool (separate components → no cross-contamination) - if (isWorkpoolNew) { - await ctx.runMutation(components.testWorkpool.config.update, { - maxParallelism, - }); - } - if (isWorkpoolOld) { - await ctx.runMutation(components.oldWorkpool.config.update, { - maxParallelism, - }); - } - - const newPool = isWorkpoolNew - ? new Workpool(components.testWorkpool, { maxParallelism }) - : null; - const oldPool = isWorkpoolOld - ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) - : null; + // run.start already configured the right component's maxParallelism. + const pool = usePool ? makePool(poolKind, { maxParallelism }) : null; console.log( - `overhead[${mode}]: ${taskCount} tasks, batchSize=${batchSize}` + - (newPool || oldPool ? `, max=${maxParallelism}` : ""), + `${scenarioLabel}: ${taskCount} tasks, batchSize=${batchSize}` + + (pool ? `, max=${maxParallelism}` : ""), ); const numBatches = Math.ceil(taskCount / batchSize); @@ -129,7 +115,7 @@ export default internalAction({ const thisBatch = Math.min(batchSize, taskCount - enqueued); const enqueuedAt = Date.now(); const tasks = Array(thisBatch).fill(0); - if (mode === "raw") { + if (!usePool) { await Promise.all( tasks.map(() => ctx.scheduler.runAfter( @@ -139,11 +125,10 @@ export default internalAction({ ), ), ); - } else if (!useOnComplete) { - const pool = newPool ?? oldPool!; + } else if (!onComplete) { await Promise.all( tasks.map(() => - pool.enqueueMutation( + pool!.enqueueMutation( ctx, internal.test.scenarios.overhead.recorder, { runId, enqueuedAt }, @@ -151,10 +136,9 @@ export default internalAction({ ), ); } else { - const pool = newPool ?? oldPool!; await Promise.all( tasks.map(() => - pool.enqueueMutation( + pool!.enqueueMutation( ctx, internal.test.scenarios.overhead.noop, {}, @@ -195,13 +179,15 @@ export default internalAction({ const tps = (completedCount / total) * 1000; const msPerTask = total / completedCount; - console.log(`\n=== overhead[${mode}] ===`); + console.log(`\n=== ${scenarioLabel} ===`); console.log( `${completedCount}/${taskCount} done in ${total}ms ` + `(${tps.toFixed(0)} tps, ${msPerTask.toFixed(1)} ms/task)`, ); return { mode, + pool: usePool ? poolKind : undefined, + onComplete, taskCount: completedCount, totalDurationMs: total, enqueueTotal, diff --git a/example/convex/test/scenarios/sustained.ts b/example/convex/test/scenarios/sustained.ts index 5e40881f..2b952638 100644 --- a/example/convex/test/scenarios/sustained.ts +++ b/example/convex/test/scenarios/sustained.ts @@ -1,9 +1,8 @@ import { internalAction, internalMutation } from "../../_generated/server"; import { v } from "convex/values"; -import { internal, components } from "../../_generated/api"; -import { Workpool } from "@convex-dev/workpool"; -import { Workpool as OldWorkpool } from "@convex-dev/workpool-old"; +import { internal } from "../../_generated/api"; import { Id } from "../../_generated/dataModel"; +import { makePool, vPoolKind } from "../pool"; /** * Sustained, interleaved load scenario. Designed to exercise OCC paths @@ -18,8 +17,11 @@ import { Id } from "../../_generated/dataModel"; * running, completions arriving — exactly the scenario where main / * updateRunStatus / kickMainLoop reads can race with concurrent writes. * - * Modes mirror overhead.ts: workpool-bare/oc and oldpool-bare/oc on the - * same deployment. Workers are actions (so they can actually sleep). + * pool: "new" | "old" + * onComplete: if true, worker is a no-op and the recorder runs via the + * onComplete callback. If false, the worker itself records. + * + * Workers are actions (so they can actually sleep). */ // Worker: an action that sleeps for [minMs, maxMs] then records completion. @@ -83,20 +85,14 @@ export const sleepingNoop = internalAction({ }, }); -const Mode = v.union( - v.literal("workpool-bare"), - v.literal("workpool-oc"), - v.literal("oldpool-bare"), - v.literal("oldpool-oc"), -); - export default internalAction({ args: { targetTps: v.optional(v.number()), // tasks per second durationSec: v.optional(v.number()), // how long to keep enqueuing workerMinMs: v.optional(v.number()), workerMaxMs: v.optional(v.number()), - mode: v.optional(Mode), + pool: v.optional(vPoolKind), + onComplete: v.optional(v.boolean()), maxParallelism: v.optional(v.number()), pollTimeoutMs: v.optional(v.number()), }, @@ -107,49 +103,33 @@ export default internalAction({ durationSec = 20, workerMinMs = 50, workerMaxMs = 500, - mode = "workpool-bare", + pool: poolKind = "new", + onComplete = false, maxParallelism = 100, pollTimeoutMs = 600_000, }, ) => { const totalTasks = targetTps * durationSec; + const scenarioLabel = `sustained-${poolKind}${onComplete ? "-oc" : "-bare"}`; const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { - scenario: `sustained-${mode}`, + scenario: scenarioLabel, parameters: { taskCount: totalTasks, targetTps, durationSec, workerMinMs, workerMaxMs, - mode, + onComplete, maxParallelism, }, + pool: poolKind, }); - const isNew = mode === "workpool-bare" || mode === "workpool-oc"; - const isOld = mode === "oldpool-bare" || mode === "oldpool-oc"; - const useOnComplete = mode === "workpool-oc" || mode === "oldpool-oc"; - - if (isNew) { - await ctx.runMutation(components.testWorkpool.config.update, { - maxParallelism, - }); - } - if (isOld) { - await ctx.runMutation(components.oldWorkpool.config.update, { - maxParallelism, - }); - } - const newPool = isNew - ? new Workpool(components.testWorkpool, { maxParallelism }) - : null; - const oldPool = isOld - ? new OldWorkpool(components.oldWorkpool, { maxParallelism }) - : null; - const pool = newPool ?? oldPool!; + // run.start already configured the right component's maxParallelism. + const pool = makePool(poolKind, { maxParallelism }); console.log( - `sustained[${mode}]: ${totalTasks} tasks @ ${targetTps}/s for ${durationSec}s, ` + + `${scenarioLabel}: ${totalTasks} tasks @ ${targetTps}/s for ${durationSec}s, ` + `worker=${workerMinMs}-${workerMaxMs}ms, max=${maxParallelism}`, ); @@ -170,7 +150,7 @@ export default internalAction({ maxMs: workerMaxMs, }; let p: Promise; - if (useOnComplete) { + if (onComplete) { p = pool.enqueueAction( ctx, internal.test.scenarios.sustained.sleepingNoop, @@ -234,7 +214,7 @@ export default internalAction({ | { p50: number; p95: number; p99: number; max: number } | undefined; - console.log(`\n=== sustained[${mode}] ===`); + console.log(`\n=== ${scenarioLabel} ===`); console.log( `${completedCount}/${enqueued} done in ${total}ms ` + `(${tps.toFixed(0)} tps, ${msPerTask.toFixed(1)} ms/task wall)`, @@ -245,7 +225,8 @@ export default internalAction({ `p99=${latency.p99}ms max=${latency.max}ms`, ); return { - mode, + pool: poolKind, + onComplete, taskCount: completedCount, enqueued, totalDurationMs: total, diff --git a/example/convex/test/scenarios/throughput.ts b/example/convex/test/scenarios/throughput.ts index b27eeb49..186a5fb5 100644 --- a/example/convex/test/scenarios/throughput.ts +++ b/example/convex/test/scenarios/throughput.ts @@ -3,6 +3,7 @@ import { v } from "convex/values"; import { internal } from "../../_generated/api"; import { enqueueTasks, TaskType } from "../work"; import { Id } from "../../_generated/dataModel"; +import { vPoolKind } from "../pool"; /** * Throughput / saturation scenario. @@ -23,6 +24,7 @@ const parameters = { taskDurationMs: v.optional(v.number()), taskType: v.optional(v.union(v.literal("mutation"), v.literal("action"))), pollTimeoutMs: v.optional(v.number()), + pool: v.optional(vPoolKind), }; export default internalAction({ @@ -37,6 +39,7 @@ export default internalAction({ taskDurationMs = 20, taskType = "mutation", pollTimeoutMs = 600_000, // 10 minutes for big runs + pool = "new", }, ) => { const runId: Id<"runs"> = await ctx.runMutation(internal.test.run.start, { @@ -49,6 +52,7 @@ export default internalAction({ taskType, taskDurationMs, }, + pool, }); const fn = @@ -80,6 +84,7 @@ export default internalAction({ const enqueuedAt = Date.now(); await enqueueTasks({ ctx, + pool, taskArgs: Array(thisBatch).fill(baseArgs), taskType, fn, @@ -97,14 +102,14 @@ export default internalAction({ `(${(taskCount / (enqueueTotal / 1000)).toFixed(0)}/s). Waiting...`, ); - // Poll for completion + // Poll for completion — runId-scoped so concurrent runs don't see each + // other's status. const pollStart = Date.now(); let metrics: Record | null = null; while (Date.now() - pollStart < pollTimeoutMs) { - metrics = (await ctx.runQuery(internal.test.run.metrics)) as Record< - string, - unknown - > | null; + metrics = (await ctx.runQuery(internal.test.run.metricsForRun, { + runId, + })) as Record | null; if (metrics && metrics.status === "completed") break; await new Promise((r) => setTimeout(r, 250)); } diff --git a/example/convex/test/work.ts b/example/convex/test/work.ts index 0e090875..cc831b74 100644 --- a/example/convex/test/work.ts +++ b/example/convex/test/work.ts @@ -1,14 +1,9 @@ import { internalMutation, internalAction } from "../_generated/server"; import { v } from "convex/values"; import { DefaultFunctionArgs } from "convex/server"; -import { components } from "../_generated/api"; -import { - vOnCompleteArgs, - WorkId, - enqueueBatch, - enqueue, -} from "@convex-dev/workpool"; +import { vOnCompleteArgs, WorkId, enqueue } from "@convex-dev/workpool"; import { ActionCtx } from "../_generated/server"; +import { PoolKind, enqueueFor } from "./pool"; /** * Generates a string of the specified length. @@ -142,6 +137,7 @@ export type { WorkId }; */ export async function enqueueTasks(options: { ctx: ActionCtx; + pool?: PoolKind; taskArgs: T[]; taskType: TaskType; fn: Parameters[3]; @@ -150,33 +146,28 @@ export async function enqueueTasks(options: { }): Promise { const { ctx, + pool = "new", taskArgs, taskType, fn, onCompleteOpts, batchEnqueue = false, } = options; - - let workIds: WorkId[]; + const { component, enqueueOne, enqueueMany } = enqueueFor(pool); if (batchEnqueue) { - console.log("Using batch enqueue"); - workIds = await enqueueBatch( - components.testWorkpool, + return await enqueueMany( + component, ctx, taskType, fn, taskArgs, onCompleteOpts, ); - } else { - console.log("Using individual enqueue"); - workIds = await Promise.all( - taskArgs.map((a) => - enqueue(components.testWorkpool, ctx, taskType, fn, a, onCompleteOpts), - ), - ); } - - return workIds; + return await Promise.all( + taskArgs.map((a) => + enqueueOne(component, ctx, taskType, fn, a, onCompleteOpts), + ), + ); } diff --git a/example/index.html b/example/index.html index e4b78eae..55883ab3 100644 --- a/example/index.html +++ b/example/index.html @@ -2,9 +2,48 @@ - - Vite + React + TS + + + + Workpool Dashboard — async work, queued & paced + + + + + + + + + + + + + + + + + + +
diff --git a/example/public/favicon.svg b/example/public/favicon.svg new file mode 100644 index 00000000..caf78cb6 --- /dev/null +++ b/example/public/favicon.svg @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/example/public/og.png b/example/public/og.png new file mode 100644 index 00000000..24d093f8 Binary files /dev/null and b/example/public/og.png differ diff --git a/example/public/og.svg b/example/public/og.svg new file mode 100644 index 00000000..87e827dd --- /dev/null +++ b/example/public/og.svg @@ -0,0 +1,58 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + workpool + async work, queued & paced — for Convex + + + + + + + + + + + + + + $ npm install @convex-dev/workpool + diff --git a/example/src/App.css b/example/src/App.css index b9d355df..f49f7d42 100644 --- a/example/src/App.css +++ b/example/src/App.css @@ -1,42 +1,225 @@ +:root { + color-scheme: light dark; +} + #root { - max-width: 1280px; + max-width: 1400px; margin: 0 auto; - padding: 2rem; - text-align: center; + padding: 1.5rem; + font-family: + -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; + text-align: left; } -.logo { - height: 6em; - padding: 1.5em; - will-change: filter; - transition: filter 300ms; +h1 { + font-size: 1.5rem; + margin: 0 0 1rem; } -.logo:hover { - filter: drop-shadow(0 0 2em #646cffaa); + +h2 { + font-size: 1.1rem; + margin: 1.5rem 0 0.5rem; } -.logo.react:hover { - filter: drop-shadow(0 0 2em #61dafbaa); + +nav.tabs { + display: flex; + gap: 0.25rem; + border-bottom: 1px solid #ddd3; + margin-bottom: 1rem; } -@keyframes logo-spin { - from { - transform: rotate(0deg); - } - to { - transform: rotate(360deg); - } +nav.tabs button { + background: none; + border: none; + padding: 0.5rem 0.9rem; + cursor: pointer; + color: inherit; + font-size: 0.95rem; + border-bottom: 2px solid transparent; } -@media (prefers-reduced-motion: no-preference) { - a:nth-of-type(2) .logo { - animation: logo-spin infinite 20s linear; - } +nav.tabs button.active { + border-bottom-color: #4f8cff; + font-weight: 600; } .card { - padding: 2em; + border: 1px solid #ddd3; + border-radius: 6px; + padding: 1rem; + margin-bottom: 1rem; +} + +table { + width: 100%; + border-collapse: collapse; + font-size: 0.85rem; +} + +th, +td { + text-align: left; + padding: 0.4rem 0.6rem; + border-bottom: 1px solid #ddd3; +} + +th { + font-weight: 600; + background: #8881; +} + +tr.clickable { + cursor: pointer; +} + +tr.clickable:hover { + background: #4f8cff15; +} + +.row-actions { + display: flex; + gap: 0.5rem; +} + +button.primary { + background: #4f8cff; + color: white; + border: none; + padding: 0.5rem 1rem; + border-radius: 4px; + cursor: pointer; + font-size: 0.9rem; +} + +button.primary:disabled { + opacity: 0.5; + cursor: not-allowed; +} + +button.secondary { + background: transparent; + border: 1px solid #8884; + padding: 0.4rem 0.8rem; + border-radius: 4px; + cursor: pointer; + color: inherit; + font-size: 0.85rem; +} + +label { + display: flex; + flex-direction: column; + gap: 0.3rem; + font-size: 0.85rem; + margin-bottom: 0.6rem; +} + +input, +select, +textarea { + font: inherit; + padding: 0.4rem 0.5rem; + border: 1px solid #8884; + border-radius: 4px; + background: transparent; + color: inherit; +} + +textarea { + font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; + font-size: 0.82rem; + min-height: 7rem; +} + +.form-row { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); + gap: 0.6rem 1rem; +} + +.metric-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(120px, 1fr)); + gap: 0.5rem; + margin: 0.5rem 0; +} + +.metric { + background: #8881; + padding: 0.5rem 0.7rem; + border-radius: 4px; +} + +.metric .label { + font-size: 0.72rem; + text-transform: uppercase; + letter-spacing: 0.05em; + opacity: 0.7; +} + +.metric .value { + font-size: 1.05rem; + font-weight: 600; + font-variant-numeric: tabular-nums; +} + +.pool-badge { + display: inline-block; + padding: 0.1rem 0.4rem; + font-size: 0.72rem; + font-weight: 600; + border-radius: 3px; + text-transform: uppercase; +} + +.pool-badge.new { + background: #4f8cff33; + color: #4f8cff; +} + +.pool-badge.old { + background: #ff8c4f33; + color: #ff8c4f; +} + +.pool-badge.none { + background: #8884; +} + +.muted { + opacity: 0.6; +} + +.status-running { + color: #4f8cff; +} + +.status-completed { + color: #5cc97a; +} + +.status-pending { + color: #b8a352; +} + +.delta-positive { + color: #5cc97a; +} + +.delta-negative { + color: #d96363; +} + +.charts { + display: grid; + gap: 1rem; } -.read-the-docs { - color: #888; +pre.params { + font-size: 0.78rem; + background: #8881; + padding: 0.5rem; + border-radius: 4px; + overflow-x: auto; + margin: 0; } diff --git a/example/src/App.tsx b/example/src/App.tsx index de92be12..3a81d13d 100644 --- a/example/src/App.tsx +++ b/example/src/App.tsx @@ -1,17 +1,833 @@ import "./App.css"; +import { useState, useMemo, useEffect } from "react"; +import { useQuery, useAction } from "convex/react"; +import { api } from "../convex/_generated/api"; +import type { Id } from "../convex/_generated/dataModel"; +import { + LineChart, + Line, + AreaChart, + Area, + CartesianGrid, + XAxis, + YAxis, + Tooltip, + Legend, + ResponsiveContainer, +} from "recharts"; + +type RunId = Id<"runs">; +type Tab = "history" | "detail" | "compare" | "run"; + +type PoolKind = "new" | "old"; + +function PoolBadge({ pool }: { pool?: PoolKind }) { + const cls = pool ?? "none"; + return {pool ?? "—"}; +} + +function fmt(ms: number | undefined): string { + if (ms === undefined) return "—"; + if (ms < 1000) return `${Math.round(ms)}ms`; + return `${(ms / 1000).toFixed(2)}s`; +} + +function fmtTime(t: number): string { + return new Date(t).toLocaleString(); +} + +type CompareIds = [RunId | null, RunId | null]; + +type UrlState = { + tab: Tab; + selectedRunId: RunId | null; + compareIds: CompareIds; +}; + +function serializeUrlState(s: UrlState): string { + switch (s.tab) { + case "detail": + return s.selectedRunId ? `detail/${s.selectedRunId}` : "history"; + case "compare": { + const ids = s.compareIds.filter((x): x is RunId => x !== null); + return ids.length > 0 ? `compare/${ids.join(",")}` : "compare"; + } + case "run": + return "new"; + case "history": + default: + return "history"; + } +} + +function parseUrlHash(hash: string): Partial { + const h = hash.replace(/^#\/?/, ""); + if (!h || h === "history") return { tab: "history" }; + if (h === "new") return { tab: "run" }; + if (h === "compare") return { tab: "compare" }; + const detailMatch = h.match(/^detail\/(.+)$/); + if (detailMatch) { + return { tab: "detail", selectedRunId: detailMatch[1] as RunId }; + } + const compareMatch = h.match(/^compare\/(.+)$/); + if (compareMatch) { + const parts = compareMatch[1].split(",").slice(0, 2); + const ids: CompareIds = [null, null]; + parts.forEach((p, i) => { + if (p) ids[i] = p as RunId; + }); + return { tab: "compare", compareIds: ids }; + } + return { tab: "history" }; +} + +function readHashState(): UrlState { + const parsed = parseUrlHash(window.location.hash); + return { + tab: parsed.tab ?? "history", + selectedRunId: parsed.selectedRunId ?? null, + compareIds: parsed.compareIds ?? [null, null], + }; +} function App() { + const initial = readHashState(); + const [tab, setTab] = useState(initial.tab); + const [selectedRunId, setSelectedRunId] = useState( + initial.selectedRunId, + ); + const [compareIds, setCompareIds] = useState(initial.compareIds); + + // Sync state → hash. + useEffect(() => { + const next = serializeUrlState({ tab, selectedRunId, compareIds }); + const current = window.location.hash.replace(/^#\/?/, ""); + if (next !== current) { + const url = `${window.location.pathname}${window.location.search}#${next}`; + window.history.replaceState(null, "", url); + } + }, [tab, selectedRunId, compareIds]); + + // Sync hash → state (back/forward, pasted URLs). + useEffect(() => { + const onHashChange = () => { + const s = readHashState(); + setTab(s.tab); + setSelectedRunId(s.selectedRunId); + setCompareIds(s.compareIds); + }; + window.addEventListener("hashchange", onHashChange); + return () => window.removeEventListener("hashchange", onHashChange); + }, []); + + return ( + <> +

Workpool Dashboard

+ + + {tab === "history" && ( + { + setSelectedRunId(id); + setTab("detail"); + }} + onCompare={(a, b) => { + setCompareIds([a, b]); + setTab("compare"); + }} + /> + )} + {tab === "detail" && selectedRunId && } + {tab === "compare" && } + {tab === "run" && setTab("history")} />} + + ); +} + +function History({ + onPick, + onCompare, +}: { + onPick: (id: RunId) => void; + onCompare: (a: RunId, b: RunId) => void; +}) { + const runs = useQuery(api.test.dashboard.listRuns, { limit: 100 }); + const [compareA, setCompareA] = useState(null); + const [compareB, setCompareB] = useState(null); + + if (runs === undefined) return

Loading…

; + if (runs.length === 0) + return

No runs yet. Use “Run scenario”.

; + + return ( +
+
+ + + {compareA && compareB ? "two runs selected" : "select A and B"} + +
+ + + + + + + + + + + + + + + + + + {runs.map((r) => ( + + ))} + +
ABScenarioPoolStatusTasksDurationp50p95p99Started
+
+ ); +} + +type HistoryRowData = { + _id: RunId; + scenario: string; + pool?: string; + startTime: number; + taskCount?: number; +}; + +function HistoryRow({ + row, + compareA, + compareB, + setCompareA, + setCompareB, + onPick, +}: { + row: HistoryRowData; + compareA: RunId | null; + compareB: RunId | null; + setCompareA: (id: RunId) => void; + setCompareB: (id: RunId) => void; + onPick: (id: RunId) => void; +}) { + const run = useQuery(api.test.dashboard.getRun, { runId: row._id }); + return ( + { + if ((e.target as HTMLElement).tagName === "INPUT") return; + onPick(row._id); + }} + > + + setCompareA(row._id)} + /> + + + setCompareB(row._id)} + /> + + {row.scenario} + + + + + {run ? run.status : "…"} + + + {run ? run.completedCount : "…"}/{row.taskCount ?? "?"} + + {fmt(run?.totalDurationMs)} + {fmt(run?.latency?.p50)} + {fmt(run?.latency?.p95)} + {fmt(run?.latency?.p99)} + {fmtTime(row.startTime)} + + ); +} + +function RunDetail({ runId }: { runId: RunId }) { + const run = useQuery(api.test.dashboard.getRun, { runId }); + const throughput = useQuery(api.test.dashboard.throughputOverTime, { + runId, + bucketMs: 500, + }); + const cdf = useQuery(api.test.dashboard.latencyCdf, { runId }); + + if (run === undefined) return

Loading…

; + if (run === null) return

Run not found.

; + + return ( + <> +
+
+

{run.scenario}

+ + {run.status} + {fmtTime(run.startTime)} +
+
+ + + + + + + +
+
+ parameters +
+            {JSON.stringify(run.parameters, null, 2)}
+          
+
+
+ +
+ + + + + `${(t / 1000).toFixed(1)}s`} + /> + + + `t=${((t as number) / 1000).toFixed(2)}s` + } + /> + + + + + + + + + + + + + + t < 1000 ? `${t}ms` : `${(t / 1000).toFixed(1)}s` + } + /> + + + `${(t as number) < 1000 ? `${t}ms` : `${((t as number) / 1000).toFixed(2)}s`}` + } + /> + + + + +
+ + ); +} + +function Compare({ + ids, + setIds, +}: { + ids: [RunId | null, RunId | null]; + setIds: (ids: [RunId | null, RunId | null]) => void; +}) { + const runs = useQuery(api.test.dashboard.listRuns, { limit: 100 }); + const [a, b] = ids; + const runA = useQuery(api.test.dashboard.getRun, a ? { runId: a } : "skip"); + const runB = useQuery(api.test.dashboard.getRun, b ? { runId: b } : "skip"); + const tA = useQuery( + api.test.dashboard.throughputOverTime, + a ? { runId: a, bucketMs: 500 } : "skip", + ); + const tB = useQuery( + api.test.dashboard.throughputOverTime, + b ? { runId: b, bucketMs: 500 } : "skip", + ); + const cA = useQuery(api.test.dashboard.latencyCdf, a ? { runId: a } : "skip"); + const cB = useQuery(api.test.dashboard.latencyCdf, b ? { runId: b } : "skip"); + + const throughputData = useMemo(() => { + const aPts = tA?.points ?? []; + const bPts = tB?.points ?? []; + const len = Math.max(aPts.length, bPts.length); + const out: Array<{ + tMs: number; + aCompleted?: number; + bCompleted?: number; + }> = []; + for (let i = 0; i < len; i++) { + const tMs = (aPts[i]?.tMs ?? bPts[i]?.tMs ?? i * 500) as number; + out.push({ + tMs, + aCompleted: aPts[i]?.completed, + bCompleted: bPts[i]?.completed, + }); + } + return out; + }, [tA, tB]); + + // Merge CDFs by ms axis: zip both sorted arrays into points with optional aPct/bPct. + const cdfData = useMemo(() => { + const aArr = cA ?? []; + const bArr = cB ?? []; + const points: Array<{ ms: number; aPct?: number; bPct?: number }> = []; + aArr.forEach((p) => points.push({ ms: p.ms, aPct: p.pct })); + bArr.forEach((p) => points.push({ ms: p.ms, bPct: p.pct })); + points.sort((x, y) => x.ms - y.ms); + return points; + }, [cA, cB]); + return ( <> -

Convex Workpool Component Example

-

- See example/convex/example.ts for all the ways to use - this component -

+
+ + +
+
+ + {runA && runB && ( +
+

Summary delta

+ +
+ )} + +
+ + + + + `${(t / 1000).toFixed(1)}s`} + /> + + + + + + + + + + + + + + + t < 1000 ? `${t}ms` : `${(t / 1000).toFixed(1)}s` + } + /> + + + + + + + +
); } +function DeltaTable({ + a, + b, +}: { + a: NonNullable>>; + b: NonNullable>>; +}) { + const rows: Array<{ + label: string; + av?: number; + bv?: number; + lower: boolean; + }> = [ + { + label: "Total duration (ms)", + av: a.totalDurationMs, + bv: b.totalDurationMs, + lower: true, + }, + { label: "p50 (ms)", av: a.latency?.p50, bv: b.latency?.p50, lower: true }, + { label: "p95 (ms)", av: a.latency?.p95, bv: b.latency?.p95, lower: true }, + { label: "p99 (ms)", av: a.latency?.p99, bv: b.latency?.p99, lower: true }, + { label: "max (ms)", av: a.latency?.max, bv: b.latency?.max, lower: true }, + ]; + return ( + + + + + + + + + + + {rows.map((row) => { + const delta = + row.av !== undefined && row.bv !== undefined && row.av !== 0 + ? ((row.bv - row.av) / row.av) * 100 + : undefined; + // For "lower is better": negative delta is good. + const better = + delta !== undefined && (row.lower ? delta < 0 : delta > 0); + return ( + + + + + + + ); + })} + +
Metric + A ({a.scenario} · {a.pool ?? "?"}) + + B ({b.scenario} · {b.pool ?? "?"}) + Δ (B vs A)
{row.label}{row.av ?? "—"}{row.bv ?? "—"} + {delta === undefined + ? "—" + : `${delta > 0 ? "+" : ""}${delta.toFixed(1)}%`} +
+ ); +} + +const SCENARIO_PRESETS = { + burstyBatches: { + waveCount: 10, + tasksPerWave: 20, + delayBetweenWavesMs: 500, + maxParallelism: 50, + taskDurationMs: 0, + }, + throughput: { + taskCount: 1000, + batchSize: 100, + interBatchMs: 50, + maxParallelism: 100, + taskDurationMs: 20, + }, + overhead: { + taskCount: 500, + batchSize: 50, + interBatchMs: 0, + mode: "pool", + onComplete: false, + maxParallelism: 50, + }, + sustained: { + targetTps: 50, + durationSec: 20, + workerMinMs: 50, + workerMaxMs: 500, + onComplete: false, + maxParallelism: 100, + }, + bigArgs: { + taskCount: 30, + argSizeBytes: 800000, + maxParallelism: 30, + }, + bigContext: { + taskCount: 30, + contextSizeBytes: 800000, + maxParallelism: 30, + }, + bigReturnTypes: { + taskCount: 20, + returnSizeBytes: 1000000, + maxParallelism: 20, + }, +} as const satisfies Record>; + +type ScenarioName = keyof typeof SCENARIO_PRESETS; + +function RunScenarioForm({ onStarted }: { onStarted: () => void }) { + const runScenarios = useAction(api.test.dashboard.runScenarios); + const [scenario, setScenario] = useState("burstyBatches"); + const [pool, setPool] = useState<"new" | "old" | "both">("new"); + const [paramsText, setParamsText] = useState( + JSON.stringify(SCENARIO_PRESETS[scenario], null, 2), + ); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(null); + + const updateScenario = (next: ScenarioName) => { + setScenario(next); + setParamsText(JSON.stringify(SCENARIO_PRESETS[next], null, 2)); + setError(null); + }; + + const launch = async () => { + setError(null); + let parsed: Record; + try { + parsed = JSON.parse(paramsText); + } catch (e) { + setError(`Invalid JSON: ${(e as Error).message}`); + return; + } + setBusy(true); + try { + const launches: Array<"new" | "old"> = + pool === "both" ? ["old", "new"] : [pool]; + const argsList = launches.map((p) => ({ + ...parsed, + pool: p, + })); + await runScenarios({ scenario, argsList }); + onStarted(); + } catch (e) { + setError((e as Error).message); + } finally { + setBusy(false); + } + }; + + return ( +
+
+ + +
+