From e13fa4b95ca8ce1de199aac8ad467b62e2420b17 Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Wed, 13 May 2026 00:42:30 +0900 Subject: [PATCH 1/2] Guard repetitive tasks from premature completion Add an opt-in bulk progress contract that records item progress, cursor state, stop-condition satisfaction, and machine-readable completion guard results. TaskRun completion now rejects incomplete linked contracts unless the caller forces completion with an explicit reason. Constraint: Stack on #1039 TaskRun lifecycle and avoid duplicating async ledger, scheduler, dashboard, or progress notification work. Rejected: Changing default completion behavior for all TaskRuns | short non-bulk tasks must remain unaffected. Confidence: high Scope-risk: moderate Directive: Keep future workflow/crawl integrations opt-in by attaching contracts rather than embedding new schedulers. Tested: npm test -- --runTestsByPath tests/core/progress-contract/storage.test.ts tests/tools/bulk-progress-tools.test.ts tests/core/task-run/storage.test.ts tests/tools/task-run-tools.test.ts --runInBand Tested: npm run build Tested: npm run lint:changed Co-authored-by: OmX --- src/core/progress-contract/index.ts | 2 + src/core/progress-contract/storage.ts | 264 +++++++++++++++++++ src/core/progress-contract/types.ts | 60 +++++ src/core/task-run/storage.ts | 4 + src/core/task-run/types.ts | 1 + src/hints/rules/error-recovery.ts | 4 + src/tools/bulk-progress.ts | 137 ++++++++++ src/tools/index.ts | 9 +- src/tools/task-run.ts | 38 +++ tests/core/progress-contract/storage.test.ts | 100 +++++++ tests/tools/bulk-progress-tools.test.ts | 72 +++++ 11 files changed, 686 insertions(+), 5 deletions(-) create mode 100644 src/core/progress-contract/index.ts create mode 100644 src/core/progress-contract/storage.ts create mode 100644 src/core/progress-contract/types.ts create mode 100644 src/tools/bulk-progress.ts create mode 100644 tests/core/progress-contract/storage.test.ts create mode 100644 tests/tools/bulk-progress-tools.test.ts diff --git a/src/core/progress-contract/index.ts b/src/core/progress-contract/index.ts new file mode 100644 index 000000000..32c197aa0 --- /dev/null +++ b/src/core/progress-contract/index.ts @@ -0,0 +1,2 @@ +export * from './types'; +export * from './storage'; diff --git a/src/core/progress-contract/storage.ts b/src/core/progress-contract/storage.ts new file mode 100644 index 000000000..67780fd72 --- /dev/null +++ b/src/core/progress-contract/storage.ts @@ -0,0 +1,264 @@ +import * as crypto from 'crypto'; +import * as os from 'os'; +import * as path from 'path'; +import { writeFileAtomicSafe, readFileSafe } from '../../utils/atomic-file'; +import { redactValue } from '../trace/redactor'; +import { TaskRunStore } from '../task-run'; +import { + BulkProgressContract, + BulkProgressFailedItem, + BulkProgressScope, + CompletionGuardResult, + StartBulkProgressInput, + UpdateBulkProgressInput, +} from './types'; + +const ID_BYTES = 8; +const MAX_ITEMS = 500; +const MAX_TEXT = 2048; +const MAX_STOP = 2048; + +export interface BulkProgressStoreOptions { + rootDir?: string; + taskRunStore?: TaskRunStore; + now?: () => number; +} + +export class BulkProgressNotFoundError extends Error { + code = 'bulk_progress_not_found'; +} + +export class BulkProgressInputError extends Error { + code = 'invalid_bulk_progress_input'; +} + +export class BulkProgressStore { + readonly rootDir: string; + private readonly taskRunStore: TaskRunStore; + private readonly now: () => number; + + constructor(opts: BulkProgressStoreOptions = {}) { + const openchromeHome = process.env.OPENCHROME_HOME || path.join(os.homedir(), '.openchrome'); + this.rootDir = opts.rootDir || path.join(openchromeHome, 'progress-contracts'); + this.taskRunStore = opts.taskRunStore || new TaskRunStore(); + this.now = opts.now || (() => Date.now()); + } + + async start(input: StartBulkProgressInput): Promise { + const ts = this.now(); + const stopCondition = optionalText(input.stop_condition, MAX_STOP); + const itemKey = optionalText(input.item_key, MAX_TEXT); + if (stopCondition === undefined) throw new BulkProgressInputError('stop_condition is required'); + if (itemKey === undefined) throw new BulkProgressInputError('item_key is required'); + + const merged = mergeItems([], [], input.completed, input.failed); + const contract: BulkProgressContract = pruneUndefined({ + contract_id: this.createId(`${input.run_id || ''}\0${stopCondition}\0${itemKey}`, ts), + run_id: optionalText(input.run_id, MAX_TEXT), + scope: normalizeScope(input.scope), + expected_total: optionalNonNegativeInt(input.expected_total), + min_completed: optionalNonNegativeInt(input.min_completed), + stop_condition: stopCondition, + stop_satisfied: input.stop_satisfied === true, + item_key: itemKey, + cursor: optionalText(input.cursor, MAX_TEXT), + completed: merged.completed, + failed: merged.failed, + completed_truncated: merged.completedTruncated, + failed_truncated: merged.failedTruncated, + last_progress_at: ts, + created_at: ts, + updated_at: ts, + }); + await this.write(contract); + if (contract.run_id !== undefined) { + await this.taskRunStore.update(contract.run_id, { + bulk_contract_id: contract.contract_id, + current_cursor: contract.cursor, + completed_items: contract.completed, + failed_items: contract.failed.map(item => ({ item: item.item, reason: item.reason })), + }); + } + return contract; + } + + async get(contractId: string): Promise { + const result = await readFileSafe(this.contractPath(contractId)); + if (result.success === false || result.data === undefined) { + throw new BulkProgressNotFoundError(`Bulk progress contract ${contractId} not found`); + } + return result.data; + } + + async update(contractId: string, input: UpdateBulkProgressInput): Promise { + const current = await this.get(contractId); + const ts = this.now(); + const merged = mergeItems(current.completed, current.failed, input.completed, input.failed); + const contract: BulkProgressContract = pruneUndefined({ + ...current, + expected_total: input.expected_total === undefined ? current.expected_total : optionalNonNegativeInt(input.expected_total), + min_completed: input.min_completed === undefined ? current.min_completed : optionalNonNegativeInt(input.min_completed), + stop_satisfied: input.stop_satisfied === undefined ? current.stop_satisfied : input.stop_satisfied === true, + cursor: input.cursor === undefined ? current.cursor : optionalText(input.cursor, MAX_TEXT), + completed: merged.completed, + failed: merged.failed, + completed_truncated: merged.completedTruncated || current.completed_truncated, + failed_truncated: merged.failedTruncated || current.failed_truncated, + last_progress_at: (input.completed || input.failed || input.cursor !== undefined || input.stop_satisfied !== undefined) ? ts : current.last_progress_at, + updated_at: ts, + }); + await this.write(contract); + if (contract.run_id !== undefined) { + await this.taskRunStore.update(contract.run_id, { + current_cursor: contract.cursor, + completed_items: contract.completed, + failed_items: contract.failed.map(item => ({ item: item.item, reason: item.reason })), + }); + } + return contract; + } + + checkCompletionGuard(contract: BulkProgressContract): CompletionGuardResult { + const completedCount = contract.completed.length; + const failedCount = contract.failed.length; + if (contract.expected_total !== undefined) { + const observed = completedCount + failedCount; + if (observed < contract.expected_total) { + return { + allowed: false, + reason: `Bulk progress incomplete: ${observed}/${contract.expected_total} ${contract.item_key} items observed.`, + missing_count: contract.expected_total - observed, + failed_count: failedCount, + completed_count: completedCount, + expected_total: contract.expected_total, + min_completed: contract.min_completed, + stop_satisfied: contract.stop_satisfied, + suggested_next_action: `Continue processing ${contract.item_key} items from cursor ${contract.cursor || '(none)'}.`, + }; + } + } + if (contract.min_completed !== undefined && completedCount < contract.min_completed) { + return { + allowed: false, + reason: `Bulk progress incomplete: min_completed ${contract.min_completed} not met.`, + missing_count: contract.min_completed - completedCount, + failed_count: failedCount, + completed_count: completedCount, + expected_total: contract.expected_total, + min_completed: contract.min_completed, + stop_satisfied: contract.stop_satisfied, + suggested_next_action: `Complete at least ${contract.min_completed - completedCount} more ${contract.item_key} item(s), or use force with a reason if the task must end early.`, + }; + } + if (contract.expected_total === undefined && contract.stop_satisfied === false) { + return { + allowed: false, + reason: `Bulk progress incomplete: stop condition is not satisfied (${contract.stop_condition}).`, + failed_count: failedCount, + completed_count: completedCount, + min_completed: contract.min_completed, + stop_satisfied: false, + suggested_next_action: `Continue until stop condition is satisfied: ${contract.stop_condition}.`, + }; + } + return { + allowed: true, + failed_count: failedCount, + completed_count: completedCount, + expected_total: contract.expected_total, + min_completed: contract.min_completed, + stop_satisfied: contract.stop_satisfied, + }; + } + + async check(contractId: string): Promise { + return this.checkCompletionGuard(await this.get(contractId)); + } + + private createId(seed: string, ts: number): string { + return crypto.createHash('sha256') + .update(seed) + .update('\0') + .update(String(ts)) + .update('\0') + .update(crypto.randomBytes(ID_BYTES)) + .digest('hex') + .slice(0, 16); + } + + private async write(contract: BulkProgressContract): Promise { + await writeFileAtomicSafe(this.contractPath(contract.contract_id), contract); + } + + private contractPath(contractId: string): string { + assertSafeId(contractId); + return path.join(this.rootDir, `${contractId}.json`); + } +} + +function mergeItems( + currentCompleted: string[], + currentFailed: BulkProgressFailedItem[], + completedInput?: string[], + failedInput?: BulkProgressFailedItem[], +): { completed: string[]; failed: BulkProgressFailedItem[]; completedTruncated: number; failedTruncated: number } { + const completed = Array.from(new Set([...currentCompleted, ...sanitizeCompleted(completedInput)])); + const failedByItem = new Map(); + for (const item of [...currentFailed, ...sanitizeFailed(failedInput)]) { + failedByItem.set(item.item, item); + } + const failed = Array.from(failedByItem.values()); + return { + completed: completed.slice(-MAX_ITEMS), + failed: failed.slice(-MAX_ITEMS), + completedTruncated: Math.max(0, completed.length - MAX_ITEMS), + failedTruncated: Math.max(0, failed.length - MAX_ITEMS), + }; +} + +function sanitizeCompleted(values: unknown): string[] { + if (Array.isArray(values) === false) return []; + return values.map(item => optionalText(item, MAX_TEXT)).filter((item): item is string => item !== undefined); +} + +function sanitizeFailed(values: unknown): BulkProgressFailedItem[] { + if (Array.isArray(values) === false) return []; + return values + .filter((item): item is BulkProgressFailedItem => Boolean(item) && typeof item === 'object' && typeof (item as BulkProgressFailedItem).item === 'string') + .map(item => pruneUndefined({ + item: optionalText(item.item, MAX_TEXT) || '', + reason: optionalText(item.reason, MAX_TEXT) || '', + retryable: typeof item.retryable === 'boolean' ? item.retryable : undefined, + })) + .filter(item => item.item.length > 0); +} + +function normalizeScope(scope: unknown): BulkProgressScope { + return scope === 'workflow' || scope === 'batch' || scope === 'crawl' ? scope : 'task_run'; +} + +function optionalText(value: unknown, max: number): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + if (trimmed.length === 0) return undefined; + const redacted = String(redactValue(trimmed)); + return redacted.length > max ? redacted.slice(0, max) : redacted; +} + +function optionalNonNegativeInt(value: unknown): number | undefined { + if (typeof value !== 'number' || Number.isFinite(value) === false) return undefined; + return Math.max(0, Math.floor(value)); +} + +function assertSafeId(id: string): void { + if (/^[a-f0-9]{16}$/i.test(id) === false) { + throw new Error(`Invalid bulk progress contract id: ${id}`); + } +} + +function pruneUndefined>(obj: T): T { + for (const key of Object.keys(obj)) { + if (obj[key] === undefined) delete obj[key]; + } + return obj; +} diff --git a/src/core/progress-contract/types.ts b/src/core/progress-contract/types.ts new file mode 100644 index 000000000..caae3c111 --- /dev/null +++ b/src/core/progress-contract/types.ts @@ -0,0 +1,60 @@ +export type BulkProgressScope = 'task_run' | 'workflow' | 'batch' | 'crawl'; + +export interface BulkProgressFailedItem { + item: string; + reason: string; + retryable?: boolean; +} + +export interface BulkProgressContract { + contract_id: string; + run_id?: string; + scope: BulkProgressScope; + expected_total?: number; + min_completed?: number; + stop_condition: string; + stop_satisfied: boolean; + item_key: string; + cursor?: string; + completed: string[]; + failed: BulkProgressFailedItem[]; + completed_truncated?: number; + failed_truncated?: number; + last_progress_at: number; + created_at: number; + updated_at: number; +} + +export interface CompletionGuardResult { + allowed: boolean; + reason?: string; + missing_count?: number; + failed_count?: number; + completed_count: number; + expected_total?: number; + min_completed?: number; + stop_satisfied?: boolean; + suggested_next_action?: string; +} + +export interface StartBulkProgressInput { + run_id?: string; + scope?: BulkProgressScope; + expected_total?: number; + min_completed?: number; + stop_condition: string; + stop_satisfied?: boolean; + item_key: string; + cursor?: string; + completed?: string[]; + failed?: BulkProgressFailedItem[]; +} + +export interface UpdateBulkProgressInput { + cursor?: string; + completed?: string[]; + failed?: BulkProgressFailedItem[]; + stop_satisfied?: boolean; + expected_total?: number; + min_completed?: number; +} diff --git a/src/core/task-run/storage.ts b/src/core/task-run/storage.ts index e5ef0d13e..bc5c376e9 100644 --- a/src/core/task-run/storage.ts +++ b/src/core/task-run/storage.ts @@ -34,6 +34,7 @@ export interface StartTaskRunInput { success_criteria?: string[]; session_id?: string; workflow_id?: string; + bulk_contract_id?: string; ledger_task_ids?: string[]; } @@ -47,6 +48,7 @@ export interface UpdateTaskRunInput { last_evidence?: EvidencePointer[]; ledger_task_ids?: string[]; workflow_id?: string; + bulk_contract_id?: string; } export interface NeedsHelpInput { @@ -91,6 +93,7 @@ export class TaskRunStore { success_criteria: sanitizeStringArray(input.success_criteria, MAX_CRITERIA, MAX_CRITERION_CHARS), session_id: optionalString(input.session_id), workflow_id: optionalString(input.workflow_id), + bulk_contract_id: optionalString(input.bulk_contract_id), ledger_task_ids: uniqueStrings(input.ledger_task_ids), created_at: ts, updated_at: ts, @@ -151,6 +154,7 @@ export class TaskRunStore { ...current, status: nextStatus, workflow_id: optionalString(input.workflow_id) || current.workflow_id, + bulk_contract_id: optionalString(input.bulk_contract_id) || current.bulk_contract_id, ledger_task_ids: uniqueStrings([...(current.ledger_task_ids || []), ...(input.ledger_task_ids || [])]), progress_summary: input.progress_summary !== undefined ? limit(scrub(input.progress_summary), MAX_SUMMARY_CHARS) diff --git a/src/core/task-run/types.ts b/src/core/task-run/types.ts index b15cfb640..402f7edda 100644 --- a/src/core/task-run/types.ts +++ b/src/core/task-run/types.ts @@ -26,6 +26,7 @@ export interface TaskRunMeta { success_criteria?: string[]; session_id?: string; workflow_id?: string; + bulk_contract_id?: string; ledger_task_ids: string[]; progress_summary?: string; completed_items?: string[]; diff --git a/src/hints/rules/error-recovery.ts b/src/hints/rules/error-recovery.ts index 13f5b33d2..2103ad277 100644 --- a/src/hints/rules/error-recovery.ts +++ b/src/hints/rules/error-recovery.ts @@ -14,6 +14,10 @@ import type { HintRule } from '../hint-engine'; const UID_EVICTED_PREFIX = /^\s*uid_evicted\b/i; const patterns: Array<{ test: RegExp; hint: string }> = [ + { + test: /bulk_completion_guard_failed|Bulk progress incomplete/i, + hint: 'Hint: Bulk progress is incomplete. Use oc_bulk_progress_update to record remaining completed or failed items, then retry oc_task_run_complete.', + }, { test: /ref\b.+not found|invalid ref|stale ref/i, hint: 'Hint: Refs expire after page changes. Use read_page or find for fresh refs.', diff --git a/src/tools/bulk-progress.ts b/src/tools/bulk-progress.ts new file mode 100644 index 000000000..82e951187 --- /dev/null +++ b/src/tools/bulk-progress.ts @@ -0,0 +1,137 @@ +import { MCPServer } from '../mcp-server'; +import { MCPResult, MCPToolDefinition, ToolHandler } from '../types/mcp'; +import { + BulkProgressInputError, + BulkProgressNotFoundError, + BulkProgressStore, + StartBulkProgressInput, + UpdateBulkProgressInput, +} from '../core/progress-contract'; + +export const bulkProgressStore = new BulkProgressStore(); + +type Json = Record; + +function jsonResult(value: Json): MCPResult { + return { + structuredContent: value, + content: [{ type: 'text', text: JSON.stringify(value, null, 2) }], + }; +} + +function errorResult(error: unknown): MCPResult { + const code = error instanceof BulkProgressNotFoundError || error instanceof BulkProgressInputError + ? error.code + : 'bulk_progress_error'; + const message = error instanceof Error ? error.message : String(error); + return { + isError: true, + structuredContent: { error: { code, message } }, + content: [{ type: 'text', text: JSON.stringify({ error: { code, message } }, null, 2) }], + }; +} + +const failedSchema = { + type: 'array', + items: { + type: 'object', + properties: { + item: { type: 'string' }, + reason: { type: 'string' }, + retryable: { type: 'boolean' }, + }, + required: ['item', 'reason'], + }, +}; + +const startDefinition: MCPToolDefinition = { + name: 'oc_bulk_progress_start', + description: 'Create an opt-in bulk progress contract for repetitive TaskRun/workflow/batch/crawl work. Use it before long item lists to prevent premature completion.', + inputSchema: { + type: 'object', + properties: { + run_id: { type: 'string' }, + scope: { type: 'string', enum: ['task_run', 'workflow', 'batch', 'crawl'] }, + expected_total: { type: 'number' }, + min_completed: { type: 'number' }, + stop_condition: { type: 'string' }, + stop_satisfied: { type: 'boolean' }, + item_key: { type: 'string' }, + cursor: { type: 'string' }, + completed: { type: 'array', items: { type: 'string' } }, + failed: failedSchema, + }, + required: ['stop_condition', 'item_key'], + }, +}; + +const updateDefinition: MCPToolDefinition = { + name: 'oc_bulk_progress_update', + description: 'Record completed/failed item ids, cursor movement, stop satisfaction, or threshold changes on a bulk progress contract.', + inputSchema: { + type: 'object', + properties: { + contract_id: { type: 'string' }, + cursor: { type: 'string' }, + completed: { type: 'array', items: { type: 'string' } }, + failed: failedSchema, + stop_satisfied: { type: 'boolean' }, + expected_total: { type: 'number' }, + min_completed: { type: 'number' }, + }, + required: ['contract_id'], + }, +}; + +const checkDefinition: MCPToolDefinition = { + name: 'oc_bulk_progress_check', + description: 'Evaluate whether a bulk progress contract currently allows task completion and return machine-readable recovery guidance.', + inputSchema: { + type: 'object', + properties: { + contract_id: { type: 'string' }, + }, + required: ['contract_id'], + }, +}; + +const startHandler: ToolHandler = async (_sessionId, args) => { + try { + const contract = await bulkProgressStore.start(args as unknown as StartBulkProgressInput); + return jsonResult({ bulk_progress_contract: contract, completion_guard: bulkProgressStore.checkCompletionGuard(contract) }); + } catch (error) { + return errorResult(error); + } +}; + +const updateHandler: ToolHandler = async (_sessionId, args) => { + try { + const contractId = String(args.contract_id || ''); + const contract = await bulkProgressStore.update(contractId, args as UpdateBulkProgressInput); + return jsonResult({ bulk_progress_contract: contract, completion_guard: bulkProgressStore.checkCompletionGuard(contract) }); + } catch (error) { + return errorResult(error); + } +}; + +const checkHandler: ToolHandler = async (_sessionId, args) => { + try { + const contractId = String(args.contract_id || ''); + const contract = await bulkProgressStore.get(contractId); + return jsonResult({ bulk_progress_contract: contract, completion_guard: bulkProgressStore.checkCompletionGuard(contract) }); + } catch (error) { + return errorResult(error); + } +}; + +export function registerBulkProgressTools(server: MCPServer): void { + server.registerTool('oc_bulk_progress_start', startHandler, startDefinition); + server.registerTool('oc_bulk_progress_update', updateHandler, updateDefinition); + server.registerTool('oc_bulk_progress_check', checkHandler, checkDefinition); +} + +export const bulkProgressToolHandlers = { + startHandler, + updateHandler, + checkHandler, +}; diff --git a/src/tools/index.ts b/src/tools/index.ts index 1e7b9e781..8a7adaf81 100644 --- a/src/tools/index.ts +++ b/src/tools/index.ts @@ -149,10 +149,10 @@ import { registerHandoffTools } from './handoff'; import { registerOcNormalizeActionTool } from './oc-normalize-action'; import { isRunHarnessEnabled } from '../run-harness/flags'; import { registerRunHarnessTools } from '../run-harness/tools'; -// Goal-level TaskRun lifecycle (#1039) -import { registerTaskRunTools } from './task-run'; // Read-only progress diagnostics (#1060). import { registerOcProgressStatusTool } from './oc-progress-status'; +// Bulk progress contracts (#1041) — opt-in guard for repetitive tasks. +import { registerBulkProgressTools } from './bulk-progress'; export function registerAllTools(server: MCPServer): void { // Core browser tools @@ -360,9 +360,8 @@ export function registerAllTools(server: MCPServer): void { if (isRunHarnessEnabled()) { registerRunHarnessTools(server); } - - // Goal-level TaskRun lifecycle (#1039) — opt-in, no effect on existing tools. - registerTaskRunTools(server); + // Bulk progress contracts (#1041) — opt-in guard for repetitive tasks. + registerBulkProgressTools(server); console.error(`[Tools] Registered ${server.getToolNames().length} tools`); } diff --git a/src/tools/task-run.ts b/src/tools/task-run.ts index b331ccbc7..a1afe56c6 100644 --- a/src/tools/task-run.ts +++ b/src/tools/task-run.ts @@ -10,6 +10,7 @@ import { TaskRunTransitionError, UpdateTaskRunInput, } from '../core/task-run'; +import { bulkProgressStore } from './bulk-progress'; const store = new TaskRunStore(); @@ -69,6 +70,7 @@ const startDefinition: MCPToolDefinition = { success_criteria: { type: 'array', items: { type: 'string' }, description: 'Optional concrete success criteria.' }, session_id: { type: 'string', description: 'Optional OpenChrome session id to associate.' }, workflow_id: { type: 'string', description: 'Optional workflow id to associate.' }, + bulk_contract_id: { type: 'string', description: 'Optional active bulk progress contract id.' }, ledger_task_ids: { type: 'array', items: { type: 'string' }, description: 'Optional #855 async ledger task ids to link when available.' }, }, required: ['goal'], @@ -91,6 +93,7 @@ const updateDefinition: MCPToolDefinition = { last_evidence: evidenceSchema, ledger_task_ids: { type: 'array', items: { type: 'string' } }, workflow_id: { type: 'string' }, + bulk_contract_id: { type: 'string' }, }, required: ['run_id'], }, @@ -139,6 +142,9 @@ const completeDefinition: MCPToolDefinition = { completed_items: { type: 'array', items: { type: 'string' } }, failed_items: failedItemsSchema, last_evidence: evidenceSchema, + bulk_contract_id: { type: 'string', description: 'Override the TaskRun active bulk progress contract for this completion attempt.' }, + force: { type: 'boolean', description: 'Allow completion despite an incomplete bulk contract only when force_reason is supplied.' }, + force_reason: { type: 'string' }, }, required: ['run_id'], }, @@ -217,6 +223,38 @@ const needsHelpHandler: ToolHandler = async (_sessionId, args) => { const completeHandler: ToolHandler = async (_sessionId, args) => { try { const runId = String(args.run_id || ''); + const current = await store.get(runId); + const contractId = typeof args.bulk_contract_id === 'string' && args.bulk_contract_id.trim() + ? args.bulk_contract_id + : current.bulk_contract_id; + if (contractId) { + const guard = await bulkProgressStore.check(contractId); + const forced = args.force === true && typeof args.force_reason === 'string' && args.force_reason.trim().length > 0; + if (guard.allowed === false && forced === false) { + const value = { + error: { + code: 'bulk_completion_guard_failed', + message: guard.reason || 'Bulk progress contract does not allow completion.', + }, + completion_guard: guard, + _hint: 'Hint: Bulk progress is incomplete. Continue from the cursor and record remaining completed or failed items before calling oc_task_run_complete again.', + _hintMeta: { + severity: 'warning', + rule: 'bulk-progress-premature-completion', + fireCount: 1, + suggestion: { + tool: 'oc_bulk_progress_update', + reason: guard.suggested_next_action || 'Record remaining item progress before completion.', + }, + }, + }; + return { + isError: true, + structuredContent: value, + content: [{ type: 'text', text: JSON.stringify(value, null, 2) }], + }; + } + } const meta = await store.complete(runId, args as CompleteInput); return jsonResult({ task_run: meta }); } catch (error) { diff --git a/tests/core/progress-contract/storage.test.ts b/tests/core/progress-contract/storage.test.ts new file mode 100644 index 000000000..b55182fc1 --- /dev/null +++ b/tests/core/progress-contract/storage.test.ts @@ -0,0 +1,100 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { BulkProgressStore } from '../../../src/core/progress-contract'; +import { TaskRunStore } from '../../../src/core/task-run'; + +describe('BulkProgressStore', () => { + let dir: string; + let now = 1_700_000_000_000; + let taskRuns: TaskRunStore; + let store: BulkProgressStore; + + beforeEach(() => { + dir = fs.mkdtempSync(path.join(os.tmpdir(), 'openchrome-bulk-progress-')); + now = 1_700_000_000_000; + taskRuns = new TaskRunStore({ rootDir: path.join(dir, 'task-runs'), now: () => now++ }); + store = new BulkProgressStore({ + rootDir: path.join(dir, 'progress-contracts'), + taskRunStore: taskRuns, + now: () => now++, + }); + }); + + afterEach(() => { + fs.rmSync(dir, { recursive: true, force: true }); + }); + + it('blocks completion until expected total is observed', async () => { + const contract = await store.start({ + expected_total: 3, + stop_condition: 'processed all input urls', + item_key: 'url', + completed: ['https://example.com'], + }); + + const blocked = store.checkCompletionGuard(contract); + expect(blocked.allowed).toBe(false); + expect(blocked.missing_count).toBe(2); + + const updated = await store.update(contract.contract_id, { + completed: ['https://news.ycombinator.com'], + failed: [{ item: 'https://www.iana.org/domains/reserved', reason: 'timeout', retryable: true }], + }); + const allowed = store.checkCompletionGuard(updated); + expect(allowed.allowed).toBe(true); + expect(allowed.failed_count).toBe(1); + }); + + it('blocks completion when min_completed is unmet', async () => { + const contract = await store.start({ + expected_total: 3, + min_completed: 2, + stop_condition: 'processed all input urls', + item_key: 'url', + completed: ['a'], + failed: [{ item: 'b', reason: 'not found' }, { item: 'c', reason: 'not found' }], + }); + + const guard = store.checkCompletionGuard(contract); + expect(guard.allowed).toBe(false); + expect(guard.reason).toContain('min_completed'); + expect(guard.missing_count).toBe(1); + }); + + it('requires unknown-total stop condition satisfaction', async () => { + const contract = await store.start({ + stop_condition: 'no next page', + item_key: 'page', + completed: ['page-1'], + cursor: 'page-2', + }); + + expect(store.checkCompletionGuard(contract).allowed).toBe(false); + + const stopped = await store.update(contract.contract_id, { stop_satisfied: true }); + expect(store.checkCompletionGuard(stopped).allowed).toBe(true); + }); + + it('links to TaskRun and bounds large item arrays', async () => { + const run = await taskRuns.start({ goal: 'Process many rows' }); + const contract = await store.start({ + run_id: run.run_id, + expected_total: 505, + stop_condition: 'processed all rows', + item_key: 'row', + completed: Array.from({ length: 505 }, (_, i) => `row-${i}`), + failed: Array.from({ length: 503 }, (_, i) => ({ item: `bad-${i}`, reason: 'failed' })), + }); + + expect(contract.completed).toHaveLength(500); + expect(contract.failed).toHaveLength(500); + expect(contract.completed_truncated).toBe(5); + expect(contract.failed_truncated).toBe(3); + + const runAfterLink = await taskRuns.get(run.run_id); + expect(runAfterLink.bulk_contract_id).toBe(contract.contract_id); + expect(runAfterLink.completed_items).toHaveLength(500); + expect(runAfterLink.failed_items).toHaveLength(500); + }); +}); diff --git a/tests/tools/bulk-progress-tools.test.ts b/tests/tools/bulk-progress-tools.test.ts new file mode 100644 index 000000000..515dc432e --- /dev/null +++ b/tests/tools/bulk-progress-tools.test.ts @@ -0,0 +1,72 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { MCPServer } from '../../src/mcp-server'; +import { registerAllTools } from '../../src/tools'; + +describe('Bulk progress tool registration', () => { + it('registers bulk progress tools', () => { + const server = new MCPServer(undefined as any); + registerAllTools(server); + const names = server.getToolNames(); + expect(names).toEqual(expect.arrayContaining([ + 'oc_bulk_progress_start', + 'oc_bulk_progress_update', + 'oc_bulk_progress_check', + ])); + }); +}); + +describe('TaskRun bulk completion guard tool integration', () => { + let dir: string; + let previousHome: string | undefined; + + beforeEach(() => { + dir = fs.mkdtempSync(path.join(os.tmpdir(), 'openchrome-bulk-tools-')); + previousHome = process.env.OPENCHROME_HOME; + process.env.OPENCHROME_HOME = dir; + jest.resetModules(); + }); + + afterEach(() => { + if (previousHome === undefined) { + delete process.env.OPENCHROME_HOME; + } else { + process.env.OPENCHROME_HOME = previousHome; + } + fs.rmSync(dir, { recursive: true, force: true }); + jest.resetModules(); + }); + + it('rejects premature oc_task_run_complete and allows completion after progress is recorded', async () => { + const { taskRunToolHandlers } = require('../../src/tools/task-run') as typeof import('../../src/tools/task-run'); + const { bulkProgressToolHandlers } = require('../../src/tools/bulk-progress') as typeof import('../../src/tools/bulk-progress'); + + const started = await taskRunToolHandlers.startHandler('default', { goal: 'Visit three URLs' }, undefined as any); + const runId = (started.structuredContent?.task_run as any).run_id as string; + + const bulk = await bulkProgressToolHandlers.startHandler('default', { + run_id: runId, + expected_total: 3, + stop_condition: 'processed all input urls', + item_key: 'url', + completed: ['https://example.com'], + }, undefined as any); + const contractId = (bulk.structuredContent?.bulk_progress_contract as any).contract_id as string; + + const blocked = await taskRunToolHandlers.completeHandler('default', { run_id: runId }, undefined as any); + expect(blocked.isError).toBe(true); + expect((blocked.structuredContent?.error as any).code).toBe('bulk_completion_guard_failed'); + expect((blocked.structuredContent?.completion_guard as any).missing_count).toBe(2); + expect((blocked.structuredContent?._hintMeta as any).severity).toBe('warning'); + + await bulkProgressToolHandlers.updateHandler('default', { + contract_id: contractId, + completed: ['https://news.ycombinator.com', 'https://www.iana.org/domains/reserved'], + }, undefined as any); + + const completed = await taskRunToolHandlers.completeHandler('default', { run_id: runId }, undefined as any); + expect(completed.isError).toBeUndefined(); + expect((completed.structuredContent?.task_run as any).status).toBe('COMPLETED'); + }); +}); From fb6cbcb34bdace56612ce900cea28b9faa153b26 Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Wed, 13 May 2026 19:42:07 +0900 Subject: [PATCH 2/2] fix(1116): strip conflict markers from storage.ts --- src/core/task-run/storage.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/task-run/storage.ts b/src/core/task-run/storage.ts index bc5c376e9..0cd296128 100644 --- a/src/core/task-run/storage.ts +++ b/src/core/task-run/storage.ts @@ -332,6 +332,7 @@ export class TaskRunStore { return text.split('\n').filter(Boolean).length + 1; } + private runDir(runId: string): string { assertSafeId(runId); return path.join(this.rootDir, runId);