-
Notifications
You must be signed in to change notification settings - Fork 3k
Event Loop Architecture: Streaming Multi-Turn Agent Nodes #3423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…-wp2-wp6-combined
Brings in upstream changes: email tool, csv/pdf fixes, docs updates, agent builder export atomicity fix, JSON extraction validation bugfix. No conflicts. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Feature/event loop wp8
implemented clientIO gateway [WP-9]
…ider Feature/aden sync by provider
(micro-fix): added graph validation for client-facing nodes [WP-10]
Feat/integration tests
# Conflicts: # .claude/settings.local.json
- Fix max_node_visits blocking executor retries: the visit count was incremented on every loop iteration including retries, causing nodes with max_node_visits=1 (default) to be skipped on retry. Added _is_retry flag to distinguish retries from new visits via edge traversal. - Fix 20 UP042 lint errors: replace (str, Enum) with StrEnum across 14 files. Python 3.11+ StrEnum is preferred and enforced by ruff. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Resolve conflict in tools/mcp_server.py: take main's CredentialStoreAdapter.default() which encapsulates the same CompositeStorage logic our branch had inline. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
PR closes #2500
Summary
Adds
EventLoopNode— a new node type that runs a streaming multi-turn LLM loop with tool execution, judge-based evaluation, and native support for client-facing (HITL) interactions. This is the core execution primitive for building production agent pipelines in Hive.88 files changed, ~19,800 lines added
What's included
1. EventLoopNode (
core/framework/graph/event_loop_node.py)The main contribution. A
NodeProtocolimplementation that runs a streaming event loop:LiteLLMProvider.stream()with real-time text delta and tool call eventsJudgeProtocol) to decide ACCEPT/RETRY/ESCALATE after each turnclient_facing=Truenatively block for user input between conversational turns, no custom judge neededinject_event()set_outputsynthetic tool for structured data extraction2. NodeConversation (
core/framework/graph/conversation.py)Message history manager for graph nodes:
ConversationStoreprotocolusage_ratio()andneeds_compaction()prune_old_tool_results()— replaces old tool result content with compact placeholders while preserving message structure and spillover file referencescompact()— replaces all messages except recent ones with a summary3. LLM Streaming (
core/framework/llm/)LiteLLMProvider.stream()— async generator yieldingTextDeltaEvent,ToolCallEvent,FinishEventstream_events.py— typed event dataclasses for the streaming protocolReasoningDeltaEvent)4. EventBus Extensions (
core/framework/runtime/event_bus.py)Convenience publisher methods for the full node lifecycle:
NODE_LOOP_STARTED/ITERATION/COMPLETED— loop lifecycleLLM_TEXT_DELTA/CLIENT_OUTPUT_DELTA— text streaming (internal vs client-facing)TOOL_CALL_STARTED/COMPLETED— tool executionCLIENT_INPUT_REQUESTED— HITL blocking signalNODE_STALLED,NODE_INTERNAL_OUTPUT5. Graph Executor Enhancements (
core/framework/graph/executor.py)asyncio.gather()max_node_visits)nullable_output_keysfor nodes with mutually exclusive outputs (e.g., approve vs reject)6. Client I/O Gateway (
core/framework/graph/client_io.py)ActiveNodeClientIO— manages input request/response for client-facing nodesCLIENT_INPUT_REQUESTED, waits onasyncio.Event7. Context Handoff (
core/framework/graph/context_handoff.py)8. MCP Tools: GitHub + Email (
tools/)EMAIL_OVERRIDE_TOtesting redirectCompositeStorage(encrypted store + env var fallback) in MCP server9. Demos
Four working demos showcasing the full framework:
event_loop_wss_demo.pyhandoff_demo.pyorg_demo.pygithub_outreach_demo.pyThe GitHub outreach demo exercises: fan-out/fan-in, feedback edges, client-facing blocking, tool result spillover/pruning, MCP tool integration, and iterative batch workflows.
10. Test Suite
12 new test files, ~6,800 lines:
test_event_loop_node.pytest_event_loop_integration.pytest_event_loop_wiring.pytest_node_conversation.pytest_executor_feedback_edges.pytest_client_facing_validation.pytest_client_io.pytest_context_handoff.pytest_stream_events.pytest_litellm_streaming.pytest_event_type_extension.pytest_concurrent_storage.pyArchitecture
For client-facing nodes, the loop blocks between conversational turns:
Key design decisions
Blocking is a node concern, not a judge concern —
client_facing=Truenodes natively block for user input. Judges only evaluate output quality.Tool result spillover — Results exceeding
max_tool_result_charsare written to disk as pretty-printed JSON. The in-context message gets a truncated preview with aload_data()reference.Tiered compaction — Pruning (zero-cost, no LLM) → normal compaction (LLM summary) → aggressive → emergency. Tool call history survives compaction to prevent re-calling tools.
Fan-out detection — Multiple
ON_SUCCESSedges from the same source trigger parallel execution viaasyncio.gather().Feedback edges — Lower priority than forward edges.
max_node_visitsprevents infinite loops.Test plan
python -m pytest core/tests/ -x -q— all core tests passpython -m pytest tools/tests/ -x -q— all tool tests passpython core/demos/event_loop_wss_demo.py— single-node chat workspython core/demos/github_outreach_demo.py— full pipeline completes end-to-end