feat: add Settlement Extension — escrow-based payment for A2A agents#442
feat: add Settlement Extension — escrow-based payment for A2A agents#442widrss wants to merge 5 commits intoa2aproject:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the A2A Settlement Extension (A2A-SE), a new mechanism for enabling escrow-based token settlement between agents in the A2A protocol. It allows agents to define pricing for their services and clients to secure payments in escrow, ensuring providers are paid upon successful task completion and refunded upon failure. The extension integrates seamlessly with the existing A2A framework by leveraging extension capabilities and metadata, requiring no modifications to the core protocol. This enhancement fills a critical gap in agent economics, moving beyond discovery to robust financial transactions. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, a significant feature for enabling escrow-based payments between agents. The submission is well-structured, including a detailed specification, a Python reference implementation, and supporting documentation. My review focuses on improving the clarity of the specification and enhancing the robustness and correctness of the Python code. I've identified a few areas for improvement, such as overly broad exception handling and a potential omission in the client wrapper's auto-settlement logic. Addressing these points will strengthen the new extension.
| async def resubscribe( | ||
| self, | ||
| request: TaskIdParams, | ||
| *, | ||
| context: ClientCallContext | None = None, | ||
| ) -> AsyncIterator[ClientEvent]: | ||
| async for event in self._delegate.resubscribe(request, context=context): | ||
| yield event |
There was a problem hiding this comment.
The resubscribe method streams events but appears to be missing the auto-settlement logic that is present in send_message, get_task, and cancel_task. If a task reaches a terminal state and this event is received via resubscribe, the escrow won't be settled. To ensure consistent behavior, consider adding logic to process events from this stream for auto-settlement, similar to how other client methods are handled.
extensions/settlement/v1/spec.md
Outdated
| | `POST` | `/exchange/escrow` | Create an escrow | | ||
| | `POST` | `/exchange/release` | Release escrowed tokens to provider | | ||
| | `POST` | `/exchange/refund` | Refund escrowed tokens to requester | | ||
| | `POST` | `/exchange/dispute` | Flag an escrow as disputed | |
There was a problem hiding this comment.
The Core Endpoints table could be more specific about how release, refund, and dispute operations identify the target escrow. The current paths (/exchange/release, etc.) do not include an escrow ID. For improved clarity, I suggest either including the ID in the path (e.g., POST /exchange/escrows/{id}/release) or adding a note in the description to clarify that the escrow_id is expected in the request body.
| except Exception: | ||
| logger.warning("Failed to verify escrow %s", escrow_id, exc_info=True) |
There was a problem hiding this comment.
Catching a broad Exception can hide underlying issues and make debugging more difficult. It would be better to catch more specific exceptions that _client.get_escrow might raise (e.g., network errors, specific API errors from the a2a-settlement library). This would allow for more granular error handling and logging.
| except Exception: | ||
| logger.warning( | ||
| "Auto-settle failed for escrow %s", escrow_id, exc_info=True | ||
| ) |
| "depends_on": ["$0"] | ||
| } |
| from a2a.types import ( | ||
| AgentCard, | ||
| AgentExtension, | ||
| GetTaskPushNotificationConfigParams, | ||
| Message, | ||
| SendMessageRequest, | ||
| SendStreamingMessageRequest, | ||
| Task, | ||
| TaskArtifactUpdateEvent, | ||
| TaskIdParams, | ||
| TaskPushNotificationConfig, | ||
| TaskQueryParams, | ||
| TaskStatusUpdateEvent, | ||
| ) | ||
|
|
||
| from a2a_settlement import SettlementExchangeClient | ||
| from a2a_settlement.agentcard import build_settlement_extension | ||
| from a2a_settlement.lifecycle import settle_for_task_state | ||
| from a2a_settlement.metadata import ( | ||
| attach_settlement_metadata, | ||
| build_settlement_metadata, | ||
| get_settlement_block, | ||
| ) |
abd6350 to
425a5f6
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, enabling escrow-based token settlement for agent-to-agent interactions within the A2A protocol. It includes new specification documents, a Python implementation, and configuration updates, integrating via capabilities.extensions for AgentCard declarations and the metadata field for settlement context. While the implementation is well-structured and provides flexible integration options, several critical security issues were identified in the Python reference implementation. These include broken access control in escrow verification and potential bypasses of the settlement requirement. Specifically, the agent fails to verify that it is the intended recipient of an escrow, and the executor does not enforce settlement when it is supposed to be mandatory. Addressing these vulnerabilities is crucial to ensure the integrity and security of the escrow-based settlement process.
| escrow = self._ext.verify_escrow(escrow_id) | ||
| if not escrow: | ||
| return False | ||
| return escrow.get('status') == 'held' |
There was a problem hiding this comment.
The _verify method checks if an escrow is in 'held' status but fails to verify that the current agent is the intended provider of that escrow. An attacker could provide a valid escrow_id belonging to a different agent, causing the current agent to perform work without a valid payment guarantee for themselves. This is a form of broken access control (IDOR) where the agent fails to verify ownership of the escrow resource.
| return escrow.get('status') == 'held' | |
| return escrow.get('status') == 'held' and escrow.get('provider_id') == self._ext._account_id |
| if self._ext.activate(context) and self._ext.auto_verify: | ||
| se_block = self._extract_settlement(context) | ||
| if se_block and not self._verify(se_block): |
There was a problem hiding this comment.
The execute method in _SettledAgentExecutor only performs escrow verification if the settlement extension is 'activated' by the client and if the settlement metadata (se_block) is present. If a client sends a request without activating the extension (e.g., by omitting the A2A-Extensions header) or without providing the metadata, the executor proceeds to execute the task without any payment verification. This bypasses the required: true policy that an agent might declare in its Agent Card, allowing attackers to use paid services for free. The executor should be aware of whether settlement is mandatory and reject requests that do not comply.
| exchange_urls=self._exchange_url, | ||
| account_ids=self._account_id, | ||
| pricing=pricing, | ||
| required=required, | ||
| ) |
There was a problem hiding this comment.
The build_settlement_extension function likely expects exchange_urls to be a list of strings and account_ids to be a dictionary mapping exchange URLs to account IDs. Currently, self._exchange_url (a single string) and self._account_id (a single string) are passed directly. This could lead to a runtime error or incorrect AgentCard generation if the underlying function expects different types. Please ensure the arguments match the expected types of build_settlement_extension.
ext_dict = build_settlement_extension(
exchange_urls=[self._exchange_url],
account_ids={self._exchange_url: self._account_id},
pricing=pricing,
required=required,
)| if state in _TERMINAL_RELEASE | _TERMINAL_REFUND: | ||
| self._escrows.pop(task_id, None) |
There was a problem hiding this comment.
The _escrows.pop(task_id, None) operation occurs within the try block. If settle_for_task_state fails due to an exception (e.g., network error), the escrow ID is still removed from tracking. This means that subsequent attempts to settle for this task will fail because the escrow_id is no longer available. The pop operation should ideally only happen if the settlement is successful.
if state in _TERMINAL_RELEASE | _TERMINAL_REFUND:
self._escrows.pop(task_id, None)
except (OSError, ValueError, KeyError):
logger.warning(
'Auto-settle failed for escrow %s',
escrow_id,
exc_info=True,
)
else:
if state in _TERMINAL_RELEASE | _TERMINAL_REFUND:
self._escrows.pop(task_id, None)| expires_at=escrow.get('expires_at', ''), | ||
| ) |
There was a problem hiding this comment.
The expires_at field defaults to an empty string ''. If the build_settlement_metadata function expects a valid ISO 8601 datetime string, an empty string might cause parsing issues or be considered invalid. It would be safer to pass None if the value is optional or ensure a properly formatted default string is provided.
expires_at=escrow.get('expires_at') # Pass None if not present, or ensure valid format
)| if self._ext.auto_settle and last_event: | ||
| self._try_settle(last_event) |
There was a problem hiding this comment.
In send_message, _try_settle is only called on the last_event. If send_message yields multiple events and an earlier event indicates a terminal state, the settlement might be delayed or missed. For consistency with resubscribe, consider calling _try_settle on each event yielded, or ensure that only the final event can trigger settlement.
if self._ext.auto_settle:
self._try_settle(event)f1da7d6 to
8e68085
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension (A2A-SE), a significant and well-designed feature for enabling escrow-based payments between agents. While the implementation is well-structured, security and performance concerns have been identified. Specifically, the use of synchronous network calls within asynchronous methods could lead to Denial of Service (DoS) by blocking the event loop. Furthermore, the client-side wrapper lacks a mechanism to prune the task-to-escrow mapping, potentially causing unbounded memory growth if tasks do not reach a terminal state. Minor suggestions are also provided to improve the clarity of the specification and the readability of the Python code.
| if self._ext.activate(context) and self._ext.auto_verify: | ||
| se_block = self._extract_settlement(context) | ||
| reject = False | ||
| if se_block and not self._verify(se_block): |
There was a problem hiding this comment.
The _SettledAgentExecutor and _SettledClient classes perform synchronous network-bound operations (via self._ext.verify_escrow and self._try_settle) within async methods. In an asyncio-based application, calling synchronous blocking functions directly on the event loop prevents other tasks from executing. This can lead to significant performance degradation and can be exploited to cause a Denial of Service (DoS) by making the agent or client unresponsive.
Remediation: Use an asynchronous version of the settlement SDK if available, or wrap the synchronous calls in asyncio.to_thread() (Python 3.9+) or loop.run_in_executor() to run them in a separate thread without blocking the event loop.
| def __init__(self, delegate: Client, ext: SettlementExtension) -> None: | ||
| self._delegate = delegate | ||
| self._ext = ext | ||
| self._escrows: dict[str, str] = {} |
There was a problem hiding this comment.
The _SettledClient class maintains a mapping of task_id to escrow_id in the self._escrows dictionary. Entries are only removed when a task reaches a terminal state (COMPLETED, FAILED, etc.). If a large number of tasks are initiated but never reach a terminal state (e.g., due to network issues, agent failure, or malicious intent), this dictionary will grow indefinitely, leading to excessive memory consumption and potential process crash (DoS).
Remediation: Implement a mechanism to prune old or stale entries from the _escrows dictionary, such as using a Time-To-Live (TTL) for entries or a fixed-size cache with an LRU (Least Recently Used) eviction policy.
extensions/settlement/v1/spec.md
Outdated
| | `POST` | `/exchange/release` | Release escrowed tokens to provider (`escrow_id` in body) | | ||
| | `POST` | `/exchange/refund` | Refund escrowed tokens to requester (`escrow_id` in body) | | ||
| | `POST` | `/exchange/dispute` | Flag an escrow as disputed (`escrow_id` in body) | |
There was a problem hiding this comment.
For better clarity in the API specification, consider making the description of the request body more explicit. The current description (escrow_id in body) is slightly ambiguous.
Specifying the exact JSON format, e.g., Body: {"escrow_id": "string"}, would make it easier for developers to implement clients for the exchange API.
| if not (exts := card.capabilities.extensions): | ||
| exts = card.capabilities.extensions = [] | ||
| exts.append(self.agent_extension(pricing, required=required)) |
There was a problem hiding this comment.
The use of the walrus operator and chained assignment here is a bit dense and can be hard to read. A more conventional and clearer approach would be to check for a falsy value and initialize the list before appending. This improves readability without changing the logic.
| if not (exts := card.capabilities.extensions): | |
| exts = card.capabilities.extensions = [] | |
| exts.append(self.agent_extension(pricing, required=required)) | |
| if not card.capabilities.extensions: | |
| card.capabilities.extensions = [] | |
| card.capabilities.extensions.append(self.agent_extension(pricing, required=required)) |
| if not escrow.get('expires_at'): | ||
| meta.get('a2a-se', {}).pop('expiresAt', None) |
There was a problem hiding this comment.
The logic to remove the expiresAt key is a bit indirect. The meta.get('a2a-se', {}) call is redundant because the a2a-se key is guaranteed to exist at this point. A more direct approach would be to access the nested dictionary directly, which makes the code's intent clearer.
| if not escrow.get('expires_at'): | |
| meta.get('a2a-se', {}).pop('expiresAt', None) | |
| if not escrow.get('expires_at'): | |
| meta['a2a-se'].pop('expiresAt', None) |
95eb465 to
69c4490
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the "A2A Settlement Extension", providing a comprehensive specification and a Python reference implementation for escrow-based payment for agents. While the documentation is clear and the overall architecture is sound, a security audit identified critical business logic flaws in the Python implementation. These include a lack of amount validation and protection against escrow reuse (double spending) in server-side escrow verification, which could lead to financial loss, and unreliable client-side settlement logic that risks locking tokens in escrow during network failures. Additionally, the code contains a bug preventing a key feature from working, a usability issue in the client wrapper, and a minor inconsistency in the specification. Addressing these issues is crucial for a production-ready and secure settlement system.
| async def _verify(self, se_block: dict) -> bool: | ||
| escrow_id = se_block.get('escrowId') | ||
| if not escrow_id: | ||
| return False | ||
| escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id) | ||
| if not escrow: | ||
| return False | ||
| if escrow.get('provider_id') != self._ext.account_id: | ||
| logger.warning( | ||
| 'Escrow %s provider mismatch: expected %s, got %s', | ||
| escrow_id, | ||
| self._ext.account_id, | ||
| escrow.get('provider_id'), | ||
| ) | ||
| return False | ||
| return escrow.get('status') == 'held' |
There was a problem hiding this comment.
The _verify method does not ensure that an escrowId is uniquely associated with a single task. Since an escrow remains in the 'held' status until it is explicitly released or refunded, an attacker can reuse the same escrowId across multiple concurrent task requests. The agent's executor wrapper will verify the escrow as valid for each request and proceed with execution, but the agent will only receive payment once when the escrow is eventually released.
| def __init__( | ||
| self, | ||
| exchange_url: str, | ||
| api_key: str, | ||
| account_id: str, | ||
| *, | ||
| auto_verify: bool = True, | ||
| auto_settle: bool = True, | ||
| ) -> None: | ||
| self._exchange_url = exchange_url | ||
| self._api_key = api_key | ||
| self.account_id = account_id | ||
| self.auto_verify = auto_verify | ||
| self.auto_settle = auto_settle | ||
| self.settlement_required = False | ||
| self.exchange_client = SettlementExchangeClient( | ||
| base_url=exchange_url, | ||
| api_key=api_key, | ||
| ) |
There was a problem hiding this comment.
The settlement_required attribute is hardcoded to False, which prevents the server-side logic in _SettledAgentExecutor from ever enforcing that settlement is required. This makes the required: true setting in the Agent Card ineffective.
To allow this behavior to be configured, you should add settlement_required as a parameter to the __init__ method. This will enable the agent's server-side behavior to be aligned with what it advertises in its Agent Card.
| def __init__( | |
| self, | |
| exchange_url: str, | |
| api_key: str, | |
| account_id: str, | |
| *, | |
| auto_verify: bool = True, | |
| auto_settle: bool = True, | |
| ) -> None: | |
| self._exchange_url = exchange_url | |
| self._api_key = api_key | |
| self.account_id = account_id | |
| self.auto_verify = auto_verify | |
| self.auto_settle = auto_settle | |
| self.settlement_required = False | |
| self.exchange_client = SettlementExchangeClient( | |
| base_url=exchange_url, | |
| api_key=api_key, | |
| ) | |
| def __init__( | |
| self, | |
| exchange_url: str, | |
| api_key: str, | |
| account_id: str, | |
| *, | |
| auto_verify: bool = True, | |
| auto_settle: bool = True, | |
| settlement_required: bool = False, | |
| ) -> None: | |
| self._exchange_url = exchange_url | |
| self._api_key = api_key | |
| self.account_id = account_id | |
| self.auto_verify = auto_verify | |
| self.auto_settle = auto_settle | |
| self.settlement_required = settlement_required | |
| self.exchange_client = SettlementExchangeClient( | |
| base_url=exchange_url, | |
| api_key=api_key, | |
| ) |
| async def _verify(self, se_block: dict) -> bool: | ||
| escrow_id = se_block.get('escrowId') | ||
| if not escrow_id: | ||
| return False | ||
| escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id) | ||
| if not escrow: | ||
| return False | ||
| if escrow.get('provider_id') != self._ext.account_id: | ||
| logger.warning( | ||
| 'Escrow %s provider mismatch: expected %s, got %s', | ||
| escrow_id, | ||
| self._ext.account_id, | ||
| escrow.get('provider_id'), | ||
| ) | ||
| return False | ||
| return escrow.get('status') == 'held' |
There was a problem hiding this comment.
The _verify method in _SettledAgentExecutor checks if an escrow exists and is in the 'held' status on the exchange, but it fails to verify that the amount of tokens in the escrow matches the amount specified in the message metadata or the agent's pricing requirements. An attacker can provide an escrowId for an escrow containing a minimal amount (e.g., 1 token) while claiming a higher amount in the metadata, leading the agent to perform work for an insufficient payment.
async def _verify(self, se_block: dict) -> bool:
escrow_id = se_block.get('escrowId')
expected_amount = se_block.get('amount')
if not escrow_id:
return False
escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id)
if not escrow:
return False
if escrow.get('provider_id') != self._ext.account_id:
logger.warning(
'Escrow %s provider mismatch: expected %s, got %s',
escrow_id,
self._ext.account_id,
escrow.get('provider_id'),
)
return False
if expected_amount is not None and escrow.get('amount') != expected_amount:
logger.warning(
'Escrow %s amount mismatch: expected %s, got %s',
escrow_id,
expected_amount,
escrow.get('amount'),
)
return False
return escrow.get('status') == 'held'| try: | ||
| await asyncio.to_thread( | ||
| settle_for_task_state, | ||
| self._ext.exchange_client, | ||
| task_state=state, | ||
| escrow_id=escrow_id, | ||
| ) | ||
| except (OSError, ValueError, KeyError): | ||
| logger.warning( | ||
| 'Auto-settle failed for escrow %s', | ||
| escrow_id, | ||
| exc_info=True, | ||
| ) | ||
| return | ||
|
|
||
| if state in _TERMINAL_RELEASE | _TERMINAL_REFUND: | ||
| self._escrows.pop(task_id, None) |
There was a problem hiding this comment.
In _SettledClient._try_settle, if the call to settle_for_task_state fails (e.g., due to a temporary network error), the task is still removed from the _escrows tracking dictionary if it has reached a terminal state. This prevents any subsequent attempts to release or refund the escrow for that task, leaving the tokens locked in escrow until the TTL expires.
| try: | |
| await asyncio.to_thread( | |
| settle_for_task_state, | |
| self._ext.exchange_client, | |
| task_state=state, | |
| escrow_id=escrow_id, | |
| ) | |
| except (OSError, ValueError, KeyError): | |
| logger.warning( | |
| 'Auto-settle failed for escrow %s', | |
| escrow_id, | |
| exc_info=True, | |
| ) | |
| return | |
| if state in _TERMINAL_RELEASE | _TERMINAL_REFUND: | |
| self._escrows.pop(task_id, None) | |
| try: | |
| await asyncio.to_thread( | |
| settle_for_task_state, | |
| self._ext.exchange_client, | |
| task_state=state, | |
| escrow_id=escrow_id, | |
| ) | |
| if state in _TERMINAL_RELEASE | _TERMINAL_REFUND: | |
| self._escrows.pop(task_id, None) | |
| except (OSError, ValueError, KeyError): | |
| logger.warning( | |
| 'Auto-settle failed for escrow %s', | |
| escrow_id, | |
| exc_info=True, | |
| ) | |
| return |
| | Method | Path | Description | | ||
| |--------|--------------------------|-----------------------------------------------------------------------| | ||
| | `POST` | `/exchange/escrow` | Create an escrow | | ||
| | `POST` | `/exchange/release` | Release escrowed tokens to provider. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/refund` | Refund escrowed tokens to requester. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/dispute` | Flag an escrow as disputed. Body: `{"escrow_id": "<id>"}` | | ||
| | `GET` | `/exchange/escrows/{id}` | Look up an escrow by ID | | ||
| | `GET` | `/exchange/balance` | Get agent's token balance | |
There was a problem hiding this comment.
There's an inconsistency in the JSON key naming convention within the specification. The Settlement Metadata sections (lines 103-143) use camelCase (e.g., escrowId, exchangeUrl), which is common for JSON APIs. However, this Core Endpoints table and the Batch Escrow for Pipelines section (lines 175-202) specify snake_case keys (e.g., escrow_id, provider_id).
To ensure consistency and prevent implementation errors, it would be best to use a single convention throughout the spec. I'd recommend standardizing on camelCase, as it's already used in other parts of the A2A protocol.
| async def send_message( | ||
| self, | ||
| request: Message, | ||
| *, | ||
| context: ClientCallContext | None = None, | ||
| ) -> AsyncIterator[ClientEvent | Message]: | ||
| async for event in self._delegate.send_message( | ||
| request, context=context | ||
| ): | ||
| yield event | ||
| if self._ext.auto_settle: | ||
| await self._try_settle(event) |
There was a problem hiding this comment.
The client wrapper aims to 'auto-settle' tasks, but it requires a manual call to track_escrow(task_id, escrow_id) to associate a task with an escrow. This is not fully automatic and can be error-prone for the user.
The wrapper could automate this association by extracting the escrowId from the outgoing message and the taskId from an incoming event. This would make the 'auto-settle' feature more robust and easier to use, better aligning with the 'fully managed' integration level described in the documentation.
async def send_message(
self,
request: Message,
*,
context: ClientCallContext | None = None,
) -> AsyncIterator[ClientEvent | Message]:
se_block = self._ext.read_metadata(request)
escrow_id = se_block.get('escrowId') if se_block else None
is_tracked = False
async for event in self._delegate.send_message(
request, context=context
):
yield event
if escrow_id and not is_tracked:
task_id, _ = self._extract_task_state(event)
if task_id:
self.track_escrow(task_id, escrow_id)
is_tracked = True
if self._ext.auto_settle:
await self._try_settle(event)Adds the A2A Settlement Extension (A2A-SE), which provides escrow-based token settlement mapped to A2A TaskState transitions. Extension spec (extensions/settlement/v1/spec.md): - AgentCard integration with skill-level pricing - Settlement metadata in A2A message metadata field - Escrow lifecycle mapped to TaskState (create/release/refund) - Batch escrow with pipeline dependencies and cascade refund - Dispute resolution flow Python implementation (samples/python/extensions/settlement/): - SettlementExtension class following the timestamp extension pattern - AgentCard helpers, executor wrapper (server), client wrapper (client) - Delegates to the a2a-settlement SDK for exchange communication Reference exchange running at https://exchange.a2a-settlement.org/api/v1 Made-with: Cursor
69c4490 to
07f844d
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, providing escrow-based token settlement for A2A agents. While the implementation is well-structured and integrates seamlessly, several security issues were identified in the Python implementation. These include a critical logic flaw allowing providers to trigger their own payments, a memory leak in escrow tracking, a race condition enabling replay attacks, and insufficient verification of escrow amounts. Addressing these vulnerabilities is crucial for the security and reliability of the settlement extension.
| auto_settle: bool = True, | ||
| ) -> None: | ||
| self._exchange_url = exchange_url | ||
| self._api_key = api_key |
There was a problem hiding this comment.
Storing the api_key directly as an instance variable (self._api_key = api_key) might be acceptable for a sample, but in a production environment, consider more secure ways to handle API keys, such as environment variables, a secrets management service, or ensuring it's not directly accessible after initialization. If the SettlementExchangeClient handles it securely internally, this might be less of a concern for SettlementExtension itself.
| if self._ext.auto_settle: | ||
| await self._try_settle(task) | ||
| return task |
There was a problem hiding this comment.
The _SettledClient wrapper automatically triggers the settlement process (which includes releasing funds to the provider on COMPLETED status) as soon as it receives a task update. In the get_task method, _try_settle is called immediately after fetching the task from the provider and before returning it to the client. Since the provider is the one who sets the task state to COMPLETED, they can effectively trigger their own payment without the client having any opportunity to review the work or dispute the outcome. This bypasses the intended escrow protection where the requester should be the one to authorize the release of funds. It is recommended to remove automatic release on COMPLETED and instead require a manual client action, or at least provide a hook for client-side verification before settlement.
| expires_at=escrow.get('expires_at', ''), | ||
| ) | ||
| if not escrow.get('expires_at'): | ||
| meta['a2a-se'].pop('expiresAt', None) |
There was a problem hiding this comment.
The line meta['a2a-se'].pop('expiresAt', None) removes the expiresAt field if escrow.get('expires_at') is falsy. It might be cleaner to handle this directly within build_settlement_metadata by passing None if the value is not present, allowing build_settlement_metadata to decide whether to include the field or not, rather than adding it and then immediately removing it.
| _TERMINAL_RELEASE = {'completed', 'TASK_STATE_COMPLETED'} | ||
| _TERMINAL_REFUND = { | ||
| 'failed', | ||
| 'canceled', | ||
| 'rejected', | ||
| 'TASK_STATE_FAILED', | ||
| 'TASK_STATE_CANCELED', | ||
| 'TASK_STATE_REJECTED', | ||
| } |
There was a problem hiding this comment.
The terminal task states (_TERMINAL_RELEASE, _TERMINAL_REFUND) and messaging methods (_MESSAGING_METHODS) are defined using string literals. If a2a.types provides an enum or constants for these states (e.g., TaskState.COMPLETED), it would be more robust and maintainable to use those constants to prevent typos and ensure consistency with the core A2A protocol definitions.
extensions/settlement/README.md
Outdated
| - [Python SDK](https://github.com/a2a-settlement/a2a-settlement/tree/main/sdk) | ||
| - [TypeScript SDK](https://github.com/a2a-settlement/a2a-settlement/tree/main/sdk-ts) |
There was a problem hiding this comment.
The links for the Python and TypeScript SDKs point to the source code directories (tree/main/sdk and tree/main/sdk-ts), while the text implies installation via pip install and npm install. This could be confusing. Consider updating the links to point to the respective package repositories (e.g., PyPI, npm) or clarifying that these are the SDKs for the settlement exchange, not the A2A framework itself, if that's the case. Consistency with extensions/settlement/v1/spec.md is also important.
| """ | ||
| try: | ||
| return self.exchange_client.get_escrow(escrow_id=escrow_id) | ||
| except (OSError, ValueError, KeyError): |
There was a problem hiding this comment.
The except block catches a broad set of exceptions (OSError, ValueError, KeyError). While exc_info=True helps with debugging, it's generally better to catch more specific exceptions if possible, or at least add a comment explaining why such broad exceptions are being caught here (e.g., to handle various potential issues from the external exchange_client).
| description = "A2A Settlement Extension — escrow-based payment for A2A agents" | ||
| readme = "README.md" | ||
| requires-python = ">=3.10" | ||
| dependencies = ["a2a-sdk>=0.3.0", "a2a-settlement>=0.8.0"] |
There was a problem hiding this comment.
The dependencies are pinned to specific versions (a2a-sdk>=0.3.0, a2a-settlement>=0.8.0). For a library or extension, it's often better to use more flexible version specifiers (e.g., ~=0.3.0 or >=0.3.0, <0.4.0) to allow for minor updates and bug fixes without requiring manual intervention, unless strict pinning is explicitly desired for stability or compatibility reasons.
| dependencies = ["a2a-sdk>=0.3.0", "a2a-settlement>=0.8.0"] | |
| dependencies = ["a2a-sdk~=0.3.0", "a2a-settlement~=0.8.0"] |
| api_key="ate_your_key", | ||
| account_id="your-agent-uuid", |
There was a problem hiding this comment.
The api_key and account_id values ("ate_your_key", "your-agent-uuid") are placeholders. It would be beneficial to explicitly mention in the comments that these should be replaced with actual values, for example, by adding # Replace with your actual API key.
| api_key="ate_your_key", | |
| account_id="your-agent-uuid", | |
| api_key="ate_your_key", # Replace with your actual API key | |
| account_id="your-agent-uuid", # Replace with your actual account ID |
| # ── Client-side wrapper ──────────────────────────────────────── | ||
|
|
||
|
|
||
| _ESCROW_TTL_S = 30 * 60 # 30 minutes, matches default exchange escrow TTL |
There was a problem hiding this comment.
The _ESCROW_TTL_S constant is hardcoded to 30 minutes. While it matches the default exchange TTL, it might be beneficial to make this configurable (e.g., via SettlementExtension constructor) or derive it dynamically if the exchange API provides this information, to allow for more flexibility and easier adjustments.
| | `preferredExchange` | string | No | The agent's preferred exchange from the list | | ||
| | `accountIds` | object | Yes | Map of exchange URL to agent's account ID on that exchange | | ||
| | `pricing` | object | No | Map of skill ID to pricing configuration | | ||
| | `currency` | string | No | Default currency (default: `ATE`) | |
There was a problem hiding this comment.
For the currency field in the Extension Params table, the 'Required' column states 'No', but the description mentions 'Default currency (default: ATE)'. While this is technically correct, it might be clearer to indicate that it's optional but has a default value, or add a note about the default in the 'Required' column itself for better readability.
Adds the A2A Settlement Extension (A2A-SE) with Python reference implementation and specification for escrow-based token settlement in the A2A task lifecycle. Made-with: Cursor
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, enabling escrow-based token settlement for agent-to-agent interactions, complete with comprehensive documentation and a Python implementation for escrow management. A significant concern is a resource exhaustion vulnerability identified in the server-side executor wrapper (_SettledAgentExecutor), where the _used_escrows set grows indefinitely, potentially leading to a Denial of Service (DoS) via memory exhaustion. Implementing a pruning mechanism, similar to the client-side wrapper, is crucial to address this. Further improvements are needed regarding the robustness of task state comparisons, the configuration of the settlement_required flag, and the transient nature of escrow tracking in the Python client and executor wrappers.
| _TERMINAL_RELEASE = {TaskState.completed, 'TASK_STATE_COMPLETED'} | ||
| _TERMINAL_REFUND = { | ||
| TaskState.failed, | ||
| TaskState.canceled, | ||
| TaskState.rejected, | ||
| 'TASK_STATE_FAILED', | ||
| 'TASK_STATE_CANCELED', | ||
| 'TASK_STATE_REJECTED', | ||
| } |
There was a problem hiding this comment.
The _TERMINAL_RELEASE and _TERMINAL_REFUND sets contain a mix of TaskState enum members and string literals. Assuming event.status.state (from _extract_task_state) returns a string representation of the task state (e.g., 'TASK_STATE_COMPLETED'), comparing an enum member like TaskState.completed directly to this string will always evaluate to False if TaskState is a standard Python Enum.
To ensure correct comparison, it's best to consistently use either the enum members or their string values. If TaskState is a StrEnum (where TaskState.completed == 'TASK_STATE_COMPLETED' is true), then the current approach is fine. However, if it's a regular Enum, you should compare against the enum's value or name, or convert the incoming state string to an enum member before comparison.
For robustness, I recommend converting the TaskState enum members to their string representation for comparison, assuming event.status.state is always a string.
_TERMINAL_RELEASE = {TaskState.completed.value, 'TASK_STATE_COMPLETED'}
_TERMINAL_REFUND = {
TaskState.failed.value,
TaskState.canceled.value,
TaskState.rejected.value,
'TASK_STATE_FAILED',
'TASK_STATE_CANCELED',
'TASK_STATE_REJECTED',
}| ) -> None: | ||
| self._delegate = delegate | ||
| self._ext = ext | ||
| self._used_escrows: set[str] = set() |
There was a problem hiding this comment.
The _SettledAgentExecutor class uses the _used_escrows set to prevent replay attacks, but this set grows indefinitely as escrowId entries are never removed. Since escrowId is untrusted client input, an attacker can exploit this to cause a Denial of Service (DoS) via memory exhaustion (CWE-400). This resource exhaustion vulnerability is critical. Additionally, as an in-memory store, _used_escrows state is lost on agent restarts, which could also lead to replay attacks if not properly handled by the settlement exchange. A pruning mechanism, similar to the client-side wrapper's _prune_stale, is needed to bound memory growth. For production systems, consider persisting this state to ensure robustness against restarts and enhance security.
| if reason: | ||
| logger.warning('Escrow %s rejected: %s', escrow_id, reason) | ||
| return False | ||
| self._used_escrows.add(escrow_id) |
There was a problem hiding this comment.
Every unique escrow_id that passes initial verification is added to the _used_escrows set without any pruning or size limits. To remediate this, implement a pruning mechanism similar to the one used in _SettledClient._escrows (line 464). Since escrows have a limited TTL (default 30 minutes), the agent only needs to track IDs for a reasonable duration to prevent replays while work is in progress.
self._used_escrows: dict[str, float] = {}
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
self._prune_stale()
# ... rest of the method ...
def _prune_stale(self) -> None:
cutoff = time.monotonic() - _ESCROW_TTL_S
stale = [k for k, ts in self._used_escrows.items() if ts < cutoff]
for k in stale:
self._used_escrows.pop(k, None)| self.account_id = account_id | ||
| self.auto_verify = auto_verify | ||
| self.auto_settle = auto_settle | ||
| self.settlement_required = False |
There was a problem hiding this comment.
The settlement_required attribute is initialized to False and is not directly configurable via the SettlementExtension constructor. However, the _SettledAgentExecutor (lines 287-289) relies on self._ext.settlement_required to determine if settlement metadata is mandatory.
The required parameter in agent_extension and add_to_card only affects the AgentCard declaration, not the runtime behavior of the SettlementExtension instance itself. This means that even if an agent declares required=True in its AgentCard, the executor will not enforce it unless self.settlement_required is manually set on the SettlementExtension instance after initialization.
To ensure the settlement_required logic functions as intended for the server-side executor, it should be a configurable parameter in the SettlementExtension constructor.
auto_verify: bool = True,
auto_settle: bool = False,
settlement_required: bool = False,
) -> None:
self._exchange_url = exchange_url
self._api_key = api_key
self.account_id = account_id
self.auto_verify = auto_verify
self.auto_settle = auto_settle
self.settlement_required = settlement_required| def __init__(self, delegate: Client, ext: SettlementExtension) -> None: | ||
| self._delegate = delegate | ||
| self._ext = ext | ||
| self._escrows: dict[str, tuple[str, float]] = {} |
There was a problem hiding this comment.
Similar to _used_escrows in the executor, the _escrows dictionary in the client wrapper is an in-memory store. If the client process restarts, it loses track of active escrows. This could lead to escrows remaining unsettled, requiring manual intervention to release or refund them.
For a more robust client, especially in scenarios where tasks can be long-running or the client might restart, consider persisting this escrow tracking state. This could involve storing it in a local database or a persistent cache.
| """ | ||
| try: | ||
| return self.exchange_client.get_escrow(escrow_id=escrow_id) | ||
| except (OSError, ValueError, KeyError): |
There was a problem hiding this comment.
Catching broad exceptions like OSError, ValueError, and KeyError can mask underlying issues. While it prevents crashes, it makes debugging harder as the specific cause of failure is lost.
Consider catching more specific exceptions if possible, or at least logging the exception type and traceback for better diagnostics. For example, if get_escrow can raise a specific ExchangeAPIError, catching that would be more precise.
| if not escrow.get('expires_at'): | ||
| meta['a2a-se'].pop('expiresAt', None) | ||
| return meta |
There was a problem hiding this comment.
The logic to remove expiresAt if it's an empty string is a bit indirect. build_settlement_metadata already sets expires_at to an empty string if escrow.get('expires_at') is not provided.
It would be cleaner to pass None to build_settlement_metadata if expires_at is not available, and let build_settlement_metadata handle whether to include the field or not, or to explicitly set it to None if it should be omitted.
meta = build_settlement_metadata(
escrow_id=escrow['escrow_id'],
amount=escrow['amount'],
fee_amount=escrow.get('fee_amount', 0),
exchange_url=self._exchange_url,
expires_at=escrow.get('expires_at')
)| # ── Client-side wrapper ──────────────────────────────────────── | ||
|
|
||
|
|
||
| _ESCROW_TTL_S = 30 * 60 # 30 minutes, matches default exchange escrow TTL |
There was a problem hiding this comment.
The _ESCROW_TTL_S constant is hardcoded to 30 minutes. While this matches the default exchange escrow TTL, it might be beneficial to make this configurable, especially if different exchanges or use cases require varying TTLs.
Consider making this a parameter in the SettlementExtension constructor or deriving it dynamically from the exchange's capabilities if available.
_ESCROW_TTL_S = 30 * 60 # 30 minutes, matches default exchange escrow TTL
class _SettledClient(Client):
"""Manages escrow around task messages.
Keeps a mapping of task_id to escrow_id so it can settle
automatically when the task reaches a terminal state. Entries
are pruned after ``_ESCROW_TTL_S`` to bound memory growth.
"""
def __init__(self, delegate: Client, ext: SettlementExtension, escrow_ttl_s: int = _ESCROW_TTL_S) -> None:
self._delegate = delegate
self._ext = ext
self._escrows: dict[str, tuple[str, float]] = {}
self._escrow_ttl_s = escrow_ttl_sAdds the A2A Settlement Extension (A2A-SE) with Python reference implementation and specification for escrow-based token settlement in the A2A task lifecycle. Made-with: Cursor
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension (A2A-SE), a significant and well-designed feature for enabling escrow-based payments between agents. However, a security audit identified critical issues related to improper input validation, uncontrolled resource consumption (DoS), and log injection. Specifically, untrusted escrow IDs from message metadata are used without sufficient validation, which could lead to memory exhaustion, forged log entries, or unauthorized API requests to the settlement exchange. Additionally, the code review suggests improvements to the specification's clarity and enhancements to error reporting in the Python implementation.
| ) -> None: | ||
| self._delegate = delegate | ||
| self._ext = ext | ||
| self._used_escrows: dict[str, float] = {} |
There was a problem hiding this comment.
The _used_escrows and _escrows dictionaries grow as new IDs are processed. While there is a pruning mechanism that removes entries older than 30 minutes, there is no limit on the total number of entries or the size of the keys (IDs) stored. An attacker can send a large number of unique, oversized IDs within the 30-minute window, leading to excessive memory consumption and a potential Denial of Service (DoS).
| escrow_id = se_block.get('escrowId') | ||
| if not escrow_id or escrow_id in self._used_escrows: | ||
| if escrow_id: | ||
| logger.warning('Escrow %s already used', escrow_id) | ||
| return False | ||
| escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id) |
There was a problem hiding this comment.
The escrow_id from untrusted message metadata is passed directly to the settlement exchange client to fetch escrow details. If the underlying client or the exchange API does not properly sanitize this ID when constructing the request URL (e.g., GET /exchange/escrows/{id}), an attacker could use path traversal sequences (like ../) to make the agent perform unauthorized requests to other API endpoints using its own credentials. Additionally, malformed input (e.g., a list instead of a string) could cause unhandled exceptions and DoS.
| | Method | Path | Description | | ||
| |--------|--------------------------|-----------------------------------------------------------------------| | ||
| | `POST` | `/exchange/escrow` | Create an escrow | | ||
| | `POST` | `/exchange/release` | Release escrowed tokens to provider. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/refund` | Refund escrowed tokens to requester. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/dispute` | Flag an escrow as disputed. Body: `{"escrow_id": "<id>"}` | | ||
| | `GET` | `/exchange/escrows/{id}` | Look up an escrow by ID | | ||
| | `GET` | `/exchange/balance` | Get agent's token balance | |
There was a problem hiding this comment.
The API endpoint paths in the "Core Endpoints" table (e.g., /exchange/escrow, /exchange/release) appear inconsistent with RESTful practices and the linked reference implementation (https://exchange.a2a-settlement.org/docs). The reference implementation uses paths like /api/v1/escrow and /api/v1/escrow/{escrow_id}/release. To avoid confusion for implementers, the specification should align with the reference implementation.
A more conventional RESTful design would look like this:
| Method | Path | Description |
|--------|---------------------------------|-------------------------------------------|
| `POST` | `/escrows` | Create an escrow |
| `POST` | `/escrows/{id}/release` | Release escrowed tokens to provider |
| `POST` | `/escrows/{id}/refund` | Refund escrowed tokens to requester |
| `POST` | `/escrows/{id}/dispute` | Flag an escrow as disputed |
| `GET` | `/escrows/{id}` | Look up an escrow by ID |
| `GET` | `/balance` | Get agent's token balance || escrow_id = se_block.get('escrowId') | ||
| if not escrow_id or escrow_id in self._used_escrows: | ||
| if escrow_id: | ||
| logger.warning('Escrow %s already used', escrow_id) |
There was a problem hiding this comment.
| if reject: | ||
| await event_queue.enqueue_event( | ||
| TaskStatusUpdateEvent( | ||
| taskId=context.task_id or '', | ||
| contextId=context.context_id or '', | ||
| final=True, | ||
| status=TaskStatusUpdateEvent.Status( | ||
| state=TaskState.failed, | ||
| message=Message( | ||
| messageId='settlement-reject', | ||
| role='agent', | ||
| parts=[], | ||
| ), | ||
| ), | ||
| ) | ||
| ) | ||
| return |
There was a problem hiding this comment.
When a task is rejected due to a settlement failure, a generic TaskStatusUpdateEvent is sent with an empty parts array in the message. This is a missed opportunity to provide a more descriptive error message to the client, which would aid in debugging. Consider determining the specific reason for failure (e.g., 'Escrow verification failed' or 'Settlement metadata missing') from the logic preceding this block and including it in the message parts.
| # ── Client-side wrapper ──────────────────────────────────────── | ||
|
|
||
|
|
||
| _ESCROW_TTL_S = 30 * 60 # 30 minutes, matches default exchange escrow TTL |
There was a problem hiding this comment.
Adds the A2A Settlement Extension (A2A-SE) with Python reference implementation and specification for escrow-based token settlement in the A2A task lifecycle. Made-with: Cursor
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, enabling escrow-based token settlement for agent-to-agent interactions. While the design effectively integrates settlement logic and the Python implementation demonstrates robust handling of escrow verification and automated settlement, two security issues were identified: a race condition in the escrow verification logic that could allow for replay attacks, and potential Denial of Service vulnerabilities due to insufficient type checking of untrusted metadata input. Addressing these security concerns will enhance the reliability and security of the extension.
| return bool(value and _ESCROW_ID_RE.fullmatch(value)) | ||
|
|
||
|
|
||
| class SettlementExtension: | ||
| """A2A Settlement Extension implementation. | ||
|
|
||
| Mirrors the pattern of the timestamp extension, offering multiple | ||
| integration levels from fully manual to fully managed. | ||
|
|
||
| Attributes: | ||
| auto_verify: If True, the executor wrapper rejects tasks whose | ||
| escrow cannot be verified on the exchange. | ||
| auto_settle: If True, the client wrapper releases/refunds escrow | ||
| automatically when a task reaches a terminal state. | ||
| **Security note**: when enabled, the provider controls when | ||
| payment is released by setting the task to COMPLETED. Set to | ||
| False and call ``release()`` explicitly when the client must | ||
| review the outcome before payment. | ||
| settlement_required: If True, the executor wrapper rejects tasks | ||
| that activate settlement but do not include settlement | ||
| metadata. Default False (freemium — unpaid requests allowed). | ||
|
|
||
| Args: | ||
| exchange_url: Base URL of the settlement exchange. | ||
| api_key: Bearer token for the exchange. | ||
| account_id: This agent's account ID on the exchange. | ||
| auto_verify: If True, the executor wrapper rejects tasks whose | ||
| escrow cannot be verified on the exchange. Default True. | ||
| auto_settle: If True, the client wrapper releases/refunds escrow | ||
| automatically when a task reaches a terminal state. | ||
| Default False. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| exchange_url: str, | ||
| api_key: str, | ||
| account_id: str, | ||
| *, | ||
| auto_verify: bool = True, | ||
| auto_settle: bool = False, | ||
| ) -> None: | ||
| self._exchange_url = exchange_url | ||
| self._api_key = api_key | ||
| self.account_id = account_id | ||
| self.auto_verify = auto_verify | ||
| self.auto_settle = auto_settle | ||
| self.settlement_required = False | ||
| self.exchange_client = SettlementExchangeClient( | ||
| base_url=exchange_url, | ||
| api_key=api_key, | ||
| ) | ||
|
|
||
| # ── AgentCard integration ────────────────────────────────── | ||
|
|
||
| def agent_extension( | ||
| self, | ||
| pricing: dict[str, Any] | None = None, | ||
| *, | ||
| required: bool = False, | ||
| ) -> AgentExtension: | ||
| """Build the AgentExtension object for this extension.""" | ||
| ext_dict = build_settlement_extension( | ||
| exchange_urls=[self._exchange_url], | ||
| account_ids={self._exchange_url: self.account_id}, | ||
| pricing=pricing, | ||
| required=required, | ||
| ) | ||
| return AgentExtension(**ext_dict) | ||
|
|
||
| def add_to_card( | ||
| self, | ||
| card: AgentCard, | ||
| pricing: dict[str, Any] | None = None, | ||
| *, | ||
| required: bool = False, | ||
| ) -> AgentCard: | ||
| """Add the settlement extension to an AgentCard.""" | ||
| if not card.capabilities.extensions: | ||
| card.capabilities.extensions = [] | ||
| card.capabilities.extensions.append( | ||
| self.agent_extension(pricing, required=required) | ||
| ) | ||
| return card | ||
|
|
||
| def is_supported(self, card: AgentCard | None) -> bool: | ||
| """Check whether an AgentCard advertises settlement.""" | ||
| if card: | ||
| return find_extension_by_uri(card, URI) is not None | ||
| return False | ||
|
|
||
| # ── Extension activation ─────────────────────────────────── | ||
|
|
||
| def activate(self, context: RequestContext) -> bool: | ||
| """Activate the extension if the client requested it.""" | ||
| if URI in context.requested_extensions: | ||
| context.add_activated_extension(URI) | ||
| return True | ||
| return False | ||
|
|
||
| # ── Manual helpers (Option 1) ────────────────────────────── | ||
|
|
||
| def create_escrow( | ||
| self, | ||
| *, | ||
| provider_id: str, | ||
| amount: int, | ||
| task_id: str | None = None, | ||
| task_type: str | None = None, | ||
| ttl_minutes: int | None = None, | ||
| ) -> dict[str, Any]: | ||
| """Create an escrow on the exchange.""" | ||
| return self.exchange_client.create_escrow( | ||
| provider_id=provider_id, | ||
| amount=amount, | ||
| task_id=task_id, | ||
| task_type=task_type, | ||
| ttl_minutes=ttl_minutes, | ||
| ) | ||
|
|
||
| def release(self, escrow_id: str) -> dict[str, Any]: | ||
| """Release an escrow (pay the provider).""" | ||
| return self.exchange_client.release_escrow(escrow_id=escrow_id) | ||
|
|
||
| def refund( | ||
| self, | ||
| escrow_id: str, | ||
| reason: str | None = None, | ||
| ) -> dict[str, Any]: | ||
| """Refund an escrow (return tokens to requester).""" | ||
| return self.exchange_client.refund_escrow( | ||
| escrow_id=escrow_id, reason=reason | ||
| ) | ||
|
|
||
| def verify_escrow(self, escrow_id: str) -> dict[str, Any] | None: | ||
| """Look up an escrow on the exchange. | ||
|
|
||
| Returns: | ||
| The escrow dict, or None if the lookup fails. | ||
| """ | ||
| try: | ||
| return self.exchange_client.get_escrow(escrow_id=escrow_id) | ||
| except (OSError, ValueError, KeyError): | ||
| logger.warning( | ||
| 'Failed to verify escrow %s', | ||
| escrow_id, | ||
| exc_info=True, | ||
| ) | ||
| return None | ||
|
|
||
| def build_metadata(self, escrow: dict[str, Any]) -> dict[str, Any]: | ||
| """Build the a2a-se metadata block from an escrow.""" | ||
| meta = build_settlement_metadata( | ||
| escrow_id=escrow['escrow_id'], | ||
| amount=escrow['amount'], | ||
| fee_amount=escrow.get('fee_amount', 0), | ||
| exchange_url=self._exchange_url, | ||
| expires_at=escrow.get('expires_at', ''), | ||
| ) | ||
| if not escrow.get('expires_at'): | ||
| meta['a2a-se'].pop('expiresAt', None) | ||
| return meta | ||
|
|
||
| @staticmethod | ||
| def read_metadata( | ||
| message: Message | Task | dict, | ||
| ) -> dict[str, Any] | None: | ||
| """Extract the a2a-se block from a message or task.""" | ||
| return get_settlement_block(message) | ||
|
|
||
| # ── Server-side: executor wrapper (Option 2) ─────────────── | ||
|
|
||
| def wrap_executor(self, executor: AgentExecutor) -> AgentExecutor: | ||
| """Wrap an executor to verify escrow before execution. | ||
|
|
||
| If the settlement extension is activated and the incoming | ||
| message contains an escrowId, the wrapper verifies the | ||
| escrow exists and is in 'held' status on the exchange | ||
| before delegating to the real executor. If verification | ||
| fails, the task is rejected. | ||
| """ | ||
| return _SettledAgentExecutor(executor, self) | ||
|
|
||
| # ── Client-side: client wrapper (Option 3) ───────────────── | ||
|
|
||
| def wrap_client(self, client: Client) -> Client: | ||
| """Wrap a client to auto-manage escrow lifecycle. | ||
|
|
||
| Outgoing messages get escrow metadata injected. When the | ||
| task reaches a terminal state, escrow is released or | ||
| refunded. | ||
| """ | ||
| return _SettledClient(client, self) | ||
|
|
||
| def client_interceptor(self) -> ClientCallInterceptor: | ||
| """Get a client interceptor that activates settlement.""" | ||
| return _SettlementClientInterceptor(self) | ||
|
|
||
|
|
||
| # ── Server-side wrapper ──────────────────────────────────────── | ||
|
|
||
|
|
||
| class _SettledAgentExecutor(AgentExecutor): | ||
| """Verifies escrow before delegating to the wrapped executor.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| delegate: AgentExecutor, | ||
| ext: SettlementExtension, | ||
| ) -> None: | ||
| self._delegate = delegate | ||
| self._ext = ext | ||
| self._used_escrows: dict[str, float] = {} | ||
|
|
||
| async def execute( | ||
| self, | ||
| context: RequestContext, | ||
| event_queue: EventQueue, | ||
| ) -> None: | ||
| if self._ext.activate(context) and self._ext.auto_verify: | ||
| se_block = self._extract_settlement(context) | ||
| reject = False | ||
| if se_block and not await self._verify(se_block): | ||
| reject = True | ||
| elif not se_block and self._ext.settlement_required: | ||
| logger.warning('Settlement required but no metadata provided') | ||
| reject = True | ||
|
|
||
| if reject: | ||
| await event_queue.enqueue_event( | ||
| TaskStatusUpdateEvent( | ||
| taskId=context.task_id or '', | ||
| contextId=context.context_id or '', | ||
| final=True, | ||
| status=TaskStatusUpdateEvent.Status( | ||
| state=TaskState.failed, | ||
| message=Message( | ||
| messageId='settlement-reject', | ||
| role='agent', | ||
| parts=[], | ||
| ), | ||
| ), | ||
| ) | ||
| ) | ||
| return | ||
| await self._delegate.execute(context, event_queue) | ||
|
|
||
| async def cancel( | ||
| self, | ||
| context: RequestContext, | ||
| event_queue: EventQueue, | ||
| ) -> None: | ||
| await self._delegate.cancel(context, event_queue) | ||
|
|
||
| def _extract_settlement(self, context: RequestContext) -> dict | None: | ||
| msg = context.message | ||
| if msg and msg.metadata: | ||
| return msg.metadata.get(METADATA_KEY) | ||
| return None | ||
|
|
||
| async def _verify(self, se_block: dict) -> bool: | ||
| self._prune_used() | ||
| escrow_id = se_block.get('escrowId') | ||
| if not _valid_escrow_id(escrow_id): |
There was a problem hiding this comment.
This section contains a Denial of Service vulnerability in the _verify method of _SettledAgentExecutor and the _valid_escrow_id helper function. These methods are vulnerable to malformed input, specifically non-dictionary se_block or non-string/None escrowId, which can cause AttributeError or TypeError and lead to agent crashes. Additionally, for better clarity and explicit configuration, consider exposing the settlement_required attribute as an optional parameter in the __init__ method of SettlementExtension.
| if escrow_id in self._used_escrows: | ||
| logger.warning('Escrow already used') | ||
| return False | ||
| escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id) | ||
| if not escrow: | ||
| return False | ||
|
|
||
| reason = self._check_escrow(se_block, escrow, escrow_id) | ||
| if reason: | ||
| logger.warning('Escrow %s rejected: %s', escrow_id, reason) | ||
| return False | ||
| self._used_escrows[escrow_id] = time.monotonic() |
There was a problem hiding this comment.
The _verify method in _SettledAgentExecutor checks if an escrow_id has already been used by looking it up in self._used_escrows. However, it only adds the escrow_id to the _used_escrows set after performing an asynchronous network call to verify the escrow on the exchange. If multiple requests with the same escrow_id are processed concurrently, they may all pass the initial check before any of them can update the _used_escrows set, leading to a double-spending or replay attack where the agent performs work multiple times for a single escrow.
if escrow_id in self._used_escrows:
logger.warning('Escrow already used')
return False
# Mark as used before the await to prevent concurrent reuse
self._used_escrows[escrow_id] = time.monotonic()
try:
escrow = await asyncio.to_thread(self._ext.verify_escrow, escrow_id)
if not escrow:
self._used_escrows.pop(escrow_id, None)
return False
reason = self._check_escrow(se_block, escrow, escrow_id)
if reason:
logger.warning('Escrow %s rejected: %s', escrow_id, reason)
self._used_escrows.pop(escrow_id, None)
return False
return True
except Exception:
self._used_escrows.pop(escrow_id, None)
raise| "amount": 10, | ||
| "feeAmount": 1, | ||
| "exchangeUrl": "https://exchange.a2a-settlement.org/api/v1", | ||
| "expiresAt": "2026-02-17T12:30:00Z" |
There was a problem hiding this comment.
The expiresAt field is shown in the example client message metadata. However, the Python implementation's build_metadata function explicitly removes this field if its value is falsy (e.g., None or empty string). To avoid potential confusion, please clarify in the specification whether expiresAt is an optional field in the a2a-se metadata block.
Adds the A2A Settlement Extension (A2A-SE) with Python reference implementation and specification for escrow-based token settlement in the A2A task lifecycle. Made-with: Cursor
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the A2A Settlement Extension, providing a mechanism for escrow-based payments between agents. The changes include the extension specification, a Python sample implementation, and associated documentation. The implementation is well-structured and follows existing patterns in the codebase. My review focuses on improving the API design in the specification for better REST compliance and enhancing error reporting in the Python implementation to provide more context to clients upon failure. I've also included a minor suggestion for code organization.
| | `POST` | `/exchange/escrow` | Create an escrow | | ||
| | `POST` | `/exchange/release` | Release escrowed tokens to provider. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/refund` | Refund escrowed tokens to requester. Body: `{"escrow_id": "<id>"}` | | ||
| | `POST` | `/exchange/dispute` | Flag an escrow as disputed. Body: `{"escrow_id": "<id>"}` | | ||
| | `GET` | `/exchange/escrows/{id}` | Look up an escrow by ID | |
There was a problem hiding this comment.
The API endpoint design could be more aligned with REST conventions for better clarity and predictability.
-
Inconsistent Pluralization: The path for creating an escrow is
POST /exchange/escrow(singular), while looking one up isGET /exchange/escrows/{id}(plural). It's conventional to use the plural form for the resource collection. I suggestPOST /exchange/escrows. -
RPC-style Actions: The
release,refund, anddisputeoperations are modeled as top-level endpoints taking anescrow_idin the body. A more RESTful approach would be to model these as actions on a specific escrow resource, for example:POST /exchange/escrows/{id}/release.
Here's a suggested revision of the endpoints table:
| Method | Path | Description |
|---|---|---|
POST |
/exchange/escrows |
Create an escrow |
POST |
/exchange/escrows/{id}/release |
Release escrowed tokens to provider |
POST |
/exchange/escrows/{id}/refund |
Refund escrowed tokens to requester |
POST |
/exchange/escrows/{id}/dispute |
Flag an escrow as disputed |
GET |
/exchange/escrows/{id} |
Look up an escrow by ID |
This would make the API more predictable and easier to use.
| se_block = self._extract_settlement(context) | ||
| reject = False | ||
| if se_block and not await self._verify(se_block): | ||
| reject = True | ||
| elif not se_block and self._ext.settlement_required: | ||
| logger.warning('Settlement required but no metadata provided') | ||
| reject = True | ||
|
|
||
| if reject: | ||
| await event_queue.enqueue_event( | ||
| TaskStatusUpdateEvent( | ||
| taskId=context.task_id or '', | ||
| contextId=context.context_id or '', | ||
| final=True, | ||
| status=TaskStatusUpdateEvent.Status( | ||
| state=TaskState.failed, | ||
| message=Message( | ||
| messageId='settlement-reject', | ||
| role='agent', | ||
| parts=[], | ||
| ), | ||
| ), | ||
| ) | ||
| ) | ||
| return |
There was a problem hiding this comment.
When a task is rejected due to a settlement issue, the client receives a generic failure message with no details. It would be more helpful for the client to receive a reason for the rejection. Consider adding a descriptive message to the TaskStatusUpdateEvent.
You could refactor the logic to capture the reason and include it in the failure message.
se_block = self._extract_settlement(context)
rejection_reason: str | None = None
if se_block:
if not await self._verify(se_block):
rejection_reason = 'Escrow verification failed.'
elif self._ext.settlement_required:
rejection_reason = 'Settlement required but no metadata provided.'
if rejection_reason:
logger.warning(
'Rejecting task %s: %s',
context.task_id,
rejection_reason,
)
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=context.task_id or '',
contextId=context.context_id or '',
final=True,
status=TaskStatusUpdateEvent.Status(
state=TaskState.failed,
message=Message(
messageId='settlement-reject',
role='agent',
parts=[{'text': rejection_reason}],
),
),
)
)
return| # ── Client-side wrapper ──────────────────────────────────────── | ||
|
|
||
|
|
||
| _ESCROW_TTL_S = 30 * 60 # 30 minutes, matches default exchange escrow TTL |
Summary
Adds the A2A Settlement Extension (A2A-SE), which provides escrow-based token settlement mapped to A2A TaskState transitions. This fills the gap between agent discovery (registry/catalog) and agent economics (how agents pay each other for work).
What it does:
per-request,per-unit,per-minute,negotiable)COMPLETED→ escrow released (provider gets paid). OnFAILED/CANCELED→ escrow refundeddepends_onfor cascade refund on upstream failureWhat's included:
extensions/settlement/v1/spec.mdsamples/python/extensions/settlement/SettlementExtensionclass with executor wrapper (server-side escrow verification) and client wrapper (auto-settle on terminal task state)How it fits with A2A:
capabilities.extensionsfor AgentCard declaration (no core protocol changes)metadata["a2a-se"]on Messages/Tasks for settlement contextTaskStatetransitions — no new states neededA2A-ExtensionsHTTP headerProduction reference:
Complementary to x402: x402 handles pay-per-request access gating (pay before the door opens). A2A-SE handles task-lifecycle settlement (escrow while work is in progress, release/refund based on outcome). They can be used together.
Test plan
extensions/settlement/v1/spec.mdrenders correctlysamples/python/extensions/settlement/installs withpip install -e .SettlementExtensionclass follows the same pattern asTimestampExtensionMade with Cursor