Skip to content

Commit 018bf86

Browse files
Kasper Jungeclaude
authored andcommitted
refactor: move FanoutEmitter to _events module with other emitter implementations
FanoutEmitter is event infrastructure, not manager-specific logic. Grouping it with EventEmitter, NullEmitter, and QueueEmitter in _events.py improves module cohesion and makes it available without depending on the UI layer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b23e9b9 commit 018bf86

File tree

4 files changed

+18
-17
lines changed

4 files changed

+18
-17
lines changed

agent_docs/CODEBASE_MAP.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ The run loop communicates via structured events (`_events.py`). Each event has a
9898
- **`EventEmitter`** — protocol that any listener implements (just an `emit(event)` method)
9999
- **`NullEmitter`** — discards events (used in tests)
100100
- **`QueueEmitter`** — pushes events into a `queue.Queue` for async consumption (used by the UI)
101+
- **`FanoutEmitter`** — broadcasts events to multiple emitters (used by the manager for fan-out to queue + persistence)
101102

102103
The CLI uses a `ConsoleEmitter` (defined in `_console_emitter.py`) that renders events to the terminal with Rich formatting.
103104

@@ -107,7 +108,7 @@ The CLI uses a `ConsoleEmitter` (defined in `_console_emitter.py`) that renders
107108
- Creates runs with unique IDs and wraps them in `ManagedRun` (config + state + emitter + thread)
108109
- Starts each run in a daemon thread via `engine.run_loop()`
109110
- Supports pause/resume/stop per run via `RunState` thread-safe control methods
110-
- Uses `_FanoutEmitter` to broadcast events to multiple listeners (e.g., queue + persistence)
111+
- Uses `FanoutEmitter` to broadcast events to multiple listeners (e.g., queue + persistence)
111112

112113
## Key files to understand first
113114

src/ralphify/_events.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,14 @@ def __init__(self, q: queue.Queue[Event] | None = None) -> None:
7777

7878
def emit(self, event: Event) -> None:
7979
self.queue.put(event)
80+
81+
82+
class FanoutEmitter:
83+
"""Broadcasts events to multiple emitters."""
84+
85+
def __init__(self, emitters: list[EventEmitter]) -> None:
86+
self._emitters = emitters
87+
88+
def emit(self, event: Event) -> None:
89+
for e in self._emitters:
90+
e.emit(event)

src/ralphify/manager.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import uuid
1010
from dataclasses import dataclass, field
1111

12-
from ralphify._events import Event, EventEmitter, QueueEmitter
12+
from ralphify._events import EventEmitter, FanoutEmitter, QueueEmitter
1313
from ralphify.engine import RunConfig, RunState, run_loop
1414

1515

@@ -28,17 +28,6 @@ def add_listener(self, emitter: EventEmitter) -> None:
2828
self._extra_emitters.append(emitter)
2929

3030

31-
class _FanoutEmitter:
32-
"""Emits to multiple emitters."""
33-
34-
def __init__(self, emitters: list[EventEmitter]) -> None:
35-
self._emitters = emitters
36-
37-
def emit(self, event: Event) -> None:
38-
for e in self._emitters:
39-
e.emit(event)
40-
41-
4231
class RunManager:
4332
"""Registry of runs. Thread-safe."""
4433

@@ -73,7 +62,7 @@ def start_run(self, run_id: str) -> None:
7362
"""
7463
managed = self._get_run(run_id)
7564
all_emitters: list[EventEmitter] = [managed.emitter] + managed._extra_emitters
76-
fanout = _FanoutEmitter(all_emitters)
65+
fanout = FanoutEmitter(all_emitters)
7766
thread = threading.Thread(
7867
target=run_loop,
7968
args=(managed.config, managed.state, fanout),

tests/test_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from dataclasses import replace
77
from unittest.mock import patch
88

9-
from ralphify._events import Event, EventType, QueueEmitter
9+
from ralphify._events import Event, EventType, FanoutEmitter, QueueEmitter
1010
from ralphify.engine import RunConfig, RunStatus
11-
from ralphify.manager import ManagedRun, RunManager, _FanoutEmitter
11+
from ralphify.manager import ManagedRun, RunManager
1212

1313
_MOCK_SUBPROCESS = "ralphify.engine.subprocess.run"
1414

@@ -213,7 +213,7 @@ class TestFanoutEmitter:
213213
def test_fanout_emits_to_all(self):
214214
q1 = QueueEmitter()
215215
q2 = QueueEmitter()
216-
fanout = _FanoutEmitter([q1, q2])
216+
fanout = FanoutEmitter([q1, q2])
217217

218218
event = Event(type=EventType.LOG_MESSAGE, run_id="test", data={"msg": "hi"})
219219
fanout.emit(event)

0 commit comments

Comments
 (0)