Skip to content

Commit 7cfa697

Browse files
IM.codesclaude
andcommitted
feat: event-driven idle detection for P2P hops instead of pure polling
P2P waitForIdle and dispatchHop now use dual strategy: - Hook event callback (notifySessionIdle) fires immediately on idle - Polling fallback still runs as safety net - waitForIdle does quick initial check then waits for event - dispatchHop polls for file growth, then races event vs poll for idle This eliminates the ~3s polling delay between hops. When the hook fires, the next hop starts instantly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a053458 commit 7cfa697

File tree

3 files changed

+102
-32
lines changed

3 files changed

+102
-32
lines changed

src/daemon/lifecycle.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { detectMemoryBackend } from '../memory/detector.js';
55
import { ServerLink } from './server-link.js';
66
import { handleWebCommand, setRouterContext } from './command-handler.js';
77
import { initFileTransfer, startCleanupTimer } from './file-transfer-handler.js';
8+
import { notifySessionIdle } from './p2p-orchestrator.js';
89
import { timelineEmitter } from './timeline-emitter.js';
910
import { timelineStore } from './timeline-store.js';
1011
import { startHookServer } from './hook-server.js';
@@ -279,6 +280,7 @@ export async function startup(): Promise<DaemonContext> {
279280
const record = listSessions().find((s) => s.name === payload.session);
280281
const projectName = record?.projectName ?? payload.session;
281282
if (payload.event === 'idle') {
283+
notifySessionIdle(payload.session);
282284
serverLink.send({ type: 'session.idle', session: payload.session, project: projectName, agentType: payload.agentType });
283285
} else if (payload.event === 'notification') {
284286
serverLink.send({ type: 'session.notification', session: payload.session, project: projectName, title: payload.title, message: payload.message });

src/daemon/p2p-orchestrator.ts

Lines changed: 96 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,52 @@ let IDLE_POLL_MS = 3_000;
7373
/** Override poll interval for tests. */
7474
export function _setIdlePollMs(ms: number): void { IDLE_POLL_MS = ms; }
7575

76+
// ── Idle event registry (callback-driven, no polling) ─────────────────────
77+
78+
type IdleResolver = () => void;
79+
const idleWaiters = new Map<string, Set<IdleResolver>>();
80+
81+
/**
82+
* Called by lifecycle hook when a session becomes idle.
83+
* Resolves any P2P waiters for that session immediately.
84+
*/
85+
export function notifySessionIdle(sessionName: string): void {
86+
const waiters = idleWaiters.get(sessionName);
87+
if (waiters && waiters.size > 0) {
88+
logger.info({ session: sessionName, waiters: waiters.size }, 'P2P: idle event received, resolving waiters');
89+
for (const resolve of waiters) resolve();
90+
waiters.clear();
91+
}
92+
}
93+
94+
function waitForIdleEvent(session: string, timeoutMs: number): Promise<boolean> {
95+
return new Promise((resolve) => {
96+
let resolved = false;
97+
const timer = setTimeout(() => {
98+
if (!resolved) { resolved = true; cleanup(); resolve(false); }
99+
}, timeoutMs);
100+
101+
const resolver: IdleResolver = () => {
102+
if (!resolved) { resolved = true; clearTimeout(timer); cleanup(); resolve(true); }
103+
};
104+
105+
function cleanup() {
106+
const set = idleWaiters.get(session);
107+
if (set) {
108+
set.delete(resolver);
109+
if (set.size === 0) idleWaiters.delete(session);
110+
}
111+
}
112+
113+
let set = idleWaiters.get(session);
114+
if (!set) {
115+
set = new Set();
116+
idleWaiters.set(session, set);
117+
}
118+
set.add(resolver);
119+
});
120+
}
121+
76122
// ── Start a P2P run ───────────────────────────────────────────────────────
77123

78124
export async function startP2pRun(
@@ -271,34 +317,49 @@ async function dispatchHop(run: P2pRun, session: string, prompt: string, serverL
271317
}
272318

273319
// Wait for: file grows + agent idle
320+
// Uses dual strategy: poll for file growth + idle, AND listen for idle hook event.
321+
// Whichever fires first wins.
274322
const deadline = Date.now() + run.timeoutMs;
275323
let fileGrew = false;
324+
let idleEventReceived = false;
325+
326+
// Start listening for idle event immediately
327+
const idlePromise = waitForIdleEvent(session, run.timeoutMs).then((ok) => {
328+
idleEventReceived = ok;
329+
});
276330

277331
while (Date.now() < deadline) {
278332
if (run._cancelled) return;
279333
await sleep(IDLE_POLL_MS);
280334
if (run._cancelled) return;
281335

282336
// Check file growth
283-
try {
284-
const currentSize = (await stat(run.contextFilePath)).size;
285-
if (currentSize > sizeBefore) {
286-
fileGrew = true;
287-
if (run.status === 'dispatched') {
288-
transition(run, 'running', serverLink);
337+
if (!fileGrew) {
338+
try {
339+
const currentSize = (await stat(run.contextFilePath)).size;
340+
if (currentSize > sizeBefore) {
341+
fileGrew = true;
342+
if (run.status === 'dispatched') {
343+
transition(run, 'running', serverLink);
344+
}
289345
}
290-
}
291-
} catch { /* ignore */ }
346+
} catch { /* ignore */ }
347+
}
292348

293-
// Check agent idle
349+
// Check idle — either via hook event or poll
294350
if (fileGrew) {
351+
if (idleEventReceived) {
352+
if (run.remainingTargets.length > 0 || session !== run.initiatorSession) {
353+
transition(run, 'awaiting_next_hop', serverLink);
354+
}
355+
return;
356+
}
357+
// Fallback: poll detectStatus
295358
try {
296359
const lines = await capturePane(session);
297360
const record = getSession(session);
298361
const agentType = (record?.agentType ?? 'claude-code') as import('../agent/detect.js').AgentType;
299-
const status = detectStatus(lines, agentType);
300-
if (status === 'idle') {
301-
// Hop complete
362+
if (detectStatus(lines, agentType) === 'idle') {
302363
if (run.remainingTargets.length > 0 || session !== run.initiatorSession) {
303364
transition(run, 'awaiting_next_hop', serverLink);
304365
}
@@ -308,6 +369,9 @@ async function dispatchHop(run: P2pRun, session: string, prompt: string, serverL
308369
}
309370
}
310371

372+
// Cleanup: the idle event listener will auto-timeout
373+
void idlePromise;
374+
311375
// Timeout
312376
if (!run._cancelled) {
313377
failRun(run, 'timed_out', `Hop timed out after ${run.timeoutMs}ms`, serverLink);
@@ -318,28 +382,28 @@ async function dispatchHop(run: P2pRun, session: string, prompt: string, serverL
318382

319383
async function waitForIdle(run: P2pRun, session: string, serverLink: ServerLink | null): Promise<void> {
320384
logger.info({ runId: run.id, session }, 'P2P: waiting for target session to become idle');
321-
const deadline = Date.now() + run.timeoutMs;
322-
let pollCount = 0;
323-
while (Date.now() < deadline) {
324-
if (run._cancelled) return;
325-
try {
326-
const lines = await capturePane(session);
327-
const record = getSession(session);
328-
const agentType = (record?.agentType ?? 'claude-code') as import('../agent/detect.js').AgentType;
329-
const status = detectStatus(lines, agentType);
330-
if (status === 'idle') {
331-
logger.info({ runId: run.id, session, pollCount }, 'P2P: target session is idle, proceeding');
332-
return;
333-
}
334-
if (pollCount % 10 === 0) {
335-
logger.debug({ runId: run.id, session, status, pollCount }, 'P2P: target not idle yet');
336-
}
337-
} catch (err) {
338-
logger.debug({ runId: run.id, session, err: String(err) }, 'P2P: capturePane/detectStatus failed');
385+
386+
// Quick check: already idle?
387+
try {
388+
const lines = await capturePane(session);
389+
const record = getSession(session);
390+
const agentType = (record?.agentType ?? 'claude-code') as import('../agent/detect.js').AgentType;
391+
if (detectStatus(lines, agentType) === 'idle') {
392+
logger.info({ runId: run.id, session }, 'P2P: target already idle, proceeding immediately');
393+
return;
339394
}
340-
pollCount++;
341-
await sleep(IDLE_POLL_MS);
395+
} catch { /* proceed to wait */ }
396+
397+
// Event-driven wait: resolve on idle hook callback or timeout
398+
const gotIdle = await waitForIdleEvent(session, run.timeoutMs);
399+
if (run._cancelled) return;
400+
401+
if (gotIdle) {
402+
logger.info({ runId: run.id, session }, 'P2P: target idle (via hook event), proceeding');
403+
return;
342404
}
405+
406+
// Timeout
343407
if (!run._cancelled) {
344408
failRun(run, 'timed_out', `Target ${session} never became idle after ${run.timeoutMs}ms`, serverLink);
345409
}

test/daemon/p2p-orchestrator.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import {
5555
_setIdlePollMs,
5656
type P2pRun,
5757
type P2pRunStatus,
58+
notifySessionIdle,
5859
} from '../../src/daemon/p2p-orchestrator.js';
5960
import { getP2pMode, BUILT_IN_MODES } from '../../src/shared/p2p-modes.js';
6061

@@ -127,13 +128,16 @@ beforeEach(() => {
127128
detectStatusMock.mockReturnValue('idle');
128129
capturePaneMock.mockResolvedValue(['$']);
129130
// When sendKeys is called, simulate the agent writing to the context file
131+
// then firing an idle hook after a short delay
130132
sendKeysDelayedEnterMock.mockImplementation(async (session: string, prompt: string) => {
131133
// Extract the context file path from the prompt and append a section
132134
const pathMatch = prompt.match(/\/tmp\/imcodes-p2p\/[^\s]+\.md/);
133135
if (pathMatch) {
134136
const { appendFile } = await import('node:fs/promises');
135137
await appendFile(pathMatch[0], `\n## Output from ${session}\n\nSome analysis.\n`);
136138
}
139+
// Simulate idle hook firing after agent finishes (small delay for file poll to detect growth)
140+
setTimeout(() => notifySessionIdle(session), 150);
137141
});
138142
getSessionMock.mockReturnValue({ agentType: 'claude-code', projectDir: '/tmp/proj' });
139143
});

0 commit comments

Comments
 (0)