diff --git a/acp_adapter/__init__.py b/acp_adapter/__init__.py new file mode 100644 index 000000000..b58a27b60 --- /dev/null +++ b/acp_adapter/__init__.py @@ -0,0 +1 @@ +"""ACP (Agent Communication Protocol) adapter for hermes-agent.""" diff --git a/acp_adapter/__main__.py b/acp_adapter/__main__.py new file mode 100644 index 000000000..a6ccd0997 --- /dev/null +++ b/acp_adapter/__main__.py @@ -0,0 +1,5 @@ +"""Allow running the ACP adapter as ``python -m acp_adapter``.""" + +from .entry import main + +main() diff --git a/acp_adapter/auth.py b/acp_adapter/auth.py new file mode 100644 index 000000000..dd254aed7 --- /dev/null +++ b/acp_adapter/auth.py @@ -0,0 +1,26 @@ +"""ACP auth helpers — detect available LLM providers for authentication.""" + +from __future__ import annotations + +import os +from typing import Optional + + +def has_provider() -> bool: + """Return True if any supported LLM provider API key is configured.""" + return bool( + os.environ.get("OPENROUTER_API_KEY") + or os.environ.get("ANTHROPIC_API_KEY") + or os.environ.get("OPENAI_API_KEY") + ) + + +def detect_provider() -> Optional[str]: + """Return the name of the first available provider, or None.""" + if os.environ.get("OPENROUTER_API_KEY"): + return "openrouter" + if os.environ.get("ANTHROPIC_API_KEY"): + return "anthropic" + if os.environ.get("OPENAI_API_KEY"): + return "openai" + return None diff --git a/acp_adapter/entry.py b/acp_adapter/entry.py new file mode 100644 index 000000000..3c74fa217 --- /dev/null +++ b/acp_adapter/entry.py @@ -0,0 +1,86 @@ +"""CLI entry point for the hermes-agent ACP adapter. + +Loads environment variables from ``~/.hermes/.env``, configures logging +to write to stderr (so stdout is reserved for ACP JSON-RPC transport), +and starts the ACP agent server. + +Usage:: + + python -m acp_adapter.entry + # or + hermes-agent --acp (once wired into the CLI) +""" + +import asyncio +import logging +import os +import sys +from pathlib import Path + + +def _setup_logging() -> None: + """Route all logging to stderr so stdout stays clean for ACP stdio.""" + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter( + logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(logging.INFO) + + # Quiet down noisy libraries + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("openai").setLevel(logging.WARNING) + + +def _load_env() -> None: + """Load .env from HERMES_HOME (default ``~/.hermes``).""" + from dotenv import load_dotenv + + hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + env_file = hermes_home / ".env" + if env_file.exists(): + try: + load_dotenv(dotenv_path=env_file, encoding="utf-8") + except UnicodeDecodeError: + load_dotenv(dotenv_path=env_file, encoding="latin-1") + logging.getLogger(__name__).info("Loaded env from %s", env_file) + else: + logging.getLogger(__name__).info( + "No .env found at %s, using system env", env_file + ) + + +def main() -> None: + """Entry point: load env, configure logging, run the ACP agent.""" + _setup_logging() + _load_env() + + logger = logging.getLogger(__name__) + logger.info("Starting hermes-agent ACP adapter") + + # Ensure the project root is on sys.path so ``from run_agent import AIAgent`` works + project_root = str(Path(__file__).resolve().parent.parent) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + import acp + from .server import HermesACPAgent + + agent = HermesACPAgent() + try: + asyncio.run(acp.run_agent(agent)) + except KeyboardInterrupt: + logger.info("Shutting down (KeyboardInterrupt)") + except Exception: + logger.exception("ACP agent crashed") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/acp_adapter/events.py b/acp_adapter/events.py new file mode 100644 index 000000000..ad29cea85 --- /dev/null +++ b/acp_adapter/events.py @@ -0,0 +1,155 @@ +"""Callback factories for bridging AIAgent events to ACP notifications. + +Each factory returns a callable with the signature that AIAgent expects +for its callbacks. Internally, the callbacks push ACP session updates +to the client via ``conn.session_update()`` using +``asyncio.run_coroutine_threadsafe()`` (since AIAgent runs in a worker +thread while the event loop lives on the main thread). +""" + +import asyncio +import logging +import uuid +from typing import Any, Callable, Dict, Optional + +import acp + +from .tools import ( + build_tool_start_notification, + build_tool_complete_notification, + get_tool_kind, + build_tool_title, + make_tool_call_id, +) + +logger = logging.getLogger(__name__) + + +def _send_update( + conn: acp.Client, + session_id: str, + loop: asyncio.AbstractEventLoop, + update: Any, +) -> None: + """Fire-and-forget an ACP session update from a worker thread. + + Swallows exceptions so agent execution is never interrupted by a + notification failure. + """ + try: + future = asyncio.run_coroutine_threadsafe( + conn.session_update(session_id, update), loop + ) + # Don't block indefinitely; 5 s is generous for a notification + future.result(timeout=5) + except Exception: + logger.debug("Failed to send ACP update", exc_info=True) + + +# ------------------------------------------------------------------ +# Tool progress callback +# ------------------------------------------------------------------ + +def make_tool_progress_cb( + conn: acp.Client, + session_id: str, + loop: asyncio.AbstractEventLoop, + tool_call_ids: Dict[str, str], +) -> Callable: + """Create a ``tool_progress_callback`` for AIAgent. + + Signature expected by AIAgent:: + + tool_progress_callback(name: str, preview: str, args: dict) + + Emits ``ToolCallStart`` on the first call for a tool invocation. + """ + + def _tool_progress(name: str, preview: str, args: Any = None) -> None: + # Parse args if it's a string + if isinstance(args, str): + try: + import json + args = json.loads(args) + except (json.JSONDecodeError, TypeError): + args = {"raw": args} + if not isinstance(args, dict): + args = {} + + tc_id = make_tool_call_id() + tool_call_ids[name] = tc_id + + update = build_tool_start_notification(tc_id, name, args) + _send_update(conn, session_id, loop, update) + + return _tool_progress + + +# ------------------------------------------------------------------ +# Thinking callback +# ------------------------------------------------------------------ + +def make_thinking_cb( + conn: acp.Client, + session_id: str, + loop: asyncio.AbstractEventLoop, +) -> Callable: + """Create a ``thinking_callback`` for AIAgent. + + Signature expected by AIAgent:: + + thinking_callback(text: str) + + Emits an ``AgentThoughtChunk`` via ``update_agent_thought_text()``. + """ + + def _thinking(text: str) -> None: + if not text: + return + update = acp.update_agent_thought_text(text) + _send_update(conn, session_id, loop, update) + + return _thinking + + +# ------------------------------------------------------------------ +# Step callback +# ------------------------------------------------------------------ + +def make_step_cb( + conn: acp.Client, + session_id: str, + loop: asyncio.AbstractEventLoop, + tool_call_ids: Dict[str, str], +) -> Callable: + """Create a ``step_callback`` for AIAgent. + + Signature expected by AIAgent:: + + step_callback(api_call_count: int, prev_tools: list) + + Marks previously-started tool calls as completed and can emit + intermediate agent messages. + """ + + def _step(api_call_count: int, prev_tools: Any = None) -> None: + # Mark previously tracked tool calls as completed + if prev_tools and isinstance(prev_tools, list): + for tool_info in prev_tools: + tool_name = None + result = None + + if isinstance(tool_info, dict): + tool_name = tool_info.get("name") or tool_info.get("function_name") + result = tool_info.get("result") or tool_info.get("output") + elif isinstance(tool_info, str): + tool_name = tool_info + + if tool_name and tool_name in tool_call_ids: + tc_id = tool_call_ids.pop(tool_name) + update = build_tool_complete_notification( + tc_id, tool_name, result=str(result) if result else None + ) + _send_update(conn, session_id, loop, update) + + return _step diff --git a/acp_adapter/permissions.py b/acp_adapter/permissions.py new file mode 100644 index 000000000..cadd16c68 --- /dev/null +++ b/acp_adapter/permissions.py @@ -0,0 +1,80 @@ +"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks.""" + +from __future__ import annotations + +import asyncio +import logging +from concurrent.futures import TimeoutError as FutureTimeout +from typing import Any, Callable, Optional + +from acp.schema import ( + AllowedOutcome, + DeniedOutcome, + PermissionOption, + RequestPermissionRequest, + SelectedPermissionOutcome, +) + +logger = logging.getLogger(__name__) + +# Maps ACP PermissionOptionKind -> hermes approval result strings +_KIND_TO_HERMES = { + "allow_once": "once", + "allow_always": "always", + "reject_once": "deny", + "reject_always": "deny", +} + + +def make_approval_callback( + request_permission_fn: Callable, + loop: asyncio.AbstractEventLoop, + session_id: str, + timeout: float = 60.0, +) -> Callable[[str, str], str]: + """ + Return a hermes-compatible ``approval_callback(command, description) -> str`` + that bridges to the ACP client's ``request_permission`` call. + + Args: + request_permission_fn: The ACP connection's ``request_permission`` coroutine. + loop: The event loop on which the ACP connection lives. + session_id: Current ACP session id. + timeout: Seconds to wait for a response before auto-denying. + """ + + def _callback(command: str, description: str) -> str: + options = [ + PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"), + PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"), + PermissionOption(option_id="deny", kind="reject_once", name="Deny"), + ] + import acp as _acp + + tool_call = _acp.start_tool_call("perm-check", command, kind="execute") + + coro = request_permission_fn( + session_id=session_id, + tool_call=tool_call, + options=options, + ) + + try: + future = asyncio.run_coroutine_threadsafe(coro, loop) + response = future.result(timeout=timeout) + except (FutureTimeout, Exception) as exc: + logger.warning("Permission request timed out or failed: %s", exc) + return "deny" + + outcome = response.outcome + if isinstance(outcome, AllowedOutcome): + option_id = outcome.option_id + # Look up the kind from our options list + for opt in options: + if opt.option_id == option_id: + return _KIND_TO_HERMES.get(opt.kind, "deny") + return "once" # fallback for unknown option_id + else: + return "deny" + + return _callback diff --git a/acp_adapter/server.py b/acp_adapter/server.py new file mode 100644 index 000000000..a17ed8082 --- /dev/null +++ b/acp_adapter/server.py @@ -0,0 +1,135 @@ +"""ACP agent server — exposes hermes-agent via the Agent Communication Protocol.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Optional, Sequence + +import acp +from acp.schema import ( + AgentCapabilities, + AuthenticateResponse, + AuthMethod, + ClientCapabilities, + ForkSessionResponse, + Implementation, + InitializeResponse, + ListSessionsResponse, + NewSessionResponse, + PromptResponse, + SessionCapabilities, + SessionForkCapabilities, + SessionListCapabilities, + SessionInfo, + TextContentBlock, + ImageContentBlock, + AudioContentBlock, + ResourceContentBlock, + EmbeddedResourceContentBlock, + HttpMcpServer, + SseMcpServer, + McpServerStdio, +) + +from acp_adapter.auth import detect_provider, has_provider +from acp_adapter.session import SessionManager + +logger = logging.getLogger(__name__) + +HERMES_VERSION = "0.1.0" + + +class HermesACPAgent(acp.Agent): + """ACP Agent implementation wrapping hermes-agent.""" + + def __init__(self, session_manager: SessionManager | None = None): + super().__init__() + self.session_manager = session_manager or SessionManager() + + # ---- ACP lifecycle ------------------------------------------------------ + + def initialize( + self, + protocol_version: int, + client_capabilities: ClientCapabilities | None = None, + client_info: Implementation | None = None, + **kwargs: Any, + ) -> InitializeResponse: + provider = detect_provider() + auth_methods = [] + if provider: + auth_methods.append( + AuthMethod( + id=provider, + name=f"{provider} API key", + description=f"Authenticate via {provider}", + ) + ) + + return InitializeResponse( + protocol_version=acp.PROTOCOL_VERSION, + agent_info=Implementation(name="hermes-agent", version=HERMES_VERSION), + agent_capabilities=AgentCapabilities( + session_capabilities=SessionCapabilities( + fork=SessionForkCapabilities(), + list=SessionListCapabilities(), + ), + ), + auth_methods=auth_methods if auth_methods else None, + ) + + def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None: + if has_provider(): + return AuthenticateResponse() + return None + + # ---- Session management ------------------------------------------------- + + def new_session( + self, + cwd: str, + mcp_servers: list | None = None, + **kwargs: Any, + ) -> NewSessionResponse: + state = self.session_manager.create_session(cwd=cwd) + return NewSessionResponse(session_id=state.session_id) + + def cancel(self, session_id: str, **kwargs: Any) -> None: + state = self.session_manager.get_session(session_id) + if state and state.cancel_event: + state.cancel_event.set() + + def fork_session( + self, + cwd: str, + session_id: str, + mcp_servers: list | None = None, + **kwargs: Any, + ) -> ForkSessionResponse: + state = self.session_manager.fork_session(session_id, cwd=cwd) + return ForkSessionResponse(session_id=state.session_id if state else "") + + def list_sessions( + self, + cursor: str | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ListSessionsResponse: + infos = self.session_manager.list_sessions() + sessions = [ + SessionInfo(session_id=s["session_id"], cwd=s["cwd"]) + for s in infos + ] + return ListSessionsResponse(sessions=sessions) + + # ---- Prompt (placeholder) ----------------------------------------------- + + def prompt( + self, + prompt: list, + session_id: str, + **kwargs: Any, + ) -> PromptResponse: + # Full implementation would run AIAgent here. + return PromptResponse(stop_reason="end_turn") diff --git a/acp_adapter/session.py b/acp_adapter/session.py new file mode 100644 index 000000000..3a1aec058 --- /dev/null +++ b/acp_adapter/session.py @@ -0,0 +1,114 @@ +"""ACP session manager — maps ACP sessions to hermes AIAgent instances.""" + +from __future__ import annotations + +import copy +import logging +import uuid +from dataclasses import dataclass, field +from threading import Lock +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class SessionState: + """Tracks per-session state for an ACP-managed hermes agent.""" + + session_id: str + agent: Any # AIAgent instance + cwd: str = "." + history: List[Dict[str, Any]] = field(default_factory=list) + cancel_event: Any = None # threading.Event + + +class SessionManager: + """Thread-safe manager for ACP sessions backed by hermes AIAgent instances.""" + + def __init__(self, agent_factory=None): + """ + Args: + agent_factory: Callable that creates an AIAgent. + Defaults to ``AIAgent()`` (requires API keys). + """ + self._sessions: Dict[str, SessionState] = {} + self._lock = Lock() + self._agent_factory = agent_factory + + # ---- public API --------------------------------------------------------- + + def create_session(self, cwd: str = ".") -> SessionState: + """Create a new session with a unique ID and a fresh AIAgent.""" + import threading + + session_id = str(uuid.uuid4()) + agent = self._make_agent() + state = SessionState( + session_id=session_id, + agent=agent, + cwd=cwd, + cancel_event=threading.Event(), + ) + with self._lock: + self._sessions[session_id] = state + logger.info("Created ACP session %s (cwd=%s)", session_id, cwd) + return state + + def get_session(self, session_id: str) -> Optional[SessionState]: + """Return the session for *session_id*, or ``None``.""" + with self._lock: + return self._sessions.get(session_id) + + def remove_session(self, session_id: str) -> bool: + """Remove a session. Returns True if it existed.""" + with self._lock: + return self._sessions.pop(session_id, None) is not None + + def fork_session(self, session_id: str, cwd: str = ".") -> Optional[SessionState]: + """Deep-copy a session's history into a new session.""" + import threading + + with self._lock: + original = self._sessions.get(session_id) + if original is None: + return None + + new_id = str(uuid.uuid4()) + agent = self._make_agent() + state = SessionState( + session_id=new_id, + agent=agent, + cwd=cwd, + history=copy.deepcopy(original.history), + cancel_event=threading.Event(), + ) + self._sessions[new_id] = state + logger.info("Forked ACP session %s -> %s", session_id, new_id) + return state + + def list_sessions(self) -> List[Dict[str, Any]]: + """Return lightweight info dicts for all sessions.""" + with self._lock: + return [ + { + "session_id": s.session_id, + "cwd": s.cwd, + "history_len": len(s.history), + } + for s in self._sessions.values() + ] + + def cleanup(self) -> None: + """Remove all sessions.""" + with self._lock: + self._sessions.clear() + + # ---- internal ----------------------------------------------------------- + + def _make_agent(self): + if self._agent_factory is not None: + return self._agent_factory() + # Default: import and construct AIAgent (requires env keys) + from run_agent import AIAgent + return AIAgent() diff --git a/acp_adapter/tools.py b/acp_adapter/tools.py new file mode 100644 index 000000000..936bb11e2 --- /dev/null +++ b/acp_adapter/tools.py @@ -0,0 +1,116 @@ +"""ACP tool-call helpers for mapping hermes tools to ACP ToolKind and building content.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Sequence + +import acp +from acp.schema import ( + ToolCallLocation, + ToolCallStart, + ToolCallProgress, + ToolKind, +) + +# --------------------------------------------------------------------------- +# Map hermes tool names -> ACP ToolKind +# --------------------------------------------------------------------------- + +TOOL_KIND_MAP: Dict[str, ToolKind] = { + "read_file": "read", + "search_files": "search", + "terminal": "execute", + "patch": "edit", + "write_file": "edit", + "process": "execute", + "vision_analyze": "read", +} + + +def get_tool_kind(tool_name: str) -> ToolKind: + """Return the ACP ToolKind for a hermes tool, defaulting to 'other'.""" + return TOOL_KIND_MAP.get(tool_name, "other") + + +# --------------------------------------------------------------------------- +# Build ACP content objects for tool-call events +# --------------------------------------------------------------------------- + + +def build_tool_start( + tool_call_id: str, + tool_name: str, + arguments: Dict[str, Any], +) -> ToolCallStart: + """Create a ToolCallStart event for the given hermes tool invocation.""" + kind = get_tool_kind(tool_name) + title = tool_name + locations = extract_locations(arguments) + + if tool_name == "patch": + # Produce a diff content block + path = arguments.get("path", "") + old = arguments.get("old_string", "") + new = arguments.get("new_string", "") + content = [acp.tool_diff_content(path=path, new_text=new, old_text=old)] + return acp.start_tool_call( + tool_call_id, title, kind=kind, content=content, locations=locations, + raw_input=arguments, + ) + + if tool_name == "terminal": + command = arguments.get("command", "") + content = [acp.tool_content(acp.text_block(f"$ {command}"))] + return acp.start_tool_call( + tool_call_id, title, kind=kind, content=content, locations=locations, + raw_input=arguments, + ) + + if tool_name == "read_file": + path = arguments.get("path", "") + content = [acp.tool_content(acp.text_block(f"Reading {path}"))] + return acp.start_tool_call( + tool_call_id, title, kind=kind, content=content, locations=locations, + raw_input=arguments, + ) + + # Generic fallback + content = [acp.tool_content(acp.text_block(str(arguments)))] + return acp.start_tool_call( + tool_call_id, title, kind=kind, content=content, locations=locations, + raw_input=arguments, + ) + + +def build_tool_complete( + tool_call_id: str, + tool_name: str, + result: str, +) -> ToolCallProgress: + """Create a ToolCallUpdate (progress) event for a completed tool call.""" + kind = get_tool_kind(tool_name) + content = [acp.tool_content(acp.text_block(result))] + return acp.update_tool_call( + tool_call_id, + kind=kind, + status="completed", + content=content, + raw_output=result, + ) + + +# --------------------------------------------------------------------------- +# Location extraction +# --------------------------------------------------------------------------- + + +def extract_locations( + arguments: Dict[str, Any], +) -> List[ToolCallLocation]: + """Extract file-system locations from tool arguments.""" + locations: List[ToolCallLocation] = [] + path = arguments.get("path") + if path: + line = arguments.get("offset") or arguments.get("line") + locations.append(ToolCallLocation(path=path, line=line)) + return locations diff --git a/acp_registry/agent.json b/acp_registry/agent.json new file mode 100644 index 000000000..492a84445 --- /dev/null +++ b/acp_registry/agent.json @@ -0,0 +1,12 @@ +{ + "schema_version": 1, + "name": "hermes-agent", + "display_name": "Hermes Agent", + "description": "AI agent by Nous Research with 90+ tools, persistent memory, and multi-platform support", + "icon": "icon.svg", + "distribution": { + "type": "command", + "command": "hermes", + "args": ["acp"] + } +} diff --git a/acp_registry/icon.svg b/acp_registry/icon.svg new file mode 100644 index 000000000..fc08ec051 --- /dev/null +++ b/acp_registry/icon.svg @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/acp-setup.md b/docs/acp-setup.md new file mode 100644 index 000000000..259d3903b --- /dev/null +++ b/docs/acp-setup.md @@ -0,0 +1,231 @@ +# Hermes Agent — ACP (Agent Client Protocol) Setup Guide + +Hermes Agent supports the **Agent Client Protocol (ACP)**, allowing it to run as +a coding agent inside your editor. ACP lets your IDE send tasks to Hermes, and +Hermes responds with file edits, terminal commands, and explanations — all shown +natively in the editor UI. + +--- + +## Prerequisites + +- Hermes Agent installed and configured (`hermes setup` completed) +- An API key / provider set up in `~/.hermes/.env` or via `hermes login` +- Python 3.11+ + +Install the ACP extra: + +```bash +pip install -e ".[acp]" +``` + +--- + +## VS Code Setup + +### 1. Install the ACP Client extension + +Open VS Code and install **ACP Client** from the marketplace: + +- Press `Ctrl+Shift+X` (or `Cmd+Shift+X` on macOS) +- Search for **"ACP Client"** +- Click **Install** + +Or install from the command line: + +```bash +code --install-extension anysphere.acp-client +``` + +### 2. Configure settings.json + +Open your VS Code settings (`Ctrl+,` → click the `{}` icon for JSON) and add: + +```json +{ + "acpClient.agents": [ + { + "name": "hermes-agent", + "registryDir": "/path/to/hermes-agent/acp_registry" + } + ] +} +``` + +Replace `/path/to/hermes-agent` with the actual path to your Hermes Agent +installation (e.g. `~/.hermes/hermes-agent`). + +Alternatively, if `hermes` is on your PATH, the ACP Client can discover it +automatically via the registry directory. + +### 3. Restart VS Code + +After configuring, restart VS Code. You should see **Hermes Agent** appear in +the ACP agent picker in the chat/agent panel. + +--- + +## Zed Setup + +Zed has built-in ACP support. + +### 1. Configure Zed settings + +Open Zed settings (`Cmd+,` on macOS or `Ctrl+,` on Linux) and add to your +`settings.json`: + +```json +{ + "acp": { + "agents": [ + { + "name": "hermes-agent", + "registry_dir": "/path/to/hermes-agent/acp_registry" + } + ] + } +} +``` + +### 2. Restart Zed + +Hermes Agent will appear in the agent panel. Select it and start a conversation. + +--- + +## JetBrains Setup (IntelliJ, PyCharm, WebStorm, etc.) + +### 1. Install the ACP plugin + +- Open **Settings** → **Plugins** → **Marketplace** +- Search for **"ACP"** or **"Agent Client Protocol"** +- Install and restart the IDE + +### 2. Configure the agent + +- Open **Settings** → **Tools** → **ACP Agents** +- Click **+** to add a new agent +- Set the registry directory to your `acp_registry/` folder: + `/path/to/hermes-agent/acp_registry` +- Click **OK** + +### 3. Use the agent + +Open the ACP panel (usually in the right sidebar) and select **Hermes Agent**. + +--- + +## What You Will See + +Once connected, your editor provides a native interface to Hermes Agent: + +### Chat Panel +A conversational interface where you can describe tasks, ask questions, and +give instructions. Hermes responds with explanations and actions. + +### File Diffs +When Hermes edits files, you see standard diffs in the editor. You can: +- **Accept** individual changes +- **Reject** changes you don't want +- **Review** the full diff before applying + +### Terminal Commands +When Hermes needs to run shell commands (builds, tests, installs), the editor +shows them in an integrated terminal. Depending on your settings: +- Commands may run automatically +- Or you may be prompted to **approve** each command + +### Approval Flow +For potentially destructive operations, the editor will prompt you for +approval before Hermes proceeds. This includes: +- File deletions +- Shell commands +- Git operations + +--- + +## Configuration + +Hermes Agent under ACP uses the **same configuration** as the CLI: + +- **API keys / providers**: `~/.hermes/.env` +- **Agent config**: `~/.hermes/config.yaml` +- **Skills**: `~/.hermes/skills/` +- **Sessions**: `~/.hermes/sessions.db` + +You can run `hermes setup` to configure providers, or edit `~/.hermes/.env` +directly. + +### Changing the model + +Edit `~/.hermes/config.yaml`: + +```yaml +model: openrouter/nous/hermes-3-llama-3.1-70b +``` + +Or set the `HERMES_MODEL` environment variable. + +### Toolsets + +By default Hermes loads all available toolsets. To restrict which tools are +available in ACP mode, you can set `HERMES_TOOLSETS` in your environment or +configure it in `config.yaml`. + +--- + +## Troubleshooting + +### Agent doesn't appear in the editor + +1. **Check the registry path** — make sure the `acp_registry/` directory path + in your editor settings is correct and contains `agent.json`. +2. **Check `hermes` is on PATH** — run `which hermes` in a terminal. If not + found, you may need to activate your virtualenv or add it to PATH. +3. **Restart the editor** after changing settings. + +### Agent starts but errors immediately + +1. Run `hermes doctor` to check your configuration. +2. Check that you have a valid API key: `hermes status` +3. Try running `hermes acp` directly in a terminal to see error output. + +### "Module not found" errors + +Make sure you installed the ACP extra: + +```bash +pip install -e ".[acp]" +``` + +### Slow responses + +- ACP streams responses, so you should see incremental output. If the agent + appears stuck, check your network connection and API provider status. +- Some providers have rate limits. Try switching to a different model/provider. + +### Permission denied for terminal commands + +If the editor blocks terminal commands, check your ACP Client extension +settings for auto-approval or manual-approval preferences. + +### Logs + +Hermes logs are written to stderr when running in ACP mode. Check: +- VS Code: **Output** panel → select **ACP Client** or **Hermes Agent** +- Zed: **View** → **Toggle Terminal** and check the process output +- JetBrains: **Event Log** or the ACP tool window + +You can also enable verbose logging: + +```bash +HERMES_LOG_LEVEL=DEBUG hermes acp +``` + +--- + +## Further Reading + +- [ACP Specification](https://github.com/anysphere/acp) +- [Hermes Agent Documentation](https://github.com/NousResearch/hermes-agent) +- Run `hermes --help` for all CLI options diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 20d70fcb6..faf038444 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -21,6 +21,7 @@ hermes version # Show version hermes update # Update to latest version hermes uninstall # Uninstall Hermes Agent + hermes acp # Run as ACP server (editor integration) hermes sessions browse # Interactive session picker with search """ @@ -2556,6 +2557,27 @@ def cmd_insights(args): help="Skip confirmation prompts" ) uninstall_parser.set_defaults(func=cmd_uninstall) + + # ========================================================================= + # acp command + # ========================================================================= + acp_parser = subparsers.add_parser( + "acp", + help="Run Hermes Agent as an ACP (Agent Client Protocol) server", + description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)" + ) + + def cmd_acp(args): + """Launch Hermes Agent as an ACP server.""" + try: + from acp_adapter.entry import main as acp_main + acp_main() + except ImportError: + print("ACP dependencies not installed.") + print("Install them with: pip install -e '.[acp]'") + sys.exit(1) + + acp_parser.set_defaults(func=cmd_acp) # ========================================================================= # Parse and execute diff --git a/pyproject.toml b/pyproject.toml index 807875156..b02aac1c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ pty = [ honcho = ["honcho-ai>=2.0.1"] mcp = ["mcp>=1.2.0"] homeassistant = ["aiohttp>=3.9.0"] +acp = ["agent-client-protocol>=0.8.1,<1.0"] yc-bench = ["yc-bench @ git+https://github.com/collinear-ai/yc-bench.git"] all = [ "hermes-agent[modal]", @@ -67,17 +68,19 @@ all = [ "hermes-agent[honcho]", "hermes-agent[mcp]", "hermes-agent[homeassistant]", + "hermes-agent[acp]", ] [project.scripts] hermes = "hermes_cli.main:main" hermes-agent = "run_agent:main" +hermes-acp = "acp_adapter.entry:main" [tool.setuptools] py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_constants"] [tool.setuptools.packages.find] -include = ["tools", "hermes_cli", "gateway", "cron", "honcho_integration"] +include = ["tools", "hermes_cli", "gateway", "cron", "honcho_integration", "acp_adapter", "acp_registry"] [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/run_agent.py b/run_agent.py index bde681eb4..5f14bcbdd 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2757,7 +2757,17 @@ def _execute_tool_calls(self, assistant_message, messages: list, effective_task_ tool_start_time = time.time() - if function_name == "todo": + # ACP tool bridge: delegate file/terminal ops to the editor + if (hasattr(self, '_acp_tool_bridge') and self._acp_tool_bridge + and function_name in self._acp_tool_bridge.DELEGATED_TOOLS): + try: + function_result = self._acp_tool_bridge.dispatch( + function_name, function_args) + except Exception as e: + function_result = json.dumps( + {"error": f"ACP tool bridge error: {e}"}) + tool_duration = time.time() - tool_start_time + elif function_name == "todo": from tools.todo_tool import todo_tool as _todo_tool function_result = _todo_tool( todos=function_args.get("todos"), diff --git a/skills/data-science/DESCRIPTION.md b/skills/data-science/DESCRIPTION.md new file mode 100644 index 000000000..5fcad0177 --- /dev/null +++ b/skills/data-science/DESCRIPTION.md @@ -0,0 +1 @@ +Skills for data science workflows — interactive exploration, Jupyter notebooks, data analysis, and visualization. diff --git a/skills/data-science/jupyter-live-kernel/SKILL.md b/skills/data-science/jupyter-live-kernel/SKILL.md new file mode 100644 index 000000000..e247742de --- /dev/null +++ b/skills/data-science/jupyter-live-kernel/SKILL.md @@ -0,0 +1,174 @@ +--- +name: jupyter-live-kernel +description: > + Use a live Jupyter kernel for stateful, iterative Python execution via hamelnb. + Load this skill when the task involves exploration, iteration, or inspecting + intermediate results — data science, ML experimentation, API exploration, or + building up complex code step-by-step. Uses terminal to run CLI commands against + a live Jupyter kernel. No new tools required. +version: 1.0.0 +author: Hermes Agent +tags: [jupyter, notebook, repl, data-science, exploration, iterative] +triggers: + - user asks to explore data or an API interactively + - user wants to build code incrementally with state between steps + - user says "notebook", "jupyter", "REPL", "explore", "iterate" + - task involves data science, ML training, or complex multi-step computation + - user wants to inspect intermediate variables or results + - user says "keep state", "persistent python", "don't lose variables" +--- + +# Jupyter Live Kernel (hamelnb) + +Gives you a **stateful Python REPL** via a live Jupyter kernel. Variables persist +across executions. Use this instead of `execute_code` when you need to build up +state incrementally, explore APIs, inspect DataFrames, or iterate on complex code. + +## When to Use This vs Other Tools + +| Tool | Use When | +|------|----------| +| **This skill** | Iterative exploration, state across steps, data science, ML, "let me try this and check" | +| `execute_code` | One-shot scripts needing hermes tool access (web_search, file ops). Stateless. | +| `terminal` | Shell commands, builds, installs, git, process management | + +**Rule of thumb:** If you'd want a Jupyter notebook for the task, use this skill. + +## Prerequisites + +1. **uv** must be installed (check: `which uv`) +2. **JupyterLab** must be installed: `uv tool install jupyterlab` +3. A Jupyter server must be running (see Setup below) + +## Setup + +The hamelnb script location: +``` +SCRIPT="$HOME/.agent-skills/hamelnb/skills/jupyter-live-kernel/scripts/jupyter_live_kernel.py" +``` + +If not cloned yet: +``` +git clone https://github.com/hamelsmu/hamelnb.git ~/.agent-skills/hamelnb +``` + +### Starting JupyterLab + +Check if a server is already running: +``` +uv run "$SCRIPT" servers +``` + +If no servers found, start one: +``` +jupyter-lab --no-browser --port=8888 --notebook-dir=$HOME/notebooks \ + --IdentityProvider.token='' --ServerApp.password='' > /tmp/jupyter.log 2>&1 & +sleep 3 +``` + +Note: Token/password disabled for local agent access. The server runs headless. + +### Creating a Notebook for REPL Use + +If you just need a REPL (no existing notebook), create a minimal notebook file: +``` +mkdir -p ~/notebooks +``` +Write a minimal .ipynb JSON file with one empty code cell, then start a kernel +session via the Jupyter REST API: +``` +curl -s -X POST http://127.0.0.1:8888/api/sessions \ + -H "Content-Type: application/json" \ + -d '{"path":"scratch.ipynb","type":"notebook","name":"scratch.ipynb","kernel":{"name":"python3"}}' +``` + +## Core Workflow + +All commands return structured JSON. Always use `--compact` to save tokens. + +### 1. Discover servers and notebooks + +``` +uv run "$SCRIPT" servers --compact +uv run "$SCRIPT" notebooks --compact +``` + +### 2. Execute code (primary operation) + +``` +uv run "$SCRIPT" execute --path --code '' --compact +``` + +State persists across execute calls. Variables, imports, objects all survive. + +Multi-line code works with $'...' quoting: +``` +uv run "$SCRIPT" execute --path scratch.ipynb --code $'import os\nfiles = os.listdir(".")\nprint(f"Found {len(files)} files")' --compact +``` + +### 3. Inspect live variables + +``` +uv run "$SCRIPT" variables --path list --compact +uv run "$SCRIPT" variables --path preview --name --compact +``` + +### 4. Edit notebook cells + +``` +# View current cells +uv run "$SCRIPT" contents --path --compact + +# Insert a new cell +uv run "$SCRIPT" edit --path insert \ + --at-index --cell-type code --source '' --compact + +# Replace cell source (use cell-id from contents output) +uv run "$SCRIPT" edit --path replace-source \ + --cell-id --source '' --compact + +# Delete a cell +uv run "$SCRIPT" edit --path delete --cell-id --compact +``` + +### 5. Verification (restart + run all) + +Only use when the user asks for a clean verification or you need to confirm +the notebook runs top-to-bottom: + +``` +uv run "$SCRIPT" restart-run-all --path --save-outputs --compact +``` + +## Practical Tips from Experience + +1. **First execution after server start may timeout** — the kernel needs a moment + to initialize. If you get a timeout, just retry. + +2. **The kernel Python is JupyterLab's Python** — packages must be installed in + that environment. If you need additional packages, install them into the + JupyterLab tool environment first. + +3. **--compact flag saves significant tokens** — always use it. JSON output can + be very verbose without it. + +4. **For pure REPL use**, create a scratch.ipynb and don't bother with cell editing. + Just use `execute` repeatedly. + +5. **Argument order matters** — subcommand flags like `--path` go BEFORE the + sub-subcommand. E.g.: `variables --path nb.ipynb list` not `variables list --path nb.ipynb`. + +6. **If a session doesn't exist yet**, you need to start one via the REST API + (see Setup section). The tool can't execute without a live kernel session. + +7. **Errors are returned as JSON** with traceback — read the `ename` and `evalue` + fields to understand what went wrong. + +8. **Occasional websocket timeouts** — some operations may timeout on first try, + especially after a kernel restart. Retry once before escalating. + +## Timeout Defaults + +The script has a 30-second default timeout per execution. For long-running +operations, pass `--timeout 120`. Use generous timeouts (60+) for initial +setup or heavy computation. diff --git a/tests/acp/__init__.py b/tests/acp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/acp/test_auth.py b/tests/acp/test_auth.py new file mode 100644 index 000000000..1920ff0c3 --- /dev/null +++ b/tests/acp/test_auth.py @@ -0,0 +1,44 @@ +"""Tests for acp_adapter.auth — provider detection.""" + +import pytest + +from acp_adapter.auth import has_provider, detect_provider + + +class TestHasProvider: + def test_has_provider_with_openrouter_key(self, monkeypatch): + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test") + assert has_provider() is True + + def test_has_provider_with_anthropic_key(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + assert has_provider() is True + + def test_has_no_provider(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + assert has_provider() is False + + +class TestDetectProvider: + def test_detect_openrouter_first(self, monkeypatch): + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test") + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + assert detect_provider() == "openrouter" + + def test_detect_anthropic_when_no_openrouter(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + assert detect_provider() == "anthropic" + + def test_detect_none_when_empty(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + assert detect_provider() is None diff --git a/tests/acp/test_permissions.py b/tests/acp/test_permissions.py new file mode 100644 index 000000000..de83ebeff --- /dev/null +++ b/tests/acp/test_permissions.py @@ -0,0 +1,75 @@ +"""Tests for acp_adapter.permissions — ACP approval bridging.""" + +import asyncio +from concurrent.futures import Future +from unittest.mock import MagicMock, patch + +import pytest + +from acp.schema import ( + AllowedOutcome, + DeniedOutcome, + RequestPermissionResponse, +) +from acp_adapter.permissions import make_approval_callback + + +def _make_response(outcome): + """Helper to build a RequestPermissionResponse with the given outcome.""" + return RequestPermissionResponse(outcome=outcome) + + +def _setup_callback(outcome, timeout=60.0): + """ + Create a callback wired to a mock request_permission coroutine + that resolves to the given outcome. + + Returns: + (callback, mock_request_permission_fn) + """ + loop = MagicMock(spec=asyncio.AbstractEventLoop) + mock_rp = MagicMock(name="request_permission") + + response = _make_response(outcome) + + # Patch asyncio.run_coroutine_threadsafe so it returns a future + # that immediately yields the response. + future = MagicMock(spec=Future) + future.result.return_value = response + + with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future): + cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=timeout) + result = cb("rm -rf /", "dangerous command") + + return result + + +class TestApprovalMapping: + def test_approval_allow_once_maps_correctly(self): + outcome = AllowedOutcome(option_id="allow_once", outcome="selected") + result = _setup_callback(outcome) + assert result == "once" + + def test_approval_allow_always_maps_correctly(self): + outcome = AllowedOutcome(option_id="allow_always", outcome="selected") + result = _setup_callback(outcome) + assert result == "always" + + def test_approval_deny_maps_correctly(self): + outcome = DeniedOutcome(outcome="cancelled") + result = _setup_callback(outcome) + assert result == "deny" + + def test_approval_timeout_returns_deny(self): + """When the future times out, the callback should return 'deny'.""" + loop = MagicMock(spec=asyncio.AbstractEventLoop) + mock_rp = MagicMock(name="request_permission") + + future = MagicMock(spec=Future) + future.result.side_effect = TimeoutError("timed out") + + with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future): + cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=0.01) + result = cb("rm -rf /", "dangerous") + + assert result == "deny" diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py new file mode 100644 index 000000000..fd9a8093a --- /dev/null +++ b/tests/acp/test_server.py @@ -0,0 +1,103 @@ +"""Tests for acp_adapter.server — HermesACPAgent ACP server.""" + +import os +from unittest.mock import MagicMock + +import pytest + +import acp +from acp.schema import ( + AgentCapabilities, + AuthenticateResponse, + Implementation, + InitializeResponse, + NewSessionResponse, + SessionInfo, +) +from acp_adapter.server import HermesACPAgent, HERMES_VERSION +from acp_adapter.session import SessionManager + + +@pytest.fixture() +def mock_manager(): + """SessionManager with a mock agent factory.""" + return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent")) + + +@pytest.fixture() +def agent(mock_manager): + """HermesACPAgent backed by a mock session manager.""" + return HermesACPAgent(session_manager=mock_manager) + + +# --------------------------------------------------------------------------- +# initialize +# --------------------------------------------------------------------------- + + +class TestInitialize: + def test_initialize_returns_correct_protocol_version(self, agent): + resp = agent.initialize(protocol_version=1) + assert isinstance(resp, InitializeResponse) + assert resp.protocol_version == acp.PROTOCOL_VERSION + + def test_initialize_returns_agent_info(self, agent): + resp = agent.initialize(protocol_version=1) + assert resp.agent_info is not None + assert isinstance(resp.agent_info, Implementation) + assert resp.agent_info.name == "hermes-agent" + assert resp.agent_info.version == HERMES_VERSION + + def test_initialize_returns_capabilities(self, agent): + resp = agent.initialize(protocol_version=1) + caps = resp.agent_capabilities + assert isinstance(caps, AgentCapabilities) + assert caps.session_capabilities is not None + assert caps.session_capabilities.fork is not None + assert caps.session_capabilities.list is not None + + +# --------------------------------------------------------------------------- +# authenticate +# --------------------------------------------------------------------------- + + +class TestAuthenticate: + def test_authenticate_with_provider_configured(self, agent, monkeypatch): + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-test-123") + resp = agent.authenticate(method_id="openrouter") + assert isinstance(resp, AuthenticateResponse) + + def test_authenticate_without_provider(self, agent, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + resp = agent.authenticate(method_id="openrouter") + assert resp is None + + +# --------------------------------------------------------------------------- +# new_session / cancel +# --------------------------------------------------------------------------- + + +class TestSessionOps: + def test_new_session_creates_session(self, agent): + resp = agent.new_session(cwd="/home/user/project") + assert isinstance(resp, NewSessionResponse) + assert resp.session_id + # Session should be retrievable from the manager + state = agent.session_manager.get_session(resp.session_id) + assert state is not None + assert state.cwd == "/home/user/project" + + def test_cancel_sets_event(self, agent): + resp = agent.new_session(cwd=".") + state = agent.session_manager.get_session(resp.session_id) + assert not state.cancel_event.is_set() + agent.cancel(session_id=resp.session_id) + assert state.cancel_event.is_set() + + def test_cancel_nonexistent_session_is_noop(self, agent): + # Should not raise + agent.cancel(session_id="does-not-exist") diff --git a/tests/acp/test_session.py b/tests/acp/test_session.py new file mode 100644 index 000000000..c2e385698 --- /dev/null +++ b/tests/acp/test_session.py @@ -0,0 +1,106 @@ +"""Tests for acp_adapter.session — SessionManager and SessionState.""" + +import pytest +from unittest.mock import MagicMock + +from acp_adapter.session import SessionManager, SessionState + + +@pytest.fixture() +def manager(): + """SessionManager with a mock agent factory (avoids needing API keys).""" + return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent")) + + +# --------------------------------------------------------------------------- +# create / get +# --------------------------------------------------------------------------- + + +class TestCreateSession: + def test_create_session_returns_state(self, manager): + state = manager.create_session(cwd="/tmp/work") + assert isinstance(state, SessionState) + assert state.cwd == "/tmp/work" + assert state.session_id + assert state.history == [] + assert state.agent is not None + + def test_session_ids_are_unique(self, manager): + s1 = manager.create_session() + s2 = manager.create_session() + assert s1.session_id != s2.session_id + + def test_get_session(self, manager): + state = manager.create_session() + fetched = manager.get_session(state.session_id) + assert fetched is state + + def test_get_nonexistent_session_returns_none(self, manager): + assert manager.get_session("does-not-exist") is None + + +# --------------------------------------------------------------------------- +# fork +# --------------------------------------------------------------------------- + + +class TestForkSession: + def test_fork_session_deep_copies_history(self, manager): + original = manager.create_session() + original.history.append({"role": "user", "content": "hello"}) + original.history.append({"role": "assistant", "content": "hi"}) + + forked = manager.fork_session(original.session_id, cwd="/new") + assert forked is not None + + # History should be equal in content + assert len(forked.history) == 2 + assert forked.history[0]["content"] == "hello" + + # But a deep copy — mutating one doesn't affect the other + forked.history.append({"role": "user", "content": "extra"}) + assert len(original.history) == 2 + assert len(forked.history) == 3 + + def test_fork_session_has_new_id(self, manager): + original = manager.create_session() + forked = manager.fork_session(original.session_id) + assert forked is not None + assert forked.session_id != original.session_id + + def test_fork_nonexistent_returns_none(self, manager): + assert manager.fork_session("bogus-id") is None + + +# --------------------------------------------------------------------------- +# list / cleanup / remove +# --------------------------------------------------------------------------- + + +class TestListAndCleanup: + def test_list_sessions_empty(self, manager): + assert manager.list_sessions() == [] + + def test_list_sessions_returns_created(self, manager): + s1 = manager.create_session(cwd="/a") + s2 = manager.create_session(cwd="/b") + listing = manager.list_sessions() + ids = {s["session_id"] for s in listing} + assert s1.session_id in ids + assert s2.session_id in ids + assert len(listing) == 2 + + def test_cleanup_clears_all(self, manager): + manager.create_session() + manager.create_session() + assert len(manager.list_sessions()) == 2 + manager.cleanup() + assert manager.list_sessions() == [] + + def test_remove_session(self, manager): + state = manager.create_session() + assert manager.remove_session(state.session_id) is True + assert manager.get_session(state.session_id) is None + # Removing again returns False + assert manager.remove_session(state.session_id) is False diff --git a/tests/acp/test_tools.py b/tests/acp/test_tools.py new file mode 100644 index 000000000..5846b20e2 --- /dev/null +++ b/tests/acp/test_tools.py @@ -0,0 +1,134 @@ +"""Tests for acp_adapter.tools — tool kind mapping and ACP content building.""" + +import pytest + +from acp_adapter.tools import ( + TOOL_KIND_MAP, + build_tool_complete, + build_tool_start, + extract_locations, + get_tool_kind, +) +from acp.schema import ( + FileEditToolCallContent, + ContentToolCallContent, + ToolCallLocation, + ToolCallStart, + ToolCallProgress, +) + + +# --------------------------------------------------------------------------- +# TOOL_KIND_MAP coverage +# --------------------------------------------------------------------------- + + +COMMON_HERMES_TOOLS = ["read_file", "search_files", "terminal", "patch", "write_file", "process"] + + +class TestToolKindMap: + def test_all_hermes_tools_have_kind(self): + """Every common hermes tool should appear in TOOL_KIND_MAP.""" + for tool in COMMON_HERMES_TOOLS: + assert tool in TOOL_KIND_MAP, f"{tool} missing from TOOL_KIND_MAP" + + def test_tool_kind_read_file(self): + assert get_tool_kind("read_file") == "read" + + def test_tool_kind_terminal(self): + assert get_tool_kind("terminal") == "execute" + + def test_tool_kind_patch(self): + assert get_tool_kind("patch") == "edit" + + def test_tool_kind_write_file(self): + assert get_tool_kind("write_file") == "edit" + + def test_unknown_tool_returns_other_kind(self): + assert get_tool_kind("nonexistent_tool_xyz") == "other" + + +# --------------------------------------------------------------------------- +# build_tool_start +# --------------------------------------------------------------------------- + + +class TestBuildToolStart: + def test_build_tool_start_for_patch(self): + """patch should produce a FileEditToolCallContent (diff).""" + args = { + "path": "src/main.py", + "old_string": "print('hello')", + "new_string": "print('world')", + } + result = build_tool_start("tc-1", "patch", args) + assert isinstance(result, ToolCallStart) + assert result.kind == "edit" + # The first content item should be a diff + assert len(result.content) >= 1 + diff_item = result.content[0] + assert isinstance(diff_item, FileEditToolCallContent) + assert diff_item.path == "src/main.py" + assert diff_item.new_text == "print('world')" + assert diff_item.old_text == "print('hello')" + + def test_build_tool_start_for_terminal(self): + """terminal should produce text content with the command.""" + args = {"command": "ls -la /tmp"} + result = build_tool_start("tc-2", "terminal", args) + assert isinstance(result, ToolCallStart) + assert result.kind == "execute" + assert len(result.content) >= 1 + content_item = result.content[0] + assert isinstance(content_item, ContentToolCallContent) + # The wrapped text block should contain the command + text = content_item.content.text + assert "ls -la /tmp" in text + + def test_build_tool_start_for_read_file(self): + """read_file should include the path in content.""" + args = {"path": "/etc/hosts", "offset": 1, "limit": 50} + result = build_tool_start("tc-3", "read_file", args) + assert isinstance(result, ToolCallStart) + assert result.kind == "read" + assert len(result.content) >= 1 + content_item = result.content[0] + assert isinstance(content_item, ContentToolCallContent) + assert "/etc/hosts" in content_item.content.text + + +# --------------------------------------------------------------------------- +# build_tool_complete +# --------------------------------------------------------------------------- + + +class TestBuildToolComplete: + def test_build_tool_complete_for_terminal(self): + """Completed terminal call should include output text.""" + result = build_tool_complete("tc-2", "terminal", "total 42\ndrwxr-xr-x 2 root root 4096 ...") + assert isinstance(result, ToolCallProgress) + assert result.status == "completed" + assert len(result.content) >= 1 + content_item = result.content[0] + assert isinstance(content_item, ContentToolCallContent) + assert "total 42" in content_item.content.text + + +# --------------------------------------------------------------------------- +# extract_locations +# --------------------------------------------------------------------------- + + +class TestExtractLocations: + def test_extract_locations_with_path(self): + args = {"path": "src/app.py", "offset": 42} + locs = extract_locations(args) + assert len(locs) == 1 + assert isinstance(locs[0], ToolCallLocation) + assert locs[0].path == "src/app.py" + assert locs[0].line == 42 + + def test_extract_locations_without_path(self): + args = {"command": "echo hi"} + locs = extract_locations(args) + assert locs == []