diff --git a/src/orchestration/plan-executor.ts b/src/orchestration/plan-executor.ts index 3138164f..03b149d0 100644 --- a/src/orchestration/plan-executor.ts +++ b/src/orchestration/plan-executor.ts @@ -11,7 +11,9 @@ import { CompiledStep, PlanErrorHandler, PlanExecutionResult, + PlanStepExecutionRecord, } from '../types/plan-cache'; +import * as crypto from 'node:crypto'; import { withTimeout } from '../utils/with-timeout'; /** @@ -42,6 +44,53 @@ function substituteParams(value: unknown, params: Record): unkn } +function stableHash(value: unknown): string { + return crypto.createHash('sha256').update(canonicalJson(value)).digest('hex'); +} + +function canonicalJson(value: unknown): string { + return JSON.stringify(sortKeys(value)); +} + +function sortKeys(value: unknown): unknown { + if (Array.isArray(value)) return value.map(sortKeys); + if (value !== null && typeof value === 'object') { + const out: Record = {}; + for (const key of Object.keys(value as Record).sort()) { + out[key] = sortKeys((value as Record)[key]); + } + return out; + } + return value; +} + +function computeKnownGoodPrefix(ledger: PlanStepExecutionRecord[]): number { + let prefix = 0; + for (const entry of ledger.filter((step) => step.phase === 'main').sort((a, b) => a.order - b.order)) { + if (entry.order !== prefix + 1 || entry.status !== 'success') break; + prefix = entry.order; + } + return prefix; +} + +function withLedger( + result: Omit, + ledger: PlanStepExecutionRecord[], + frontierStepOrder?: number, + invalidationReason?: string, +): PlanExecutionResult { + return { + ...result, + ledger: { + steps: ledger, + knownGoodPrefixLength: computeKnownGoodPrefix(ledger), + ...(frontierStepOrder !== undefined && { frontierStepOrder }), + ...(invalidationReason && { invalidationReason }), + }, + }; +} + + /** * Extract result data from an MCPResult according to parseResult spec. * Returns the extracted value (raw text, parsed JSON, or a specific field). @@ -144,6 +193,7 @@ export class PlanExecutor { ): Promise { const startTime = Date.now(); let stepsExecuted = 0; + const ledger: PlanStepExecutionRecord[] = []; // 1. Build params map: plan defaults first, runtime overrides on top const params: Record = {}; @@ -154,14 +204,14 @@ export class PlanExecutor { } Object.assign(params, runtimeParams); - const failure = (error: string): PlanExecutionResult => ({ + const failure = (error: string, frontierStepOrder?: number): PlanExecutionResult => withLedger({ success: false, planId: plan.id, error, durationMs: Date.now() - startTime, stepsExecuted, totalSteps: plan.steps.length, - }); + }, ledger, frontierStepOrder, error); // 2. Execute each step sequentially for (const step of plan.steps) { @@ -172,11 +222,14 @@ export class PlanExecutor { if (!handler) { const msg = `No handler found for tool "${step.tool}" at ${stepLabel}`; console.error(`[PlanExecutor] ${msg}`); - return failure(msg); + ledger.push({ order: step.order, tool: step.tool, argsHash: stableHash(step.args), phase: 'main', status: 'failed', durationMs: 0, reason: msg }); + return failure(msg, step.order); } // b. Substitute template variables in args const substitutedArgs = substituteParams(step.args, params) as Record; + const stepStartedAt = Date.now(); + const argsHash = stableHash(substitutedArgs); // c. Call handler with timeout let mcpResult: MCPResult; @@ -190,6 +243,7 @@ export class PlanExecutor { } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); console.error(`[PlanExecutor] Step failed at ${stepLabel}: ${errMsg}`); + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'failed', durationMs: Date.now() - stepStartedAt, reason: errMsg }); // Check for a matching error handler const conditionKey = `step${step.order}_error`; @@ -198,7 +252,8 @@ export class PlanExecutor { plan.errorHandlers, sessionId, params, - stepsExecuted + stepsExecuted, + ledger ); if (recovered !== null) { stepsExecuted = recovered.stepsExecuted; @@ -207,13 +262,14 @@ export class PlanExecutor { continue; } - return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`); + return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`, step.order); } // d. Check for error result if (mcpResult.isError) { const errMsg = mcpResult.content?.[0]?.text ?? 'Unknown tool error'; console.error(`[PlanExecutor] Tool returned error at ${stepLabel}: ${errMsg}`); + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'failed', durationMs: Date.now() - stepStartedAt, reason: errMsg }); const conditionKey = `step${step.order}_error`; const recovered = await this.tryRecovery( @@ -221,7 +277,8 @@ export class PlanExecutor { plan.errorHandlers, sessionId, params, - stepsExecuted + stepsExecuted, + ledger ); if (recovered !== null) { stepsExecuted = recovered.stepsExecuted; @@ -229,18 +286,20 @@ export class PlanExecutor { continue; } - return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`); + return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`, step.order); } // e. Check for empty result (before storing) — may trigger empty_result handler if (isEmptyResult(mcpResult)) { + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'empty_result', durationMs: Date.now() - stepStartedAt, reason: 'empty result' }); const conditionKey = `step${step.order}_empty_result`; const recovered = await this.tryRecovery( conditionKey, plan.errorHandlers, sessionId, params, - stepsExecuted + stepsExecuted, + ledger ); if (recovered !== null) { stepsExecuted = recovered.stepsExecuted; @@ -251,10 +310,12 @@ export class PlanExecutor { } // f. Parse and store result if requested + let storedAs: string | undefined; if (step.parseResult && step.parseResult.storeAs) { try { const extracted = extractResult(mcpResult, step.parseResult); params[step.parseResult.storeAs] = extracted; + storedAs = step.parseResult.storeAs; } catch (err) { console.error( `[PlanExecutor] Failed to extract result at ${stepLabel}: ${ @@ -264,31 +325,34 @@ export class PlanExecutor { // Non-fatal: continue without storing } } + if (!isEmptyResult(mcpResult)) { + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'success', durationMs: Date.now() - stepStartedAt, storedAs }); + } } // 3. Validate success criteria const criteriaError = validateSuccessCriteria(plan.successCriteria, params); if (criteriaError) { console.error(`[PlanExecutor] Success criteria failed for plan=${plan.id}: ${criteriaError}`); - return { + return withLedger({ success: false, planId: plan.id, error: `Success criteria not met: ${criteriaError}`, durationMs: Date.now() - startTime, stepsExecuted, totalSteps: plan.steps.length, - }; + }, ledger, computeKnownGoodPrefix(ledger) + 1, `Success criteria not met: ${criteriaError}`); } // 4. Return success with all collected params as data - return { + return withLedger({ success: true, planId: plan.id, data: params, durationMs: Date.now() - startTime, stepsExecuted, totalSteps: plan.steps.length, - }; + }, ledger); } /** @@ -300,7 +364,8 @@ export class PlanExecutor { errorHandlers: PlanErrorHandler[], sessionId: string, params: Record, - currentStepsExecuted: number + currentStepsExecuted: number, + ledger: PlanStepExecutionRecord[] ): Promise<{ stepsExecuted: number; params: Record } | null> { const handler = errorHandlers.find((h) => h.condition === conditionKey); if (!handler) return null; @@ -321,6 +386,8 @@ export class PlanExecutor { } const substitutedArgs = substituteParams(step.args, params) as Record; + const recoveryStartedAt = Date.now(); + const argsHash = stableHash(substitutedArgs); let mcpResult: MCPResult; try { @@ -331,27 +398,29 @@ export class PlanExecutor { ); stepsExecuted++; } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); console.error( - `[PlanExecutor] Recovery step failed at ${stepLabel}: ${ - err instanceof Error ? err.message : String(err) - }` + `[PlanExecutor] Recovery step failed at ${stepLabel}: ${errMsg}` ); + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: 'failed', durationMs: Date.now() - recoveryStartedAt, reason: errMsg }); continue; } if (mcpResult.isError) { + const errMsg = mcpResult.content?.[0]?.text ?? 'unknown'; console.error( - `[PlanExecutor] Recovery step returned error at ${stepLabel}: ${ - mcpResult.content?.[0]?.text ?? 'unknown' - }` + `[PlanExecutor] Recovery step returned error at ${stepLabel}: ${errMsg}` ); + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: 'failed', durationMs: Date.now() - recoveryStartedAt, reason: errMsg }); continue; } + let storedAs: string | undefined; if (step.parseResult && step.parseResult.storeAs) { try { const extracted = extractResult(mcpResult, step.parseResult); params[step.parseResult.storeAs] = extracted; + storedAs = step.parseResult.storeAs; } catch (err) { console.error( `[PlanExecutor] Recovery: failed to extract result at ${stepLabel}: ${ @@ -360,6 +429,7 @@ export class PlanExecutor { ); } } + ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: isEmptyResult(mcpResult) ? 'empty_result' : 'success', durationMs: Date.now() - recoveryStartedAt, storedAs }); } return { stepsExecuted, params }; diff --git a/src/types/plan-cache.ts b/src/types/plan-cache.ts index a52af178..7b01accb 100644 --- a/src/types/plan-cache.ts +++ b/src/types/plan-cache.ts @@ -109,6 +109,40 @@ export interface PlanRegistryData { updatedAt: number; } + +/** Per-step ledger entry emitted by PlanExecutor for recovery/resume diagnostics. */ +export interface PlanStepExecutionRecord { + /** Original plan step order, or recovery step order within a handler. */ + order: number; + /** Tool executed for this step. */ + tool: string; + /** Stable hash of substituted arguments; never stores raw args. */ + argsHash: string; + /** Whether this entry came from the main plan or a recovery handler. */ + phase: 'main' | 'recovery'; + /** Recovery condition when phase === 'recovery'. */ + recoveryCondition?: string; + /** Step outcome. */ + status: 'success' | 'failed' | 'empty_result' | 'skipped'; + /** Wall-clock time spent in this step. */ + durationMs: number; + /** Optional concise reason for failure/empty/skipped status. */ + reason?: string; + /** Stored output variable name when parseResult.storeAs was applied. */ + storedAs?: string; +} + +/** Known-good prefix / invalidated frontier summary for a plan run. */ +export interface PlanExecutionLedger { + steps: PlanStepExecutionRecord[]; + /** Count of contiguous successful main-plan steps from the start. */ + knownGoodPrefixLength: number; + /** First main-plan step that invalidated forward progress, if any. */ + frontierStepOrder?: number; + /** Human-readable reason for the frontier. */ + invalidationReason?: string; +} + /** Result of plan execution */ export interface PlanExecutionResult { /** Whether the plan executed successfully */ @@ -125,4 +159,6 @@ export interface PlanExecutionResult { stepsExecuted: number; /** Total steps in plan */ totalSteps: number; + /** Per-step recovery metadata for known-good prefix/frontier diagnostics. */ + ledger?: PlanExecutionLedger; } diff --git a/tests/orchestration/plan-cache.test.ts b/tests/orchestration/plan-cache.test.ts index 616398ee..844a55df 100644 --- a/tests/orchestration/plan-cache.test.ts +++ b/tests/orchestration/plan-cache.test.ts @@ -783,3 +783,65 @@ describeIntegration('Integration: PlanRegistry + PlanExecutor', () => { expect(after).toBeGreaterThan(before); }); }); + +describe('PlanExecutor known-good prefix ledger', () => { + const SESSION_ID = 'ledger-session'; + + function handler(text: string, isError = false): ToolHandler { + return jest.fn(async () => ({ content: [{ type: 'text' as const, text }], isError })); + } + + function resolver(handlers: Record) { + return (toolName: string): ToolHandler | null => handlers[toolName] ?? null; + } + + test('records all-success known-good prefix', async () => { + const executor = new PlanExecutor(resolver({ a: handler('A'), b: handler('B') })); + const plan = buildPlan({ + id: 'ledger-success', + steps: [buildStep({ order: 1, tool: 'a', args: { b: 2, a: 1 } }), buildStep({ order: 2, tool: 'b', args: {} })], + successCriteria: {}, + }); + + const result = await executor.execute(plan, SESSION_ID, {}); + + expect(result.success).toBe(true); + expect(result.ledger?.knownGoodPrefixLength).toBe(2); + expect(result.ledger?.frontierStepOrder).toBeUndefined(); + expect(result.ledger?.steps.map((s) => s.status)).toEqual(['success', 'success']); + expect(result.ledger?.steps[0].argsHash).toMatch(/^[a-f0-9]{64}$/); + }); + + test('records failed frontier after known-good prefix', async () => { + const executor = new PlanExecutor(resolver({ a: handler('A'), b: jest.fn(async () => { throw new Error('boom'); }) })); + const plan = buildPlan({ + id: 'ledger-fail', + steps: [buildStep({ order: 1, tool: 'a', args: {} }), buildStep({ order: 2, tool: 'b', args: {} })], + successCriteria: {}, + }); + + const result = await executor.execute(plan, SESSION_ID, {}); + + expect(result.success).toBe(false); + expect(result.ledger?.knownGoodPrefixLength).toBe(1); + expect(result.ledger?.frontierStepOrder).toBe(2); + expect(result.ledger?.invalidationReason).toContain('boom'); + }); + + test('records recovery handler metadata for empty result', async () => { + const executor = new PlanExecutor(resolver({ empty: handler('{}'), recover: handler('{"ok":true}') })); + const plan = buildPlan({ + id: 'ledger-recovery', + steps: [buildStep({ order: 1, tool: 'empty', args: {} })], + errorHandlers: [{ condition: 'step1_empty_result', action: 'recover', steps: [buildStep({ order: 1, tool: 'recover', args: {} })] }], + successCriteria: {}, + }); + + const result = await executor.execute(plan, SESSION_ID, {}); + + expect(result.ledger?.steps).toEqual(expect.arrayContaining([ + expect.objectContaining({ phase: 'main', status: 'empty_result' }), + expect.objectContaining({ phase: 'recovery', recoveryCondition: 'step1_empty_result', status: 'success' }), + ])); + }); +});