Skip to content

Commit fd3acf7

Browse files
Merge pull request #67 from pattern-tech/feat/heartbeat
Streaming Heartbeat
2 parents 998621e + 9ed2cc9 commit fd3acf7

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

src/agent/services/agent_service.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
153153
Notes:
154154
This method uses an efficient NDJSON streaming protocol for reliable parsing.
155155
It supports both memory and non-memory modes, adapting the execution method accordingly.
156+
Includes a heartbeat mechanism to keep the connection alive during long processing.
156157
"""
157158
if not self.streaming or not self.streaming_handler:
158159
raise ValueError("Streaming is not enabled")
@@ -177,6 +178,8 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
177178
)
178179

179180
buffer = "" # Initialize an empty buffer for accumulating incomplete JSON
181+
last_activity = asyncio.get_event_loop().time() # Track the last activity time
182+
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
180183

181184
# Continue processing while the task is running or queue has items
182185
while not task.done() or not self.streaming_handler.queue.empty():
@@ -187,6 +190,9 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
187190
timeout=self.token_timeout
188191
)
189192

193+
# Update the last activity time when we receive a token
194+
last_activity = asyncio.get_event_loop().time()
195+
190196
# Add the new token to our buffer
191197
buffer += token
192198

@@ -197,7 +203,18 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
197203
yield json_str
198204

199205
except asyncio.TimeoutError:
200-
# No new tokens available, wait a bit before checking again
206+
# No new tokens available, check if we should send a heartbeat
207+
current_time = asyncio.get_event_loop().time()
208+
if current_time - last_activity >= heartbeat_interval:
209+
# Send a heartbeat to keep the connection alive
210+
heartbeat_event = {
211+
"type": "heartbeat",
212+
"data": "still_processing"
213+
}
214+
yield json.dumps(heartbeat_event) + "\n"
215+
last_activity = current_time # Reset the activity timer
216+
217+
# Wait a bit before checking again
201218
await asyncio.sleep(self.poll_interval)
202219
continue
203220
except asyncio.CancelledError:

0 commit comments

Comments
 (0)