diff --git a/ix-cli/src/cli/__tests__/ingest-retry.test.ts b/ix-cli/src/cli/__tests__/ingest-retry.test.ts new file mode 100644 index 0000000..1204884 --- /dev/null +++ b/ix-cli/src/cli/__tests__/ingest-retry.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from 'vitest'; +import { isAbortError, isRetryableCommitConflict } from '../commands/ingest.js'; + +describe('isAbortError', () => { + it('detects AbortError and TimeoutError by name', () => { + const abort = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }); + const timeout = Object.assign(new Error('signal timed out'), { name: 'TimeoutError' }); + expect(isAbortError(abort)).toBe(true); + expect(isAbortError(timeout)).toBe(true); + }); + + it('detects an abort by message when the name is generic', () => { + expect(isAbortError(new Error('This operation was aborted'))).toBe(true); + }); + + it('is false for ordinary errors', () => { + expect(isAbortError(new Error('500: internal error'))).toBe(false); + expect(isAbortError('write-write conflict')).toBe(false); + }); +}); + +describe('isRetryableCommitConflict', () => { + it('still retries Arango lock conflicts and transport drops', () => { + expect(isRetryableCommitConflict('write-write conflict')).toBe(true); + expect(isRetryableCommitConflict(new Error('Error: 1200 timeout waiting to lock key'))).toBe(true); + expect(isRetryableCommitConflict(new Error('fetch failed'))).toBe(true); + expect(isRetryableCommitConflict(new Error('read ECONNRESET'))).toBe(true); + }); + + it('does NOT retry a deadline/timeout abort (the deadline must stop work)', () => { + const abort = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }); + const timeout = Object.assign(new Error('signal timed out'), { name: 'TimeoutError' }); + expect(isRetryableCommitConflict(abort)).toBe(false); + expect(isRetryableCommitConflict(timeout)).toBe(false); + }); + + it('does not retry a plain 500', () => { + expect(isRetryableCommitConflict(new Error('500: internal server error'))).toBe(false); + }); +}); diff --git a/ix-cli/src/cli/__tests__/map-auto-skip.test.ts b/ix-cli/src/cli/__tests__/map-auto-skip.test.ts new file mode 100644 index 0000000..8e1861d --- /dev/null +++ b/ix-cli/src/cli/__tests__/map-auto-skip.test.ts @@ -0,0 +1,23 @@ +import { afterEach, describe, expect, it } from 'vitest'; +import { shouldSkipAutoMap } from '../commands/map.js'; + +describe('shouldSkipAutoMap', () => { + afterEach(() => { delete process.env.IX_AUTO_MAP_CLOUD; }); + + it('skips an automatic map against a remote backend', () => { + expect(shouldSkipAutoMap({ auto: true, cloudReady: true })).toBe(true); + }); + + it('never skips a manual map (auto=false), even against a remote backend', () => { + expect(shouldSkipAutoMap({ auto: false, cloudReady: true })).toBe(false); + }); + + it('never skips an automatic map against a local backend', () => { + expect(shouldSkipAutoMap({ auto: true, cloudReady: false })).toBe(false); + }); + + it('honors the IX_AUTO_MAP_CLOUD opt-in to allow remote auto-refresh', () => { + process.env.IX_AUTO_MAP_CLOUD = '1'; + expect(shouldSkipAutoMap({ auto: true, cloudReady: true })).toBe(false); + }); +}); diff --git a/ix-cli/src/cli/__tests__/single-flight.test.ts b/ix-cli/src/cli/__tests__/single-flight.test.ts new file mode 100644 index 0000000..be03f4a --- /dev/null +++ b/ix-cli/src/cli/__tests__/single-flight.test.ts @@ -0,0 +1,94 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { mkdtempSync, rmSync, writeFileSync, readdirSync, existsSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir, hostname } from 'node:os'; + +import { acquireMapLock, isStaleForTest, lockPathForTest } from '../single-flight.js'; + +describe('acquireMapLock single-flight', () => { + let dir: string; + + beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'ix-lock-')); + process.env.IX_LOCK_DIR = dir; + delete process.env.IX_MAP_LOCK_MAX_MS; + }); + + afterEach(() => { + delete process.env.IX_LOCK_DIR; + delete process.env.IX_MAP_LOCK_MAX_MS; + rmSync(dir, { recursive: true, force: true }); + }); + + it('grants the lock to the first caller and denies a concurrent caller', () => { + const first = acquireMapLock('/work/repo', 'ix map /work/repo'); + expect(first).not.toBeNull(); + + const second = acquireMapLock('/work/repo', 'ix map /work/repo'); + expect(second).toBeNull(); // coalesce — a live holder owns it + + first!.release(); + }); + + it('re-grants after release', () => { + const first = acquireMapLock('/work/repo', 'm'); + expect(first).not.toBeNull(); + first!.release(); + + const again = acquireMapLock('/work/repo', 'm'); + expect(again).not.toBeNull(); + again!.release(); + }); + + it('isolates different workspaces', () => { + const a = acquireMapLock('/work/a', 'm'); + const b = acquireMapLock('/work/b', 'm'); + expect(a).not.toBeNull(); + expect(b).not.toBeNull(); // different key → different lockfile + a!.release(); + b!.release(); + }); + + it('release is idempotent and removes the lockfile', () => { + const h = acquireMapLock('/work/repo', 'm'); + expect(readdirSync(dir).length).toBe(1); + h!.release(); + h!.release(); // no throw + expect(readdirSync(dir).length).toBe(0); + }); + + it('steals a stale lock left by a dead holder', () => { + const path = lockPathForTest('/work/repo'); + // A lock owned by a PID that cannot be alive on this host. + writeFileSync(path, JSON.stringify({ pid: 2 ** 30, host: hostname(), startedAt: Date.now(), label: 'dead' })); + const h = acquireMapLock('/work/repo', 'm'); + expect(h).not.toBeNull(); // stale holder → stolen + h!.release(); + }); + + it('steals a lock older than the max age even if the holder looks alive', () => { + process.env.IX_MAP_LOCK_MAX_MS = '1000'; + const path = lockPathForTest('/work/repo'); + writeFileSync(path, JSON.stringify({ pid: process.pid, host: hostname(), startedAt: Date.now() - 60_000, label: 'old' })); + const h = acquireMapLock('/work/repo', 'm'); + expect(h).not.toBeNull(); // aged out → stolen + h!.release(); + }); + + it('treats an unparseable lockfile as abandoned', () => { + const path = lockPathForTest('/work/repo'); + writeFileSync(path, 'not json'); + expect(isStaleForTest(path)).toBe(true); + const h = acquireMapLock('/work/repo', 'm'); + expect(h).not.toBeNull(); + h!.release(); + }); + + it('does not leak across a missing lock dir (fail-open is best-effort)', () => { + // Sanity: a brand-new dir yields a clean acquire. + expect(existsSync(dir)).toBe(true); + const h = acquireMapLock('/work/fresh', 'm'); + expect(h).not.toBeNull(); + h!.release(); + }); +}); diff --git a/ix-cli/src/cli/commands/ingest.ts b/ix-cli/src/cli/commands/ingest.ts index da3eb86..10a4203 100644 --- a/ix-cli/src/cli/commands/ingest.ts +++ b/ix-cli/src/cli/commands/ingest.ts @@ -287,7 +287,19 @@ const COMMIT_CONFLICT_RETRY_PATTERNS = [ 'econnrefused', ]; +// An abort from the shared wall-clock deadline (or a per-request timeout) must +// NOT be retried — the whole point of the deadline is to stop work. Retrying +// here would defeat it and add 6 more backed-off requests to an already-late +// run. AbortSignal.timeout raises a TimeoutError; an aborted deadline raises an +// AbortError; both also stringify with "aborted". +export function isAbortError(err: unknown): boolean { + const name = (err as { name?: string } | null)?.name; + if (name === "AbortError" || name === "TimeoutError") return true; + return String(err).toLowerCase().includes("aborted"); +} + export function isRetryableCommitConflict(err: unknown): boolean { + if (isAbortError(err)) return false; const message = String(err).toLowerCase(); return COMMIT_CONFLICT_RETRY_PATTERNS.some(pattern => message.includes(pattern)); } @@ -399,7 +411,7 @@ export function registerIngestCommand(program: Command): void { export async function ingestFiles( path: string, - opts: { recursive?: boolean; force?: boolean; format: string; root?: string; debug?: boolean; printSummary?: boolean; suppressOutput?: boolean; lang?: string; mapMode?: boolean } + opts: { recursive?: boolean; force?: boolean; format: string; root?: string; debug?: boolean; printSummary?: boolean; suppressOutput?: boolean; lang?: string; mapMode?: boolean; deadlineSignal?: AbortSignal } ): Promise { const debug = opts.debug || process.env.IX_DEBUG === '1'; const mapMode = opts.mapMode === true; @@ -576,7 +588,7 @@ export async function ingestFiles( process.stderr.write(`[multi-repo] system "${detectedSystem!.name}" (${systemId}) members=${detectedSystem!.members.join(', ')} packages=${Object.keys(packageRegistry).length} declaredDeps=${depCount}\n`); } - const client = new IxClient(getEndpoint()); + const client = new IxClient(getEndpoint(), opts.deadlineSignal); // Schema-version check forces a clean re-ingest when the backend's graph // format has changed in a way that invalidates existing node IDs (e.g. the diff --git a/ix-cli/src/cli/commands/map.ts b/ix-cli/src/cli/commands/map.ts index 9e0fd17..6594fd3 100644 --- a/ix-cli/src/cli/commands/map.ts +++ b/ix-cli/src/cli/commands/map.ts @@ -10,6 +10,32 @@ import { formatFetchError } from "../errors.js"; import { ingestFiles } from "./ingest.js"; import { detectSystem } from "../system.js"; import { getRemoteRunner, isCloudReady } from "../remote.js"; +import { acquireMapLock } from "../single-flight.js"; + +// Hard wall-clock budget for a single `ix map`. Past this, the shared deadline +// signal aborts every in-flight request and the command exits, so a single +// invocation can never grind for hours against an unhealthy backend. Tunable +// via IX_MAP_DEADLINE_MS; 0 disables the budget. +const DEFAULT_MAP_DEADLINE_MS = 15 * 60 * 1000; + +function mapDeadlineSignal(): AbortSignal | undefined { + const raw = process.env.IX_MAP_DEADLINE_MS; + const budget = raw !== undefined ? Number.parseInt(raw, 10) : DEFAULT_MAP_DEADLINE_MS; + if (!Number.isFinite(budget) || budget <= 0) return undefined; // disabled + return AbortSignal.timeout(budget); +} + +// Whether an automatically-triggered map should be skipped. Background refresh +// (the editor/agent hooks that re-map on change) is a local-only convenience: +// against a remote backend it would push a write on every change from every +// client, so it is skipped there and remote ingestion stays deliberate. Manual +// `ix map` never sets IX_AUTO_MAP, so it is never skipped. IX_AUTO_MAP_CLOUD=1 +// opts the automatic path back in for users who do want remote auto-refresh. +export function shouldSkipAutoMap(opts: { auto: boolean; cloudReady: boolean }): boolean { + if (!opts.auto || !opts.cloudReady) return false; + if (process.env.IX_AUTO_MAP_CLOUD === "1") return false; + return true; +} export interface MapRegion { id: string; @@ -133,11 +159,47 @@ Examples: ) .action(async (pathArg: string | undefined, opts: { format: string; level?: string; minConfidence: string; maxItems: string; allItems?: boolean; sort: string; graph?: boolean; list?: boolean; full?: boolean; verbose?: boolean; silent?: boolean }) => { const cwd = pathArg ? resolve(pathArg) : process.cwd(); + + const silent = opts.silent === true || opts.format === "silent"; + + // Single-flight: refuse to stack. Background refresh can fire `ix map` + // repeatedly (e.g. once per change); if a map is slow or the backend is + // unhealthy, those invocations would otherwise pile up and run concurrently + // against the backend. The first map for a workspace holds the lock; any + // concurrent one coalesces and exits 0 here. The lock auto-releases on + // process exit (see single-flight.ts) and a stale lock from a crashed map + // is stolen, so this never wedges. + const mapLock = acquireMapLock(cwd, `ix map ${cwd}`); + if (!mapLock) { + if (!silent && opts.format !== "json" && opts.format !== "llm") { + process.stderr.write(chalk.dim(" Another ix map is already running for this workspace — skipping.\n")); + } + return; // coalesce; the in-flight map will refresh the graph + } + + // Background refresh is a local-only convenience. When invoked + // automatically (IX_AUTO_MAP=1, set by the editor/agent hooks) against a + // remote backend, skip: a remote graph should be fed deliberately, not by + // a write on every change from every client. Manual `ix map` is never + // skipped; opt the automatic path back in with IX_AUTO_MAP_CLOUD=1. + const autoMap = process.env.IX_AUTO_MAP === "1"; + const cloudReady = await isCloudReady(); + if (shouldSkipAutoMap({ auto: autoMap, cloudReady })) { + if (!silent && opts.format !== "json" && opts.format !== "llm") { + process.stderr.write(chalk.dim(" Skipping automatic map: active backend is remote (run `ix map` manually to refresh it).\n")); + } + return; // lock releases on process exit + } + + // Shared wall-clock deadline applied to every backend request this command + // makes (ingest + map), so the whole operation is bounded even if the + // backend stalls on individual long per-request timeouts. + const deadlineSignal = mapDeadlineSignal(); + // Auto-detect a multi-repo system (>= 2 child repo roots). When present we // scope the map to its system_id; otherwise it's an ordinary single-repo map. const systemId = detectSystem(cwd)?.systemId; - const silent = opts.silent === true || opts.format === "silent"; // json and llm are machine formats: suppress progress chatter and route // ingestion through the quiet path so stdout carries only the result. const machineFormat = opts.format === "json" || opts.format === "llm"; @@ -166,7 +228,6 @@ Examples: // The local backend bootstrap below only runs on the local path — // cloud ingestion doesn't require a local Ix backend. const ingestStart = performance.now(); - const cloudReady = await isCloudReady(); if (cloudReady) { const runner = getRemoteRunner()!; // isCloudReady guarantees non-null try { @@ -195,6 +256,7 @@ Examples: printSummary: false, suppressOutput: true, mapMode: true, + deadlineSignal, }); } catch (err: any) { emitError(formatFetchError(err)); @@ -204,7 +266,7 @@ Examples: } const ingestMs = Math.round(performance.now() - ingestStart); - const client = new IxClient(getEndpoint()); + const client = new IxClient(getEndpoint(), deadlineSignal); const mapBarWidth = 25; const mapStart = performance.now(); diff --git a/ix-cli/src/cli/single-flight.ts b/ix-cli/src/cli/single-flight.ts new file mode 100644 index 0000000..f292a18 --- /dev/null +++ b/ix-cli/src/cli/single-flight.ts @@ -0,0 +1,152 @@ +import { mkdirSync, writeFileSync, readFileSync, rmSync, openSync, closeSync } from "node:fs"; +import { join } from "node:path"; +import { homedir, hostname } from "node:os"; +import { createHash } from "node:crypto"; + +// --------------------------------------------------------------------------- +// CLI-level single-flight lock for `ix map` / ingest. +// +// Background: a graph-refresh hook or watcher can fire `ix map` many times in +// quick succession (e.g. once per change). If a map is slow, or the backend is +// unhealthy and requests stall on their long per-request timeouts, those +// invocations stack: many concurrent `ix map` processes each hold a connection +// and retry, overwhelming the backend and wasting local resources. +// +// The robust fix is to make `ix map` single-flight at the CLI layer, so the +// guarantee holds no matter what launches it (hook, watcher, manual, CI). The +// first invocation for a workspace takes the lock; any concurrent invocation +// sees a live holder and exits quietly (coalesces) instead of piling on. A +// stale lock (dead holder, or older than IX_MAP_LOCK_MAX_MS) is stolen so a +// crashed map never wedges future runs. +// +// Keeping the authority in the CLI (rather than only in a shell-hook lock) +// means even an external watcher, an old hook, or two manual runs cannot stack. +// --------------------------------------------------------------------------- + +// Lock directory. Overridable via IX_LOCK_DIR (used by tests, and handy if +// ~/.ix is read-only). Read per call so the override can change between runs. +function lockDir(): string { + return process.env.IX_LOCK_DIR || join(homedir(), ".ix", "locks"); +} + +// Default: a held lock older than this is presumed stale (its holder crashed +// without cleanup, or is a zombie). Generous enough to outlast a legitimately +// slow map on a large repo, short enough that a wedge self-heals within a turn. +const DEFAULT_LOCK_MAX_MS = 20 * 60 * 1000; + +interface LockMeta { + pid: number; + host: string; + startedAt: number; // epoch ms + label: string; // e.g. "ix map " +} + +export interface LockHandle { + /** Release the lock. Idempotent; safe to call from multiple exit paths. */ + release(): void; +} + +function lockMaxMs(): number { + const raw = process.env.IX_MAP_LOCK_MAX_MS; + if (!raw) return DEFAULT_LOCK_MAX_MS; + const n = Number.parseInt(raw, 10); + return Number.isFinite(n) && n > 0 ? n : DEFAULT_LOCK_MAX_MS; +} + +function lockPathFor(key: string): string { + const h = createHash("sha256").update(key).digest("hex").slice(0, 16); + return join(lockDir(), `map-${h}.lock`); +} + +/** True when a PID is alive on this host. signal 0 = existence check, no-op. */ +function pidAlive(pid: number): boolean { + if (!Number.isInteger(pid) || pid <= 0) return false; + try { + process.kill(pid, 0); + return true; + } catch (err: any) { + // EPERM means the process exists but is owned by another user — alive. + return err?.code === "EPERM"; + } +} + +function readMeta(path: string): LockMeta | null { + try { + return JSON.parse(readFileSync(path, "utf-8")) as LockMeta; + } catch { + return null; + } +} + +/** A held lock is stale if its holder is gone, on another host, or too old. */ +function isStale(meta: LockMeta | null): boolean { + if (!meta) return true; // unparseable/empty → treat as abandoned + if (meta.host === hostname() && !pidAlive(meta.pid)) return true; + if (Date.now() - meta.startedAt > lockMaxMs()) return true; + return false; +} + +/** + * Try to acquire the single-flight lock for `key` (the workspace root). + * + * Returns a LockHandle on success, or null if another live invocation already + * holds it — in which case the caller should coalesce (skip its own run). + * + * Acquisition is atomic via O_CREAT|O_EXCL ('wx'); the classic create-exclusive + * lockfile. On contention we inspect the holder: a stale lock is removed and + * acquisition retried once. + */ +export function acquireMapLock(workspaceRoot: string, label: string): LockHandle | null { + try { mkdirSync(lockDir(), { recursive: true }); } catch { /* best effort */ } + const path = lockPathFor(workspaceRoot); + const meta: LockMeta = { pid: process.pid, host: hostname(), startedAt: Date.now(), label }; + + const tryCreate = (): boolean => { + try { + // 'wx' = O_CREAT | O_EXCL: fails if the file already exists. mode 0600 — + // the lock carries no secrets but matches the rest of ~/.ix. + const fd = openSync(path, "wx", 0o600); + try { writeFileSync(fd, JSON.stringify(meta)); } finally { closeSync(fd); } + return true; + } catch (err: any) { + if (err?.code === "EEXIST") return false; + // Any other error (e.g. permission, read-only FS): fail open rather than + // block the user's map. Single-flight is an optimization, not correctness. + return true; + } + }; + + if (tryCreate()) return makeHandle(path); + + // Contended — is the holder still alive? + if (isStale(readMeta(path))) { + try { rmSync(path, { force: true }); } catch { /* best effort */ } + if (tryCreate()) return makeHandle(path); + } + return null; // a live holder owns it — caller should coalesce +} + +function makeHandle(path: string): LockHandle { + let released = false; + const release = (): void => { + if (released) return; + released = true; + try { rmSync(path, { force: true }); } catch { /* best effort */ } + }; + // Release on normal exit and on the common termination signals so a killed + // map (hook timeout, Ctrl-C) does not leave a lock that blocks the next run + // until it ages out. + process.once("exit", release); + process.once("SIGINT", () => { release(); process.exit(130); }); + process.once("SIGTERM", () => { release(); process.exit(143); }); + return { release }; +} + +// ── Test-only surface ────────────────────────────────────────────────────── +// Exported for unit tests; not part of the public CLI API. +export function lockPathForTest(workspaceRoot: string): string { + return lockPathFor(workspaceRoot); +} +export function isStaleForTest(path: string): boolean { + return isStale(readMeta(path)); +} diff --git a/ix-cli/src/client/api.ts b/ix-cli/src/client/api.ts index 4dfb88f..acfefe8 100644 --- a/ix-cli/src/client/api.ts +++ b/ix-cli/src/client/api.ts @@ -19,7 +19,24 @@ export interface ListSubsystemsOptions { } export class IxClient { - constructor(private endpoint: string = "http://localhost:8090") {} + // An optional deadline signal shared across every request this client makes. + // `ix map` sets it to a hard wall-clock budget so that, even when the backend + // is unhealthy and individual requests would otherwise sit on their long + // per-request timeouts (5-30 min each), the whole operation aborts and the + // process exits instead of grinding for hours and stacking with re-launches. + constructor( + private endpoint: string = "http://localhost:8090", + private deadlineSignal?: AbortSignal, + ) {} + + // Combine a per-request timeout with the optional shared deadline. Whichever + // fires first aborts the fetch. AbortSignal.any propagates the first abort. + private signalFor(perRequestMs: number): AbortSignal { + const perRequest = AbortSignal.timeout(perRequestMs); + return this.deadlineSignal + ? AbortSignal.any([perRequest, this.deadlineSignal]) + : perRequest; + } async query( question: string, @@ -33,7 +50,7 @@ export class IxClient { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ path, recursive, force: force || undefined }), - signal: AbortSignal.timeout(30 * 60 * 1000), // 30 minute timeout for large repos + signal: this.signalFor(30 * 60 * 1000), // 30 minute timeout for large repos }); if (!resp.ok) { const text = await resp.text(); @@ -207,7 +224,7 @@ export class IxClient { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(patch), - signal: AbortSignal.timeout(5 * 60 * 1000), // 5 min — matches commitPatchBulk + signal: this.signalFor(5 * 60 * 1000), // 5 min — matches commitPatchBulk }); if (!resp.ok) { const text = await resp.text(); @@ -266,7 +283,7 @@ export class IxClient { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), - signal: AbortSignal.timeout(30 * 60 * 1000), // 30 minute timeout + signal: this.signalFor(30 * 60 * 1000), // 30 minute timeout }); if (!resp.ok) { const text = await resp.text(); @@ -280,7 +297,7 @@ export class IxClient { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ patches }), - signal: AbortSignal.timeout(5 * 60 * 1000), // 5 min — prevents hang when k8s ingress closes idle connections + signal: this.signalFor(5 * 60 * 1000), // 5 min — prevents hang when k8s ingress closes idle connections }); if (!resp.ok) { const text = await resp.text(); @@ -504,6 +521,10 @@ export class IxClient { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), + // These small reads/writes (source-hashes, stitch, list, ...) had no + // timeout, so a stalled connection could hang the process indefinitely. + // 2 min per request, also bounded by the shared deadline when set. + signal: this.signalFor(2 * 60 * 1000), }); if (!resp.ok) { const text = await resp.text(); @@ -513,7 +534,9 @@ export class IxClient { } private async get(path: string): Promise { - const resp = await fetch(`${this.endpoint}${path}`); + const resp = await fetch(`${this.endpoint}${path}`, { + signal: this.signalFor(2 * 60 * 1000), + }); if (!resp.ok) { const text = await resp.text(); throw new Error(`${resp.status}: ${text}`);