Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ npm i
npm run dev
```

## Running the example dashboard

The `example/` app includes a dashboard for launching benchmark scenarios,
inspecting individual runs (throughput, latency CDF), and comparing two runs
side-by-side. To use it, run the backend and the frontend together:

```sh
npm run dev # in one terminal: convex dev + library codegen watch
npm run dev:dashboard # in another: vite dev server for example/
```

Then open the URL vite prints (typically <http://localhost:5173>). The "Run
scenario" tab launches presets; "History" lists past runs; "Compare" diffs two
of them. See `example/README.md` for more.

## Testing

```sh
Expand Down
1 change: 1 addition & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export default [
{
ignores: [
"dist/**",
"example/dist/**",
"eslint.config.js",
"vitest.config.ts",
"**/_generated/",
Expand Down
48 changes: 46 additions & 2 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,48 @@
# Example app

Components need an app that uses them in order to run codegen. An example app is
also useful for testing and documentation.
Components need an app that uses them in order to run codegen. This example app
also doubles as a benchmark dashboard for the workpool component itself — it
exercises the API and surfaces throughput and latency metrics for the scenarios
in `convex/test/scenarios/`.

## Running the dashboard

From the repo root, in two terminals:

```sh
npm run dev # backend: convex dev + workpool codegen watch
npm run dev:dashboard # frontend: vite dev server (defaults to http://localhost:5173)
```

The first run of `npm run dev` writes `.env.local` with `VITE_CONVEX_URL`, which
the vite config reads from the repo root (`envDir: "../"`).

## What's in the dashboard

- **Run scenario** — pick a preset (`burstyBatches`, `throughput`, `overhead`,
`sustained`, `bigArgs`, `bigContext`, `bigReturnTypes`), tweak the JSON
parameters, and launch it against the "new" pool (this branch), the "old" pool
(`workpool@0.4.6`, installed as `@convex-dev/workpool-old`), or both
back-to-back.
- **History** — every run is persisted to the `runs` table. Pick A and B to diff
them.
- **Detail** — per-run throughput-over-time and latency CDF charts.
- **Compare** — side-by-side throughput and CDF for two runs, plus a summary
delta table (p50/p95/p99/max/duration).

URL state is encoded in the hash (`#detail/<id>`, `#compare/<id1>,<id2>`), so
links are shareable.

## Deploying it as a static site (optional)

The example is wired to `@convex-dev/static-hosting`, so you can publish the
dashboard to your dev deployment with:

```sh
npm run deploy:dashboard # uploads to dev
npm run deploy:dashboard:prod # uploads to prod
```

It will be served at `https://<your-deployment>.convex.site/`. See the component
setup in `convex/convex.config.ts`, `convex/http.ts`, and
`convex/staticHosting.ts`.
9 changes: 9 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

import type * as crons from "../crons.js";
import type * as example from "../example.js";
import type * as http from "../http.js";
import type * as staticHosting from "../staticHosting.js";
import type * as test_dashboard from "../test/dashboard.js";
import type * as test_nonRetryable from "../test/nonRetryable.js";
import type * as test_pool from "../test/pool.js";
import type * as test_run from "../test/run.js";
import type * as test_scenarios_bigArgs from "../test/scenarios/bigArgs.js";
import type * as test_scenarios_bigContext from "../test/scenarios/bigContext.js";
Expand All @@ -30,7 +34,11 @@ import type {
declare const fullApi: ApiFromModules<{
crons: typeof crons;
example: typeof example;
http: typeof http;
staticHosting: typeof staticHosting;
"test/dashboard": typeof test_dashboard;
"test/nonRetryable": typeof test_nonRetryable;
"test/pool": typeof test_pool;
"test/run": typeof test_run;
"test/scenarios/bigArgs": typeof test_scenarios_bigArgs;
"test/scenarios/bigContext": typeof test_scenarios_bigContext;
Expand Down Expand Up @@ -74,4 +82,5 @@ export declare const components: {
serializedPool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"serializedPool">;
testWorkpool: import("@convex-dev/workpool/_generated/component.js").ComponentApi<"testWorkpool">;
oldWorkpool: import("@convex-dev/workpool-old/_generated/component.js").ComponentApi<"oldWorkpool">;
selfHosting: import("@convex-dev/static-hosting/_generated/component.js").ComponentApi<"selfHosting">;
};
2 changes: 2 additions & 0 deletions example/convex/convex.config.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";
import workpoolOld from "@convex-dev/workpool-old/convex.config";
import staticHosting from "@convex-dev/static-hosting/convex.config";

const app = defineApp();
app.use(workpool, { name: "smallPool" });
app.use(workpool, { name: "bigPool" });
app.use(workpool, { name: "serializedPool" });
app.use(workpool, { name: "testWorkpool" });
app.use(workpoolOld, { name: "oldWorkpool" });
app.use(staticHosting);

export default app;
9 changes: 9 additions & 0 deletions example/convex/http.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { httpRouter } from "convex/server";
import { registerStaticRoutes } from "@convex-dev/static-hosting";
import { components } from "./_generated/api";

const http = httpRouter();

registerStaticRoutes(http, components.selfHosting);

export default http;
1 change: 1 addition & 0 deletions example/convex/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default defineSchema({
parameters: v.any(),
taskCount: v.optional(v.number()),
endTime: v.optional(v.number()),
pool: v.optional(v.union(v.literal("new"), v.literal("old"))),
}),
tasks: defineTable({
runId: v.id("runs"),
Expand Down
18 changes: 18 additions & 0 deletions example/convex/staticHosting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
exposeDeploymentQuery,
exposeUploadApi,
} from "@convex-dev/static-hosting";
import { components } from "./_generated/api";

export const {
generateUploadUrl,
generateUploadUrls,
recordAsset,
recordAssets,
gcOldAssets,
listAssets,
} = exposeUploadApi(components.selfHosting);

export const { getCurrentDeployment } = exposeDeploymentQuery(
components.selfHosting,
);
215 changes: 215 additions & 0 deletions example/convex/test/dashboard.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import { v } from "convex/values";
import { query, action } from "../_generated/server";
import { internal } from "../_generated/api";
import { runStatus } from "./run";

function percentile(sorted: number[], p: number): number {
const idx = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[Math.max(0, idx)];
}

// Just the list of run docs — no per-row tasks aggregation. Each row
// subscribes to `getRun` separately so we don't .collect() the entire
// tasks table on every history poll.
export const listRuns = query({
args: { limit: v.optional(v.number()) },
handler: async (ctx, { limit = 50 }) => {
const runs = await ctx.db.query("runs").order("desc").take(limit);
return runs.map((run) => ({
_id: run._id,
scenario: run.scenario,
pool: run.pool,
startTime: run.startTime,
taskCount: run.taskCount,
}));
},
});

export const getRun = query({
args: { runId: v.id("runs") },
handler: async (ctx, { runId }) => {
const run = await ctx.db.get("runs", runId);
if (!run) return null;
const tasks = await ctx.db
.query("tasks")
.withIndex("runId", (q) => q.eq("runId", run._id))
.collect();
const status = await runStatus(ctx, run);
const latencies = tasks
.filter((t) => t.enqueuedAt !== undefined)
.map((t) => t.endTime - t.enqueuedAt!)
.sort((a, b) => a - b);
const endTimes = tasks.map((t) => t.endTime);
const lastEnd = endTimes.length ? Math.max(...endTimes) : undefined;
return {
_id: run._id,
scenario: run.scenario,
parameters: run.parameters,
pool: run.pool,
startTime: run.startTime,
taskCount: run.taskCount,
completedCount: tasks.length,
status,
totalDurationMs:
lastEnd !== undefined ? lastEnd - run.startTime : undefined,
latency:
latencies.length > 0
? {
p50: percentile(latencies, 50),
p95: percentile(latencies, 95),
p99: percentile(latencies, 99),
max: latencies[latencies.length - 1],
}
: undefined,
};
},
});

// Time-bucketed throughput. Returns one point per `bucketMs` window from
// the run's start: completed (count finishing in that window) and inFlight
// (enqueued - completed at that t).
export const throughputOverTime = query({
args: { runId: v.id("runs"), bucketMs: v.optional(v.number()) },
handler: async (ctx, { runId, bucketMs = 500 }) => {
const run = await ctx.db.get("runs", runId);
if (!run) return null;
const tasks = await ctx.db
.query("tasks")
.withIndex("runId", (q) => q.eq("runId", runId))
.collect();
if (tasks.length === 0) return { bucketMs, points: [] };

const start = run.startTime;
const lastEnd = Math.max(...tasks.map((t) => t.endTime));
const totalDurationMs = lastEnd - start;
const numBuckets = Math.max(1, Math.ceil(totalDurationMs / bucketMs) + 1);

const completedPerBucket = new Array<number>(numBuckets).fill(0);
const enqueuedPerBucket = new Array<number>(numBuckets).fill(0);
for (const t of tasks) {
const cIdx = Math.min(
numBuckets - 1,
Math.floor((t.endTime - start) / bucketMs),
);
completedPerBucket[cIdx]++;
if (t.enqueuedAt !== undefined) {
const eIdx = Math.max(
0,
Math.min(
numBuckets - 1,
Math.floor((t.enqueuedAt - start) / bucketMs),
),
);
Comment on lines +96 to +102

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Defensive clamping may hide data integrity issues.

The Math.max(0, ...) at line 96 silently clamps negative bucket indices when t.enqueuedAt < start. This would occur if a task was enqueued before the run started, indicating a data integrity problem.

Consider adding a warning or assertion to surface this condition rather than silently accepting it:

🛡️ Suggested improvement
       if (t.enqueuedAt !== undefined) {
+        if (t.enqueuedAt < start) {
+          console.warn(`Task ${t.workId} enqueuedAt (${t.enqueuedAt}) is before run start (${start})`);
+        }
         const eIdx = Math.max(
           0,
           Math.min(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@example/convex/test/dashboard.ts` around lines 96 - 102, The current eIdx
computation silently clamps negative indices (Math.max(0, ...)) when
t.enqueuedAt < start; update the code around the eIdx calculation to first
detect this condition (check if t.enqueuedAt < start) and surface it (e.g.,
console.warn/processLogger.warn or throw/assert with identifying info from t
such as an id or enqueuedAt) before performing the clamping, then compute eIdx
using the same Math.floor/min logic; keep the final clamping to avoid crashes
but ensure the warning/assertion makes the data-integrity issue visible.

enqueuedPerBucket[eIdx]++;
}
}
const points: Array<{
tMs: number;
completed: number;
enqueued: number;
inFlight: number;
}> = [];
let cumEnqueued = 0;
let cumCompleted = 0;
for (let i = 0; i < numBuckets; i++) {
cumEnqueued += enqueuedPerBucket[i];
cumCompleted += completedPerBucket[i];
points.push({
tMs: i * bucketMs,
completed: completedPerBucket[i],
enqueued: enqueuedPerBucket[i],
inFlight: Math.max(0, cumEnqueued - cumCompleted),
});
}
return { bucketMs, points };
},
});

// Sorted latency array, thinned to ~200 points for CDF plotting.
export const latencyCdf = query({
args: { runId: v.id("runs"), points: v.optional(v.number()) },
handler: async (ctx, { runId, points = 200 }) => {
const tasks = await ctx.db
.query("tasks")
.withIndex("runId", (q) => q.eq("runId", runId))
.collect();
const latencies = tasks
.filter((t) => t.enqueuedAt !== undefined)
.map((t) => t.endTime - t.enqueuedAt!)
.sort((a, b) => a - b);
if (latencies.length === 0) return [];
const stride = Math.max(1, Math.floor(latencies.length / points));
const out: Array<{ pct: number; ms: number }> = [];
for (let i = 0; i < latencies.length; i += stride) {
out.push({
pct: ((i + 1) / latencies.length) * 100,
ms: latencies[i],
});
}
// ensure the max latency point is present
out.push({ pct: 100, ms: latencies[latencies.length - 1] });
return out;
},
});

// Live status of the currently-running scenario, if any.
export const latestRunStatus = query({
args: {},
handler: async (ctx) => {
const run = await ctx.db.query("runs").order("desc").first();
if (!run) return null;
const status = await runStatus(ctx, run);
return { runId: run._id, scenario: run.scenario, status };
},
});

const scenarioName = v.union(
v.literal("burstyBatches"),
v.literal("throughput"),
v.literal("overhead"),
v.literal("sustained"),
v.literal("bigArgs"),
v.literal("bigContext"),
v.literal("bigReturnTypes"),
);

/**
* Public action so the dashboard can trigger one or more scenario runs by
* name. Multi-launch is sequenced server-side: each non-final entry is
* awaited via `ctx.runAction` (so we know its tasks are done and the
* scenario's poll loop has returned), then we sleep past the 5s
* "previous run started too recently" guard in `run.start` before the
* next launch. The final entry is also awaited so that any `run.start`
* failure surfaces back to the dashboard instead of disappearing into a
* scheduled action's logs.
*/
const GUARD_BUFFER_MS = 5_500;
/**
* Concurrent benchmark: fires the same scenario at both pools simultaneously
* and waits for both runs to fully complete. Useful for testing whether
* scheduler thrash from a competing pool makes individual pool throughput
* worse than running it alone.
*/
export const runConcurrent = action({
args: { scenario: scenarioName, args: v.any() },
handler: async (ctx, { scenario, args }) => {
const fn = internal.test.scenarios[scenario].default;
const [,] = await Promise.all([
ctx.runAction(fn, { ...args, pool: "old" }),
ctx.runAction(fn, { ...args, pool: "new" }),
]);
},
});

export const runScenarios = action({
args: { scenario: scenarioName, argsList: v.array(v.any()) },
handler: async (ctx, { scenario, argsList }) => {
const fn = internal.test.scenarios[scenario].default;
for (let i = 0; i < argsList.length; i++) {
if (i > 0) {
await new Promise((r) => setTimeout(r, GUARD_BUFFER_MS));
}
await ctx.runAction(fn, argsList[i]);
}
},
});
Loading
Loading