diff --git a/package.json b/package.json index cda43e71..38de5b8d 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,8 @@ "lint:tool-schemas": "node dist/index.js serve --introspect-tools-list | node scripts/lint-tool-schemas.mjs -", "clean": "rimraf dist", "prepare": "npm run build", - "lint:changed": "node scripts/lint-changed-src.js" + "lint:changed": "node scripts/lint-changed-src.js", + "harness:parallel-smoke": "ts-node tests/harness/parallel-smoke.ts" }, "keywords": [ "openchrome", diff --git a/tests/harness/parallel-runner.test.ts b/tests/harness/parallel-runner.test.ts new file mode 100644 index 00000000..3a0ad7f8 --- /dev/null +++ b/tests/harness/parallel-runner.test.ts @@ -0,0 +1,129 @@ +/// + +import * as fs from 'node:fs/promises'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { HarnessParallelRunner, HarnessScenario, sleep } from './parallel-runner'; + +describe('HarnessParallelRunner', () => { + test('enforces concurrency limit', async () => { + let active = 0; + let maxActive = 0; + const scenarios: HarnessScenario[] = Array.from({ length: 6 }, (_, i) => ({ + id: `s${i}`, + run: async () => { + active++; + maxActive = Math.max(maxActive, active); + await sleep(20); + active--; + return i; + }, + })); + + const runner = new HarnessParallelRunner({ + concurrency: 2, + scenarioTimeoutMs: 500, + maxErrors: 10, + stragglerAfterMs: 250, + }); + + const result = await runner.run(scenarios); + + expect(result.completed).toHaveLength(6); + expect(maxActive).toBeLessThanOrEqual(2); + expect(result.failed).toHaveLength(0); + expect(result.timedOut).toHaveLength(0); + }); + + test('records stragglers separately from timed out scenarios', async () => { + const runner = new HarnessParallelRunner({ + concurrency: 1, + scenarioTimeoutMs: 500, + maxErrors: 5, + stragglerAfterMs: 20, + }); + + const result = await runner.run([{ id: 'slow-ok', run: async () => { await sleep(60); return 'ok'; } }]); + + expect(result.completed).toHaveLength(1); + expect(result.timedOut).toHaveLength(0); + expect(result.stragglers.map((s) => s.id)).toContain('slow-ok'); + }); + + test('times out scenarios and invokes cleanup', async () => { + let cleaned = false; + const runner = new HarnessParallelRunner({ + concurrency: 1, + scenarioTimeoutMs: 20, + maxErrors: 5, + stragglerAfterMs: 5, + }); + + const result = await runner.run([{ id: 'timeout', run: async (signal) => { await sleep(1_000, signal); return 'late'; }, cleanup: () => { cleaned = true; } }]); + + expect(result.completed).toHaveLength(0); + expect(result.timedOut.map((t) => t.id)).toContain('timeout'); + expect(cleaned).toBe(true); + }); + + test('cancels queued scenarios after maxErrors while preserving partial evidence', async () => { + const runner = new HarnessParallelRunner({ + concurrency: 1, + scenarioTimeoutMs: 200, + maxErrors: 1, + stragglerAfterMs: 100, + }); + + const result = await runner.run([ + { id: 'ok', run: async () => 'done' }, + { id: 'fail', run: async () => { throw new Error('boom'); } }, + { id: 'queued', run: async () => 'not-run' }, + ]); + + expect(result.completed.map((r) => r.id)).toContain('ok'); + expect(result.failed.map((r) => r.id)).toContain('fail'); + expect(result.cancelled).toBe(true); + expect(result.results.find((r) => r.id === 'queued')?.status).toBe('cancelled'); + }); + + + test('aborts active scenarios when maxErrors is reached', async () => { + let slowCleaned = false; + const runner = new HarnessParallelRunner({ + concurrency: 2, + scenarioTimeoutMs: 1_000, + maxErrors: 1, + stragglerAfterMs: 500, + }); + + const result = await runner.run([ + { id: 'fail-fast', run: async () => { throw new Error('boom'); } }, + { id: 'slow-active', run: async (signal) => { await sleep(1_000, signal); return 'late'; }, cleanup: () => { slowCleaned = true; } }, + { id: 'queued', run: async () => 'not-run' }, + ]); + + expect(result.cancelled).toBe(true); + expect(result.failed.map((r) => r.id)).toContain('fail-fast'); + expect(result.results.find((r) => r.id === 'slow-active')?.status).toBe('cancelled'); + expect(result.results.find((r) => r.id === 'queued')?.status).toBe('cancelled'); + expect(slowCleaned).toBe(true); + }); + + test('writes partial result JSON', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'oc-harness-')); + const output = path.join(dir, 'partial.json'); + const runner = new HarnessParallelRunner({ + concurrency: 1, + scenarioTimeoutMs: 200, + maxErrors: 5, + stragglerAfterMs: 100, + partialWritePath: output, + }); + + await runner.run([{ id: 'ok', run: async () => 'done' }]); + const parsed = JSON.parse(await fs.readFile(output, 'utf8')); + + expect(parsed.parallel.completed).toHaveLength(1); + expect(parsed.parallel.results[0].id).toBe('ok'); + }); +}); diff --git a/tests/harness/parallel-runner.ts b/tests/harness/parallel-runner.ts new file mode 100644 index 00000000..a5cfb151 --- /dev/null +++ b/tests/harness/parallel-runner.ts @@ -0,0 +1,284 @@ +import * as fs from 'node:fs/promises'; +import * as path from 'node:path'; + +export interface HarnessScenario { + id: string; + run(signal: AbortSignal): Promise; + cleanup?(): Promise | void; +} + +export interface HarnessParallelRunnerOptions { + concurrency: number; + scenarioTimeoutMs: number; + maxErrors: number; + stragglerAfterMs: number; + partialWritePath?: string; + now?: () => number; +} + +export interface HarnessCompleted { + id: string; + result: T; + durationMs: number; +} + +export interface HarnessFailure { + id: string; + error: string; + durationMs: number; +} + +export interface HarnessTimeout { + id: string; + durationMs: number; +} + +export interface HarnessStraggler { + id: string; + durationMs: number; + stragglerAfterMs: number; +} + +export interface HarnessScenarioSummary { + id: string; + status: 'completed' | 'failed' | 'timedOut' | 'cancelled'; + durationMs: number; + result?: T; + error?: string; +} + +export interface HarnessRunResult { + completed: Array>; + failed: HarnessFailure[]; + timedOut: HarnessTimeout[]; + cancelled: boolean; + stragglers: HarnessStraggler[]; + results: Array>; + concurrency: number; + maxErrors: number; + scenarioTimeoutMs: number; + stragglerAfterMs: number; + startedAt: string; + endedAt: string; + durationMs: number; +} + +interface NormalizedOptions extends Required> { + partialWritePath?: string; +} + +export class HarnessParallelRunner { + private readonly options: NormalizedOptions; + private writeChain: Promise = Promise.resolve(); + private writeSeq = 0; + private readonly activeControllers = new Set(); + private readonly cancellationControllers = new Set(); + + constructor(options: HarnessParallelRunnerOptions) { + if (!Number.isInteger(options.concurrency) || options.concurrency < 1) { + throw new Error('concurrency must be a positive integer'); + } + if (!Number.isFinite(options.scenarioTimeoutMs) || options.scenarioTimeoutMs < 1) { + throw new Error('scenarioTimeoutMs must be a positive number'); + } + if (!Number.isInteger(options.maxErrors) || options.maxErrors < 1) { + throw new Error('maxErrors must be a positive integer'); + } + if (!Number.isFinite(options.stragglerAfterMs) || options.stragglerAfterMs < 1) { + throw new Error('stragglerAfterMs must be a positive number'); + } + + this.options = { + concurrency: options.concurrency, + scenarioTimeoutMs: options.scenarioTimeoutMs, + maxErrors: options.maxErrors, + stragglerAfterMs: options.stragglerAfterMs, + partialWritePath: options.partialWritePath, + now: options.now ?? Date.now, + }; + } + + async run(scenarios: Array>): Promise> { + const startedAtMs = this.options.now(); + const result: HarnessRunResult = { + completed: [], + failed: [], + timedOut: [], + cancelled: false, + stragglers: [], + results: [], + concurrency: this.options.concurrency, + maxErrors: this.options.maxErrors, + scenarioTimeoutMs: this.options.scenarioTimeoutMs, + stragglerAfterMs: this.options.stragglerAfterMs, + startedAt: new Date(startedAtMs).toISOString(), + endedAt: new Date(startedAtMs).toISOString(), + durationMs: 0, + }; + + let nextIndex = 0; + let active = 0; + let errorCount = 0; + + await new Promise((resolve) => { + const launchMore = () => { + if (result.cancelled && active === 0) { + resolve(); + return; + } + + while (!result.cancelled && active < this.options.concurrency && nextIndex < scenarios.length) { + const scenario = scenarios[nextIndex++]; + active++; + void this.runOne(scenario, result) + .then((status) => { + if (status === 'failed' || status === 'timedOut') { + errorCount++; + if (errorCount >= this.options.maxErrors) { + result.cancelled = nextIndex < scenarios.length || active > 1; + this.abortActiveScenarios(); + } + } + }) + .finally(() => { + active--; + void this.writePartial(result) + .catch(() => undefined) + .finally(() => { + if ((nextIndex >= scenarios.length || result.cancelled) && active === 0) { + resolve(); + } else { + launchMore(); + } + }); + }); + } + + if (nextIndex >= scenarios.length && active === 0) { + resolve(); + } + }; + + launchMore(); + }); + + if (result.cancelled) { + for (let i = nextIndex; i < scenarios.length; i++) { + result.results.push({ id: scenarios[i].id, status: 'cancelled', durationMs: 0 }); + } + } + + const endedAtMs = this.options.now(); + result.endedAt = new Date(endedAtMs).toISOString(); + result.durationMs = Math.max(0, endedAtMs - startedAtMs); + await this.writePartial(result); + return result; + } + + private async runOne( + scenario: HarnessScenario, + aggregate: HarnessRunResult, + ): Promise<'completed' | 'failed' | 'timedOut' | 'cancelled'> { + const startedAt = this.options.now(); + const controller = new AbortController(); + this.activeControllers.add(controller); + let stragglerRecorded = false; + let settled = false; + + const recordStraggler = () => { + if (stragglerRecorded || settled) return; + stragglerRecorded = true; + aggregate.stragglers.push({ + id: scenario.id, + durationMs: Math.max(0, this.options.now() - startedAt), + stragglerAfterMs: this.options.stragglerAfterMs, + }); + }; + + const stragglerTimer = setTimeout(recordStraggler, this.options.stragglerAfterMs); + const timeoutTimer = setTimeout(() => controller.abort(), this.options.scenarioTimeoutMs); + stragglerTimer.unref?.(); + timeoutTimer.unref?.(); + + const finish = async () => { + settled = true; + this.activeControllers.delete(controller); + clearTimeout(stragglerTimer); + clearTimeout(timeoutTimer); + try { + await scenario.cleanup?.(); + } catch { + // Cleanup is best-effort for harness evidence. The original scenario + // result remains the source of truth. + } + }; + + try { + const value = await scenario.run(controller.signal); + const durationMs = Math.max(0, this.options.now() - startedAt); + await finish(); + aggregate.completed.push({ id: scenario.id, result: value, durationMs }); + aggregate.results.push({ id: scenario.id, status: 'completed', durationMs, result: value }); + return 'completed'; + } catch (error) { + const durationMs = Math.max(0, this.options.now() - startedAt); + await finish(); + if (controller.signal.aborted) { + if (this.cancellationControllers.has(controller)) { + this.cancellationControllers.delete(controller); + aggregate.results.push({ id: scenario.id, status: 'cancelled', durationMs, error: stringifyError(error) }); + return 'cancelled'; + } + aggregate.timedOut.push({ id: scenario.id, durationMs }); + aggregate.results.push({ id: scenario.id, status: 'timedOut', durationMs, error: stringifyError(error) }); + return 'timedOut'; + } + aggregate.failed.push({ id: scenario.id, error: stringifyError(error), durationMs }); + aggregate.results.push({ id: scenario.id, status: 'failed', durationMs, error: stringifyError(error) }); + return 'failed'; + } + } + + private abortActiveScenarios(): void { + for (const controller of this.activeControllers) { + this.cancellationControllers.add(controller); + controller.abort(); + } + } + + private async writePartial(result: HarnessRunResult): Promise { + if (!this.options.partialWritePath) return; + const filePath = this.options.partialWritePath; + const snapshot = JSON.stringify({ parallel: result }, null, 2); + this.writeChain = this.writeChain.then(async () => { + await fs.mkdir(path.dirname(filePath), { recursive: true }); + const tmp = `${filePath}.tmp-${process.pid}-${Date.now()}-${this.writeSeq++}`; + await fs.writeFile(tmp, snapshot); + await fs.rename(tmp, filePath); + }); + await this.writeChain; + } +} + +export function stringifyError(error: unknown): string { + if (error instanceof Error) return error.message; + return String(error); +} + +export function sleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(new Error('aborted')); + return; + } + const onAbort = () => { + clearTimeout(timer); + reject(new Error('aborted')); + }; + const timer = setTimeout(() => { + signal?.removeEventListener('abort', onAbort); + resolve(); + }, ms); + signal?.addEventListener('abort', onAbort, { once: true }); + }); +} diff --git a/tests/harness/parallel-smoke.ts b/tests/harness/parallel-smoke.ts new file mode 100644 index 00000000..bbda9c98 --- /dev/null +++ b/tests/harness/parallel-smoke.ts @@ -0,0 +1,102 @@ +import { HarnessParallelRunner, HarnessScenario, sleep } from './parallel-runner'; + +interface SmokeResult { + ok: boolean; + kind: string; +} + +function parseArgs(argv: string[]): Record { + const out: Record = {}; + for (let i = 2; i < argv.length; i++) { + const arg = argv[i]; + if (arg.startsWith('--')) { + const key = arg.slice(2); + const next = argv[i + 1]; + if (next && !next.startsWith('--')) { + out[key] = next; + i++; + } else { + out[key] = true; + } + } + } + return out; +} + +function intArg(args: Record, key: string, fallback: number): number { + const raw = args[key]; + if (typeof raw !== 'string') return fallback; + const parsed = Number.parseInt(raw, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +function makeScenarios(args: Record): HarnessScenario[] { + const scenarios: HarnessScenario[] = [ + { id: 'fast-one', run: async () => ({ ok: true, kind: 'fast' }) }, + { id: 'fast-two', run: async () => ({ ok: true, kind: 'fast' }) }, + { id: 'fast-three', run: async () => ({ ok: true, kind: 'fast' }) }, + ]; + + if (args['include-straggler-fixture']) { + scenarios.push({ + id: 'intentional-straggler', + run: async (signal) => { + await sleep(150, signal); + return { ok: true, kind: 'straggler' }; + }, + }); + } + + if (args['include-timeout-fixture']) { + scenarios.push({ + id: 'intentional-timeout', + run: async (signal) => { + await sleep(10_000, signal); + return { ok: true, kind: 'timeout-unexpected' }; + }, + }); + } + + if (args['include-failing-fixtures']) { + scenarios.push({ + id: 'intentional-failure', + run: async () => { + throw new Error('intentional fixture failure'); + }, + }); + scenarios.push({ id: 'queued-after-failure', run: async () => ({ ok: true, kind: 'should-cancel' }) }); + } + + return scenarios; +} + +async function main(): Promise { + const args = parseArgs(process.argv); + const output = typeof args.output === 'string' ? args.output : 'artifacts/harness-parallel/latest.json'; + const runner = new HarnessParallelRunner({ + concurrency: intArg(args, 'concurrency', 2), + scenarioTimeoutMs: intArg(args, 'scenario-timeout-ms', 1_000), + maxErrors: intArg(args, 'max-errors', 5), + stragglerAfterMs: intArg(args, 'straggler-after-ms', 50), + partialWritePath: output, + }); + + const result = await runner.run(makeScenarios(args)); + console.log(JSON.stringify({ + completed: result.completed.length, + failed: result.failed.length, + timedOut: result.timedOut.length, + cancelled: result.cancelled, + stragglers: result.stragglers.length, + output, + }, null, 2)); + + if (result.failed.length > 0 || result.timedOut.length > 0 || result.cancelled) { + process.exitCode = 1; + } +} + +main().catch((error) => { + console.error(error); + process.exit(1); +});