-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow.py
More file actions
451 lines (374 loc) · 18.2 KB
/
workflow.py
File metadata and controls
451 lines (374 loc) · 18.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""
WorkflowController
GRASP Controller responsible for reading the workflow configuration from
the YAML file and executing the declared node sequence on behalf of any
agentic framework.
Each framework instantiates one WorkflowController in __init__ and reuses
it across all run() calls, ensuring configuration is read once and the
comparison across frameworks remains fair.
"""
import os
import re
import json
import logging
from pathlib import Path
from typing import Dict, List, Callable, Optional, Tuple, Set
from domain_model import ReasoningStep
from config import ConfigExpert
from tool_pool import ToolPool
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_prompt(template_path: str) -> str:
"""
Load a prompt template from a file path.
Paths are resolved relative to the config file's directory first,
then relative to the current working directory as a fallback.
"""
config = ConfigExpert.get_instance()
config_dir = Path(config.get_config_path()).parent if config.get_config_path() else Path.cwd()
candidates = [
config_dir / template_path,
Path.cwd() / template_path,
Path(template_path),
]
for path in candidates:
if path.exists():
return path.read_text(encoding="utf-8")
raise FileNotFoundError(
f"Prompt template not found: '{template_path}'. "
f"Searched in: {[str(p) for p in candidates]}"
)
def _render(template: str, variables: Dict) -> str:
"""
Render a prompt template by substituting {placeholders} with values.
Missing keys are left as-is so partial templates still render safely.
"""
result = template
for key, value in variables.items():
result = result.replace(f"{{{key}}}", str(value) if value is not None else "")
return result
# ---------------------------------------------------------------------------
# ReAct Parser — parse and execute plain-text ReAct format responses
# ---------------------------------------------------------------------------
def _parse_react_actions(response: str) -> List[Tuple[str, str]]:
"""
Parse plain-text ReAct format responses to extract (action, action_input) pairs.
Looks for patterns like:
Action: tool_name
Action Input: {"key": "value"}
Returns list of (action_name, action_input_json_string) tuples.
"""
actions = []
# Match Action: tool_name followed by Action Input: {...}
pattern = r'Action:\s*(\w+)\s*\n\s*Action Input:\s*(.+?)(?=\n(?:Thought:|Observation:|Action:|Final Answer:|$))'
matches = re.finditer(pattern, response, re.DOTALL | re.IGNORECASE)
for match in matches:
action_name = match.group(1).strip()
action_input_str = match.group(2).strip()
actions.append((action_name, action_input_str))
return actions
def _execute_react_actions(response: str, max_iterations: int = 3) -> str:
"""
Execute tools found in ReAct-format response and append observations.
If the response contains Action/Action Input pairs, execute them,
collect observations, and append them back to the response.
Args:
response: The model's text response (may contain ReAct format)
max_iterations: Max tool execution rounds to prevent infinite loops
Returns:
Enhanced response with tool execution results appended as Observations
"""
for iteration in range(max_iterations):
actions = _parse_react_actions(response)
if not actions:
# No more Actions to execute
break
# Execute each action
for action_name, action_input_str in actions:
try:
# Parse the action input (try JSON, fall back to string)
try:
action_args = json.loads(action_input_str)
if isinstance(action_args, dict):
result = ToolPool.invoke(action_name, **action_args)
else:
result = ToolPool.invoke(action_name, action_args)
except json.JSONDecodeError:
# Not JSON, pass as string
result = ToolPool.invoke(action_name, action_input_str)
# Append observation to response
observation_text = f"\nObservation: {result}"
response += observation_text
except Exception as e:
error_msg = f"\nObservation: Tool execution error: {str(e)}"
response += error_msg
return response
# ---------------------------------------------------------------------------
# NodeConfig — lightweight data class parsed from YAML
# ---------------------------------------------------------------------------
class NodeConfig:
"""Parsed representation of a single workflow node from YAML config."""
def __init__(self, cfg: dict):
self.name: str = cfg["name"]
self.prompt_template: str = cfg["prompt_template"]
self.output_key: str = cfg["output_key"]
self.loop: bool = cfg.get("loop", False)
self._template_text: Optional[str] = None
@property
def template_text(self) -> str:
if self._template_text is None:
self._template_text = _load_prompt(self.prompt_template)
return self._template_text
# ---------------------------------------------------------------------------
# WorkflowController
# ---------------------------------------------------------------------------
class WorkflowController:
"""
GRASP Controller for agentic workflow execution.
Reads the workflow node sequence once from the YAML configuration,
compiles it into an executable chain, and exposes a single run()
method that all framework agents delegate to.
Instantiate once per agent instance (in __init__) to ensure the
configuration snapshot is identical across all frameworks throughout
an experiment run.
Supported workflows include linear workflows and hierarchical workflows.
For linear workflows, nodes execute sequentially with outputs keys feeding
into the next node's prompt.
For hierarchical workflows, nodes store output on {research_results},
which accumulates a list of results that can be referenced in subsequent
nodes, e.g., an orchestrator that assesses the results of previous nodes.
Looping behavior is supported via a loop flag on any node.
Looping is executed until the node's output_key (e.g. {step}) exceeds max_steps defined in the config.
"""
# Class-level cache of already-validated config paths.
# Validation runs once per unique YAML file per process, not once per
# framework instantiation — since all frameworks share the same config.
_validated_configs: Set[str] = set()
def __init__(self, framework_name: str, generate_fn: Callable, get_tool_payload_fn: Callable):
"""
Args:
framework_name: Human-readable framework label used in prompts
(e.g. "LangChain", "CrewAI").
generate_fn: Callable(prompt, **kwargs) -> str — the adapter's
generate method bound to the current agent instance.
get_tool_payload_fn: Callable() -> list — returns the OpenAI-schema
tool list for the current agent's registered tools.
"""
self.framework_name = framework_name
self._generate = generate_fn
self._get_tool_payload = get_tool_payload_fn
config = ConfigExpert.get_instance()
workflow_cfg = config.get("workflow", {})
self.max_steps: int = config.get("max_steps", 2)
self.max_tokens: int = config.get("max_tokens", 1024)
raw_nodes: list = workflow_cfg.get("nodes", [])
if not raw_nodes:
raise ValueError(
"No nodes defined under 'workflow.nodes' in the configuration file."
)
self._nodes: Dict[str, NodeConfig] = {
n["name"]: NodeConfig(n) for n in raw_nodes
}
self._entry_node: str = workflow_cfg.get("entry_node", raw_nodes[0]["name"])
self._exit_node: str = workflow_cfg.get("exit_node", raw_nodes[-1]["name"])
# Validate the workflow placeholders once per unique config file.
config_path = config.get_config_path() or ""
if config_path not in WorkflowController._validated_configs:
self._validate_placeholders()
WorkflowController._validated_configs.add(config_path)
def _validate_placeholders(self):
"""
Validates workflow continuity at startup, before any API calls are made.
First pass — hard errors: a placeholder required by a node's prompt
is not yet available at the point that node executes.
Second pass — soft warnings: a node declares an output_key that no
subsequent prompt ever consumes (dead state).
Raises:
ValueError: If a required placeholder is missing.
FileNotFoundError: If a prompt template file cannot be found,
re-raised with the node name for context.
"""
# Keys always available to any prompt (injected by _execute_node).
# NOTE: Unlike standard keys that perform a simple assignment, {research_results} triggers list-based accumulation.
# NOTE: When a node defines output_key: research_results, the controller appends the model's response to a list rather than overwriting the previous value.
# NOTE: This makes it the primary tool for history tracking across multiple loop iterations or sequential research steps.
available_keys: Set[str] = {
"framework", "role", "tools_text",
"task", "step", "max_steps", "research_results",
}
# Track which output_keys are actually consumed by at least one prompt.
consumed_keys: Set[str] = set()
node_list = list(self._nodes.values())
# ------------------------------------------------------------------
# Placeholder regex — requires identifier to start with a letter or
# underscore, eliminating false positives from numeric format specs
# like {0} or {1:.2f} that may appear in example output sections of
# prompt templates.
# ------------------------------------------------------------------
placeholder_re = re.compile(r"\{([a-zA-Z_]\w*)\}")
# ------------------------------------------------------------------
# First pass — missing input errors
# ------------------------------------------------------------------
for node in node_list:
try:
template_text = node.template_text
except FileNotFoundError as e:
raise FileNotFoundError(
f"Cannot validate node '{node.name}': prompt template not found. "
f"Original error: {e}"
) from e
placeholders = set(placeholder_re.findall(template_text))
for p in placeholders:
if p not in available_keys:
raise ValueError(
f"Workflow configuration error in node '{node.name}': "
f"placeholder '{{{p}}}' is required by "
f"'{node.prompt_template}' but has not been produced "
f"by any preceding node. "
f"Available keys at this point: {sorted(available_keys)}."
)
consumed_keys.add(p)
# Make this node's output available to all subsequent nodes.
available_keys.add(node.output_key)
# ------------------------------------------------------------------
# Second pass — unused output warnings
# ------------------------------------------------------------------
for node in node_list:
if node.output_key not in consumed_keys and node.name != self._exit_node:
logger.warning(
"Workflow note: node '%s' declares output_key '%s' but it is "
"never referenced as a placeholder in any subsequent prompt. "
"The value will only appear in the final state export.",
node.name,
node.output_key,
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(
self,
task: str,
tools: dict,
role: str,
reasoning_steps: List[ReasoningStep],
) -> dict:
"""
Execute the configured workflow and return the final state dict.
"""
# Initialize state with core variables
state: dict = {
"task": task,
"step": 0,
"tools": tools,
"max_steps": self.max_steps,
}
# Walk nodes in declaration order
node_list = list(self._nodes.values())
for node in node_list:
if node.loop:
# First execution always runs; then loop while step <= max_steps
state = self._execute_node(node, state, role, reasoning_steps)
while state["step"] <= state["max_steps"]:
reasoning_steps.append(ReasoningStep(
step_number=len(reasoning_steps) + 1,
thought=f"Continuing loop node '{node.name}' (step {state['step']})",
observation="More iterations needed"
))
state = self._execute_node(node, state, role, reasoning_steps)
else:
state = self._execute_node(node, state, role, reasoning_steps)
return state
def _execute_node(
self,
node: NodeConfig,
state: dict,
role: str,
reasoning_steps: List[ReasoningStep],
) -> dict:
"""Render the node's prompt dynamically, call generate, and update state."""
reasoning_steps.append(ReasoningStep(
step_number=len(reasoning_steps) + 1,
thought=f"Executing node '{node.name}'",
action=node.name,
action_input=state["task"][:200]
))
tools_text = self._build_tools_text(state["tools"])
# --- DYNAMIC VARIABLE INJECTION ---
# Start with base framework/agent variables
variables = {
"framework": self.framework_name,
"role": role,
"tools_text": tools_text,
}
# Inject everything currently in the state dict.
# This makes any previous output_key (like 'navigation_status')
# available to the current node's prompt template.
for key, value in state.items():
if key == "research_results" and isinstance(value, list):
# Special handling for history accumulation: join into a single string
variables[key] = "\n\n".join(str(r) for r in value if r)
elif key == "tools":
continue # Skip raw tool objects
else:
variables[key] = value
# Render the template using the dynamic variable set
prompt = _render(node.template_text, variables)
# Nodes that use tools get the tool payload; the exit node does not
use_tools = bool(state["tools"]) and node.name != self._exit_node
kwargs = {"max_tokens": self.max_tokens}
if use_tools:
kwargs["tools"] = self._get_tool_payload()
try:
output = self._generate(prompt, **kwargs)
output = "" if output is None else output
# Parse and execute tools if response contains ReAct format
if use_tools and ("Action:" in output or "Thought:" in output):
output = _execute_react_actions(output)
# Update state based on the output_key defined in YAML
if node.output_key == "research_results":
# List-based accumulation for history
state["research_results"] = state.get("research_results", []) + [output]
state["step"] = state.get("step", 0) + 1
else:
# Standard assignment for single-step results
state[node.output_key] = output
# Increment step counter for loop nodes
if node.loop:
state["step"] = state.get("step", 0) + 1
reasoning_steps.append(ReasoningStep(
step_number=len(reasoning_steps) + 1,
thought=f"Node '{node.name}' completed",
observation=f"Output stored in '{node.output_key}'"
))
except Exception as e:
error_msg = f"{node.name} error: {str(e)}"
if node.output_key == "research_results":
state["research_results"] = state.get("research_results", []) + [error_msg]
state["step"] = state.get("step", 0) + 1
else:
state[node.output_key] = error_msg
# Increment step counter for loop nodes even on error
if node.loop:
state["step"] = state.get("step", 0) + 1
reasoning_steps.append(ReasoningStep(
step_number=len(reasoning_steps) + 1,
thought=f"Error in node '{node.name}'",
observation=str(e)
))
return state
def _build_tools_text(self, tools: dict) -> str:
"""Build a human-readable tools listing from the agent's tool dict."""
if not tools:
return "None"
lines = []
for tool in tools.values():
name = getattr(tool, "name", None) or getattr(tool, "tool_name", str(tool))
desc = getattr(tool, "description", "No description available.")
lines.append(f"- {name}: {desc}")
return "\n".join(lines)
@property
def final_output_key(self) -> str:
"""Returns the output_key of the configured exit node."""
return self._nodes[self._exit_node].output_key