|
| 1 | +# Request Flows - EventQueue Processing |
| 2 | + |
| 3 | +> Deep-dive on streaming vs non-streaming request handling |
| 4 | +
|
| 5 | +## Non-Streaming Flow (`onMessageSend()`) |
| 6 | + |
| 7 | +**Location**: `DefaultRequestHandler.java` |
| 8 | + |
| 9 | +``` |
| 10 | +1. initMessageSend() |
| 11 | + → Create TaskManager & RequestContext |
| 12 | +
|
| 13 | +2. queueManager.createOrTap(taskId) |
| 14 | + → Get/create EventQueue (MainQueue or ChildQueue) |
| 15 | +
|
| 16 | +3. registerAndExecuteAgentAsync() |
| 17 | + → Start AgentExecutor in background thread |
| 18 | +
|
| 19 | +4. resultAggregator.consumeAndBreakOnInterrupt(consumer) |
| 20 | + → Poll queue until terminal event or AUTH_REQUIRED |
| 21 | + → Blocking wait for events |
| 22 | +
|
| 23 | +5. cleanup(queue, task, async) |
| 24 | + → Close queue immediately OR in background |
| 25 | +
|
| 26 | +6. Return Task/Message to client |
| 27 | +``` |
| 28 | + |
| 29 | +### Terminal Events |
| 30 | + |
| 31 | +Events that cause polling loop exit: |
| 32 | +- `TaskStatusUpdateEvent` with `isFinal() == true` |
| 33 | +- `Message` (legacy) |
| 34 | +- `Task` with state: COMPLETED, CANCELED, FAILED, REJECTED, UNKNOWN |
| 35 | + |
| 36 | +### AUTH_REQUIRED Special Case |
| 37 | + |
| 38 | +**Behavior**: |
| 39 | +- Returns current task to client immediately |
| 40 | +- Agent continues running in background |
| 41 | +- Queue stays open, cleanup happens async |
| 42 | +- Future events update TaskStore |
| 43 | + |
| 44 | +**Why**: Allows client to handle authentication prompt while agent waits for credentials. |
| 45 | + |
| 46 | +--- |
| 47 | + |
| 48 | +## Streaming Flow (`onMessageSendStream()`) |
| 49 | + |
| 50 | +**Location**: `DefaultRequestHandler.java` |
| 51 | + |
| 52 | +``` |
| 53 | +1. initMessageSend() |
| 54 | + → Same as non-streaming |
| 55 | +
|
| 56 | +2. queueManager.createOrTap(taskId) |
| 57 | + → Same |
| 58 | +
|
| 59 | +3. registerAndExecuteAgentAsync() |
| 60 | + → Same |
| 61 | +
|
| 62 | +4. resultAggregator.consumeAndEmit(consumer) |
| 63 | + → Returns Flow.Publisher<Event> immediately |
| 64 | + → Non-blocking |
| 65 | +
|
| 66 | +5. processor() wraps publisher: |
| 67 | + - Validates task ID |
| 68 | + - Adds task to QueueManager |
| 69 | + - Stores push notification config |
| 70 | + - Sends push notifications |
| 71 | +
|
| 72 | +6. cleanup(queue, task, true) |
| 73 | + → ALWAYS async for streaming |
| 74 | +
|
| 75 | +7. Return Flow.Publisher<StreamingEventKind> |
| 76 | +``` |
| 77 | + |
| 78 | +### Key Difference |
| 79 | + |
| 80 | +**Non-Streaming**: Blocks until terminal event, then returns Task/Message |
| 81 | +**Streaming**: Returns Flow.Publisher immediately, client receives events as they arrive |
| 82 | + |
| 83 | +**Cleanup**: Streaming ALWAYS uses async cleanup (background thread) |
| 84 | + |
| 85 | +--- |
| 86 | + |
| 87 | +## EventConsumer Details |
| 88 | + |
| 89 | +**Location**: `server-common/.../events/EventConsumer.java` |
| 90 | + |
| 91 | +**Purpose**: Consumes events from EventQueue and exposes as reactive stream |
| 92 | + |
| 93 | +**Key Methods**: |
| 94 | +- `consume()` → Returns `Flow.Publisher<Event>` |
| 95 | +- Polls queue with 500ms timeout |
| 96 | +- Closes queue on final event |
| 97 | +- Thread-safe concurrent consumption |
| 98 | + |
| 99 | +**Usage**: |
| 100 | +```java |
| 101 | +EventConsumer consumer = new EventConsumer(eventQueue); |
| 102 | +Flow.Publisher<Event> publisher = consumer.consume(); |
| 103 | +// Subscribe to receive events as they arrive |
| 104 | +``` |
| 105 | + |
| 106 | +--- |
| 107 | + |
| 108 | +## ResultAggregator Modes |
| 109 | + |
| 110 | +**Location**: `server-common/.../tasks/ResultAggregator.java` |
| 111 | + |
| 112 | +Bridges EventConsumer and DefaultRequestHandler with three consumption modes: |
| 113 | + |
| 114 | +### 1. consumeAndBreakOnInterrupt() |
| 115 | + |
| 116 | +**Used by**: `onMessageSend()` (non-streaming) |
| 117 | + |
| 118 | +**Behavior**: |
| 119 | +- Polls queue until terminal event or AUTH_REQUIRED |
| 120 | +- Returns `EventTypeAndInterrupt(event, interrupted)` |
| 121 | +- Blocking operation |
| 122 | +- Exits early on AUTH_REQUIRED (interrupted = true) |
| 123 | + |
| 124 | +**Use Case**: Non-streaming requests that need single final response |
| 125 | + |
| 126 | +### 2. consumeAndEmit() |
| 127 | + |
| 128 | +**Used by**: `onMessageSendStream()` (streaming) |
| 129 | + |
| 130 | +**Behavior**: |
| 131 | +- Returns all events as `Flow.Publisher<Event>` |
| 132 | +- Non-blocking, immediate return |
| 133 | +- Client subscribes to stream |
| 134 | +- Events delivered as they arrive |
| 135 | + |
| 136 | +**Use Case**: Streaming requests where client wants all events in real-time |
| 137 | + |
| 138 | +### 3. consumeAll() |
| 139 | + |
| 140 | +**Used by**: `onCancelTask()` |
| 141 | + |
| 142 | +**Behavior**: |
| 143 | +- Consumes all events from queue |
| 144 | +- Returns first `Message` or final `Task` found |
| 145 | +- Simple consumption without streaming |
| 146 | +- Blocks until queue exhausted |
| 147 | + |
| 148 | +**Use Case**: Task cancellation where final state matters |
| 149 | + |
| 150 | +--- |
| 151 | + |
| 152 | +## Flow Comparison Table |
| 153 | + |
| 154 | +| Aspect | Non-Streaming | Streaming | |
| 155 | +|--------|---------------|-----------| |
| 156 | +| **ResultAggregator Mode** | consumeAndBreakOnInterrupt | consumeAndEmit | |
| 157 | +| **Return Type** | Task/Message | Flow.Publisher | |
| 158 | +| **Blocking** | Yes (until terminal event) | No (immediate return) | |
| 159 | +| **Cleanup** | Immediate or async | Always async | |
| 160 | +| **AUTH_REQUIRED** | Early exit, return task | Continue streaming | |
| 161 | +| **Use Case** | Simple request/response | Real-time event updates | |
| 162 | + |
| 163 | +--- |
| 164 | + |
| 165 | +## Cleanup Integration |
| 166 | + |
| 167 | +### Non-Streaming Cleanup Decision |
| 168 | + |
| 169 | +```java |
| 170 | +if (event instanceof Message || isFinalEvent(event)) { |
| 171 | + if (!interrupted) { |
| 172 | + cleanup(queue, task, false); // Immediate: wait for agent, close queue |
| 173 | + } else { |
| 174 | + cleanup(queue, task, true); // Async: close in background (AUTH_REQUIRED case) |
| 175 | + } |
| 176 | +} |
| 177 | +``` |
| 178 | + |
| 179 | +**Logic**: |
| 180 | +- Terminal event + not interrupted → Immediate cleanup (wait for agent, close queue) |
| 181 | +- Terminal event + interrupted (AUTH_REQUIRED) → Async cleanup (agent still running) |
| 182 | + |
| 183 | +### Streaming Cleanup |
| 184 | + |
| 185 | +```java |
| 186 | +cleanup(queue, task, true); // ALWAYS async for streaming |
| 187 | +``` |
| 188 | + |
| 189 | +**Logic**: Streaming always uses async cleanup because: |
| 190 | +- Publisher already returned to client |
| 191 | +- Events may still be processing |
| 192 | +- Queue cleanup happens in background |
| 193 | + |
| 194 | +--- |
| 195 | + |
| 196 | +## Thread Model |
| 197 | + |
| 198 | +### Agent Execution Thread |
| 199 | +- `CompletableFuture.runAsync(agentExecutor::execute, executor)` |
| 200 | +- Agent runs in background thread pool |
| 201 | +- Enqueues events to MainQueue |
| 202 | + |
| 203 | +### MainEventBusProcessor Thread |
| 204 | +- Single background thread: "MainEventBusProcessor" |
| 205 | +- Processes events from MainEventBus |
| 206 | +- Persists to TaskStore, distributes to ChildQueues |
| 207 | + |
| 208 | +### Consumer Thread |
| 209 | +- Non-streaming: Request handler thread (blocking) |
| 210 | +- Streaming: Subscriber thread (reactive) |
| 211 | +- Polls ChildQueue for events |
| 212 | + |
| 213 | +### Cleanup Thread |
| 214 | +- Async cleanup: Background thread pool |
| 215 | +- Immediate cleanup: Request handler thread |
| 216 | + |
| 217 | +--- |
| 218 | + |
| 219 | +## Related Documentation |
| 220 | + |
| 221 | +- **[Main Overview](../EVENTQUEUE.md)** - Architecture and components |
| 222 | +- **[Lifecycle](LIFECYCLE.md)** - Queue lifecycle and cleanup |
| 223 | +- **[Scenarios](SCENARIOS.md)** - Real-world usage patterns |
0 commit comments