Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions apps/miroflow-agent/src/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ async def run_sub_agent(
tool_calls_data = []
all_tool_results_content_with_id = []
should_rollback_turn = False
# Buffer successful queries so a later rollback in the same turn
# does not pollute duplicate-query state.
successful_queries_buffer = []

for call in tool_calls:
server_name = call["server_name"]
Expand Down Expand Up @@ -531,9 +534,9 @@ async def run_sub_agent(
sub_agent_name
].execute_tool_call(server_name, tool_name, arguments)

# Update query count if successful
# Buffer query if successful (deferred commit)
if "error" not in tool_result:
await self._record_query(cache_name, tool_name, arguments)
successful_queries_buffer.append((cache_name, tool_name, arguments))

# Post-process result
tool_result = self.tool_executor.post_process_tool_call_result(
Expand Down Expand Up @@ -617,6 +620,10 @@ async def run_sub_agent(
if should_rollback_turn:
continue

# Commit buffered successful queries to cache
for cache_name, tool_name, arguments in successful_queries_buffer:
await self._record_query(cache_name, tool_name, arguments)

# Reset consecutive rollbacks on successful execution
if consecutive_rollbacks > 0:
self.task_log.log_step(
Expand Down Expand Up @@ -905,6 +912,9 @@ async def run_main_agent(
tool_calls_data = []
all_tool_results_content_with_id = []
should_rollback_turn = False
# Buffer successful queries so a later rollback in the same turn
# does not pollute duplicate-query state.
successful_queries_buffer = []
main_agent_last_call_tokens = self.llm_client.last_call_tokens

for call in tool_calls:
Expand Down Expand Up @@ -954,8 +964,8 @@ async def run_main_agent(
arguments["subtask"],
)

# Update query count
await self._record_query(cache_name, tool_name, arguments)
# Buffer sub-agent query if successful (deferred commit)
successful_queries_buffer.append((cache_name, tool_name, arguments))

tool_result = {
"server_name": server_name,
Expand Down Expand Up @@ -1002,9 +1012,9 @@ async def run_main_agent(
)
)

# Update query count if successful
# Buffer query if successful (deferred commit)
if "error" not in tool_result:
await self._record_query(cache_name, tool_name, arguments)
successful_queries_buffer.append((cache_name, tool_name, arguments))

# Post-process result
tool_result = self.tool_executor.post_process_tool_call_result(
Expand Down Expand Up @@ -1092,6 +1102,10 @@ async def run_main_agent(
if should_rollback_turn:
continue

# Commit buffered successful queries to cache
for cache_name, tool_name, arguments in successful_queries_buffer:
await self._record_query(cache_name, tool_name, arguments)

# Reset consecutive rollbacks on successful execution
if consecutive_rollbacks > 0:
self.task_log.log_step(
Expand Down