diff --git a/src/agent-sec-core/hermes-plugin/README.md b/src/agent-sec-core/hermes-plugin/README.md index 505d6cb4a..003c2a1d6 100644 --- a/src/agent-sec-core/hermes-plugin/README.md +++ b/src/agent-sec-core/hermes-plugin/README.md @@ -122,15 +122,21 @@ Hermes 支持的 hook 及其回调签名: ### Skill Ledger -推荐部署模式是 SkillFS + Skill Ledger daemon activation:SkillFS 捕获 skill 变更,daemon 刷新 `.skill-meta/activation.json`/xattr。Hermes `skill-ledger` capability 默认仍注册,默认 `policy = "ask"`:在 Hermes `skill_view` 读取技能前读取统一 exposure summary;Hermes 没有交互确认能力,因此 `ask` 会降级为用户可见 warning 并放行。 +当前 Hermes 场景暂不支持 Skill Ledger 安全检查,请自行关注 skill 安全性。Hermes +`skill-ledger` capability 只保留 fail-open 兼容行为:检测到不兼容的 Hermes skill root +时跳过检查、不调用 CLI、不阻断,并在 `policy = "ask"` / `policy = "warn"` 下提示 +`暂不支持Hermes场景,请自行关注skill安全性。` - `enabled = false`:完全不注册 Hermes hook。 -- `policy = "ask"`:默认策略;当 summary `message` 非空时缓存为本轮告警,并通过 +- `policy = "ask"`:默认策略;未触发暂不支持兜底时,当 summary `message` 非空会缓存为本轮告警,并通过 `transform_llm_output` 追加到最终回复开头,确保用户可见。 - `policy = "debug"`:静默兼容模式;summary `message` 非空、CLI 失败或 JSON 解析失败都 fail-open,只写 debug。 -- `policy = "warn"`:warning-only 兼容模式;summary `message` 非空时缓存为本轮告警,并通过 +- `policy = "warn"`:warning-only 兼容模式;未触发暂不支持兜底时,summary `message` 非空会缓存为本轮告警,并通过 `transform_llm_output` 追加到最终回复开头,确保用户可见。 - `policy = "block"`:summary `message` 非空时直接返回 Hermes block 结果。 +- 检测到暂不支持的 Hermes skill root 时,所有 `policy` 都 fail-open;`ask` / `warn` + 会显示 `暂不支持Hermes场景,请自行关注skill安全性。`,`debug` 只写日志,`block` + 不阻断。 - `latestStatus = "unmanaged"` 是 Skill Ledger 诊断状态,summary `message` 为 `null`,包括 `block` 在内的所有 policy 都静默放行。 - 未配置 `policy` 的旧配置仍兼容:`enable_block = true` 映射为 `block`,`enable_block = false` 映射为 `warn`。 - 当前版本仅覆盖 Hermes 默认本地技能目录 `~/.hermes/skills`,按 Hermes `skill_view` @@ -155,7 +161,8 @@ max_warnings_per_turn = 5 max_warning_contexts = 128 ``` -无 SkillFS 且希望用户可见提示或阻断时,将 `policy` 显式改为 `ask`、`warn` 或 `block`。 +Hermes 场景请不要依赖该 capability 作为 Skill Ledger 安全拦截;如需严格 Skill +安全检查,请在非 Hermes 场景或独立流程中完成。 ### observability diff --git a/src/agent-sec-core/hermes-plugin/src/capabilities/skill_ledger.py b/src/agent-sec-core/hermes-plugin/src/capabilities/skill_ledger.py index da9be07a9..79215af48 100644 --- a/src/agent-sec-core/hermes-plugin/src/capabilities/skill_ledger.py +++ b/src/agent-sec-core/hermes-plugin/src/capabilities/skill_ledger.py @@ -26,6 +26,21 @@ _SKIP_DIRS = frozenset({".git", ".github", ".hub", ".archive", ".skill-meta"}) _CONTEXT_KEY_FIELDS = ("session_id", "task_id", "run_id") _HERMES_SESSION_ENV = "HERMES_SESSION_ID" +_UNSUPPORTED_STATUS = "unsupported" +_UNSUPPORTED_HERMES_NOTICE = "暂不支持Hermes场景,请自行关注skill安全性。" +_SKILLFS_INPLACE_SENTINELS = ( + Path(".skillfs-inbox"), + Path("skill-discover") / _SKILL_MANIFEST, +) + + +class _UnsupportedHermesSkillRoot(Exception): + """Internal signal for unsupported Hermes skill root views.""" + + def __init__(self, root: Path, reason: str): + super().__init__(reason) + self.root = root + self.reason = reason @dataclass @@ -75,13 +90,29 @@ def _on_pre_tool_call(self, tool_name, args, **kwargs): self._diagnostic("[agent-sec-core] skill-ledger missing args, fail-open") return None - skill_dir = self._resolve_skill_dir(args) + root = self._resolved_skills_dir() + if root is not None: + unsupported_reason = self._unsupported_reason_for_root(root) + if unsupported_reason is not None: + self._handle_unsupported_hermes(kwargs, root, unsupported_reason) + return None + + try: + skill_dir = self._resolve_skill_dir(args, root=root) + except _UnsupportedHermesSkillRoot as exc: + self._handle_unsupported_hermes(kwargs, exc.root, exc.reason) + return None + if skill_dir is None: self._diagnostic( "[agent-sec-core] skill-ledger could not resolve skill_dir, fail-open" ) return None - skill_dir = skill_dir.resolve() + try: + skill_dir = skill_dir.resolve() + except (OSError, ValueError) as exc: + self._handle_unsupported_hermes(kwargs, skill_dir, str(exc)) + return None result = call_agent_sec_cli( ["skill-ledger", "show", str(skill_dir)], @@ -150,10 +181,23 @@ def _on_transform_llm_output( if not warnings: return None + unsupported_warnings = [ + warning for warning in warnings if warning.status == _UNSUPPORTED_STATUS + ] + warnings = [ + warning for warning in warnings if warning.status != _UNSUPPORTED_STATUS + ] + + if unsupported_warnings and not warnings: + return f"{_UNSUPPORTED_HERMES_NOTICE}\n\n{response_text}" + lines = [ "[agent-sec-core skill-ledger warning]", "The following Hermes skills did not pass Skill Ledger checks:", ] + if unsupported_warnings: + lines.insert(0, "") + lines.insert(0, _UNSUPPORTED_HERMES_NOTICE) for warning in warnings[: self._max_warnings_per_turn]: lines.append( f"- {warning.skill_name}: status={warning.status}; {warning.message}" @@ -166,14 +210,18 @@ def _on_transform_llm_output( lines.append(response_text) return "\n".join(lines) - def _resolve_skill_dir(self, args: dict[str, Any]) -> Path | None: + def _resolve_skill_dir( + self, args: dict[str, Any], *, root: Path | None = None + ) -> Path | None: """Resolve a Hermes skill_view call to a local skill directory.""" skill_name = self._extract_string(args, "name", "skill", "skill_name") if not skill_name: return None - return self._resolve_skill_dir_from_name(skill_name) + return self._resolve_skill_dir_from_name(skill_name, root=root) - def _resolve_skill_dir_from_name(self, skill_name: str) -> Path | None: + def _resolve_skill_dir_from_name( + self, skill_name: str, *, root: Path | None = None + ) -> Path | None: """Resolve by Hermes local directory name or category/name.""" wanted = skill_name.strip() if not wanted: @@ -185,9 +233,15 @@ def _resolve_skill_dir_from_name(self, skill_name: str) -> Path | None: ) return None - root = self._resolved_skills_dir() - if root is None or not root.is_dir(): + if root is None: + root = self._resolved_skills_dir() + if root is None: return None + try: + if not root.is_dir(): + return None + except OSError as exc: + raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc candidates: list[Path] = [] seen: set[Path] = set() @@ -196,7 +250,9 @@ def record(skill_dir: Path, skill_file: Path) -> None: try: resolved_file = skill_file.resolve() resolved_dir = skill_dir.resolve() - except (OSError, ValueError): + except OSError as exc: + raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc + except ValueError: return if not self._is_under_root(resolved_file, root): return @@ -209,7 +265,11 @@ def record(skill_dir: Path, skill_file: Path) -> None: if relative_name is not None: direct_path = root / relative_name direct_skill_file = direct_path / _SKILL_MANIFEST - if direct_path.is_dir() and direct_skill_file.is_file(): + try: + is_direct_skill = direct_path.is_dir() and direct_skill_file.is_file() + except OSError as exc: + raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc + if is_direct_skill: record(direct_path, direct_skill_file) if "/" not in wanted: @@ -238,15 +298,51 @@ def _resolved_skills_dir(self) -> Path | None: def _iter_skill_files(self, root: Path): """Yield SKILL.md files under the default Hermes local skills dir.""" - for skill_file in sorted(root.rglob(_SKILL_MANIFEST)): + try: + skill_files = sorted(root.rglob(_SKILL_MANIFEST)) + except OSError as exc: + raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc + + for skill_file in skill_files: try: resolved = skill_file.resolve() - except (OSError, ValueError): + except OSError as exc: + raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc + except ValueError: continue if self._is_ignored_path(resolved, root): continue yield resolved + def _unsupported_reason_for_root(self, root: Path) -> str | None: + for sentinel in _SKILLFS_INPLACE_SENTINELS: + sentinel_path = root / sentinel + try: + if sentinel_path.exists(): + return f"SkillFS in-place sentinel found: {sentinel}" + except OSError as exc: + return str(exc) + return None + + def _handle_unsupported_hermes( + self, kwargs: dict[str, Any], root: Path, reason: str + ) -> None: + log_message = "[agent-sec-core] skill-ledger %s root=%s reason=%s" + if self._policy == _POLICY_DEBUG: + logger.debug(log_message, _UNSUPPORTED_HERMES_NOTICE, root, reason) + return + + logger.warning(log_message, _UNSUPPORTED_HERMES_NOTICE, root, reason) + if self._policy not in {_POLICY_ASK, _POLICY_WARN}: + return + self._remember_warning( + kwargs, + "Hermes", + root, + _UNSUPPORTED_STATUS, + _UNSUPPORTED_HERMES_NOTICE, + ) + @staticmethod def _is_ignored_path(path: Path, root: Path) -> bool: try: diff --git a/src/agent-sec-core/tests/unit-test/hermes-plugin/test_skill_ledger.py b/src/agent-sec-core/tests/unit-test/hermes-plugin/test_skill_ledger.py index 5a3ebb6b0..fcf9e366b 100644 --- a/src/agent-sec-core/tests/unit-test/hermes-plugin/test_skill_ledger.py +++ b/src/agent-sec-core/tests/unit-test/hermes-plugin/test_skill_ledger.py @@ -131,6 +131,100 @@ def test_default_config_registers_skill_ledger_hooks(): class TestSkillLedgerHooks: """Behavior tests for pre_tool_call and transform_llm_output.""" + @pytest.mark.parametrize( + "sentinel", + [ + ".skillfs-inbox", + "skill-discover/SKILL.md", + ], + ) + @patch("src.capabilities.skill_ledger.call_agent_sec_cli") + def test_skillfs_inplace_root_fails_open_with_short_notice( + self, mock_cli, tmp_path, sentinel + ): + root = tmp_path / "skills" + _make_skill(root, "devops/risky") + sentinel_path = root / sentinel + if sentinel_path.name == "SKILL.md": + sentinel_path.parent.mkdir(parents=True) + sentinel_path.write_text( + "---\nname: skill-discover\n---\n", encoding="utf-8" + ) + else: + sentinel_path.mkdir(parents=True) + cap = _make_capability(root, policy="warn") + + result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1") + output = cap._on_transform_llm_output( + response_text="assistant response", session_id="s1" + ) + + assert result is None + mock_cli.assert_not_called() + assert ( + output + == "暂不支持Hermes场景,请自行关注skill安全性。\n\nassistant response" + ) + + @patch("src.capabilities.skill_ledger.call_agent_sec_cli") + def test_skillfs_inplace_root_debug_policy_only_logs( + self, mock_cli, tmp_path, caplog + ): + root = tmp_path / "skills" + _make_skill(root, "devops/risky") + (root / ".skillfs-inbox").mkdir() + cap = _make_capability(root, policy="debug") + caplog.set_level(logging.DEBUG, logger="agent-sec-core") + + result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1") + output = cap._on_transform_llm_output( + response_text="assistant response", session_id="s1" + ) + + assert result is None + mock_cli.assert_not_called() + assert output is None + assert not cap._warnings_by_context + assert any("暂不支持Hermes场景" in record.message for record in caplog.records) + + @patch("src.capabilities.skill_ledger.call_agent_sec_cli") + def test_skillfs_inplace_root_block_policy_does_not_block(self, mock_cli, tmp_path): + root = tmp_path / "skills" + _make_skill(root, "devops/risky") + (root / ".skillfs-inbox").mkdir() + cap = _make_capability(root, policy="block") + + result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1") + output = cap._on_transform_llm_output( + response_text="assistant response", session_id="s1" + ) + + assert result is None + mock_cli.assert_not_called() + assert output is None + + @patch("src.capabilities.skill_ledger.Path.rglob") + @patch("src.capabilities.skill_ledger.call_agent_sec_cli") + def test_skill_file_traversal_loop_fails_open_with_short_notice( + self, mock_cli, mock_rglob, tmp_path + ): + root = tmp_path / "skills" + root.mkdir() + cap = _make_capability(root, policy="warn") + mock_rglob.side_effect = OSError("File system loop detected") + + result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1") + output = cap._on_transform_llm_output( + response_text="assistant response", session_id="s1" + ) + + assert result is None + mock_cli.assert_not_called() + assert ( + output + == "暂不支持Hermes场景,请自行关注skill安全性。\n\nassistant response" + ) + @patch("src.capabilities.skill_ledger.call_agent_sec_cli") def test_pass_allows_without_warning(self, mock_cli, tmp_path): root = tmp_path / "skills"