Skip to content

fix(stream): replay active run activity on reconnect#3371

Open
AJV20 wants to merge 2 commits into
nesquena:masterfrom
AJV20:fix/gateway-activity-reattach
Open

fix(stream): replay active run activity on reconnect#3371
AJV20 wants to merge 2 commits into
nesquena:masterfrom
AJV20:fix/gateway-activity-reattach

Conversation

@AJV20
Copy link
Copy Markdown
Contributor

@AJV20 AJV20 commented Jun 2, 2026

Summary

  • replay already-journaled run events when a browser attaches to an active WebUI stream
  • keep stale-worker interruption synthesis disabled for still-active streams
  • add a regression test covering late subscribers seeing prior tool activity plus future live events

Tests

  • python3 -m pytest tests/test_issue_1584_multitab_sse.py tests/test_webui_gateway_chat_backend.py tests/test_gateway_sse_reconnect_dedupe.py -q
  • python3 -m py_compile api/routes.py api/run_journal.py api/gateway_chat.py
  • git diff --check origin/master...HEAD

@nesquena-hermes
Copy link
Copy Markdown
Collaborator

Thanks for tackling the empty-Activity-on-late-attach problem (#1584 follow-up) — replaying journaled run activity for a browser that attaches to an already-running stream is the right idea. I held this from the v0.51.210 batch because, as written, it can deliver the same event twice to an active stream, producing visibly duplicated Activity (and, for token frames, duplicated assistant text).

Root cause. For a still-active WebUI-owned stream, _handle_sse_stream now does:

  1. subscriber = stream.subscribe() — and StreamChannel.subscribe() (api/config.py) preloads the _offline_buffer into the new subscriber's queue. The offline buffer holds the live events that were broadcast while no tab was attached.
  2. then this PR replays the journal (_replay_run_journal(..., include_stale=False)) for the same stream.

Any event that arrived during the no-subscriber gap is therefore in both places: the journal (replayed in step 2) and the offline buffer (drained by the live loop). The browser renders it twice. Tool events append unconditionally on the front end, so the duplicate is user-visible.

Empirical repro against this branch (a single tool event that is both journaled and offline-buffered):

append_run_event(session_id, stream_id, "tool", {"name":"terminal","preview":"terminal: pytest","tid":"call-1"})
stream = create_stream_channel()
stream.put_nowait(("tool", {"name":"terminal","preview":"terminal: pytest","tid":"call-1"}))  # -> offline_buffer
STREAMS[stream_id] = stream
# attach a late subscriber via _handle_sse_stream, then stream_end

Result on this branch: event: tool appears on the wire (expected 1). Both our regression gates (Codex and an independent Opus review) flagged the same duplication path independently.

What a safe version needs: make the active-stream replay single-sourced / cursor-safe so journal replay and the offline-buffer drain can't both emit the same event. A couple of viable shapes:

  • Carry the event seq/id on the queued StreamChannel items and, after replaying the journal up to cursor N, drop any queued item whose seq ≤ N before entering the live loop; and have active reconnect/reattach EventSource URLs send _runJournalReplayParams() (the after_event_id/after_seq cursor) so the journal replay starts from the right point instead of seq 0. (See static/messages.js reconnect URL construction — the active reconnect paths currently omit the journal-replay params.)
  • Or: for an active stream, don't replay the journal at all when the StreamChannel already has the tail buffered — only replay journal history that predates the buffer.

The regression test should assert that an event present in both the journal and the offline buffer is delivered exactly once to a late subscriber, and that a second concurrent tab reconnecting mid-run doesn't re-render the already-streamed turn.

Marking hold + changes-requested. The feature is welcome once the replay is de-duplicated against the offline buffer.

@nesquena-hermes nesquena-hermes added hold changes-requested Maintainer left detailed feedback requesting changes; PR is waiting on author to address labels Jun 2, 2026
@AJV20
Copy link
Copy Markdown
Contributor Author

AJV20 commented Jun 2, 2026

Thanks — fixed the duplicate active reattach path. Active stream subscribers now skip the StreamChannel offline-buffer preload when the handler is replaying the run journal, so a gap event that is both journaled and buffered is emitted once. Reconnect EventSource URLs now include the journal cursor params for active streams too.

Verification:

  • python3.11 -m py_compile api/config.py api/routes.py tests/test_issue_1584_multitab_sse.py tests/test_run_journal_frontend_static.py
  • pytest tests/test_issue_1584_multitab_sse.py tests/test_run_journal_frontend_static.py -q (9 passed)
  • git diff --check

@nesquena-hermes
Copy link
Copy Markdown
Collaborator

Thanks for the quick turnaround on the dup-event blocker, @AJV20 — the replay_buffer=False + active-stream journal replay direction is right, and the empty-Activity-on-late-attach problem this fixes is real. I picked this up to take it over the finish line and ran it through the full release gate (full suite + Opus + the Codex regression gate). The suite is green (7281/0), but the regression gate found a residual duplicate-event race that I then verified, and closing it properly needs one more design step on your side. Holding with changes-requested so it doesn't churn through re-triage.

The residual race (CORE)

The active path subscribes (replay_buffer=False), then replays the run journal. But the producer's put() does journal-then-queue as two non-atomic steps (api/streaming.py:4154-4174, and the gateway twin at api/gateway_chat.py:229-238). So for a frame emitted in the subscribe()→_replay_run_journal() window:

  1. producer appends it to the journal,
  2. handler's replay reads + emits it from the journal,
  3. producer then broadcasts the same frame to the (already-subscribed) live queue,
  4. handler's live loop emits it again → duplicated Activity row / duplicated assistant token text.

Repro: a journaled token/tool frame whose event_id seq is at or below the replayed high-water emits twice (id: <stream>:2 appears in the SSE body twice).

Why the "obvious" fix is blocked, and what I tried

The clean fix is per-frame-id dedup: skip a live frame whose journal event_id was already replayed. The natural way to carry that id is a 3-tuple (event, data, event_id) on the queue — but the queue item shape is a pinned 2-tuple contract. tests/test_stage364_opus_live_sse_event_id.py::test_queue_tuple_shape_preserved_as_two_tuple documents that several direct consumers unpack event, data = q.get() (test_cancel_interrupt, test_sprint42, test_sprint51, test_issue1857_usage_overwrite), and a 3-tuple breaks them with ValueError.

I tried a contract-preserving content-identity dedup (snapshot subscriber.qsize() as the overlap window, skip queued frames whose (event, json(payload)) matches a replayed journal entry). The gate correctly rejected it as still-racy:

  • qsize() is itself a timing snapshot — a genuinely-new post-replay frame with an identical payload (e.g. a repeated token) can land inside the budget and be wrongly dropped.
  • If replay reads a journal entry before its queued copy reaches the queue, that copy is outside the qsize() snapshot and still double-emits.

So content-identity + qsize can't close it deterministically. It needs the actual per-frame id.

Suggested design (satisfies the gate AND the 2-tuple contract)

Carry the per-frame journal id out-of-band, not in the queue tuple — keep q.put_nowait((event, data)) unchanged so all existing consumers keep working:

  1. Producer (put() in streaming.py and put_gateway_event() in gateway_chat.py): you already capture event_id from append_sse_event() and write the stream-global STREAM_LAST_EVENT_ID[stream_id]. Add a per-frame side channel keyed by frame identity or a monotonic counter — e.g. a per-stream collections.deque of event_ids pushed in the same order frames are queued, or a small dict the consumer can pop in FIFO order. The producer pushes the id right next to q.put_nowait((event, data)) under the same ordering.
  2. Consumer (_handle_sse_stream active path): _replay_run_journal should return the set/high-water of replayed event_ids (seq). For each dequeued live frame, pop its id from the side-channel deque; if that id's seq was already replayed, skip it; otherwise emit and stamp id: from the frame's own id (not the stream-global latest, which can be newer than an older queued frame and corrupts the browser's reconnect cursor — static/messages.js:919/2283/2515 send the cursor params on st.active).
  3. Offline-buffer fallback: when active journal replay returns no durable events or raises, fall back to replaying the StreamChannel offline buffer for that subscriber (origin's subscribe() did this; replay_buffer=False currently bypasses it with no fallback). Either make journal replay mandatory before disabling replay_buffer, or replay the offline buffer on the failure path.

A per-stream FIFO id side-channel cleaned up in the worker's finally (alongside the existing STREAM_LAST_EVENT_ID.pop) keeps memory bounded and the public queue shape intact.

Tests to add

  • The race repro: a frame both journaled and queued in the overlap window emits exactly once, AND two legitimately-repeated identical frames (e.g. identical tokens) both emit (no under-emission).
  • Live id: equals the dequeued frame's own journal id, not the stream-global latest.
  • Offline-buffer fallback fires when the journal is unavailable.

Happy to review as soon as you push — this is close, it's just the last bit of getting the dedup keyed on the real per-frame id while keeping the 2-tuple queue contract. The full-suite + both gates will need to come back clean (Codex SAFE) before it ships.

@nesquena-hermes
Copy link
Copy Markdown
Collaborator

Keeping this on hold + changes-requested after a deep release-gate review — there's a real (but subtle) duplicate-event race left, with a verified trace. The direction is right and the replay_buffer=False + active-journal-replay approach is the correct shape; this is one more concurrency edge to close before it's safe to ship.

The race (verified by tracing the producer path)

On active-stream reconnect, _handle_sse_stream does, in order:

  1. subscribe(replay_buffer=False)
  2. _replay_run_journal(..., include_stale=False) → reads the journal as of now and emits those events
  3. the live loop emits everything queued on the subscriber

But the producers (api/streaming.py:4276-4293, api/gateway_chat.py:224-236) journal each event before they queue it. So an event produced in the window after step 1's subscribe but before step 2's read_run_events() completes is BOTH:

  • replayed from the journal (step 2), and
  • later emitted from the live queue (step 3)

→ the late subscriber sees that tool/token frame twice. The frontend doesn't save it from this: static/messages.js only records the cursor after handlers run (≈line 2350), and the token/tool handlers (≈1611 / 1671) apply without deduping first.

The two dedupe tests in the PR cover the already-buffered-before-subscribe case, but not this produced-during-the-replay-window case.

Suggested fix (cross-file — why this is a kick-back, not an inline tweak)

  1. Carry each journaled event's event_id/seq with the queued item in BOTH producers (api/streaming.py, api/gateway_chat.py) — enqueue (event, data, event_id) rather than (event, data).
  2. Make _handle_sse_stream unpack 2- or 3-tuples, and have _replay_run_journal return the highest replayed seq.
  3. In the live loop, skip any queued event whose seq is <= max(after_seq, replayed_seq).
  4. Add a regression that forces an event to be journaled+queued between the active subscribe and read_run_events(), asserting it is emitted exactly once.

(Don't lean on STREAM_LAST_EVENT_ID for this — it's a per-stream latest side channel, not the id of the specific queued item being emitted.)

When picking this back up: note that a straight cherry-pick of the current branch onto fresh master silently drops the _replay_run_journal signature hunk (the include_stale kwarg) and two test imports — they need re-applying, and the PR's own tests catch it. Thanks @AJV20 — really close; this is the last race to nail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

changes-requested Maintainer left detailed feedback requesting changes; PR is waiting on author to address hold

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants