Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

## [Unreleased]

### Fixed

- **`stream_end` no longer clears an active live assistant row before the settled transcript is ready.** When `stream_end` arrives while live text, Worklog, Thinking, or tool activity is still displayed, the browser now waits for the persisted session to leave its active/pending state before replacing the live DOM. This avoids a terminal-settle race where the assistant area could briefly go blank or only recover after switching sessions. (refs #3877)

## [v0.51.346] — 2026-06-09 — Release LJ (PWA notification controls)

### Added
Expand Down
109 changes: 93 additions & 16 deletions static/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,72 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
function _closeSource(source){
closeLiveStream(activeSid, streamId, source);
}
function _clearStreamEndRecovery(){
if(_streamEndRecoveryTimer){
clearTimeout(_streamEndRecoveryTimer);
_streamEndRecoveryTimer=null;
}
_pendingStreamEndRecovery=false;
_streamEndRecoveryAttempts=0;
}
function _liveStreamEndScenePresent(){
if(assistantText||assistantRow) return true;
if(String(liveReasoningText||reasoningText||'').trim()) return true;
const inflight=INFLIGHT[activeSid];
if(inflight&&Array.isArray(inflight.toolCalls)&&inflight.toolCalls.length) return true;
if(!_isActiveSession()||typeof document==='undefined') return false;
const turn=$('liveAssistantTurn');
return !!(turn&&turn.querySelector(
'[data-live-assistant="1"],'+
'.live-worklog[data-live-worklog-shell="1"],'+
'.tool-card-row[data-live-tid],'+
'.agent-activity-thinking[data-thinking-active="1"]'
));
}
function _scheduleStreamEndRecovery(source, delay=180){
if(_streamEndRecoveryTimer) clearTimeout(_streamEndRecoveryTimer);
_pendingStreamEndRecovery=true;
_streamEndRecoveryTimer=setTimeout(()=>{void _runStreamEndRecovery(source);},delay);
}
function _finalizeStreamEndFallback(source){
_clearStreamEndRecovery();
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
_terminalStateReached=true;
_streamFinalized=true;
_cancelAnimationFramePendingStreamRender();
_streamFadeCleanupReduceMotionListener();
_smdEndParser();
if(typeof finalizeThinkingCard==='function') finalizeThinkingCard();
_clearOwnerInflightState();
_clearApprovalForOwner();
_clearClarifyForOwner('terminal');
if(_isActiveSession()){
S.activeStreamId=null;
clearLiveToolCards();if(!assistantText)removeThinking();
renderMessages({preserveScroll:true});
}
renderSessionList();
_setActivePaneIdleIfOwner();
_closeSource(source);
Comment on lines +1619 to +1638

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _finalizeStreamEndFallback missing stream-ownership guard before clearing global session state

When _restoreSettledSession returns 'stale' (i.e. _isActiveSession() is true but S.activeStreamId !== streamId), a newer stream has already taken over the same session. _finalizeStreamEndFallback is called regardless, and its if(_isActiveSession()) branch unconditionally executes S.activeStreamId=null, clearLiveToolCards(), and renderMessages() — clobbering the live UI of that new stream.

The concrete failure path: stream_end arrives while live content is present → 180 ms deferred recovery is scheduled → within that window the user submits a new message on the same session (S.activeStreamId becomes the new stream's ID) → recovery fires → _restoreSettledSession sees S.activeStreamId !== streamId and returns 'stale'_finalizeStreamEndFallback nullifies S.activeStreamId and re-renders from stale persisted state, erasing the new stream's live content.

Compare with _handleStreamError (line 3786), which has an explicit early-return guard for exactly this condition: if(_isActiveSession() && S.activeStreamId!==streamId){ _closeSource(source); return; }. The same guard — or at minimum changing if(_isActiveSession()) to if(_isActiveSession() && S.activeStreamId===streamId) — is needed in _finalizeStreamEndFallback before the S.activeStreamId=null / clearLiveToolCards() / renderMessages() block. The same gap exists in the non-deferred direct stream_end path, which can also reach _finalizeStreamEndFallback after an async API roundtrip during which S.activeStreamId may have advanced.

}
async function _runStreamEndRecovery(source){
if(_streamFinalized || _terminalStateReached || !_pendingStreamEndRecovery){
_clearStreamEndRecovery();
return;
}
_streamEndRecoveryTimer=null;
const status=await _restoreSettledSession(source,{status:true});
if(status==='restored'){
_clearStreamEndRecovery();
return;
}
if(status==='active'&&_streamEndRecoveryAttempts<10){
_streamEndRecoveryAttempts+=1;
_scheduleStreamEndRecovery(source,200);
return;
}
_finalizeStreamEndFallback(source);
}
function _stripLiveVisibleAssistantEchoFromThinking(text, snippets){
let out=String(text||'');
(Array.isArray(snippets)?snippets:[]).forEach(snippet=>{
Expand Down Expand Up @@ -1739,6 +1805,9 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
let _reconnectAttempted=false;
let _terminalStateReached=false;
let _deferredStreamRecoveryBound=false;
let _pendingStreamEndRecovery=false;
let _streamEndRecoveryTimer=null;
let _streamEndRecoveryAttempts=0;

function _pageHiddenForStreamError(){
return (typeof document!=='undefined'&&document.visibilityState==='hidden')||
Expand Down Expand Up @@ -3042,6 +3111,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){

source.addEventListener('done',e=>{
if(_streamFinalized) return;
_clearStreamEndRecovery();
if(_bailOutOfTerminalEventsFromStaleStream(source)) return;
// Set _streamFinalized IMMEDIATELY — before any fade delay. Without this,
// a stream_end event arriving during the fade window sees
Expand Down Expand Up @@ -3266,27 +3336,30 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
_closeSource(source);
return;
}
_clearStreamEndRecovery();
if(_bailOutOfTerminalEventsFromStaleStream(source)) return;
_terminalStateReached=true;
try{
const d=JSON.parse(e.data||'{}');
if((d.session_id||activeSid)!==activeSid) return;
}catch(_){}
if(S.activeStreamId===streamId && _liveStreamEndScenePresent()){
_scheduleStreamEndRecovery(source);
return;
}
// Some replay/journal paths can deliver stream_end without a preceding
// done event. In that case closing the EventSource is not enough: the
// live DOM/inflight state remains projected and can duplicate Thinking or
// assistant content until a later session switch. Settle from the persisted
// session before closing so the pane converges on canonical state.
if(await _restoreSettledSession(source)){
const status=await _restoreSettledSession(source,{status:true});
if(status==='restored'){
return;
}
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
_streamFinalized=true;
_cancelAnimationFramePendingStreamRender();
_streamFadeCleanupReduceMotionListener();
_smdEndParser();
if(typeof finalizeThinkingCard==='function') finalizeThinkingCard();
_closeSource(source);
if(status==='active'&&S.activeStreamId===streamId){
_scheduleStreamEndRecovery(source,200);
return;
}
_finalizeStreamEndFallback(source);
});

source.addEventListener('pending_steer_leftover',e=>{
Expand Down Expand Up @@ -3398,6 +3471,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){

source.addEventListener('apperror',e=>{
if(_bailOutOfTerminalEventsFromStaleStream(source)) return;
_clearStreamEndRecovery();
_terminalStateReached=true;
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
_streamFinalized=true;
Expand Down Expand Up @@ -3541,6 +3615,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){

source.addEventListener('cancel',e=>{
if(_bailOutOfTerminalEventsFromStaleStream(source)) return;
_clearStreamEndRecovery();
_terminalStateReached=true;
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
_streamFinalized=true;
Expand Down Expand Up @@ -3631,19 +3706,20 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
window._carryForwardEphemeralTurnFields=_carryForwardEphemeralTurnFields;
}

async function _restoreSettledSession(source){
async function _restoreSettledSession(source, options=null){
const returnStatus=!!(options&&options.status);
if(_isActiveSession() && S.activeStreamId!==streamId){
_closeSource(source);
return false;
return returnStatus?'stale':false;
}
try{
const data=await api(`/api/session?session_id=${encodeURIComponent(activeSid)}`);
// Opus #2852 race-fix: if a late `done` event ran the finalize path while
// we were awaiting the network roundtrip, bail out — done already settled.
if(_streamFinalized) return true;
if(_streamFinalized) return returnStatus?'restored':true;
const session=data&&data.session;
if(!session) return false;
if(session.active_stream_id||session.pending_user_message) return false;
if(!session) return returnStatus?'missing':false;
if(session.active_stream_id||session.pending_user_message) return returnStatus?'active':false;
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
_streamFinalized=true;
_cancelAnimationFramePendingStreamRender();
Expand Down Expand Up @@ -3701,9 +3777,9 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
if(_isActiveSession()) _queueDrainSid=activeSid;
renderSessionList();
_setActivePaneIdleIfOwner();
return true;
return returnStatus?'restored':true;
}catch(_){
return false;
return returnStatus?'error':false;
}
}

Expand All @@ -3712,6 +3788,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
_closeSource(source);
return;
}
_clearStreamEndRecovery();
// Opus review Q1: mirror done/apperror/cancel finalization so any pending rAF
// cannot fire after renderMessages() has settled the DOM with the error message.
if(_persistTimer){clearTimeout(_persistTimer);_persistTimer=null;}
Expand Down
14 changes: 10 additions & 4 deletions tests/test_1694_terminal_cleanup_ownership.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ def test_stream_end_without_done_restores_settled_session_before_closing():
never replaces the pane with the persisted transcript when done is missing.
"""
body = _event_body("stream_end")
restore_idx = body.find("_restoreSettledSession(source)")
close_idx = body.rfind("_closeSource(source)")
finalized_idx = body.find("_streamFinalized=true")
restore_idx = body.find("_restoreSettledSession(source,{status:true})")
if restore_idx == -1:
restore_idx = body.find("_restoreSettledSession(source)")
close_idx = body.find("_closeSource(source)", restore_idx)
if close_idx == -1:
close_idx = body.find("_finalizeStreamEndFallback(source)", restore_idx)
finalized_idx = body.find("_streamFinalized=true", restore_idx)
if finalized_idx == -1:
finalized_idx = body.find("_finalizeStreamEndFallback(source)", restore_idx)
assert restore_idx != -1, "stream_end handler must restore settled session when done is absent"
assert close_idx != -1, "stream_end handler must still close the owning EventSource"
assert restore_idx < close_idx, "restore must be attempted before closing the stream"
Expand All @@ -113,7 +119,7 @@ def test_settled_restore_and_error_close_only_the_event_source_owner():
restore_body = _function_body("_restoreSettledSession")
error_body = _function_body("_handleStreamError")
event_body = _event_body("error")
assert "async function _restoreSettledSession(source)" in MESSAGES_JS
assert "async function _restoreSettledSession(source, options=null)" in MESSAGES_JS
assert "function _handleStreamError(source)" in MESSAGES_JS
assert "_closeSource(source);" in restore_body
assert "_closeSource(source);" in error_body
Expand Down
10 changes: 7 additions & 3 deletions tests/test_issue2863_session_index_prime.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ def test_missing_index_starts_background_rebuild_while_preserving_first_scan(mon
assert {row["session_id"] for row in rows} == {"issue28630", "issue28631", "issue28632"}

thread = models._SESSION_INDEX_REBUILD_THREAD
assert thread is not None
thread.join(timeout=5)
assert not thread.is_alive()
# Fast runners can complete the background rebuild and clear the global
# thread slot before this assertion observes it. The invariant is that the
# first scan remains correct and the index is rebuilt, not that the transient
# thread object is still visible.
if thread is not None:
thread.join(timeout=5)
assert not thread.is_alive()

index = json.loads(models.SESSION_INDEX_FILE.read_text(encoding="utf-8"))
assert {row["session_id"] for row in index} == {"issue28630", "issue28631", "issue28632"}
Expand Down
5 changes: 3 additions & 2 deletions tests/test_issue856_active_session_read_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def test_done_path_marks_active_session_as_viewed():
def test_cancel_path_marks_active_session_as_viewed():
cancel_idx = MESSAGES_JS.find("source.addEventListener('cancel'")
assert cancel_idx != -1, "cancel handler not found in messages.js"
cancel_block = MESSAGES_JS[cancel_idx:MESSAGES_JS.find("async function _restoreSettledSession(source)", cancel_idx)]
restore_marker = "async function _restoreSettledSession(source"
cancel_block = MESSAGES_JS[cancel_idx:MESSAGES_JS.find(restore_marker, cancel_idx)]
assert "_markSessionViewed(activeSid" in cancel_block, (
"cancel handler must mark the active session as viewed after settling messages"
)


def test_restore_and_error_paths_mark_active_session_as_viewed():
restore_idx = MESSAGES_JS.find("async function _restoreSettledSession(source)")
restore_idx = MESSAGES_JS.find("async function _restoreSettledSession(source")
assert restore_idx != -1, "_restoreSettledSession(source) not found in messages.js"
restore_block = MESSAGES_JS[restore_idx:MESSAGES_JS.find("function _handleStreamError(source)", restore_idx)]
assert "const completedSid=session.session_id||activeSid;" in restore_block
Expand Down
2 changes: 1 addition & 1 deletion tests/test_issue856_background_completion_unread.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_switching_away_counts_as_background_completion():


def test_restore_settled_background_stream_marks_completion_unread():
restore_idx = MESSAGES_JS.find("async function _restoreSettledSession(source)")
restore_idx = MESSAGES_JS.find("async function _restoreSettledSession(source")
assert restore_idx != -1, "_restoreSettledSession(source) not found"
restore_block = MESSAGES_JS[restore_idx:MESSAGES_JS.find("function _handleStreamError", restore_idx)]

Expand Down
121 changes: 121 additions & 0 deletions tests/test_stream_end_recovery_gating.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Regression coverage for stream-end recovery ordering.

#3877-style recovery relies on one subtle path:
when `stream_end` arrives while the active live assistant row is still
present, cleanup should be deferred briefly to allow pending final SSE updates to
settle, then performed through the shared terminal recovery helper.
"""

from pathlib import Path


REPO_ROOT = Path(__file__).resolve().parent.parent
MESSAGES_JS = (REPO_ROOT / "static" / "messages.js").read_text(encoding="utf-8")


def _event_block(event_name: str) -> str:
marker = f"source.addEventListener('{event_name}'"
start = MESSAGES_JS.find(marker)
assert start >= 0, f"missing {event_name} listener"
brace = MESSAGES_JS.find("{", start)
assert brace >= 0, f"missing {event_name} listener body"
depth = 0
i = brace
while i < len(MESSAGES_JS):
if MESSAGES_JS[i] == "{":
depth += 1
elif MESSAGES_JS[i] == "}":
depth -= 1
if depth == 0:
return MESSAGES_JS[brace : i + 1]
i += 1
raise AssertionError(f"unclosed {event_name} listener body")


def _function_body(name: str) -> str:
marker = f"async function {name}("
start = MESSAGES_JS.find(marker)
if start < 0:
marker = f"function {name}("
start = MESSAGES_JS.find(marker)
assert start >= 0, f"missing function: {name}"
brace = MESSAGES_JS.find("{", start)
assert brace >= 0, f"missing {name} body"
depth = 0
i = brace
while i < len(MESSAGES_JS):
if MESSAGES_JS[i] == "{":
depth += 1
elif MESSAGES_JS[i] == "}":
depth -= 1
if depth == 0:
return MESSAGES_JS[brace : i + 1]
i += 1
raise AssertionError(f"unclosed function body: {name}")


def test_stream_end_defers_settlement_when_live_assistant_still_present():
body = _event_block("stream_end")
assert "if(S.activeStreamId===streamId && _liveStreamEndScenePresent())" in body, (
"stream_end should defer terminal cleanup while active live scene content is still present"
)
assert "_scheduleStreamEndRecovery(source);" in body, (
"stream_end should schedule the deferred recovery timer before returning"
)
assert "_scheduleStreamEndRecovery(source)" in body, (
"stream_end must delegate deferred cleanup to helper"
)


def test_stream_end_fallback_does_not_finalize_when_session_is_still_active():
body = _event_block("stream_end")
assert "const status=await _restoreSettledSession(source,{status:true});" in body
assert "if(status==='active'&&S.activeStreamId===streamId)" in body
assert "_scheduleStreamEndRecovery(source,200);" in body
assert "_finalizeStreamEndFallback(source);" in body


def test_stream_end_recovery_helper_retries_while_session_is_still_active():
fn = _function_body("_runStreamEndRecovery")
assert "if(_streamFinalized || _terminalStateReached || !_pendingStreamEndRecovery)" in fn
assert "_restoreSettledSession(source,{status:true})" in fn
assert "if(status==='active'&&_streamEndRecoveryAttempts<10)" in fn
assert "_scheduleStreamEndRecovery(source,200);" in fn
assert "_finalizeStreamEndFallback(source);" in fn


def test_stream_end_fallback_helper_clears_owner_state_before_closing():
fn = _function_body("_finalizeStreamEndFallback")
assert "_terminalStateReached=true;" in fn
assert "_streamFinalized=true;" in fn
assert "_clearOwnerInflightState();" in fn
assert "_clearApprovalForOwner();" in fn
assert "_clearClarifyForOwner('terminal');" in fn
assert "renderMessages({preserveScroll:true});" in fn
assert "_setActivePaneIdleIfOwner();" in fn
assert "_closeSource(source)" in fn


def test_stream_end_live_scene_detection_includes_empty_text_activity():
fn = _function_body("_liveStreamEndScenePresent")
assert "if(assistantText||assistantRow) return true;" in fn
assert "liveReasoningText||reasoningText" in fn
assert "inflight.toolCalls.length" in fn
assert "data-live-worklog-shell" in fn
assert "data-thinking-active" in fn


def test_restore_settled_session_can_report_active_pending_status():
fn = _function_body("_restoreSettledSession")
assert "async function _restoreSettledSession(source, options=null)" in MESSAGES_JS
assert "arguments[1]" not in fn
assert "const returnStatus=!!(options&&options.status);" in fn
assert "return returnStatus?'active':false;" in fn
assert "return returnStatus?'restored':true;" in fn


def test_stream_end_recovery_state_is_cleared_on_done_and_terminal_events():
assert "_clearStreamEndRecovery();" in _event_block("done")
assert "_clearStreamEndRecovery();" in _event_block("stream_end")
assert "_clearStreamEndRecovery();" in _event_block("cancel")
assert "_clearStreamEndRecovery();" in _event_block("apperror")
Loading