Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions acp_adapter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""ACP (Agent Communication Protocol) adapter for hermes-agent."""
5 changes: 5 additions & 0 deletions acp_adapter/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Allow running the ACP adapter as ``python -m acp_adapter``."""

from .entry import main

main()
26 changes: 26 additions & 0 deletions acp_adapter/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""ACP auth helpers — detect available LLM providers for authentication."""

from __future__ import annotations

import os
from typing import Optional


def has_provider() -> bool:
"""Return True if any supported LLM provider API key is configured."""
return bool(
os.environ.get("OPENROUTER_API_KEY")
or os.environ.get("ANTHROPIC_API_KEY")
or os.environ.get("OPENAI_API_KEY")
)


def detect_provider() -> Optional[str]:
"""Return the name of the first available provider, or None."""
if os.environ.get("OPENROUTER_API_KEY"):
return "openrouter"
if os.environ.get("ANTHROPIC_API_KEY"):
return "anthropic"
if os.environ.get("OPENAI_API_KEY"):
return "openai"
return None
86 changes: 86 additions & 0 deletions acp_adapter/entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""CLI entry point for the hermes-agent ACP adapter.

Loads environment variables from ``~/.hermes/.env``, configures logging
to write to stderr (so stdout is reserved for ACP JSON-RPC transport),
and starts the ACP agent server.

Usage::

python -m acp_adapter.entry
# or
hermes-agent --acp (once wired into the CLI)
"""

import asyncio
import logging
import os
import sys
from pathlib import Path


def _setup_logging() -> None:
"""Route all logging to stderr so stdout stays clean for ACP stdio."""
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(logging.INFO)

# Quiet down noisy libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)


def _load_env() -> None:
"""Load .env from HERMES_HOME (default ``~/.hermes``)."""
from dotenv import load_dotenv

hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
env_file = hermes_home / ".env"
if env_file.exists():
try:
load_dotenv(dotenv_path=env_file, encoding="utf-8")
except UnicodeDecodeError:
load_dotenv(dotenv_path=env_file, encoding="latin-1")
logging.getLogger(__name__).info("Loaded env from %s", env_file)
else:
logging.getLogger(__name__).info(
"No .env found at %s, using system env", env_file
)


def main() -> None:
"""Entry point: load env, configure logging, run the ACP agent."""
_setup_logging()
_load_env()

logger = logging.getLogger(__name__)
logger.info("Starting hermes-agent ACP adapter")

# Ensure the project root is on sys.path so ``from run_agent import AIAgent`` works
project_root = str(Path(__file__).resolve().parent.parent)
if project_root not in sys.path:
sys.path.insert(0, project_root)

import acp
from .server import HermesACPAgent

agent = HermesACPAgent()
try:
asyncio.run(acp.run_agent(agent))
except KeyboardInterrupt:
logger.info("Shutting down (KeyboardInterrupt)")
except Exception:
logger.exception("ACP agent crashed")
sys.exit(1)


if __name__ == "__main__":
main()
155 changes: 155 additions & 0 deletions acp_adapter/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Callback factories for bridging AIAgent events to ACP notifications.

Each factory returns a callable with the signature that AIAgent expects
for its callbacks. Internally, the callbacks push ACP session updates
to the client via ``conn.session_update()`` using
``asyncio.run_coroutine_threadsafe()`` (since AIAgent runs in a worker
thread while the event loop lives on the main thread).
"""

import asyncio
import logging
import uuid
from typing import Any, Callable, Dict, Optional

import acp

from .tools import (
build_tool_start_notification,
build_tool_complete_notification,
get_tool_kind,
build_tool_title,
make_tool_call_id,
)

logger = logging.getLogger(__name__)


def _send_update(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
update: Any,
) -> None:
"""Fire-and-forget an ACP session update from a worker thread.

Swallows exceptions so agent execution is never interrupted by a
notification failure.
"""
try:
future = asyncio.run_coroutine_threadsafe(
conn.session_update(session_id, update), loop
)
# Don't block indefinitely; 5 s is generous for a notification
future.result(timeout=5)
except Exception:
logger.debug("Failed to send ACP update", exc_info=True)


# ------------------------------------------------------------------
# Tool progress callback
# ------------------------------------------------------------------

def make_tool_progress_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
tool_call_ids: Dict[str, str],
) -> Callable:
"""Create a ``tool_progress_callback`` for AIAgent.

Signature expected by AIAgent::

tool_progress_callback(name: str, preview: str, args: dict)

Emits ``ToolCallStart`` on the first call for a tool invocation.
"""

def _tool_progress(name: str, preview: str, args: Any = None) -> None:
# Parse args if it's a string
if isinstance(args, str):
try:
import json
args = json.loads(args)
except (json.JSONDecodeError, TypeError):
args = {"raw": args}
if not isinstance(args, dict):
args = {}

tc_id = make_tool_call_id()
tool_call_ids[name] = tc_id

update = build_tool_start_notification(tc_id, name, args)
_send_update(conn, session_id, loop, update)

return _tool_progress


# ------------------------------------------------------------------
# Thinking callback
# ------------------------------------------------------------------

def make_thinking_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
) -> Callable:
"""Create a ``thinking_callback`` for AIAgent.

Signature expected by AIAgent::

thinking_callback(text: str)

Emits an ``AgentThoughtChunk`` via ``update_agent_thought_text()``.
"""

def _thinking(text: str) -> None:
if not text:
return
update = acp.update_agent_thought_text(text)
_send_update(conn, session_id, loop, update)

return _thinking


# ------------------------------------------------------------------
# Step callback
# ------------------------------------------------------------------

def make_step_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
tool_call_ids: Dict[str, str],
) -> Callable:
"""Create a ``step_callback`` for AIAgent.

Signature expected by AIAgent::

step_callback(api_call_count: int, prev_tools: list)

Marks previously-started tool calls as completed and can emit
intermediate agent messages.
"""

def _step(api_call_count: int, prev_tools: Any = None) -> None:
# Mark previously tracked tool calls as completed
if prev_tools and isinstance(prev_tools, list):
for tool_info in prev_tools:
tool_name = None
result = None

if isinstance(tool_info, dict):
tool_name = tool_info.get("name") or tool_info.get("function_name")
result = tool_info.get("result") or tool_info.get("output")
elif isinstance(tool_info, str):
tool_name = tool_info

if tool_name and tool_name in tool_call_ids:
tc_id = tool_call_ids.pop(tool_name)
update = build_tool_complete_notification(
tc_id, tool_name, result=str(result) if result else None
)
_send_update(conn, session_id, loop, update)

return _step
80 changes: 80 additions & 0 deletions acp_adapter/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks."""

from __future__ import annotations

import asyncio
import logging
from concurrent.futures import TimeoutError as FutureTimeout
from typing import Any, Callable, Optional

from acp.schema import (
AllowedOutcome,
DeniedOutcome,
PermissionOption,
RequestPermissionRequest,
SelectedPermissionOutcome,
)

logger = logging.getLogger(__name__)

# Maps ACP PermissionOptionKind -> hermes approval result strings
_KIND_TO_HERMES = {
"allow_once": "once",
"allow_always": "always",
"reject_once": "deny",
"reject_always": "deny",
}


def make_approval_callback(
request_permission_fn: Callable,
loop: asyncio.AbstractEventLoop,
session_id: str,
timeout: float = 60.0,
) -> Callable[[str, str], str]:
"""
Return a hermes-compatible ``approval_callback(command, description) -> str``
that bridges to the ACP client's ``request_permission`` call.

Args:
request_permission_fn: The ACP connection's ``request_permission`` coroutine.
loop: The event loop on which the ACP connection lives.
session_id: Current ACP session id.
timeout: Seconds to wait for a response before auto-denying.
"""

def _callback(command: str, description: str) -> str:
options = [
PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"),
PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"),
PermissionOption(option_id="deny", kind="reject_once", name="Deny"),
]
import acp as _acp

tool_call = _acp.start_tool_call("perm-check", command, kind="execute")

coro = request_permission_fn(
session_id=session_id,
tool_call=tool_call,
options=options,
)

try:
future = asyncio.run_coroutine_threadsafe(coro, loop)
response = future.result(timeout=timeout)
except (FutureTimeout, Exception) as exc:
logger.warning("Permission request timed out or failed: %s", exc)
return "deny"

outcome = response.outcome
if isinstance(outcome, AllowedOutcome):
option_id = outcome.option_id
# Look up the kind from our options list
for opt in options:
if opt.option_id == option_id:
return _KIND_TO_HERMES.get(opt.kind, "deny")
return "once" # fallback for unknown option_id
else:
return "deny"

return _callback
Loading
Loading