Skip to content

Commit 5d1c3bd

Browse files
committed
Gemini review
1 parent b7ee230 commit 5d1c3bd

File tree

3 files changed

+53
-27
lines changed

3 files changed

+53
-27
lines changed

.claude/architecture/eventqueue/FLOWS.md

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -164,21 +164,26 @@ Bridges EventConsumer and DefaultRequestHandler with three consumption modes:
164164

165165
## Cleanup Integration
166166

167-
### Non-Streaming Cleanup Decision
167+
### Actual Implementation: Always Asynchronous
168+
169+
**Reality**: Cleanup is ALWAYS asynchronous in both streaming and non-streaming flows. The cleanup happens in the `finally` block via `cleanupProducer()`, which runs in a background thread.
168170

169171
```java
170-
if (event instanceof Message || isFinalEvent(event)) {
171-
if (!interrupted) {
172-
cleanup(queue, task, false); // Immediate: wait for agent, close queue
173-
} else {
174-
cleanup(queue, task, true); // Async: close in background (AUTH_REQUIRED case)
175-
}
176-
}
172+
// Both flows (in finally block):
173+
cleanupProducer(agentFuture, consumptionFuture, taskId, queue, isStreaming)
174+
.whenComplete((res, err) -> {
175+
if (err != null) {
176+
LOGGER.error("Error during async cleanup for task {}", taskId, err);
177+
}
178+
});
177179
```
178180

179-
**Logic**:
180-
- Terminal event + not interrupted → Immediate cleanup (wait for agent, close queue)
181-
- Terminal event + interrupted (AUTH_REQUIRED) → Async cleanup (agent still running)
181+
**Key Points**:
182+
- Cleanup is initiated in `finally` block regardless of flow outcome
183+
- `cleanupProducer()` waits for both agent and consumption futures to complete
184+
- Queue closure happens in background, never blocking the request thread
185+
- For streaming: EventConsumer manages queue lifecycle via `agentCompleted` flag
186+
- For non-streaming: Queue is closed directly after agent completes
182187

183188
### Streaming Cleanup
184189

.claude/architecture/eventqueue/LIFECYCLE.md

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,32 @@ cleanup(queue, task, true); // Background cleanup after streaming completes
130130
### Cleanup Implementation
131131

132132
```java
133-
private void cleanup(EventQueue queue, Task task, boolean async) {
134-
Runnable cleanupTask = () -> {
135-
agentFuture.join(); // Wait for agent to finish
136-
queue.close(); // Close ChildQueue (triggers Level 2 check)
137-
};
138-
139-
if (async) {
140-
CompletableFuture.runAsync(cleanupTask, executor);
141-
} else {
142-
cleanupTask.run();
133+
private CompletableFuture<Void> cleanupProducer(
134+
@Nullable CompletableFuture<Void> agentFuture,
135+
@Nullable CompletableFuture<Void> consumptionFuture,
136+
String taskId,
137+
EventQueue queue,
138+
boolean isStreaming) {
139+
140+
if (agentFuture == null) {
141+
return CompletableFuture.completedFuture(null);
142+
}
143+
144+
// Wait for BOTH agent AND consumption to complete before cleanup
145+
CompletableFuture<Void> bothComplete = agentFuture;
146+
if (consumptionFuture != null) {
147+
bothComplete = CompletableFuture.allOf(agentFuture, consumptionFuture);
143148
}
149+
150+
return bothComplete.whenComplete((v, t) -> {
151+
if (isStreaming) {
152+
// EventConsumer manages queue lifecycle via agentCompleted flag
153+
LOGGER.debug("Streaming: queue lifecycle managed by EventConsumer");
154+
} else {
155+
// Close ChildQueue directly (triggers Level 2 check)
156+
queue.close(false, true);
157+
}
158+
});
144159
}
145160
```
146161

.claude/architecture/eventqueue/SCENARIOS.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,19 @@ EventQueue mainQueue = queueManager.createOrTap(taskId);
105105
// Second client taps into existing MainQueue
106106
EventQueue childQueue = queueManager.tap(taskId);
107107

108-
// Event distribution (MainQueue.enqueueEvent)
108+
// Event distribution (ASYNCHRONOUS via MainEventBus)
109+
// NOTE: Distribution is NOT immediate!
109110
public void enqueueEvent(Event event) {
110-
super.enqueueEvent(event); // Enqueue to MainEventBus
111-
children.forEach(child -> child.internalEnqueueEvent(event)); // Copy to ChildQueues
112-
if (enqueueHook != null) {
113-
enqueueHook.onEnqueue(event); // Replication hook
114-
}
111+
// Step 1: Submit to MainEventBus (async processing)
112+
mainEventBus.submit(event);
113+
114+
// Step 2: MainEventBusProcessor thread (separate background thread):
115+
// - Persists event to TaskStore
116+
// - Distributes to all ChildQueues via child.internalEnqueueItem(item)
117+
// - Invokes replication hook if configured
118+
119+
// Key Point: Events are NOT immediately in ChildQueues!
120+
// There's a delay while MainEventBusProcessor persists and distributes.
115121
}
116122
```
117123

0 commit comments

Comments
 (0)