Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/core/progress-contract/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './types';
export * from './storage';
264 changes: 264 additions & 0 deletions src/core/progress-contract/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import * as crypto from 'crypto';
import * as os from 'os';
import * as path from 'path';
import { writeFileAtomicSafe, readFileSafe } from '../../utils/atomic-file';
import { redactValue } from '../trace/redactor';
import { TaskRunStore } from '../task-run';
import {
BulkProgressContract,
BulkProgressFailedItem,
BulkProgressScope,
CompletionGuardResult,
StartBulkProgressInput,
UpdateBulkProgressInput,
} from './types';

const ID_BYTES = 8;
const MAX_ITEMS = 500;
const MAX_TEXT = 2048;
const MAX_STOP = 2048;

export interface BulkProgressStoreOptions {
rootDir?: string;
taskRunStore?: TaskRunStore;
now?: () => number;
}

export class BulkProgressNotFoundError extends Error {
code = 'bulk_progress_not_found';
}

export class BulkProgressInputError extends Error {
code = 'invalid_bulk_progress_input';
}

export class BulkProgressStore {
readonly rootDir: string;
private readonly taskRunStore: TaskRunStore;
private readonly now: () => number;

constructor(opts: BulkProgressStoreOptions = {}) {
const openchromeHome = process.env.OPENCHROME_HOME || path.join(os.homedir(), '.openchrome');
this.rootDir = opts.rootDir || path.join(openchromeHome, 'progress-contracts');
this.taskRunStore = opts.taskRunStore || new TaskRunStore();
this.now = opts.now || (() => Date.now());
}

async start(input: StartBulkProgressInput): Promise<BulkProgressContract> {
const ts = this.now();
const stopCondition = optionalText(input.stop_condition, MAX_STOP);
const itemKey = optionalText(input.item_key, MAX_TEXT);
if (stopCondition === undefined) throw new BulkProgressInputError('stop_condition is required');
if (itemKey === undefined) throw new BulkProgressInputError('item_key is required');

const merged = mergeItems([], [], input.completed, input.failed);
const contract: BulkProgressContract = pruneUndefined({
contract_id: this.createId(`${input.run_id || ''}\0${stopCondition}\0${itemKey}`, ts),
run_id: optionalText(input.run_id, MAX_TEXT),
scope: normalizeScope(input.scope),
expected_total: optionalNonNegativeInt(input.expected_total),
min_completed: optionalNonNegativeInt(input.min_completed),
stop_condition: stopCondition,
stop_satisfied: input.stop_satisfied === true,
item_key: itemKey,
cursor: optionalText(input.cursor, MAX_TEXT),
completed: merged.completed,
failed: merged.failed,
completed_truncated: merged.completedTruncated,
failed_truncated: merged.failedTruncated,
last_progress_at: ts,
created_at: ts,
updated_at: ts,
});
await this.write(contract);
if (contract.run_id !== undefined) {
await this.taskRunStore.update(contract.run_id, {
bulk_contract_id: contract.contract_id,
current_cursor: contract.cursor,
completed_items: contract.completed,
failed_items: contract.failed.map(item => ({ item: item.item, reason: item.reason })),
});
}
return contract;
}

async get(contractId: string): Promise<BulkProgressContract> {
const result = await readFileSafe<BulkProgressContract>(this.contractPath(contractId));
if (result.success === false || result.data === undefined) {
throw new BulkProgressNotFoundError(`Bulk progress contract ${contractId} not found`);
}
return result.data;
}

async update(contractId: string, input: UpdateBulkProgressInput): Promise<BulkProgressContract> {
const current = await this.get(contractId);
const ts = this.now();
const merged = mergeItems(current.completed, current.failed, input.completed, input.failed);
const contract: BulkProgressContract = pruneUndefined({
...current,
expected_total: input.expected_total === undefined ? current.expected_total : optionalNonNegativeInt(input.expected_total),
min_completed: input.min_completed === undefined ? current.min_completed : optionalNonNegativeInt(input.min_completed),
stop_satisfied: input.stop_satisfied === undefined ? current.stop_satisfied : input.stop_satisfied === true,
cursor: input.cursor === undefined ? current.cursor : optionalText(input.cursor, MAX_TEXT),
completed: merged.completed,
failed: merged.failed,
completed_truncated: merged.completedTruncated || current.completed_truncated,
failed_truncated: merged.failedTruncated || current.failed_truncated,
last_progress_at: (input.completed || input.failed || input.cursor !== undefined || input.stop_satisfied !== undefined) ? ts : current.last_progress_at,
updated_at: ts,
});
await this.write(contract);
if (contract.run_id !== undefined) {
await this.taskRunStore.update(contract.run_id, {
current_cursor: contract.cursor,
completed_items: contract.completed,
failed_items: contract.failed.map(item => ({ item: item.item, reason: item.reason })),
});
}
return contract;
}

checkCompletionGuard(contract: BulkProgressContract): CompletionGuardResult {
const completedCount = contract.completed.length;
const failedCount = contract.failed.length;
if (contract.expected_total !== undefined) {
const observed = completedCount + failedCount;
if (observed < contract.expected_total) {
return {
allowed: false,
reason: `Bulk progress incomplete: ${observed}/${contract.expected_total} ${contract.item_key} items observed.`,
missing_count: contract.expected_total - observed,
failed_count: failedCount,
completed_count: completedCount,
expected_total: contract.expected_total,
min_completed: contract.min_completed,
stop_satisfied: contract.stop_satisfied,
suggested_next_action: `Continue processing ${contract.item_key} items from cursor ${contract.cursor || '(none)'}.`,
};
}
}
if (contract.min_completed !== undefined && completedCount < contract.min_completed) {
return {
allowed: false,
reason: `Bulk progress incomplete: min_completed ${contract.min_completed} not met.`,
missing_count: contract.min_completed - completedCount,
failed_count: failedCount,
completed_count: completedCount,
expected_total: contract.expected_total,
min_completed: contract.min_completed,
stop_satisfied: contract.stop_satisfied,
suggested_next_action: `Complete at least ${contract.min_completed - completedCount} more ${contract.item_key} item(s), or use force with a reason if the task must end early.`,
};
}
if (contract.expected_total === undefined && contract.stop_satisfied === false) {
return {
allowed: false,
reason: `Bulk progress incomplete: stop condition is not satisfied (${contract.stop_condition}).`,
failed_count: failedCount,
completed_count: completedCount,
min_completed: contract.min_completed,
stop_satisfied: false,
suggested_next_action: `Continue until stop condition is satisfied: ${contract.stop_condition}.`,
};
}
return {
allowed: true,
failed_count: failedCount,
completed_count: completedCount,
expected_total: contract.expected_total,
min_completed: contract.min_completed,
stop_satisfied: contract.stop_satisfied,
};
}

async check(contractId: string): Promise<CompletionGuardResult> {
return this.checkCompletionGuard(await this.get(contractId));
}

private createId(seed: string, ts: number): string {
return crypto.createHash('sha256')
.update(seed)
.update('\0')
.update(String(ts))
.update('\0')
.update(crypto.randomBytes(ID_BYTES))
.digest('hex')
.slice(0, 16);
}

private async write(contract: BulkProgressContract): Promise<void> {
await writeFileAtomicSafe(this.contractPath(contract.contract_id), contract);
}

private contractPath(contractId: string): string {
assertSafeId(contractId);
return path.join(this.rootDir, `${contractId}.json`);
}
}

function mergeItems(
currentCompleted: string[],
currentFailed: BulkProgressFailedItem[],
completedInput?: string[],
failedInput?: BulkProgressFailedItem[],
): { completed: string[]; failed: BulkProgressFailedItem[]; completedTruncated: number; failedTruncated: number } {
const completed = Array.from(new Set([...currentCompleted, ...sanitizeCompleted(completedInput)]));
const failedByItem = new Map<string, BulkProgressFailedItem>();
for (const item of [...currentFailed, ...sanitizeFailed(failedInput)]) {
failedByItem.set(item.item, item);
}
const failed = Array.from(failedByItem.values());
return {
completed: completed.slice(-MAX_ITEMS),
failed: failed.slice(-MAX_ITEMS),
completedTruncated: Math.max(0, completed.length - MAX_ITEMS),
failedTruncated: Math.max(0, failed.length - MAX_ITEMS),
};
}

function sanitizeCompleted(values: unknown): string[] {
if (Array.isArray(values) === false) return [];
return values.map(item => optionalText(item, MAX_TEXT)).filter((item): item is string => item !== undefined);
}

function sanitizeFailed(values: unknown): BulkProgressFailedItem[] {
if (Array.isArray(values) === false) return [];
return values
.filter((item): item is BulkProgressFailedItem => Boolean(item) && typeof item === 'object' && typeof (item as BulkProgressFailedItem).item === 'string')
.map(item => pruneUndefined({
item: optionalText(item.item, MAX_TEXT) || '',
reason: optionalText(item.reason, MAX_TEXT) || '',
retryable: typeof item.retryable === 'boolean' ? item.retryable : undefined,
}))
.filter(item => item.item.length > 0);
}

function normalizeScope(scope: unknown): BulkProgressScope {
return scope === 'workflow' || scope === 'batch' || scope === 'crawl' ? scope : 'task_run';
}

function optionalText(value: unknown, max: number): string | undefined {
if (typeof value !== 'string') return undefined;
const trimmed = value.trim();
if (trimmed.length === 0) return undefined;
const redacted = String(redactValue(trimmed));
return redacted.length > max ? redacted.slice(0, max) : redacted;
}

function optionalNonNegativeInt(value: unknown): number | undefined {
if (typeof value !== 'number' || Number.isFinite(value) === false) return undefined;
return Math.max(0, Math.floor(value));
}

function assertSafeId(id: string): void {
if (/^[a-f0-9]{16}$/i.test(id) === false) {
throw new Error(`Invalid bulk progress contract id: ${id}`);
}
}

function pruneUndefined<T extends Record<string, unknown>>(obj: T): T {
for (const key of Object.keys(obj)) {
if (obj[key] === undefined) delete obj[key];
}
return obj;
}
60 changes: 60 additions & 0 deletions src/core/progress-contract/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
export type BulkProgressScope = 'task_run' | 'workflow' | 'batch' | 'crawl';

export interface BulkProgressFailedItem {
item: string;
reason: string;
retryable?: boolean;
}

export interface BulkProgressContract {
contract_id: string;
run_id?: string;
scope: BulkProgressScope;
expected_total?: number;
min_completed?: number;
stop_condition: string;
stop_satisfied: boolean;
item_key: string;
cursor?: string;
completed: string[];
failed: BulkProgressFailedItem[];
completed_truncated?: number;
failed_truncated?: number;
last_progress_at: number;
created_at: number;
updated_at: number;
}

export interface CompletionGuardResult {
allowed: boolean;
reason?: string;
missing_count?: number;
failed_count?: number;
completed_count: number;
expected_total?: number;
min_completed?: number;
stop_satisfied?: boolean;
suggested_next_action?: string;
}

export interface StartBulkProgressInput {
run_id?: string;
scope?: BulkProgressScope;
expected_total?: number;
min_completed?: number;
stop_condition: string;
stop_satisfied?: boolean;
item_key: string;
cursor?: string;
completed?: string[];
failed?: BulkProgressFailedItem[];
}

export interface UpdateBulkProgressInput {
cursor?: string;
completed?: string[];
failed?: BulkProgressFailedItem[];
stop_satisfied?: boolean;
expected_total?: number;
min_completed?: number;
}
5 changes: 5 additions & 0 deletions src/core/task-run/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface StartTaskRunInput {
success_criteria?: string[];
session_id?: string;
workflow_id?: string;
bulk_contract_id?: string;
ledger_task_ids?: string[];
}

Expand All @@ -47,6 +48,7 @@ export interface UpdateTaskRunInput {
last_evidence?: EvidencePointer[];
ledger_task_ids?: string[];
workflow_id?: string;
bulk_contract_id?: string;
}

export interface NeedsHelpInput {
Expand Down Expand Up @@ -91,6 +93,7 @@ export class TaskRunStore {
success_criteria: sanitizeStringArray(input.success_criteria, MAX_CRITERIA, MAX_CRITERION_CHARS),
session_id: optionalString(input.session_id),
workflow_id: optionalString(input.workflow_id),
bulk_contract_id: optionalString(input.bulk_contract_id),
ledger_task_ids: uniqueStrings(input.ledger_task_ids),
created_at: ts,
updated_at: ts,
Expand Down Expand Up @@ -151,6 +154,7 @@ export class TaskRunStore {
...current,
status: nextStatus,
workflow_id: optionalString(input.workflow_id) || current.workflow_id,
bulk_contract_id: optionalString(input.bulk_contract_id) || current.bulk_contract_id,
ledger_task_ids: uniqueStrings([...(current.ledger_task_ids || []), ...(input.ledger_task_ids || [])]),
progress_summary: input.progress_summary !== undefined
? limit(scrub(input.progress_summary), MAX_SUMMARY_CHARS)
Expand Down Expand Up @@ -328,6 +332,7 @@ export class TaskRunStore {
return text.split('\n').filter(Boolean).length + 1;
}


private runDir(runId: string): string {
assertSafeId(runId);
return path.join(this.rootDir, runId);
Expand Down
1 change: 1 addition & 0 deletions src/core/task-run/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface TaskRunMeta {
success_criteria?: string[];
session_id?: string;
workflow_id?: string;
bulk_contract_id?: string;
ledger_task_ids: string[];
progress_summary?: string;
completed_items?: string[];
Expand Down
4 changes: 4 additions & 0 deletions src/hints/rules/error-recovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import type { HintRule } from '../hint-engine';
const UID_EVICTED_PREFIX = /^\s*uid_evicted\b/i;

const patterns: Array<{ test: RegExp; hint: string }> = [
{
test: /bulk_completion_guard_failed|Bulk progress incomplete/i,
hint: 'Hint: Bulk progress is incomplete. Use oc_bulk_progress_update to record remaining completed or failed items, then retry oc_task_run_complete.',
},
{
test: /ref\b.+not found|invalid ref|stale ref/i,
hint: 'Hint: Refs expire after page changes. Use read_page or find for fresh refs.',
Expand Down
Loading