Skip to content

Commit 5611a63

Browse files
author
IM.codes
committed
Fix transport queued message handling
1 parent 64e090d commit 5611a63

File tree

10 files changed

+299
-20
lines changed

10 files changed

+299
-20
lines changed

src/agent/transport-session-runtime.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,6 @@ export class TransportSessionRuntime implements SessionRuntime {
155155
if (!this._providerSessionId) {
156156
throw new Error('TransportSessionRuntime not initialized — call initialize() first');
157157
}
158-
// Clear pending — user cancelled, they don't want queued messages either
159-
this._pendingMessages = [];
160158
if (!this.provider.cancel) return;
161159
await this.provider.cancel(this._providerSessionId);
162160
}

src/daemon/command-handler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1353,14 +1353,16 @@ async function handleSend(cmd: Record<string, unknown>, serverLink: ServerLink):
13531353
try { serverLink.send({ type: 'command.ack', commandId: effectiveId, status: isLegacy ? 'accepted_legacy' : 'accepted', session: sessionName }); } catch {}
13541354
return;
13551355
}
1356-
timelineEmitter.emit(sessionName, 'user.message', { text });
13571356
if (record?.agentType === 'qwen' && record.qwenAuthType === 'qwen-oauth') {
13581357
recordQwenOAuthRequest();
13591358
refreshQwenQuotaUsageLabels(serverLink);
13601359
}
13611360
// send() is synchronous: dispatches immediately if idle, queues if busy.
13621361
// Status changes come from transport runtime's onStatusChange callback.
13631362
const result = transportRuntime.send(text);
1363+
if (result === 'sent') {
1364+
timelineEmitter.emit(sessionName, 'user.message', { text });
1365+
}
13641366
if (result === 'queued') {
13651367
timelineEmitter.emit(sessionName, 'session.state', { state: 'queued', pendingCount: transportRuntime.pendingCount }, { source: 'daemon', confidence: 'high' });
13661368
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest';
2+
3+
const { getSessionMock, getTransportRuntimeMock, emitMock } = vi.hoisted(() => ({
4+
getSessionMock: vi.fn(),
5+
getTransportRuntimeMock: vi.fn(),
6+
emitMock: vi.fn(),
7+
}));
8+
9+
vi.mock('../../src/store/session-store.js', () => ({
10+
listSessions: vi.fn(() => []),
11+
getSession: getSessionMock,
12+
upsertSession: vi.fn(),
13+
removeSession: vi.fn(),
14+
updateSessionState: vi.fn(),
15+
}));
16+
17+
vi.mock('../../src/agent/session-manager.js', () => ({
18+
startProject: vi.fn(),
19+
stopProject: vi.fn(),
20+
teardownProject: vi.fn(),
21+
getTransportRuntime: getTransportRuntimeMock,
22+
launchTransportSession: vi.fn(),
23+
isProviderSessionBound: vi.fn(() => false),
24+
persistSessionRecord: vi.fn(),
25+
relaunchSessionWithSettings: vi.fn(),
26+
}));
27+
28+
vi.mock('../../src/agent/tmux.js', () => ({
29+
sendKeys: vi.fn(),
30+
sendKeysDelayedEnter: vi.fn(),
31+
sendRawInput: vi.fn(),
32+
resizeSession: vi.fn(),
33+
sendKey: vi.fn(),
34+
getPaneStartCommand: vi.fn(),
35+
}));
36+
37+
vi.mock('../../src/router/message-router.js', () => ({
38+
routeMessage: vi.fn(),
39+
}));
40+
41+
vi.mock('../../src/daemon/terminal-streamer.js', () => ({
42+
terminalStreamer: {
43+
subscribe: vi.fn(),
44+
unsubscribe: vi.fn(),
45+
start: vi.fn(),
46+
stop: vi.fn(),
47+
},
48+
}));
49+
50+
vi.mock('../../src/daemon/timeline-emitter.js', () => ({
51+
timelineEmitter: {
52+
emit: emitMock,
53+
on: vi.fn(() => () => {}),
54+
off: vi.fn(),
55+
epoch: 0,
56+
replay: vi.fn(() => ({ events: [], truncated: false })),
57+
},
58+
}));
59+
60+
vi.mock('../../src/daemon/timeline-store.js', () => ({
61+
timelineStore: {
62+
append: vi.fn(),
63+
read: vi.fn(() => []),
64+
clear: vi.fn(),
65+
},
66+
}));
67+
68+
vi.mock('../../src/daemon/subsession-manager.js', () => ({
69+
startSubSession: vi.fn(),
70+
stopSubSession: vi.fn(),
71+
rebuildSubSessions: vi.fn(),
72+
detectShells: vi.fn().mockResolvedValue([]),
73+
readSubSessionResponse: vi.fn(),
74+
subSessionName: (id: string) => `deck_sub_${id}`,
75+
}));
76+
77+
vi.mock('../../src/daemon/p2p-orchestrator.js', () => ({
78+
startP2pRun: vi.fn(),
79+
cancelP2pRun: vi.fn(),
80+
getP2pRun: vi.fn(() => undefined),
81+
listP2pRuns: vi.fn(() => []),
82+
serializeP2pRun: vi.fn(),
83+
}));
84+
85+
vi.mock('../../src/daemon/repo-handler.js', () => ({
86+
handleRepoCommand: vi.fn(),
87+
}));
88+
89+
vi.mock('../../src/daemon/file-transfer-handler.js', () => ({
90+
handleFileUpload: vi.fn(),
91+
handleFileDownload: vi.fn(),
92+
createProjectFileHandle: vi.fn(),
93+
lookupAttachment: vi.fn(() => undefined),
94+
}));
95+
96+
vi.mock('../../src/daemon/preview-relay.js', () => ({
97+
handlePreviewCommand: vi.fn(),
98+
}));
99+
100+
vi.mock('../../src/daemon/provider-sessions.js', () => ({
101+
listProviderSessions: vi.fn(() => []),
102+
}));
103+
104+
vi.mock('../../src/util/logger.js', () => ({
105+
default: {
106+
info: vi.fn(),
107+
warn: vi.fn(),
108+
error: vi.fn(),
109+
debug: vi.fn(),
110+
},
111+
}));
112+
113+
vi.mock('../../src/util/imc-dir.js', () => ({
114+
ensureImcDir: vi.fn().mockResolvedValue('/tmp/imc'),
115+
imcSubDir: vi.fn((dir: string, sub: string) => `${dir}/.imc/${sub}`),
116+
}));
117+
118+
import { handleWebCommand } from '../../src/daemon/command-handler.js';
119+
120+
const flushAsync = () => new Promise<void>((resolve) => setTimeout(resolve, 0));
121+
122+
describe('handleWebCommand transport queue behavior', () => {
123+
const serverLink = {
124+
send: vi.fn(),
125+
sendBinary: vi.fn(),
126+
sendTimelineEvent: vi.fn(),
127+
daemonVersion: '0.1.0',
128+
};
129+
130+
beforeEach(() => {
131+
vi.clearAllMocks();
132+
getSessionMock.mockReturnValue({
133+
name: 'deck_transport_brain',
134+
projectName: 'transport',
135+
role: 'brain',
136+
agentType: 'claude-code-sdk',
137+
runtimeType: 'transport',
138+
state: 'running',
139+
});
140+
});
141+
142+
it('does not emit a user.message for queued transport sends', async () => {
143+
getTransportRuntimeMock.mockReturnValue({
144+
send: vi.fn(() => 'queued'),
145+
pendingCount: 2,
146+
});
147+
148+
handleWebCommand({ type: 'session.send', session: 'deck_transport_brain', text: 'queued msg', commandId: 'cmd-queued' }, serverLink as any);
149+
await flushAsync();
150+
151+
expect(emitMock).not.toHaveBeenCalledWith('deck_transport_brain', 'user.message', { text: 'queued msg' });
152+
expect(emitMock).toHaveBeenCalledWith(
153+
'deck_transport_brain',
154+
'session.state',
155+
{ state: 'queued', pendingCount: 2 },
156+
expect.any(Object),
157+
);
158+
expect(emitMock).toHaveBeenCalledWith('deck_transport_brain', 'command.ack', { commandId: 'cmd-queued', status: 'accepted' });
159+
});
160+
161+
it('emits a user.message immediately for dispatched transport sends', async () => {
162+
getTransportRuntimeMock.mockReturnValue({
163+
send: vi.fn(() => 'sent'),
164+
pendingCount: 0,
165+
});
166+
167+
handleWebCommand({ type: 'session.send', session: 'deck_transport_brain', text: 'sent msg', commandId: 'cmd-sent' }, serverLink as any);
168+
await flushAsync();
169+
170+
expect(emitMock).toHaveBeenCalledWith('deck_transport_brain', 'user.message', { text: 'sent msg' });
171+
expect(emitMock).not.toHaveBeenCalledWith(
172+
'deck_transport_brain',
173+
'session.state',
174+
expect.objectContaining({ state: 'queued' }),
175+
expect.anything(),
176+
);
177+
});
178+
});

test/daemon/transport-session-runtime.test.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,27 @@ describe('TransportSessionRuntime', () => {
9898
expect(runtime.sending).toBe(false);
9999
});
100100

101-
it('cancel() delegates to provider.cancel and clears pending', () => {
101+
it('cancel() delegates to provider.cancel and preserves pending', () => {
102102
runtime.send('first');
103103
runtime.send('queued1');
104104
runtime.send('queued2');
105105
expect(runtime.pendingCount).toBe(2);
106106

107107
runtime.cancel();
108108
expect(mock.provider.cancel).toHaveBeenCalledWith('sess-1');
109+
expect(runtime.pendingCount).toBe(2);
110+
});
111+
112+
it('cancelled turns drain pending messages into the next turn', () => {
113+
runtime.send('first');
114+
runtime.send('queued1');
115+
runtime.send('queued2');
116+
117+
runtime.cancel();
118+
mock.fireError('sess-1', { code: 'CANCELLED', message: 'cancelled', recoverable: true });
119+
120+
expect(mock.provider.send).toHaveBeenCalledTimes(2);
121+
expect(mock.provider.send).toHaveBeenNthCalledWith(2, 'sess-1', 'queued1\n\nqueued2', undefined, undefined);
109122
expect(runtime.pendingCount).toBe(0);
110123
});
111124

test/daemon/transport-status-lifecycle.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,18 +200,18 @@ describe('batched queuing', () => {
200200
expect(runtime.getStatus()).toBe('thinking');
201201
});
202202

203-
it('on cancel, pending messages are cleared (not drained)', () => {
203+
it('on cancel, pending messages are drained into the next turn', () => {
204204
runtime.send('first');
205-
runtime.send('will-be-dropped');
206-
runtime.send('also-dropped');
205+
runtime.send('still-queued');
206+
runtime.send('send-after-stop');
207207

208208
runtime.cancel();
209209
mock.fireCancelled('sess-1');
210210

211-
// Cancel clears pending → no drain, no second send
212-
expect(mock.provider.send).toHaveBeenCalledTimes(1);
211+
expect(mock.provider.send).toHaveBeenCalledTimes(2);
212+
expect(mock.provider.send).toHaveBeenNthCalledWith(2, 'sess-1', 'still-queued\n\nsend-after-stop', undefined, undefined);
213213
expect(runtime.pendingCount).toBe(0);
214-
expect(runtime.getStatus()).toBe('idle');
214+
expect(runtime.getStatus()).toBe('thinking');
215215
});
216216

217217
it('multiple turns with queuing: correct history order', () => {

web/src/components/SessionControls.tsx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,9 @@ export function SessionControls({ ws, activeSession, inputRef, onAfterAction, on
837837

838838
const finalizeSend = useCallback((payload: PendingSendPayload, options?: { clearComposer?: boolean }) => {
839839
if (!ws || !activeSession) return;
840-
// Transport sessions queue messages internally — no frontend notice needed
840+
if (activeSession.runtimeType === 'transport' && activeSession.state === 'running') {
841+
setQueuedNoticeVisible(true);
842+
}
841843
quickData.recordHistory(payload.text, activeSession.name);
842844
try {
843845
ws.sendSessionCommand('send', { sessionName: activeSession.name, text: payload.text, ...payload.extra });

web/src/components/SessionPane.tsx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,9 @@ export function SessionPane({
270270
inputRef={inputRef}
271271
onAfterAction={onAfterAction}
272272
onSend={(_name, text) => {
273-
addOptimisticUserMessage(text);
273+
if (session.runtimeType !== 'transport') {
274+
addOptimisticUserMessage(text);
275+
}
274276
scrollToBottom();
275277
}}
276278
onStopProject={onStopProject}

web/src/styles.css

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,22 @@ body {
124124
.p2p-settings-icon { font-size: 12px; line-height: 1; }
125125
.p2p-settings-label { font-size: 11px; font-weight: 600; letter-spacing: 0.01em; }
126126
.controls { padding: 6px 8px; display: flex; gap: 6px; align-items: center; }
127-
.controls-queued-hint { padding: 0 12px 8px; color: #fbbf24; font-size: 12px; line-height: 1.35; }
127+
.controls-queued-hint {
128+
position: absolute;
129+
left: 12px;
130+
right: 12px;
131+
bottom: calc(100% + 6px);
132+
padding: 8px 10px;
133+
color: #fbbf24;
134+
font-size: 12px;
135+
line-height: 1.35;
136+
background: rgba(15, 23, 42, 0.92);
137+
border: 1px solid rgba(251, 191, 36, 0.35);
138+
border-radius: 10px;
139+
box-shadow: 0 8px 24px rgba(0, 0, 0, 0.35);
140+
pointer-events: none;
141+
z-index: 3;
142+
}
128143
.attachment-badges { display: flex; flex-wrap: wrap; gap: 4px; padding: 4px 2px 2px; }
129144
.attachment-badge { display: inline-flex; align-items: center; gap: 4px; background: #1e293b; border: 1px solid #334155; border-radius: 12px; padding: 2px 8px 2px 6px; font-size: 12px; color: #94a3b8; max-width: 200px; }
130145
.attachment-badge-icon { font-size: 11px; flex-shrink: 0; }

web/test/components/SessionControls.test.tsx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ describe('SessionControls', () => {
688688
});
689689

690690

691-
it('sends message to running transport session without blocking (queuing is daemon-side)', () => {
691+
it('shows a bottom queued notice when sending to a running transport session', () => {
692692
const ws = makeWs();
693693
const runningSession = makeSession({
694694
name: 'qwen-session',
@@ -714,8 +714,7 @@ describe('SessionControls', () => {
714714
sessionName: 'qwen-session',
715715
text: 'queued send',
716716
});
717-
// No frontend queued notice — transport runtime queues internally
718-
expect(screen.queryByText('transport_send_queued')).toBeNull();
717+
expect(screen.getByText('transport_send_queued')).toBeDefined();
719718
});
720719

721720
it('pressing Escape in a running transport input sends /stop command', () => {

0 commit comments

Comments
 (0)