Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2380114
feat: update AG2 example using 0.10 A2A builtins
Lancetnik Oct 22, 2025
0aea2a3
Update samples/python/agents/ag2/README.md
Lancetnik Oct 22, 2025
87654e4
Update samples/python/agents/ag2/README.md
Lancetnik Oct 22, 2025
82a60e9
Update samples/python/agents/ag2/a2a_python_reviewer.py
Lancetnik Oct 22, 2025
7d95e84
Update samples/python/agents/ag2/fastapi_codegen_a2a_client.py
Lancetnik Oct 22, 2025
148d8f0
Update samples/python/agents/ag2/fastapi_codegen_a2a_client.py
Lancetnik Oct 22, 2025
f177462
Update samples/python/agents/ag2/cli_codegen_a2a_client.py
Lancetnik Oct 22, 2025
dc1a651
Update samples/python/agents/ag2/a2a_python_reviewer.py
Lancetnik Oct 22, 2025
6994cfc
Update samples/python/agents/ag2/cli_codegen_a2a_client.py
Lancetnik Oct 22, 2025
8b10786
fix: correct mypy api call processing
Lancetnik Oct 22, 2025
9c8196f
Merge branch 'main' of github.com:Lancetnik/a2a-samples
Lancetnik Oct 22, 2025
8df3afd
ci: fix linter
Lancetnik Oct 22, 2025
4b65cbc
ci: fix md linter
Lancetnik Oct 22, 2025
0ec25bd
ci: fix warning linelen
Lancetnik Oct 22, 2025
b2ce345
Merge branch 'main' into main
Lancetnik Nov 2, 2025
f49bf00
chore(ag2): submit AG2 sample folder (includes demo)
elCaptnCode Dec 17, 2025
bc2781d
Update samples/python/agents/ag2/a2a_python_reviewer.py
elCaptnCode Dec 17, 2025
8cd2f86
Update samples/python/agents/ag2/a2a_python_reviewer.py
elCaptnCode Dec 17, 2025
d82bad2
Update samples/python/agents/ag2/README.md
elCaptnCode Dec 17, 2025
d292a3d
Update samples/python/agents/ag2/demo/websocket.py
elCaptnCode Dec 17, 2025
8a92c2f
Feat/agp simulation cross framework (#388)
gulliantonio Nov 4, 2025
8c8b28f
feat: update AG2 example using 0.10 A2A builtins (#392)
Lancetnik Nov 12, 2025
422a8fc
refactor: migrate to new client API and update dependencies (#384)
martimfasantos Nov 12, 2025
c43d816
chore: Upgrading to A2A java SDK 0.3.2.Final (#401)
ehsavoie Nov 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions extensions/agp/src/agp_protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class CapabilityAnnouncement(BaseModel):
description="The function or skill provided (e.g., 'financial_analysis:quarterly').",
)
version: str = Field(..., description='Version of the capability schema.')
cost: float | None = Field(None, description='Estimated cost metric.')
cost: Optional[float] = Field(None, description='Estimated cost metric.')
policy: dict[str, Any] = Field(
...,
description='Key-value pairs defining required security/data policies.',
Expand All @@ -34,14 +34,13 @@ class IntentPayload(BaseModel):
payload: dict[str, Any] = Field(
..., description='The core data arguments required for the task.'
)
# FIX APPLIED: Renaming internal field to policy_constraints for clarity
# This field holds the constraints the client demands (e.g., security_level: 5)
policy_constraints: dict[str, Any] = Field(
default_factory=dict,
description='Client-defined constraints that must be matched against the announced policy.',
alias='policy_constraints',
)

model_config = ConfigDict(extra='forbid', populate_by_name=True)
model_config = ConfigDict(extra='forbid')


# --- AGP Routing Structures ---
Expand Down Expand Up @@ -102,18 +101,14 @@ def announce_capability(
f'[{self.squad_name}] ANNOUNCED: {capability_key} routed via {path}'
)

# Protected method containing the core, overridable routing logic
def _select_best_route(self, intent: IntentPayload) -> RouteEntry | None:
# Private method containing the core, *unmodified* routing logic
def __select_best_route(
self, intent: IntentPayload
) -> Optional[RouteEntry]:
"""
Performs Policy-Based Routing to find the best available squad.

Routing Logic:
1. Find all routes matching the target_capability.
2. Filter routes based on matching all policy constraints (PBR).
3. Select the lowest-cost route among the compliant options.
[Private Logic] Performs Policy-Based Routing to find the best available squad.
"""
target_cap = intent.target_capability
# CRITICAL CHANGE: Use the correct snake_case attribute name for constraints
intent_constraints = intent.policy_constraints

if target_cap not in self.agp_table.routes:
Expand All @@ -129,15 +124,12 @@ def _select_best_route(self, intent: IntentPayload) -> RouteEntry | None:
route
for route in possible_routes
if all(
# Check if the constraint key exists in the route policy AND the values are sufficient.
key in route.policy
and (
# If the key is 'security_level' and both values are numeric, check for >= sufficiency.
route.policy[key] >= value
if key == 'security_level'
and isinstance(route.policy.get(key), (int, float))
and isinstance(value, (int, float))
# Otherwise (e.g., boolean flags like 'requires_PII'), require exact equality.
else route.policy[key] == value
)
for key, value in intent_constraints.items()
Expand All @@ -155,13 +147,20 @@ def _select_best_route(self, intent: IntentPayload) -> RouteEntry | None:

return best_route

# Public method that is typically called by the A2A endpoint
def route_intent(self, intent: IntentPayload) -> RouteEntry | None:
# Public, overridable method for core routing logic (used by external components)
def select_best_route(self, intent: IntentPayload) -> Optional[RouteEntry]:
"""
Public entry point for external components (like DelegationRouter)
to retrieve the best route *without side effects*.
"""
return self.__select_best_route(intent)

# Public method that is typically called by the A2A endpoint (includes side effects)
def route_intent(self, intent: IntentPayload) -> Optional[RouteEntry]:
"""
Public entry point for routing an Intent payload.
Calls the internal selection logic and prints the result.
Public entry point for routing an Intent payload, including printing side effects.
"""
best_route = self._select_best_route(intent)
best_route = self.__select_best_route(intent)

if best_route:
print(
Expand Down
119 changes: 119 additions & 0 deletions extensions/agp/src/agp_protocol/agp_delegation_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging

from typing import Any

from pydantic import BaseModel, ConfigDict, Field


# Get a module-level logger instance
logger = logging.getLogger(__name__)

# NOTE: Since this file is now in the src/agp_protocol package,
# we use relative import to pull necessary classes from the sibling file (__init__.py).
from .__init__ import (
AgentGatewayProtocol,
IntentPayload,
)


# --- NEW DELEGATION INTENT STRUCTURES ---


class SubIntent(BaseModel):
"""An atomic, routable sub-task created during decomposition.

This structure uses 'policy_constraints' for clarity.
"""

target_capability: str = Field(
...,
description="The specific AGP capability to route (e.g., 'infra:provision').",
)
payload: dict[str, Any] = Field(
...,
description='Data specific to the sub-intent (e.g., VM type, budget amount).',
)
policy_constraints: dict[str, Any] = Field(
default_factory=dict,
description='Specific security/geo constraints for this individual sub-task.',
)

model_config = ConfigDict(extra='forbid')


class DelegationIntent(BaseModel):
"""A high-level meta-task requiring decomposition and routing to multiple squads."""

meta_task: str = Field(
..., description="High-level goal (e.g., 'Setup Project Alpha')."
)
sub_intents: list[SubIntent] = Field(
..., description='List of atomic tasks to be decomposed and routed.'
)
origin_squad: str = Field(
..., description="The squad initiating the request (e.g., 'HR')."
)

model_config = ConfigDict(extra='forbid')


# --- SIMULATION SPECIFIC DELEGATION ROUTER ---


class DelegationRouter:
"""Manages the overall decomposition of a meta-task into routable SubIntents
and aggregates the final results from the AGP Gateway.
"""

def __init__(self, central_gateway: AgentGatewayProtocol):
self.central_gateway = central_gateway
# Access the squad_name attribute from the Gateway instance
self.squad_name = central_gateway.squad_name

def route_delegation_intent(self, delegation_intent: DelegationIntent):
"""Simulates the Central Gateway receiving a meta-task, decomposing it, and routing
each component through the core AGP Policy-Based Router.
"""
# Replaced print() statements with logger.info()
logger.info(
f"\n[{self.squad_name}] RECEIVED DELEGATION: '{delegation_intent.meta_task}' from {delegation_intent.origin_squad}"
)
logger.info(
'--------------------------------------------------------------------------------'
)

results = {}

for i, sub_intent_data in enumerate(delegation_intent.sub_intents):
# --- CRITICAL DECOMPOSITION STEP ---
# Synthesize a simple AGP IntentPayload from the SubIntent data
sub_intent = IntentPayload(
target_capability=sub_intent_data.target_capability,
payload=sub_intent_data.payload,
# Use the correct keyword for the core IntentPayload
policy_constraints=sub_intent_data.policy_constraints,
)

# Route the synthesized Intent via the core AGP router
route = self.central_gateway.select_best_route(sub_intent)

status = 'SUCCESS' if route else 'FAILED'
path = route.path if route else 'N/A'
cost = route.cost if route else 'N/A'

# Logging the result for each sub-task
logger.info(
f'[{i + 1}/{len(delegation_intent.sub_intents)}] TASK: {sub_intent.target_capability}'
)
logger.info(f' STATUS: {status}')
logger.info(f' ROUTE: {path} (Cost: {cost})')

results[sub_intent.target_capability] = status

logger.info(
'--------------------------------------------------------------------------------'
)
logger.info(
f'[{self.squad_name}] DELEGATION COMPLETE: Processed {len(results)} sub-tasks.'
)
return results
171 changes: 171 additions & 0 deletions extensions/agp/tests/simulation/enterprise/enterprise_sim_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import logging

from agp_protocol import AGPTable, AgentGatewayProtocol, CapabilityAnnouncement
from agp_protocol.agp_delegation_models import (
DelegationIntent,
DelegationRouter,
SubIntent,
)


# Set logging level to WARNING so only our custom routing failures are visible
logging.basicConfig(level=logging.WARNING)


# --- DATA DEFINITION: Centralized Capabilities List ---
# This list defines all squads and their announced policies and costs in a data-driven structure.
ENTERPRISE_CAPABILITIES = [
# 1. FINANCE (Policy-Critical / High-Trust / Google ADK)
{
'capability': 'budget:authorize',
'version': '1.0',
'cost': 0.12,
'path': 'Finance_ADK/budget_gateway',
'policy': {
'security_level': 5,
'requires_role': 'finance_admin',
'geo': 'US',
},
},
# 2. ENGINEERING (Cost-Sensitive / LangChain)
{
'capability': 'infra:provision',
'version': '1.5',
'cost': 0.04,
'path': 'Engineering_LC/provisioner_tool',
'policy': {'security_level': 3, 'geo': 'US'},
},
# 3. HR (Strict Compliance / PII / LangChain)
{
'capability': 'onboarding:initiate',
'version': '1.0',
'cost': 0.15,
'path': 'HR_LC/onboarding_service',
'policy': {'security_level': 4, 'requires_pii': True, 'geo': 'US'},
},
# 4. MARKETING (High Volume / Low Security / LangGraph)
{
'capability': 'content:draft',
'version': '2.0',
'cost': 0.08,
'path': 'Marketing_LG/content_tool',
'policy': {'security_level': 2, 'geo': 'US'},
},
# 5. COMPLIANCE (Zero-Trust/RBAC / Google ADK)
{
'capability': 'policy:audit',
'version': '1.0',
'cost': 0.20,
'path': 'Compliance_ADK/audit_service',
'policy': {'security_level': 5, 'requires_role': 'exec', 'geo': 'US'},
},
# 6. CHEAP EXTERNAL VENDOR (Non-compliant competitor for 'infra:provision')
{
'capability': 'infra:provision',
'version': '1.0',
'cost': 0.03, # CHEAPER than Engineering, but fails security check
'path': 'External_Vendor/vm_tool',
'policy': {'security_level': 1, 'geo': 'US'},
},
]


def setup_enterprise_agp_table(gateway: AgentGatewayProtocol):
"""Simulates Capability Announcements from five specialized, multi-framework Squads.
Refactored to be data-driven.
"""
# CORRECTED: Replaced print() with logging.info()
logging.info(
'--- 1. Announcing Capabilities (Building AGP Routing Table) ---'
)

for item in ENTERPRISE_CAPABILITIES:
announcement = CapabilityAnnouncement(
capability=item['capability'],
version=item['version'],
cost=item['cost'],
policy=item['policy'],
)
gateway.announce_capability(announcement, path=item['path'])


def run_enterprise_simulation():
"""Executes the simulation of an Executive Project Launch delegation
through the Central AGP Gateway.
"""
# Initialize the AGP Gateway
agp_table = AGPTable()
central_gateway = AgentGatewayProtocol(
squad_name='Central_AGP_Router', agp_table=agp_table
)

# Build the routing table
setup_enterprise_agp_table(central_gateway)

logging.info(
'\n--- 2. Building Delegation Task (HR Initiates Project Setup) ---'
)

# Define the Complex Delegation Task (Executive Project Launch)
project_delegation_intent = DelegationIntent(
meta_task='Executive Project Launch: Q4 Strategy Initiative',
origin_squad='HR_Squad_Orchestrator',
sub_intents=[
# TASK 1: Finance Authorization (L5 Security Required)
SubIntent(
target_capability='budget:authorize',
payload={'project_id': 'Q4-STRAT', 'amount': 50000},
policy_constraints={
'security_level': 5,
'requires_role': 'finance_admin',
},
),
# TASK 2: Infrastructure Provisioning (Cost Sensitive, L3 Security Required)
SubIntent(
target_capability='infra:provision',
payload={'vm_type': 'standard_compute'},
policy_constraints={'security_level': 3, 'cost_max': 0.05},
),
# TASK 3: Personnel Onboarding (PII Mandatory)
SubIntent(
target_capability='onboarding:initiate',
payload={
'role': 'Lead Architect',
'candidate_name': 'Jane Doe',
},
policy_constraints={'requires_pii': True},
),
# TASK 4: Compliance Audit Check (RBAC Restriction)
SubIntent(
target_capability='policy:audit',
payload={'report': 'Q4'},
policy_constraints={'requires_role': 'exec'},
),
# TASK 5: Marketing Content Draft (Low Security, High Volume)
SubIntent(
target_capability='content:draft',
payload={'topic': 'Strategy Launch PR'},
policy_constraints={'security_level': 2},
),
],
)

# Initialize the Delegation Router
router = DelegationRouter(central_gateway=central_gateway)

logging.info('\n--- 3. Executing Delegation and Policy Routing ---')
logging.info('Routing Agent: Central_AGP_Router')

# Execute the decomposition and routing
final_status = router.route_delegation_intent(project_delegation_intent)

logging.info('\n--- 4. Final Aggregation Status ---')
for task, status in final_status.items():
logging.info(f"Task '{task}': {status}")


if __name__ == '__main__':
run_enterprise_simulation()

if __name__ == '__main__':
run_enterprise_simulation()
Loading