Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 89 additions & 19 deletions src/orchestration/plan-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import {
CompiledStep,
PlanErrorHandler,
PlanExecutionResult,
PlanStepExecutionRecord,
} from '../types/plan-cache';
import * as crypto from 'node:crypto';
import { withTimeout } from '../utils/with-timeout';

/**
Expand Down Expand Up @@ -42,6 +44,53 @@ function substituteParams(value: unknown, params: Record<string, unknown>): unkn
}


function stableHash(value: unknown): string {
return crypto.createHash('sha256').update(canonicalJson(value)).digest('hex');
}

function canonicalJson(value: unknown): string {
return JSON.stringify(sortKeys(value));
}

function sortKeys(value: unknown): unknown {
if (Array.isArray(value)) return value.map(sortKeys);
if (value !== null && typeof value === 'object') {
const out: Record<string, unknown> = {};
for (const key of Object.keys(value as Record<string, unknown>).sort()) {
out[key] = sortKeys((value as Record<string, unknown>)[key]);
}
return out;
}
return value;
}

function computeKnownGoodPrefix(ledger: PlanStepExecutionRecord[]): number {
let prefix = 0;
for (const entry of ledger.filter((step) => step.phase === 'main').sort((a, b) => a.order - b.order)) {
if (entry.order !== prefix + 1 || entry.status !== 'success') break;
prefix = entry.order;
}
return prefix;
}

function withLedger(
result: Omit<PlanExecutionResult, 'ledger'>,
ledger: PlanStepExecutionRecord[],
frontierStepOrder?: number,
invalidationReason?: string,
): PlanExecutionResult {
return {
...result,
ledger: {
steps: ledger,
knownGoodPrefixLength: computeKnownGoodPrefix(ledger),
...(frontierStepOrder !== undefined && { frontierStepOrder }),
...(invalidationReason && { invalidationReason }),
},
};
}


/**
* Extract result data from an MCPResult according to parseResult spec.
* Returns the extracted value (raw text, parsed JSON, or a specific field).
Expand Down Expand Up @@ -144,6 +193,7 @@ export class PlanExecutor {
): Promise<PlanExecutionResult> {
const startTime = Date.now();
let stepsExecuted = 0;
const ledger: PlanStepExecutionRecord[] = [];

// 1. Build params map: plan defaults first, runtime overrides on top
const params: Record<string, unknown> = {};
Expand All @@ -154,14 +204,14 @@ export class PlanExecutor {
}
Object.assign(params, runtimeParams);

const failure = (error: string): PlanExecutionResult => ({
const failure = (error: string, frontierStepOrder?: number): PlanExecutionResult => withLedger({
success: false,
planId: plan.id,
error,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
});
}, ledger, frontierStepOrder, error);

// 2. Execute each step sequentially
for (const step of plan.steps) {
Expand All @@ -172,11 +222,14 @@ export class PlanExecutor {
if (!handler) {
const msg = `No handler found for tool "${step.tool}" at ${stepLabel}`;
console.error(`[PlanExecutor] ${msg}`);
return failure(msg);
ledger.push({ order: step.order, tool: step.tool, argsHash: stableHash(step.args), phase: 'main', status: 'failed', durationMs: 0, reason: msg });
return failure(msg, step.order);
}

// b. Substitute template variables in args
const substitutedArgs = substituteParams(step.args, params) as Record<string, unknown>;
const stepStartedAt = Date.now();
const argsHash = stableHash(substitutedArgs);

// c. Call handler with timeout
let mcpResult: MCPResult;
Expand All @@ -190,6 +243,7 @@ export class PlanExecutor {
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
console.error(`[PlanExecutor] Step failed at ${stepLabel}: ${errMsg}`);
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'failed', durationMs: Date.now() - stepStartedAt, reason: errMsg });

// Check for a matching error handler
const conditionKey = `step${step.order}_error`;
Expand All @@ -198,7 +252,8 @@ export class PlanExecutor {
plan.errorHandlers,
sessionId,
params,
stepsExecuted
stepsExecuted,
ledger
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Expand All @@ -207,40 +262,44 @@ export class PlanExecutor {
continue;
}

return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`);
return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`, step.order);
}

// d. Check for error result
if (mcpResult.isError) {
const errMsg = mcpResult.content?.[0]?.text ?? 'Unknown tool error';
console.error(`[PlanExecutor] Tool returned error at ${stepLabel}: ${errMsg}`);
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'failed', durationMs: Date.now() - stepStartedAt, reason: errMsg });

const conditionKey = `step${step.order}_error`;
const recovered = await this.tryRecovery(
conditionKey,
plan.errorHandlers,
sessionId,
params,
stepsExecuted
stepsExecuted,
ledger
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Object.assign(params, recovered.params);
continue;
}

return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`);
return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`, step.order);
}

// e. Check for empty result (before storing) — may trigger empty_result handler
if (isEmptyResult(mcpResult)) {
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'empty_result', durationMs: Date.now() - stepStartedAt, reason: 'empty result' });
const conditionKey = `step${step.order}_empty_result`;
const recovered = await this.tryRecovery(
conditionKey,
plan.errorHandlers,
sessionId,
params,
stepsExecuted
stepsExecuted,
ledger
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Expand All @@ -251,10 +310,12 @@ export class PlanExecutor {
}

// f. Parse and store result if requested
let storedAs: string | undefined;
if (step.parseResult && step.parseResult.storeAs) {
try {
const extracted = extractResult(mcpResult, step.parseResult);
params[step.parseResult.storeAs] = extracted;
storedAs = step.parseResult.storeAs;
} catch (err) {
console.error(
`[PlanExecutor] Failed to extract result at ${stepLabel}: ${
Expand All @@ -264,31 +325,34 @@ export class PlanExecutor {
// Non-fatal: continue without storing
}
}
if (!isEmptyResult(mcpResult)) {
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'main', status: 'success', durationMs: Date.now() - stepStartedAt, storedAs });
}
}

// 3. Validate success criteria
const criteriaError = validateSuccessCriteria(plan.successCriteria, params);
if (criteriaError) {
console.error(`[PlanExecutor] Success criteria failed for plan=${plan.id}: ${criteriaError}`);
return {
return withLedger({
success: false,
planId: plan.id,
error: `Success criteria not met: ${criteriaError}`,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
};
}, ledger, computeKnownGoodPrefix(ledger) + 1, `Success criteria not met: ${criteriaError}`);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not point frontier to a non-existent step

When success criteria fail after step execution, frontierStepOrder is set to computeKnownGoodPrefix(ledger) + 1, which can exceed totalSteps (for example, all steps succeed but required fields are missing). That produces a frontier that does not map to any main-plan step, so any consumer that retries or highlights the frontier step will target an invalid index. This path should leave frontierStepOrder unset (or map it to a real step) when the invalidation is post-step validation rather than a specific step failure.

Useful? React with 👍 / 👎.

}

// 4. Return success with all collected params as data
return {
return withLedger({
success: true,
planId: plan.id,
data: params,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
};
}, ledger);
}

/**
Expand All @@ -300,7 +364,8 @@ export class PlanExecutor {
errorHandlers: PlanErrorHandler[],
sessionId: string,
params: Record<string, unknown>,
currentStepsExecuted: number
currentStepsExecuted: number,
ledger: PlanStepExecutionRecord[]
): Promise<{ stepsExecuted: number; params: Record<string, unknown> } | null> {
const handler = errorHandlers.find((h) => h.condition === conditionKey);
if (!handler) return null;
Expand All @@ -321,6 +386,8 @@ export class PlanExecutor {
}

const substitutedArgs = substituteParams(step.args, params) as Record<string, unknown>;
const recoveryStartedAt = Date.now();
const argsHash = stableHash(substitutedArgs);

let mcpResult: MCPResult;
try {
Expand All @@ -331,27 +398,29 @@ export class PlanExecutor {
);
stepsExecuted++;
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
console.error(
`[PlanExecutor] Recovery step failed at ${stepLabel}: ${
err instanceof Error ? err.message : String(err)
}`
`[PlanExecutor] Recovery step failed at ${stepLabel}: ${errMsg}`
);
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: 'failed', durationMs: Date.now() - recoveryStartedAt, reason: errMsg });
continue;
}

if (mcpResult.isError) {
const errMsg = mcpResult.content?.[0]?.text ?? 'unknown';
console.error(
`[PlanExecutor] Recovery step returned error at ${stepLabel}: ${
mcpResult.content?.[0]?.text ?? 'unknown'
}`
`[PlanExecutor] Recovery step returned error at ${stepLabel}: ${errMsg}`
);
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: 'failed', durationMs: Date.now() - recoveryStartedAt, reason: errMsg });
continue;
}

let storedAs: string | undefined;
if (step.parseResult && step.parseResult.storeAs) {
try {
const extracted = extractResult(mcpResult, step.parseResult);
params[step.parseResult.storeAs] = extracted;
storedAs = step.parseResult.storeAs;
} catch (err) {
console.error(
`[PlanExecutor] Recovery: failed to extract result at ${stepLabel}: ${
Expand All @@ -360,6 +429,7 @@ export class PlanExecutor {
);
}
}
ledger.push({ order: step.order, tool: step.tool, argsHash, phase: 'recovery', recoveryCondition: conditionKey, status: isEmptyResult(mcpResult) ? 'empty_result' : 'success', durationMs: Date.now() - recoveryStartedAt, storedAs });
}

return { stepsExecuted, params };
Expand Down
36 changes: 36 additions & 0 deletions src/types/plan-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,40 @@ export interface PlanRegistryData {
updatedAt: number;
}


/** Per-step ledger entry emitted by PlanExecutor for recovery/resume diagnostics. */
export interface PlanStepExecutionRecord {
/** Original plan step order, or recovery step order within a handler. */
order: number;
/** Tool executed for this step. */
tool: string;
/** Stable hash of substituted arguments; never stores raw args. */
argsHash: string;
/** Whether this entry came from the main plan or a recovery handler. */
phase: 'main' | 'recovery';
/** Recovery condition when phase === 'recovery'. */
recoveryCondition?: string;
/** Step outcome. */
status: 'success' | 'failed' | 'empty_result' | 'skipped';
/** Wall-clock time spent in this step. */
durationMs: number;
/** Optional concise reason for failure/empty/skipped status. */
reason?: string;
/** Stored output variable name when parseResult.storeAs was applied. */
storedAs?: string;
}

/** Known-good prefix / invalidated frontier summary for a plan run. */
export interface PlanExecutionLedger {
steps: PlanStepExecutionRecord[];
/** Count of contiguous successful main-plan steps from the start. */
knownGoodPrefixLength: number;
/** First main-plan step that invalidated forward progress, if any. */
frontierStepOrder?: number;
/** Human-readable reason for the frontier. */
invalidationReason?: string;
}

/** Result of plan execution */
export interface PlanExecutionResult {
/** Whether the plan executed successfully */
Expand All @@ -125,4 +159,6 @@ export interface PlanExecutionResult {
stepsExecuted: number;
/** Total steps in plan */
totalSteps: number;
/** Per-step recovery metadata for known-good prefix/frontier diagnostics. */
ledger?: PlanExecutionLedger;
}
Loading
Loading