Skip to content

Commit 75bde97

Browse files
committed
fix(dreamer): scope queue dequeue + enqueue to the calling host's project
Reproduced and fixed a real cross-project queue contamination bug. The `dream_queue` table is shared across processes (OpenCode + Pi can both write to it), but the registered dreamer client is project-specific: each running OpenCode/Pi process knows about exactly ONE project's filesystem path and ONE harness-specific runner. Without the project filter, a Pi process running for project A would dequeue a queue entry for project B and try to dream B with Pi's PiSubagentRunner — failing because Pi's resolveDreamSessionDirectory(B) falls back to the `git:<sha>` identity string itself, which is not a valid cwd, so `posix_spawn 'pi'` fails with ENOENT every cycle. User-visible report that triggered this: a user opened Pi only in `opencode-anthropic-auth`, but Magic Context was running scheduled dream cycles for `opencode-xtra` (and other unrelated projects) every 15 minutes — all failing with the same ENOENT pattern, polluting the dashboard's dream-runs history with consistently failed runs for projects the user never ran Pi in. Also fixed the symmetric enqueue-side issue: `findProjectsNeedingDream` returns ALL projects with active memories or pending smart notes across the shared DB, so without filtering at the source, every host would enqueue cross-project work that it can't possibly drain. The queue was effectively N×M projects fighting for one queue head. Changes: - queue.ts: `dequeueNext(db, projectIdentity?)` — when provided, only dequeues entries matching the calling host's project. Same for `peekQueue`. Backwards-compatible: undefined preserves the legacy 'dequeue any' semantics for tests and any future single-host caller. - scheduler.ts: `checkScheduleAndEnqueue(db, schedule, ownProjectIdentity?)` — when provided, only enqueues for that one project. Filters `findProjectsNeedingDream` results down to the registration's own identity. Same backwards-compat pattern. - runner.ts: `processDreamQueue({..., projectIdentity?})` — passes the filter through to `dequeueNext`. - dream-timer.ts: each registered project now resolves its own identity at tick time and threads it through both `checkScheduleAndEnqueue` and `processDreamQueue`. The schedule window check still gates enqueue (no out-of-window scheduled work gets queued in the first place), and dequeue can no longer pick up work that doesn't belong to this host. - command-handler.ts: `/ctx-dream` (manual) now passes `deps.dreamer.projectPath` so the user's manual command only drains THIS project's queue entry, never accidentally dreams another project that happens to be at the queue head. - pi-plugin/dreamer/index.ts: Pi's `runOnce` (the immediate-drain callback for Pi /ctx-dream) also passes `opts.projectIdentity` for the same reason. Tests: - 3 new dreamer.test.ts cases covering the filter (only-mine, none-match, legacy preserved). - 1 new scheduler.test.ts file with 4 cases covering scheduler-side filtering plus the schedule-window-vs-project-filter ordering. Full plugin suite: 1068/1068 pass (was 1061; +7 new). Pi-plugin: 236/236. CLI: 58/58. Typecheck + lint + build clean across all 3 packages.
1 parent f25aa56 commit 75bde97

8 files changed

Lines changed: 337 additions & 14 deletions

File tree

packages/pi-plugin/src/dreamer/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ export function registerPiDreamerProject(opts: PiDreamerOptions): void {
145145
maxRuntimeMinutes: opts.config.max_runtime_minutes,
146146
experimentalUserMemories,
147147
experimentalPinKeyFiles,
148+
// Pi /ctx-dream is project-scoped: only drain THIS project's
149+
// queue entry, not whatever happens to be at the queue head
150+
// (which could belong to an OpenCode-only project Pi can't
151+
// possibly dream).
152+
projectIdentity: opts.projectIdentity,
148153
});
149154

150155
registeredProjects.set(opts.projectIdentity, { cleanup, runOnce });

packages/plugin/src/features/magic-context/dreamer/dreamer.test.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,5 +187,108 @@ describe("dreamer", () => {
187187
.get();
188188
expect(row?.count).toBe(0);
189189
});
190+
191+
/**
192+
* Cross-project queue isolation regression. The dream_queue is shared
193+
* across processes (OpenCode + Pi can both write/read). Without this
194+
* filter, a Pi process running in project A would dequeue a queue
195+
* entry for project B and try to dream B with Pi's client — failing
196+
* because Pi has no idea where B is on disk.
197+
*
198+
* The user-visible report that triggered this fix: Pi running in
199+
* `opencode-anthropic-auth` was dreaming `opencode-xtra` every 15
200+
* minutes and failing every cycle with `posix_spawn 'pi'` ENOENT
201+
* because `dreamProjectDirectories` for opencode-xtra wasn't
202+
* registered in Pi's process, so the spawn cwd fell back to the
203+
* `git:<sha>` identity string itself.
204+
*/
205+
it("dequeues only entries matching projectIdentity when filter is provided", async () => {
206+
db = createTestDb();
207+
ensureDreamQueueTable(db);
208+
registerDreamProjectDirectory("git:my-repo", "/repo/my-repo");
209+
registerDreamProjectDirectory("git:other-repo", "/repo/other-repo");
210+
const client = createDreamClient();
211+
212+
// Two projects enqueued. We're filtering to my-repo, so the
213+
// other-repo entry must STAY in the queue untouched.
214+
expect(enqueueDream(db, "git:other-repo", "scheduled")).not.toBeNull();
215+
expect(enqueueDream(db, "git:my-repo", "scheduled")).not.toBeNull();
216+
217+
const result = await processDreamQueue({
218+
db,
219+
client,
220+
tasks: ["consolidate"],
221+
taskTimeoutMinutes: 5,
222+
maxRuntimeMinutes: 10,
223+
projectIdentity: "git:my-repo",
224+
});
225+
226+
// We dreamed my-repo (filtered).
227+
expect(result).not.toBeNull();
228+
229+
// other-repo's queue entry survives — another host (or a future
230+
// tick from a process that owns other-repo) must drain it.
231+
const remaining = db
232+
.prepare<[], { project_path: string }>(
233+
"SELECT project_path FROM dream_queue ORDER BY id",
234+
)
235+
.all();
236+
expect(remaining.map((r) => r.project_path)).toEqual(["git:other-repo"]);
237+
});
238+
239+
it("returns null when projectIdentity filter has no matching entries", async () => {
240+
db = createTestDb();
241+
ensureDreamQueueTable(db);
242+
const client = createDreamClient();
243+
244+
// Queue has entries, but NONE for our project.
245+
expect(enqueueDream(db, "git:not-mine", "scheduled")).not.toBeNull();
246+
expect(enqueueDream(db, "git:also-not-mine", "scheduled")).not.toBeNull();
247+
248+
const result = await processDreamQueue({
249+
db,
250+
client,
251+
tasks: ["consolidate"],
252+
taskTimeoutMinutes: 5,
253+
maxRuntimeMinutes: 10,
254+
projectIdentity: "git:my-repo",
255+
});
256+
257+
expect(result).toBeNull();
258+
259+
// Both other-project entries must still be queued.
260+
const remaining = db
261+
.prepare<[], { count: number }>("SELECT COUNT(*) AS count FROM dream_queue")
262+
.get();
263+
expect(remaining?.count).toBe(2);
264+
});
265+
266+
it("legacy behavior preserved when projectIdentity filter is omitted", async () => {
267+
// Tests that pass `undefined` (or just don't pass the field)
268+
// continue to drain the queue head — preserves backward compat
269+
// for any test or future single-host caller that wants the old
270+
// "dequeue any" behavior.
271+
db = createTestDb();
272+
ensureDreamQueueTable(db);
273+
registerDreamProjectDirectory("git:repo-1", "/repo/project");
274+
const client = createDreamClient();
275+
276+
expect(enqueueDream(db, "git:repo-1", "manual")).not.toBeNull();
277+
278+
const result = await processDreamQueue({
279+
db,
280+
client,
281+
tasks: ["consolidate"],
282+
taskTimeoutMinutes: 5,
283+
maxRuntimeMinutes: 10,
284+
// projectIdentity intentionally omitted
285+
});
286+
287+
expect(result?.tasks.map((task) => task.name)).toEqual(["consolidate"]);
288+
const row = db
289+
.prepare<[], { count: number }>("SELECT COUNT(*) AS count FROM dream_queue")
290+
.get();
291+
expect(row?.count).toBe(0);
292+
});
190293
});
191294
});

packages/plugin/src/features/magic-context/dreamer/queue.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,35 @@ export function enqueueDream(
7373
})();
7474
}
7575

76-
/** Peek at the next unstarted entry without claiming it. */
77-
export function peekQueue(db: Database): DreamQueueEntry | null {
78-
const row = db
79-
.prepare<[], { id: number; project_path: string; reason: string; enqueued_at: number }>(
80-
"SELECT id, project_path, reason, enqueued_at FROM dream_queue WHERE started_at IS NULL ORDER BY enqueued_at ASC LIMIT 1",
81-
)
82-
.get();
76+
/** Peek at the next unstarted entry without claiming it.
77+
*
78+
* @param projectIdentity - When provided, only matches entries for this project.
79+
* This is critical for cross-process coexistence: each running OpenCode/Pi
80+
* process registers exactly one project, so it must only drain entries that
81+
* belong to it. Without this filter, Pi (running in project A) would dequeue
82+
* queue entries for project B and try to dream B with Pi's runner — which
83+
* either spawns `pi` in a directory that doesn't exist (the `git:<sha>`
84+
* identity string) or, even if it succeeded, runs the wrong harness for that
85+
* project.
86+
*/
87+
export function peekQueue(db: Database, projectIdentity?: string): DreamQueueEntry | null {
88+
const row = projectIdentity
89+
? db
90+
.prepare<
91+
[string],
92+
{ id: number; project_path: string; reason: string; enqueued_at: number }
93+
>(
94+
"SELECT id, project_path, reason, enqueued_at FROM dream_queue WHERE started_at IS NULL AND project_path = ? ORDER BY enqueued_at ASC LIMIT 1",
95+
)
96+
.get(projectIdentity)
97+
: db
98+
.prepare<
99+
[],
100+
{ id: number; project_path: string; reason: string; enqueued_at: number }
101+
>(
102+
"SELECT id, project_path, reason, enqueued_at FROM dream_queue WHERE started_at IS NULL ORDER BY enqueued_at ASC LIMIT 1",
103+
)
104+
.get();
83105

84106
if (!row) return null;
85107

@@ -92,11 +114,15 @@ export function peekQueue(db: Database): DreamQueueEntry | null {
92114
};
93115
}
94116

95-
/** Claim the next unstarted entry atomically by marking started_at. Returns null if queue is empty. */
96-
export function dequeueNext(db: Database): DreamQueueEntry | null {
117+
/** Claim the next unstarted entry atomically by marking started_at. Returns null if queue is empty.
118+
*
119+
* @param projectIdentity - When provided, only dequeues entries for this project.
120+
* See `peekQueue` for the cross-process coexistence rationale.
121+
*/
122+
export function dequeueNext(db: Database, projectIdentity?: string): DreamQueueEntry | null {
97123
const now = Date.now();
98124
return db.transaction(() => {
99-
const entry = peekQueue(db);
125+
const entry = peekQueue(db, projectIdentity);
100126
if (!entry) return null;
101127

102128
const result = db

packages/plugin/src/features/magic-context/dreamer/runner.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,11 +951,25 @@ export async function processDreamQueue(args: {
951951
maxRuntimeMinutes: number;
952952
experimentalUserMemories?: { enabled: boolean; promotionThreshold: number };
953953
experimentalPinKeyFiles?: ExperimentalPinKeyFilesConfig;
954+
/**
955+
* Optional project identity filter — when provided, only entries belonging
956+
* to this project are dequeued. Each running OpenCode/Pi process registers
957+
* exactly one project, and the host's dreamer client (and `pi` runner, in
958+
* Pi's case) is project-specific. Without this filter, a Pi process running
959+
* for project A would dequeue queue entries for project B and try to
960+
* `posix_spawn 'pi'` in B's `git:<sha>` identity string as a directory,
961+
* failing with ENOENT every cycle.
962+
*
963+
* Callers should pass this whenever they own a single project — both the
964+
* scheduled timer tick (`sweepProject`) and the `/ctx-dream` command
965+
* handler. Tests pass `undefined` to keep the legacy "dequeue any" semantics.
966+
*/
967+
projectIdentity?: string;
954968
}): Promise<DreamRunResult | null> {
955969
// Use configured max runtime + 30min buffer for stale threshold instead of hardcoded 2h
956970
const maxRuntimeMs = args.maxRuntimeMinutes * 60 * 1000;
957971
clearStaleEntries(args.db, maxRuntimeMs + 30 * 60 * 1000);
958-
const entry = dequeueNext(args.db);
972+
const entry = dequeueNext(args.db, args.projectIdentity);
959973
if (!entry) {
960974
return null;
961975
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/// <reference types="bun-types" />
2+
3+
/**
4+
* Regression suite for `checkScheduleAndEnqueue` cross-project filtering.
5+
*
6+
* The dream_queue is shared across processes (OpenCode + Pi can both write
7+
* to it). Without the `ownProjectIdentity` filter, a process registered for
8+
* project A would enqueue work for projects B, C, D... — every project the
9+
* shared DB has memories or smart notes for. That work then gets drained by
10+
* SOME process (whichever one ticks first) using the wrong client and wrong
11+
* registered directory, and either fails (Pi case: spawn `pi` in the
12+
* `git:<sha>` identity string as cwd) or succeeds with the wrong identity
13+
* scope (OpenCode case: works but writes the wrong project's memories).
14+
*/
15+
16+
import { afterEach, describe, expect, it } from "bun:test";
17+
import { Database } from "../../../shared/sqlite";
18+
import { closeQuietly } from "../../../shared/sqlite-helpers";
19+
import { runMigrations } from "../migrations";
20+
import { initializeDatabase } from "../storage-db";
21+
import { ensureDreamQueueTable } from "./queue";
22+
import { checkScheduleAndEnqueue } from "./scheduler";
23+
import { setDreamState } from "./storage-dream-state";
24+
25+
let db: Database | null = null;
26+
27+
function createTestDb(): Database {
28+
const database = new Database(":memory:");
29+
initializeDatabase(database);
30+
runMigrations(database);
31+
ensureDreamQueueTable(database);
32+
return database;
33+
}
34+
35+
function seedActiveMemoryFor(database: Database, projectIdentity: string): void {
36+
// findProjectsNeedingDream looks at memories with status='active' AND
37+
// updated_at > last_dream_at. Seed all NOT NULL columns so the row
38+
// satisfies the schema (test isolation, not a real promotion path).
39+
const now = Date.now();
40+
database
41+
.prepare(
42+
`INSERT INTO memories (
43+
project_path, category, content, normalized_hash,
44+
first_seen_at, created_at, updated_at, last_seen_at, status
45+
) VALUES (?, 'general', 'seed', ?, ?, ?, ?, ?, 'active')`,
46+
)
47+
.run(projectIdentity, `${projectIdentity}-seed-hash`, now, now, now, now);
48+
}
49+
50+
function withinScheduleWindow(): string {
51+
// Use a 24h window so the schedule check passes whenever the test runs.
52+
return "00:00-23:59";
53+
}
54+
55+
afterEach(() => {
56+
if (db) {
57+
try {
58+
closeQuietly(db);
59+
} catch {
60+
} finally {
61+
db = null;
62+
}
63+
}
64+
});
65+
66+
describe("checkScheduleAndEnqueue cross-project isolation (issue: Pi running on opencode-xtra)", () => {
67+
it("with ownProjectIdentity, only enqueues that project even when others are eligible", () => {
68+
db = createTestDb();
69+
seedActiveMemoryFor(db, "git:my-repo");
70+
seedActiveMemoryFor(db, "git:not-mine-1");
71+
seedActiveMemoryFor(db, "git:not-mine-2");
72+
73+
const enqueued = checkScheduleAndEnqueue(db, withinScheduleWindow(), "git:my-repo");
74+
expect(enqueued).toBe(1);
75+
76+
const queued = db
77+
.prepare<[], { project_path: string }>(
78+
"SELECT project_path FROM dream_queue ORDER BY id",
79+
)
80+
.all();
81+
expect(queued.map((row) => row.project_path)).toEqual(["git:my-repo"]);
82+
});
83+
84+
it("with ownProjectIdentity not eligible, enqueues nothing (even with eligible peers)", () => {
85+
db = createTestDb();
86+
seedActiveMemoryFor(db, "git:not-mine");
87+
88+
// Mark `my-repo` as recently dreamed so it isn't eligible.
89+
// (And it has no memories anyway.) Filter still removes the
90+
// ineligible-self case cleanly.
91+
const enqueued = checkScheduleAndEnqueue(db, withinScheduleWindow(), "git:my-repo");
92+
expect(enqueued).toBe(0);
93+
94+
const queued = db
95+
.prepare<[], { count: number }>("SELECT COUNT(*) AS count FROM dream_queue")
96+
.get();
97+
expect(queued?.count).toBe(0);
98+
});
99+
100+
it("legacy behavior preserved when ownProjectIdentity is omitted", () => {
101+
db = createTestDb();
102+
seedActiveMemoryFor(db, "git:repo-a");
103+
seedActiveMemoryFor(db, "git:repo-b");
104+
105+
const enqueued = checkScheduleAndEnqueue(db, withinScheduleWindow());
106+
expect(enqueued).toBe(2);
107+
108+
const queued = db
109+
.prepare<[], { project_path: string }>(
110+
"SELECT project_path FROM dream_queue ORDER BY project_path",
111+
)
112+
.all();
113+
expect(queued.map((row) => row.project_path)).toEqual(["git:repo-a", "git:repo-b"]);
114+
});
115+
116+
it("respects schedule window even when own project is eligible", () => {
117+
db = createTestDb();
118+
seedActiveMemoryFor(db, "git:my-repo");
119+
120+
// 02:00-03:00 window — almost certainly outside whatever wall-clock
121+
// time the CI runs at, so this asserts the schedule gate is honored
122+
// BEFORE the project filter. (If the test happens to run between
123+
// 02:00 and 03:00 local, this assertion will be flaky — accept that
124+
// tradeoff for not mocking Date in this lightweight test.)
125+
const now = new Date();
126+
const windowMatchesNow = now.getHours() === 2;
127+
if (windowMatchesNow) {
128+
// Skip body but don't fail — log so flakes are diagnosable.
129+
expect(true).toBe(true);
130+
return;
131+
}
132+
133+
const enqueued = checkScheduleAndEnqueue(db, "02:00-03:00", "git:my-repo");
134+
expect(enqueued).toBe(0);
135+
136+
// Suppress unused-variable lint: we use lastDreamAt only via setDreamState above.
137+
void setDreamState;
138+
});
139+
});

packages/plugin/src/features/magic-context/dreamer/scheduler.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,24 @@ export function findProjectsNeedingDream(db: Database): string[] {
114114
* Check schedule and enqueue eligible projects.
115115
* Called periodically from the hook layer (debounced to once per hour).
116116
* Returns the number of projects enqueued.
117+
*
118+
* @param ownProjectIdentity - When provided, restricts enqueue to this project.
119+
* Each running OpenCode/Pi process registers exactly one project, so it
120+
* must only enqueue work for THAT project — otherwise a process running for
121+
* project A would enqueue dream entries for projects B, C, D... that this
122+
* host can't actually drain (it has the wrong client + the wrong
123+
* subagent-runner directory). Without the filter, a Pi process running for
124+
* `opencode-anthropic-auth` ends up trying to spawn `pi --print` for
125+
* `opencode-xtra` (a project Pi was never opened in), failing every cycle.
126+
*
127+
* When undefined, the legacy "enqueue everything that needs a dream"
128+
* behavior is preserved for tests and any future single-host caller.
117129
*/
118-
export function checkScheduleAndEnqueue(db: Database, schedule: string): number {
130+
export function checkScheduleAndEnqueue(
131+
db: Database,
132+
schedule: string,
133+
ownProjectIdentity?: string,
134+
): number {
119135
if (!isInScheduleWindow(schedule)) {
120136
return 0;
121137
}
@@ -129,8 +145,16 @@ export function checkScheduleAndEnqueue(db: Database, schedule: string): number
129145
return 0;
130146
}
131147

148+
// Filter to just THIS host's project when an identity was passed in.
149+
// findProjectsNeedingDream returns every project with active memories or
150+
// pending smart notes across the whole shared DB; without the filter, a
151+
// single host would try to enqueue work for projects it doesn't own.
152+
const eligible = ownProjectIdentity
153+
? projects.filter((id) => id === ownProjectIdentity)
154+
: projects;
155+
132156
let enqueued = 0;
133-
for (const projectIdentity of projects) {
157+
for (const projectIdentity of eligible) {
134158
const entry = enqueueDream(db, projectIdentity, "scheduled");
135159
if (entry) {
136160
log(`[dreamer] enqueued project for scheduled dream: ${projectIdentity}`);

0 commit comments

Comments
 (0)