Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 180 additions & 64 deletions src/orchestration/plan-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import { MCPResult, ToolHandler } from '../types/mcp';
import {
CompiledPlan,
CompiledStep,
PlanErrorHandler,
PlanExecutionResult,
} from '../types/plan-cache';
import { withTimeout } from '../utils/with-timeout';
CompiledStep,
PlanErrorHandler,
PlanExecutionOptions,
PlanExecutionResult,
PlanRecoveryAttempt,
} from '../types/plan-cache';
import { rankRecoveryCandidates } from '../recovery';
import { withTimeout } from '../utils/with-timeout';

/**
* Recursively substitute ${varName} templates in a value using the params map.
Expand Down Expand Up @@ -137,13 +140,15 @@ export class PlanExecutor {
this.toolResolver = toolResolver;
}

async execute(
plan: CompiledPlan,
sessionId: string,
runtimeParams: Record<string, unknown>
): Promise<PlanExecutionResult> {
const startTime = Date.now();
let stepsExecuted = 0;
async execute(
plan: CompiledPlan,
sessionId: string,
runtimeParams: Record<string, unknown>,
options: PlanExecutionOptions = {}
): Promise<PlanExecutionResult> {
const startTime = Date.now();
let stepsExecuted = 0;
const recoveryAttempts: PlanRecoveryAttempt[] = [];

// 1. Build params map: plan defaults first, runtime overrides on top
const params: Record<string, unknown> = {};
Expand All @@ -154,14 +159,25 @@ export class PlanExecutor {
}
Object.assign(params, runtimeParams);

const failure = (error: string): PlanExecutionResult => ({
success: false,
planId: plan.id,
error,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
});
const withRecovery = <T extends PlanExecutionResult>(result: T): T => {
if (options.boundedRecovery?.enabled) {
result.recovery = {
enabled: true,
attempts: recoveryAttempts,
exhausted: countExecutedRecoveryAttempts(recoveryAttempts) >= (options.boundedRecovery.maxToolCalls ?? 2),
};
}
return result;
};

const failure = (error: string): PlanExecutionResult => withRecovery({
success: false,
planId: plan.id,
error,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
});

// 2. Execute each step sequentially
for (const step of plan.steps) {
Expand Down Expand Up @@ -200,14 +216,18 @@ export class PlanExecutor {
params,
stepsExecuted
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
// Merge any params updates from recovery into our params
Object.assign(params, recovered.params);
continue;
}

return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
// Merge any params updates from recovery into our params
Object.assign(params, recovered.params);
continue;
}

const bounded = await this.tryBoundedRecovery(step, errMsg, sessionId, params, options, recoveryAttempts);
stepsExecuted += bounded.stepsExecuted;
if (bounded.recovered) continue;

return failure(`Step ${step.order} (${step.tool}) failed: ${errMsg}`);
}

// d. Check for error result
Expand All @@ -223,13 +243,17 @@ export class PlanExecutor {
params,
stepsExecuted
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Object.assign(params, recovered.params);
continue;
}

return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Object.assign(params, recovered.params);
continue;
}

const bounded = await this.tryBoundedRecovery(step, errMsg, sessionId, params, options, recoveryAttempts);
stepsExecuted += bounded.stepsExecuted;
if (bounded.recovered) continue;

return failure(`Step ${step.order} (${step.tool}) returned error: ${errMsg}`);
}

// e. Check for empty result (before storing) — may trigger empty_result handler
Expand All @@ -242,13 +266,16 @@ export class PlanExecutor {
params,
stepsExecuted
);
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Object.assign(params, recovered.params);
continue;
}
// No handler for empty — treat as non-fatal, just skip storing
}
if (recovered !== null) {
stepsExecuted = recovered.stepsExecuted;
Object.assign(params, recovered.params);
continue;
}
const bounded = await this.tryBoundedRecovery(step, 'empty result', sessionId, params, options, recoveryAttempts);
stepsExecuted += bounded.stepsExecuted;
if (bounded.recovered) continue;
// No handler for empty — treat as non-fatal, just skip storing
}

// f. Parse and store result if requested
if (step.parseResult && step.parseResult.storeAs) {
Expand All @@ -270,29 +297,103 @@ export class PlanExecutor {
const criteriaError = validateSuccessCriteria(plan.successCriteria, params);
if (criteriaError) {
console.error(`[PlanExecutor] Success criteria failed for plan=${plan.id}: ${criteriaError}`);
return {
success: false,
planId: plan.id,
error: `Success criteria not met: ${criteriaError}`,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
};
return withRecovery({
success: false,
planId: plan.id,
error: `Success criteria not met: ${criteriaError}`,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
});
}

// 4. Return success with all collected params as data
return {
success: true,
planId: plan.id,
data: params,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
};
}

/**
* Attempt to find and run a recovery handler for a given condition.
return withRecovery({
success: true,
planId: plan.id,
data: params,
durationMs: Date.now() - startTime,
stepsExecuted,
totalSteps: plan.steps.length,
});
}

private async tryBoundedRecovery(
failedStep: CompiledStep,
errorText: string,
sessionId: string,
params: Record<string, unknown>,
options: PlanExecutionOptions,
recoveryAttempts: PlanRecoveryAttempt[],
): Promise<{ recovered: boolean; stepsExecuted: number }> {
const config = options.boundedRecovery;
if (!config?.enabled) return { recovered: false, stepsExecuted: 0 };

const maxToolCalls = Math.max(0, config.maxToolCalls ?? 2);
if (countExecutedRecoveryAttempts(recoveryAttempts) >= maxToolCalls) {
return { recovered: false, stepsExecuted: 0 };
}

const candidates = rankRecoveryCandidates({
toolName: failedStep.tool,
resultText: errorText,
isError: true,
recentCalls: [{ toolName: failedStep.tool, result: 'error', error: errorText }],
maxCandidates: config.maxCandidates ?? 3,
});

let executed = 0;
for (const candidate of candidates) {
if (countExecutedRecoveryAttempts(recoveryAttempts) >= maxToolCalls) break;
if (candidate.risk !== 'read_only' || candidate.blockedReason) {
recoveryAttempts.push({
tool: candidate.tool,
status: 'blocked',
reason: candidate.blockedReason ?? `risk ${candidate.risk} is not allowed for bounded recovery`,
});
continue;
}

const handler = this.toolResolver(candidate.tool);
if (!handler) {
recoveryAttempts.push({ tool: candidate.tool, status: 'failed', reason: 'tool handler not found' });
continue;
}

const args = buildSafeRecoveryArgs(candidate.tool, params);
if (!args) {
recoveryAttempts.push({ tool: candidate.tool, status: 'blocked', reason: 'no safe argument template available' });
continue;
}

try {
const result = await withTimeout(
handler(sessionId, args),
config.perCandidateTimeoutMs ?? 5000,
`bounded recovery tool=${candidate.tool} for step=${failedStep.order}`,
);
executed++;
if (!result.isError && !isEmptyResult(result)) {
recoveryAttempts.push({ tool: candidate.tool, status: 'success', reason: candidate.reason });
return { recovered: true, stepsExecuted: executed };
}
recoveryAttempts.push({ tool: candidate.tool, status: 'failed', reason: 'candidate returned empty or error result' });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
recoveryAttempts.push({
tool: candidate.tool,
status: /timed out/i.test(message) ? 'timeout' : 'failed',
reason: candidate.reason,
error: message,
});
}
}

return { recovered: false, stepsExecuted: executed };
}

/**
* Attempt to find and run a recovery handler for a given condition.
* Returns updated stepsExecuted + params snapshot on success, null if no handler.
*/
private async tryRecovery(
Expand Down Expand Up @@ -363,5 +464,20 @@ export class PlanExecutor {
}

return { stepsExecuted, params };
}
}
}
}

function countExecutedRecoveryAttempts(attempts: PlanRecoveryAttempt[]): number {
return attempts.filter((attempt) => attempt.status !== 'blocked').length;
}

function buildSafeRecoveryArgs(tool: string, params: Record<string, unknown>): Record<string, unknown> | null {
const tabId = typeof params.tabId === 'string' ? params.tabId : undefined;
switch (tool) {
case 'read_page':
case 'tabs_context':
return tabId ? { tabId } : {};
default:
return null;
}
}
12 changes: 11 additions & 1 deletion src/tools/orchestration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,10 @@ const executePlanDefinition: MCPToolDefinition = {
type: 'string',
description: 'Tab ID to execute the plan against',
},
boundedRecovery: {
type: 'boolean',
description: 'Opt-in: try a small, safety-gated read-only recovery search when a plan step fails',
},
params: {
type: 'object',
description: 'Runtime params merged with plan defaults',
Expand All @@ -684,6 +688,7 @@ const executePlanHandler: ToolHandler = async (
const planId = args.planId as string;
const tabId = args.tabId as string;
const runtimeParams = (args.params as Record<string, unknown>) || {};
const boundedRecovery = args.boundedRecovery === true;

if (!planId || !tabId) {
return {
Expand Down Expand Up @@ -749,7 +754,11 @@ const executePlanHandler: ToolHandler = async (

// Execute the plan
const mergedParams = { tabId, ...runtimeParams };
const result = await executor.execute(plan, sessionId, mergedParams);
const result = await executor.execute(plan, sessionId, mergedParams, {
boundedRecovery: boundedRecovery
? { enabled: true, maxCandidates: 3, maxToolCalls: 2, perCandidateTimeoutMs: 5000 }
: undefined,
});

// Update stats
registry.updateStats(planId, result.success, result.durationMs);
Expand All @@ -765,6 +774,7 @@ const executePlanHandler: ToolHandler = async (
durationMs: result.durationMs,
data: result.data,
error: result.error,
recovery: result.recovery,
message: result.success
? `Plan "${planId}" executed successfully in ${result.durationMs}ms (${result.stepsExecuted}/${result.totalSteps} steps)`
: `Plan "${planId}" failed: ${result.error}. Consider manual execution.`,
Expand Down
35 changes: 30 additions & 5 deletions src/types/plan-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,31 @@ export interface PlanRegistryData {
updatedAt: number;
}

/** Result of plan execution */
export interface PlanExecutionResult {
/** Bounded recovery telemetry for plan execution. */
export interface PlanRecoveryAttempt {
tool: string;
status: 'success' | 'failed' | 'blocked' | 'timeout';
reason: string;
error?: string;
}

export interface PlanRecoverySummary {
enabled: boolean;
attempts: PlanRecoveryAttempt[];
exhausted: boolean;
}

export interface PlanExecutionOptions {
boundedRecovery?: {
enabled: boolean;
maxCandidates?: number;
maxToolCalls?: number;
perCandidateTimeoutMs?: number;
};
}

/** Result of plan execution */
export interface PlanExecutionResult {
/** Whether the plan executed successfully */
success: boolean;
/** Plan ID that was executed */
Expand All @@ -123,6 +146,8 @@ export interface PlanExecutionResult {
durationMs: number;
/** Number of steps executed */
stepsExecuted: number;
/** Total steps in plan */
totalSteps: number;
}
/** Total steps in plan */
totalSteps: number;
/** Optional bounded recovery summary when opt-in recovery is enabled. */
recovery?: PlanRecoverySummary;
}
Loading