Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
## [v0.51.289] — 2026-06-06 — Release JE (hotfix — sidebar ReferenceError #3696 + scope-undef prevention gate)

### Fixed
- **Live stream turns show the bottom timer immediately after starting.** The
first chat-start path now places the live footer timer as soon as the backend
returns `stream_id` and `pending_started_at`, instead of only restoring it
after a session switch or reconnect. Reloading a currently running session
also recreates the live worklog shell after the transcript DOM rebuild, so the
stream no longer stays invisible until you switch away and back. Live replay
cursors now stay tied to each queued SSE frame instead of the latest global
stream event, so reconnects cannot skip queued live output. (#3401,
@franksong2702)
- **Sidebar no longer crashes with `ReferenceError: _sessionAttentionState is not defined`.** The session-attention helper was declared *inside* `renderSessionListFromCache()` and relied on function hoisting, but the top-level `_sidebarRowHasVisibleMessages` (reached via `renderSessionListFromCache` → `_partitionSidebarSessionRows`) called it bare — and hoisting is scoped to the enclosing function, so every sidebar cache-render threw and the session list went blank. `_sessionAttentionState` is now a top-level function reachable by both call sites. Regressed in #3672 (v0.51.269). (#3696)
- **Stale-stream terminal events no longer risk a `ReferenceError: source is not defined`.** `_bailOutOfTerminalEventsFromStaleStream` (declared inside `attachLiveStream`) called `_closeSource(source)` against a `source` that was not in its lexical scope — it would have thrown on the late-finalizing-stream path when the user is back in an active session. `source` is now threaded as an explicit parameter. Found by the new scope gate below during review. (#3696)

Expand Down
8 changes: 8 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ A tool card is a debug event row, not a chat message. Show icon, name, short tar

Same visual family as tool-call metadata. They should be quieter than assistant prose and should not use bright tinted full cards unless the user expands them.

Automatic compression follows a quiet live-only divider treatment rather than a
tool-card row. Use `Compressing context` for the active barrier and
`Context auto-compressed` after continuation/completion; render both as centered
non-interactive text with horizontal rules. Do not give it a caret, click
target, distinct accent color, special leading dot, or separate card identity.
Once the final answer is settled, omit the live-only compression row unless it is
needed to explain a visible recovery or error state.

### Composer

The composer is the command surface. Keep it legible and focused: modest radius, subtle border, transparent inactive chips, no theatrical hover scaling.
Expand Down
25 changes: 22 additions & 3 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4871,8 +4871,13 @@ def __init__(self):
self._lock = threading.Lock()
self._subscribers: list[queue.Queue] = []
self._offline_buffer: list[tuple[str, object]] = []
self._last_event_id: str | None = None

def subscribe(self) -> queue.Queue:
q, _snapshot = self.subscribe_with_snapshot()
return q

def subscribe_with_snapshot(self) -> tuple[queue.Queue, dict[str, object]]:
q: queue.Queue = queue.Queue()
with self._lock:
# Replay buffered events to the new subscriber INSIDE the lock so a
Expand All @@ -4882,8 +4887,12 @@ def subscribe(self) -> queue.Queue:
# is safe. Per Opus advisor on stage-292.
for item in self._offline_buffer:
q.put_nowait(item)
snapshot = {
"offline_buffered_events": len(self._offline_buffer),
"last_event_id": self._last_event_id,
}
self._subscribers.append(q)
return q
return q, snapshot

def unsubscribe(self, q: queue.Queue) -> None:
with self._lock:
Expand All @@ -4892,8 +4901,18 @@ def unsubscribe(self, q: queue.Queue) -> None:
except ValueError:
pass

def put_nowait(self, item: tuple[str, object]) -> None:
def note_last_event_id(self, event_id: str | None) -> None:
"""Record the latest journal event id without changing the queue shape."""
if not event_id:
return
with self._lock:
self._last_event_id = event_id

def put_nowait(self, item: tuple[str, object] | tuple[str, object, str | None]) -> None:
event_id = item[2] if len(item) >= 3 else None
with self._lock:
if event_id:
self._last_event_id = event_id
subscribers = list(self._subscribers)
if not subscribers:
self._offline_buffer.append(item)
Expand All @@ -4902,7 +4921,7 @@ def put_nowait(self, item: tuple[str, object]) -> None:
for q in subscribers:
q.put_nowait(item)

def diagnostic_snapshot(self) -> dict[str, int]:
def diagnostic_snapshot(self) -> dict[str, object]:
"""Return non-sensitive stream observation counters for health checks."""
with self._lock:
return {
Expand Down
9 changes: 8 additions & 1 deletion api/gateway_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _run_gateway_chat_streaming(
def put_gateway_event(event, data):
if cancel_event.is_set() and event not in ("cancel", "error", "apperror"):
return
event_id = None
if run_journal is not None:
try:
journaled = run_journal.append_sse_event(event, data)
Expand All @@ -232,8 +233,14 @@ def put_gateway_event(event, data):
STREAM_LAST_EVENT_ID[stream_id] = event_id
except Exception:
logger.debug("Failed to append gateway event %s for stream %s", event, stream_id, exc_info=True)
if event_id and hasattr(q, "note_last_event_id"):
try:
q.note_last_event_id(event_id)
except Exception:
logger.debug("Failed to note gateway event_id %s for stream %s", event_id, stream_id, exc_info=True)
try:
q.put_nowait((event, data))
queue_item = (event, data, event_id) if event_id and hasattr(q, "subscribe_with_snapshot") else (event, data)
q.put_nowait(queue_item)
except Exception:
logger.debug("Failed to put gateway event to queue")

Expand Down
149 changes: 126 additions & 23 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2987,8 +2987,8 @@ def _keep_latest_messaging_session_per_source(
get_state_db_session_messages,
get_state_db_session_summary,
merge_session_messages_append_only,
_session_message_merge_key,
_active_stream_ids,
_session_message_merge_key,
_is_empty_partial_activity_message,
_hide_from_default_sidebar,
prune_session_from_index,
Expand Down Expand Up @@ -8516,14 +8516,22 @@ def _parse_run_journal_after_seq(qs: dict, stream_id: str | None = None) -> int
return 0


def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
def _replay_run_journal(
handler,
stream_id: str,
after_seq: int | None,
*,
max_seq: int | None = None,
include_stale: bool = True,
) -> bool:
summary = find_run_summary(stream_id)
if not summary:
return False
journal = read_run_events(
str(summary.get("session_id") or ""),
stream_id,
after_seq=after_seq,
max_seq=max_seq,
)
for entry in journal.get("events") or []:
_sse_with_id(
Expand All @@ -8532,7 +8540,7 @@ def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
entry.get("payload"),
entry.get("event_id"),
)
if not summary.get("terminal"):
if include_stale and not summary.get("terminal"):
stale = stale_interrupted_event(
str(summary.get("session_id") or ""),
stream_id,
Expand All @@ -8543,6 +8551,13 @@ def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
return True


def _run_journal_same_run_seq(event_id: str | None, stream_id: str) -> int | None:
event_run_id, event_seq = _parse_run_journal_event_id(event_id)
if event_run_id != stream_id:
return None
return event_seq


def _runner_stream_cursor_from_query(qs: dict) -> str | None:
cursor = str(qs.get("cursor", [""])[0] or "").strip()
if cursor:
Expand Down Expand Up @@ -8660,26 +8675,58 @@ def _handle_sse_stream(handler, parsed):
except _CLIENT_DISCONNECT_ERRORS:
pass
return True
subscriber = stream.subscribe() if hasattr(stream, "subscribe") else stream
if hasattr(stream, "subscribe_with_snapshot"):
subscriber, stream_snapshot = stream.subscribe_with_snapshot()
else:
subscriber = stream.subscribe() if hasattr(stream, "subscribe") else stream
stream_snapshot = {}
handler.send_response(200)
handler.send_header("Content-Type", "text/event-stream; charset=utf-8")
handler.send_header("Cache-Control", "no-cache")
handler.send_header("X-Accel-Buffering", "no")
handler.send_header("Connection", "close")
handler.end_headers()
replay_cutoff_seq = None
if qs.get("replay", [""])[0] or qs.get("after_seq", [None])[0] not in (None, "") or qs.get("after_event_id", [None])[0]:
snapshot_cutoff_seq = _run_journal_same_run_seq(
str(stream_snapshot.get("last_event_id") or ""),
stream_id,
)
try:
replayed = _replay_run_journal(
handler,
stream_id,
_parse_run_journal_after_seq(qs, stream_id),
max_seq=snapshot_cutoff_seq,
include_stale=False,
)
if replayed:
replay_cutoff_seq = snapshot_cutoff_seq
except _CLIENT_DISCONNECT_ERRORS:
raise
except Exception:
logger.debug("Failed to replay active run journal for stream %s", stream_id, exc_info=True)
try:
while True:
try:
event, data = subscriber.get(timeout=_SSE_HEARTBEAT_INTERVAL_SECONDS)
item = subscriber.get(timeout=_SSE_HEARTBEAT_INTERVAL_SECONDS)
except queue.Empty:
handler.wfile.write(b": heartbeat\n\n")
handler.wfile.flush()
continue
if len(item) >= 3:
event, data, queued_event_id = item[0], item[1], item[2]
else:
event, data = item
queued_event_id = STREAM_LAST_EVENT_ID.get(stream_id)
# Stage-364: emit `id:` from STREAM_LAST_EVENT_ID side-channel so
# the frontend's `_lastRunJournalSeq` cursor advances during live
# streaming. Without this, mid-stream error→replay would arrive
# with after_seq=0 and double-render every journaled event.
event_id = STREAM_LAST_EVENT_ID.get(stream_id)
event_id = queued_event_id or STREAM_LAST_EVENT_ID.get(stream_id)
event_seq = _run_journal_same_run_seq(event_id, stream_id)
if replay_cutoff_seq is not None and event_seq is not None and event_seq <= replay_cutoff_seq:
continue
Comment thread
greptile-apps[bot] marked this conversation as resolved.
if event_id:
_sse_with_id(handler, event, data, event_id)
else:
Expand Down Expand Up @@ -10943,6 +10990,41 @@ def _is_hidden_empty_session(s) -> bool:
)


def _active_stream_blocks_chat_start(session, stream_id: str | None) -> bool:
"""Return whether an active_stream_id still owns this session's next turn.

``active_stream_id`` is written before the SSE channel is registered, so a
very fresh pending turn must also block duplicate chat_start requests. If we
only check STREAMS here, a second request can race through the registration
gap and overwrite the sidecar owner.
"""
if not stream_id:
return False
with STREAMS_LOCK:
if stream_id in STREAMS:
return True
try:
from api import config as _live_config
with _live_config.ACTIVE_RUNS_LOCK:
if stream_id in (_live_config.ACTIVE_RUNS or {}):
return True
except Exception:
pass
if getattr(session, "pending_user_message", None):
try:
from api.models import _REPAIR_STALE_PENDING_GRACE_SECONDS
grace_seconds = float(_REPAIR_STALE_PENDING_GRACE_SECONDS)
except Exception:
grace_seconds = 30.0
try:
pending_started_at = float(getattr(session, "pending_started_at", None) or 0)
except Exception:
pending_started_at = 0.0
if pending_started_at and time.time() - pending_started_at < grace_seconds:
return True
return False


def _start_chat_stream_for_session(
s,
*,
Expand All @@ -10963,10 +11045,7 @@ def _start_chat_stream_for_session(
diag.stage("active_stream_check") if diag else None
current_stream_id = getattr(s, "active_stream_id", None)
if current_stream_id:
diag.stage("active_stream_lock_wait") if diag else None
with STREAMS_LOCK:
current_active = current_stream_id in STREAMS
if current_active:
if _active_stream_blocks_chat_start(s, current_stream_id):
diag.stage("response_write") if diag else None
return {
"error": "session already has an active stream",
Expand All @@ -10984,21 +11063,45 @@ def _start_chat_stream_for_session(
goal_related = True
PENDING_GOAL_CONTINUATION.discard(s.session_id)

stream_id = uuid.uuid4().hex
session_lock = _get_session_agent_lock(s.session_id)
diag.stage("session_lock_wait") if diag else None
with session_lock:
diag.stage("save_pending_state") if diag else None
was_hidden_empty_session = _is_hidden_empty_session(s)
_prepare_chat_start_session_for_stream(
s,
msg=msg,
attachments=attachments,
workspace=workspace,
model=model,
model_provider=model_provider,
stream_id=stream_id,
)
while True:
with session_lock:
locked_stream_id = getattr(s, "active_stream_id", None)
if locked_stream_id:
if _active_stream_blocks_chat_start(s, locked_stream_id):
diag.stage("response_write") if diag else None
return {
"error": "session already has an active stream",
"active_stream_id": locked_stream_id,
"_status": 409,
}
needs_stale_cleanup = True
else:
needs_stale_cleanup = False
stream_id = uuid.uuid4().hex
diag.stage("save_pending_state") if diag else None
was_hidden_empty_session = _is_hidden_empty_session(s)
_prepare_chat_start_session_for_stream(
s,
msg=msg,
attachments=attachments,
workspace=workspace,
model=model,
model_provider=model_provider,
stream_id=stream_id,
)
break
if needs_stale_cleanup:
diag.stage("stale_stream_cleanup") if diag else None
cleared = _clear_stale_stream_state(s)
if not cleared and getattr(s, "active_stream_id", None):
diag.stage("response_write") if diag else None
return {
"error": "session already has an active stream",
"active_stream_id": getattr(s, "active_stream_id", None),
"_status": 409,
}
if was_hidden_empty_session:
publish_session_list_changed("session_new", profile=getattr(s, "profile", None))
diag.stage("turn_journal_submitted") if diag else None
Expand Down
3 changes: 3 additions & 0 deletions api/run_journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,15 @@ def read_run_events(
run_id: str,
*,
after_seq: int | None = None,
max_seq: int | None = None,
session_dir: Path | None = None,
) -> dict:
path = _run_path(session_id, run_id, session_dir=session_dir)
events, malformed = _read_jsonl(path)
if after_seq is not None:
events = [event for event in events if int(event.get("seq") or 0) > int(after_seq)]
if max_seq is not None:
events = [event for event in events if int(event.get("seq") or 0) <= int(max_seq)]
return {
"session_id": str(session_id),
"run_id": str(run_id),
Expand Down
Loading
Loading