forked from nicknochnack/ACPWalkthrough
-
Notifications
You must be signed in to change notification settings - Fork 93
Expand file tree
/
Copy patha2a_healthcare_agent.py
More file actions
143 lines (124 loc) · 5.25 KB
/
a2a_healthcare_agent.py
File metadata and controls
143 lines (124 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import asyncio
import os
from typing import Any
from beeai_framework.adapters.a2a.agents import A2AAgent
from beeai_framework.adapters.a2a.serve.server import A2AServer, A2AServerConfig
from beeai_framework.adapters.gemini import GeminiChatModel
from beeai_framework.adapters.vertexai import VertexAIChatModel # noqa: F401
from beeai_framework.agents.requirement import RequirementAgent
from beeai_framework.agents.requirement.requirements.conditional import (
ConditionalRequirement,
)
from beeai_framework.memory import UnconstrainedMemory
from beeai_framework.middleware.trajectory import EventMeta, GlobalTrajectoryMiddleware
from beeai_framework.serve.utils import LRUMemoryManager
from beeai_framework.tools import Tool
from beeai_framework.tools.handoff import HandoffTool
from beeai_framework.tools.think import ThinkTool
from helpers import setup_env
# Log only tool calls
class ConciseGlobalTrajectoryMiddleware(GlobalTrajectoryMiddleware):
def _format_prefix(self, meta: EventMeta) -> str:
prefix = super()._format_prefix(meta)
return prefix.rstrip(": ")
def _format_payload(self, value: Any) -> str:
return ""
def main() -> None:
print("Running A2A Orchestrator Agent")
setup_env()
host = os.getenv("AGENT_HOST")
policy_agent_port = os.getenv("POLICY_AGENT_PORT")
research_agent_port = os.getenv("RESEARCH_AGENT_PORT")
provider_agent_port = os.getenv("PROVIDER_AGENT_PORT")
healthcare_agent_port = int(os.getenv("HEALTHCARE_AGENT_PORT"))
# Log only tool calls
GlobalTrajectoryMiddleware(target=[Tool])
policy_agent = A2AAgent(
url=f"http://{host}:{policy_agent_port}", memory=UnconstrainedMemory()
)
# Run `check_agent_exists()` to fetch and populate AgentCard
asyncio.run(policy_agent.check_agent_exists())
print("\tℹ️", f"{policy_agent.name} initialized")
research_agent = A2AAgent(
url=f"http://{host}:{research_agent_port}", memory=UnconstrainedMemory()
)
asyncio.run(research_agent.check_agent_exists())
print("\tℹ️", f"{research_agent.name} initialized")
provider_agent = A2AAgent(
url=f"http://{host}:{provider_agent_port}", memory=UnconstrainedMemory()
)
asyncio.run(provider_agent.check_agent_exists())
print("\tℹ️", f"{provider_agent.name} initialized")
healthcare_agent = RequirementAgent(
name="Healthcare Agent",
description="A personal concierge for Healthcare Information, customized to your policy.",
llm=GeminiChatModel(
"gemini-2.5-flash",
allow_parallel_tool_calls=True,
),
# If using Vertex AI
# llm = VertexAIChatModel(
# model_id="gemini-2.5-flash",
# project= os.environ.get("GOOGLE_CLOUD_PROJECT"),
# location="global",
# allow_parallel_tool_calls=True,
# ),
tools=[
thinktool := ThinkTool(),
policy_tool := HandoffTool(
target=policy_agent,
name=policy_agent.name,
description=policy_agent.agent_card.description,
),
research_tool := HandoffTool(
target=research_agent,
name=research_agent.name,
description=research_agent.agent_card.description,
),
provider_tool := HandoffTool(
target=provider_agent,
name=provider_agent.name,
description=provider_agent.agent_card.description,
),
],
requirements=[
ConditionalRequirement(
thinktool,
force_at_step=1,
force_after=Tool,
consecutive_allowed=False,
),
ConditionalRequirement(
policy_tool,
consecutive_allowed=False,
max_invocations=1,
),
ConditionalRequirement(
research_tool,
consecutive_allowed=False,
max_invocations=1,
),
ConditionalRequirement(
provider_tool,
consecutive_allowed=False,
max_invocations=1,
),
],
role="Healthcare Concierge",
instructions=(
f"""You are a concierge for healthcare services. Your task is to handoff to one or more agents to answer questions and provide a detailed summary of their answers. Be sure that all of their questions are answered before responding.
IMPORTANT: When returning answers about providers, only output providers from `{provider_agent.name}` and only provide insurance information based on the results from `{policy_agent.name}`.
In your output, put which agent gave you the information!"""
),
)
print("\tℹ️", f"{healthcare_agent.meta.name} initialized")
# Register the agent with the A2A server and run the HTTP server
# we use LRU memory manager to keep limited amount of sessions in the memory
A2AServer(
config=A2AServerConfig(
port=healthcare_agent_port, protocol="jsonrpc", host=host
),
memory_manager=LRUMemoryManager(maxsize=100),
).register(healthcare_agent, send_trajectory=True).serve()
if __name__ == "__main__":
main()