forked from ParinAcharyaGit/workflow-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
222 lines (187 loc) · 8.07 KB
/
Copy pathutils.py
File metadata and controls
222 lines (187 loc) · 8.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# utils.py
import os
import requests
from dotenv import load_dotenv
import json
load_dotenv()
IBM_API_KEY = os.getenv('IBM_API_KEY')
url = "https://iam.cloud.ibm.com/identity/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "urn:ibm:params:oauth:grant-type:apikey",
"apikey": IBM_API_KEY
}
response = requests.post(url, headers=headers, data=data)
watsonx_token = response.json().get("access_token")
if watsonx_token: print("token created successfully")
###############################################################################################
# ReAct Agent V2
params = {
"space_id": "825b15ec-b09f-413c-80ed-4e7fd3fc0bb0"
}
class WorkflowAnalyzer:
def __init__(self):
self.workflow_response = None
def set_workflow_response(self, response):
self.workflow_response = response
def get_workflow_response(self):
return self.workflow_response
# Create a global instance
workflow_analyzer = WorkflowAnalyzer()
def gen_ai_service(context, params=params, **custom):
from langchain_ibm import ChatWatsonx
from ibm_watsonx_ai import APIClient
from langchain_core.messages import AIMessage, HumanMessage
from langchain_community.tools import WikipediaQueryRun, DuckDuckGoSearchRun
from langchain_community.utilities import WikipediaAPIWrapper
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
model = "meta-llama/llama-3-3-70b-instruct"
service_url = "https://us-south.ml.cloud.ibm.com"
credentials = {
"url": service_url,
"token": context.generate_token()
}
client = APIClient(credentials)
space_id = params.get("space_id")
client.set.default_space(space_id)
def create_chat_model(watsonx_client):
parameters = {
"frequency_penalty": 0,
"max_tokens": 7500,
"presence_penalty": 0,
"temperature": 0,
"top_p": 1
}
return ChatWatsonx(
model_id=model,
url=service_url,
space_id=space_id,
params=parameters,
watsonx_client=watsonx_client,
)
def create_tools():
from langchain_community.utilities import WikipediaAPIWrapper
tools = []
tools.append(WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper(top_k_results=2)))
tools.append(DuckDuckGoSearchRun(name='business_metrics_search'))
return tools
def create_agent(model, tools, role_instructions):
memory = MemorySaver()
return create_react_agent(model, tools=tools, checkpointer=memory, state_modifier=role_instructions)
def convert_messages(messages):
return [
HumanMessage(content=msg["content"]) if msg["role"] == "user"
else AIMessage(content=msg["content"])
for msg in messages
]
def generate(context):
# Get workflow response from the analyzer instance
response_data = workflow_analyzer.get_workflow_response()
if not response_data:
print("Warning: No workflow response data available")
response_data = {}
payload = context.get_json()
messages = convert_messages(payload.get("messages", []))
model_instance = create_chat_model(client)
tools = create_tools()
# Initialize agent workers
summarizer = create_agent(model_instance, create_tools(), f"""From the context provided: {response_data}
- Identify, Analyze and number each business workflow step. Never combine steps.
- Highlight inefficiency factors.
- Summarize each business workflow step in 10-15 words.
- Output format:
{{
"steps": [
{{
"step_number": 1,
"summary": "10-15 word description",
"inefficiencies": ["list"]
}}
]
}}
""")
scorer = create_agent(model_instance, [DuckDuckGoSearchRun()], 'Score EACH business workflow step separately from 1 to 10, where 10 indicates highest efficiency, using named industry metrics. Be as critical as possible, do not just score highly without justification. Mention the metric used.')
suggester = create_agent(model_instance, tools, """ Your role is to suggest improvements for each step. Do not use any tools in this step, your output should be JSON only.
- The JSON object output should have the following keys:
- \"step_summary\": the brief 10 to 15 word summary of each business workflow step with the step number in chronological order.
- \"efficiency_score\": the numerical score assigned out of 10 for each business workflow step.
- \"explanation\": a description of specific steps to improve workflow efficiency and an estimated improvement in affected metrics for each business workflow step.
""")
# Chaining the agents together through carried context
try:
summary_result = summarizer.invoke(
{'messages': messages},
{'configurable':{'thread_id': 42 }, 'recursion_limit': 300 }
)
scored_result = scorer.invoke(
{'messages': summary_result['messages']},
{'configurable':{'thread_id': 42 }, 'recursion_limit': 300 }
)
suggestions = suggester.invoke(
{
'messages': [
*summary_result['messages'],
*scored_result['messages'],
{
"role": "user",
"content": "REMINDER: Final output must be JSON array with: step_summary, efficiency_score, improvements and expected_impact for EACH business workflow step."
}
]
},
{'configurable':{'thread_id': 42 }, 'recursion_limit': 300, 'timeout': 1200 }
)
try:
output_data = json.loads(suggestions["messages"][-1].content)
if not isinstance(output_data, list):
output_data = [output_data]
except:
output_data = []
return {
"headers": {"Content-Type": "application/json"},
"body": {
"choices": [{
"message": {
"role": "assistant",
"content": json.dumps(output_data)
}
}]
}
}
except Exception as e:
print(f"Error in generate function: {str(e)}")
return {
"headers": {"Content-Type": "application/json"},
"body": {
"choices": [{
"message": {
"role": "assistant",
"content": json.dumps([{"error": str(e)}])
}
}]
}
}
return generate, None
class RealContext:
def __init__(self, messages):
self._messages = messages
self._token = watsonx_token
if not self._token:
raise Exception("WATSONX_TOKEN environment variable not set.")
def generate_token(self):
return self._token
def get_token(self):
return self.generate_token()
def get_json(self):
return {"messages": self._messages}
if __name__ == "__main__":
messages = [
{"role": "system", "content": "You are an expert multi-agent framework that analyzes of a business workflow. Maintain JSON format throughout the output of the analysis chain and present in a clean, parsable format."},
{"role": "user", "content": "Begin the business workflow analysis chain."}
]
context = RealContext(messages)
generate, _ = gen_ai_service(context)
response = generate(context)
print(json.dumps(response, indent=2))