Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/agent-sec-core/hermes-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,21 @@ Hermes 支持的 hook 及其回调签名:

### Skill Ledger

推荐部署模式是 SkillFS + Skill Ledger daemon activation:SkillFS 捕获 skill 变更,daemon 刷新 `.skill-meta/activation.json`/xattr。Hermes `skill-ledger` capability 默认仍注册,默认 `policy = "ask"`:在 Hermes `skill_view` 读取技能前读取统一 exposure summary;Hermes 没有交互确认能力,因此 `ask` 会降级为用户可见 warning 并放行。
当前 Hermes 场景暂不支持 Skill Ledger 安全检查,请自行关注 skill 安全性。Hermes
`skill-ledger` capability 只保留 fail-open 兼容行为:检测到不兼容的 Hermes skill root
时跳过检查、不调用 CLI、不阻断,并在 `policy = "ask"` / `policy = "warn"` 下提示
`暂不支持Hermes场景,请自行关注skill安全性。`

- `enabled = false`:完全不注册 Hermes hook。
- `policy = "ask"`:默认策略;当 summary `message` 非空时缓存为本轮告警,并通过
- `policy = "ask"`:默认策略;未触发暂不支持兜底时,当 summary `message` 非空会缓存为本轮告警,并通过
`transform_llm_output` 追加到最终回复开头,确保用户可见。
- `policy = "debug"`:静默兼容模式;summary `message` 非空、CLI 失败或 JSON 解析失败都 fail-open,只写 debug。
- `policy = "warn"`:warning-only 兼容模式;summary `message` 非空时缓存为本轮告警,并通过
- `policy = "warn"`:warning-only 兼容模式;未触发暂不支持兜底时,summary `message` 非空会缓存为本轮告警,并通过
`transform_llm_output` 追加到最终回复开头,确保用户可见。
- `policy = "block"`:summary `message` 非空时直接返回 Hermes block 结果。
- 检测到暂不支持的 Hermes skill root 时,所有 `policy` 都 fail-open;`ask` / `warn`
会显示 `暂不支持Hermes场景,请自行关注skill安全性。`,`debug` 只写日志,`block`
不阻断。
- `latestStatus = "unmanaged"` 是 Skill Ledger 诊断状态,summary `message` 为 `null`,包括 `block` 在内的所有 policy 都静默放行。
- 未配置 `policy` 的旧配置仍兼容:`enable_block = true` 映射为 `block`,`enable_block = false` 映射为 `warn`。
- 当前版本仅覆盖 Hermes 默认本地技能目录 `~/.hermes/skills`,按 Hermes `skill_view`
Expand All @@ -155,7 +161,8 @@ max_warnings_per_turn = 5
max_warning_contexts = 128
```

无 SkillFS 且希望用户可见提示或阻断时,将 `policy` 显式改为 `ask`、`warn` 或 `block`。
Hermes 场景请不要依赖该 capability 作为 Skill Ledger 安全拦截;如需严格 Skill
安全检查,请在非 Hermes 场景或独立流程中完成。

### observability

Expand Down
118 changes: 107 additions & 11 deletions src/agent-sec-core/hermes-plugin/src/capabilities/skill_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@
_SKIP_DIRS = frozenset({".git", ".github", ".hub", ".archive", ".skill-meta"})
_CONTEXT_KEY_FIELDS = ("session_id", "task_id", "run_id")
_HERMES_SESSION_ENV = "HERMES_SESSION_ID"
_UNSUPPORTED_STATUS = "unsupported"
_UNSUPPORTED_HERMES_NOTICE = "暂不支持Hermes场景,请自行关注skill安全性。"
_SKILLFS_INPLACE_SENTINELS = (
Path(".skillfs-inbox"),
Path("skill-discover") / _SKILL_MANIFEST,
)


class _UnsupportedHermesSkillRoot(Exception):
"""Internal signal for unsupported Hermes skill root views."""

def __init__(self, root: Path, reason: str):
super().__init__(reason)
self.root = root
self.reason = reason


@dataclass
Expand Down Expand Up @@ -75,13 +90,29 @@ def _on_pre_tool_call(self, tool_name, args, **kwargs):
self._diagnostic("[agent-sec-core] skill-ledger missing args, fail-open")
return None

skill_dir = self._resolve_skill_dir(args)
root = self._resolved_skills_dir()
if root is not None:
unsupported_reason = self._unsupported_reason_for_root(root)
if unsupported_reason is not None:
self._handle_unsupported_hermes(kwargs, root, unsupported_reason)
return None

try:
skill_dir = self._resolve_skill_dir(args, root=root)
except _UnsupportedHermesSkillRoot as exc:
self._handle_unsupported_hermes(kwargs, exc.root, exc.reason)
return None

if skill_dir is None:
self._diagnostic(
"[agent-sec-core] skill-ledger could not resolve skill_dir, fail-open"
)
return None
skill_dir = skill_dir.resolve()
try:
skill_dir = skill_dir.resolve()
except (OSError, ValueError) as exc:
self._handle_unsupported_hermes(kwargs, skill_dir, str(exc))
return None

result = call_agent_sec_cli(
["skill-ledger", "show", str(skill_dir)],
Expand Down Expand Up @@ -150,10 +181,23 @@ def _on_transform_llm_output(
if not warnings:
return None

unsupported_warnings = [
warning for warning in warnings if warning.status == _UNSUPPORTED_STATUS
]
warnings = [
warning for warning in warnings if warning.status != _UNSUPPORTED_STATUS
]

if unsupported_warnings and not warnings:
return f"{_UNSUPPORTED_HERMES_NOTICE}\n\n{response_text}"

lines = [
"[agent-sec-core skill-ledger warning]",
"The following Hermes skills did not pass Skill Ledger checks:",
]
if unsupported_warnings:
lines.insert(0, "")
lines.insert(0, _UNSUPPORTED_HERMES_NOTICE)
for warning in warnings[: self._max_warnings_per_turn]:
lines.append(
f"- {warning.skill_name}: status={warning.status}; {warning.message}"
Expand All @@ -166,14 +210,18 @@ def _on_transform_llm_output(
lines.append(response_text)
return "\n".join(lines)

def _resolve_skill_dir(self, args: dict[str, Any]) -> Path | None:
def _resolve_skill_dir(
self, args: dict[str, Any], *, root: Path | None = None
) -> Path | None:
"""Resolve a Hermes skill_view call to a local skill directory."""
skill_name = self._extract_string(args, "name", "skill", "skill_name")
if not skill_name:
return None
return self._resolve_skill_dir_from_name(skill_name)
return self._resolve_skill_dir_from_name(skill_name, root=root)

def _resolve_skill_dir_from_name(self, skill_name: str) -> Path | None:
def _resolve_skill_dir_from_name(
self, skill_name: str, *, root: Path | None = None
) -> Path | None:
"""Resolve by Hermes local directory name or category/name."""
wanted = skill_name.strip()
if not wanted:
Expand All @@ -185,9 +233,15 @@ def _resolve_skill_dir_from_name(self, skill_name: str) -> Path | None:
)
return None

root = self._resolved_skills_dir()
if root is None or not root.is_dir():
if root is None:
root = self._resolved_skills_dir()
if root is None:
return None
try:
if not root.is_dir():
return None
except OSError as exc:
raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc

candidates: list[Path] = []
seen: set[Path] = set()
Expand All @@ -196,7 +250,9 @@ def record(skill_dir: Path, skill_file: Path) -> None:
try:
resolved_file = skill_file.resolve()
resolved_dir = skill_dir.resolve()
except (OSError, ValueError):
except OSError as exc:
raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc
except ValueError:
return
if not self._is_under_root(resolved_file, root):
return
Expand All @@ -209,7 +265,11 @@ def record(skill_dir: Path, skill_file: Path) -> None:
if relative_name is not None:
direct_path = root / relative_name
direct_skill_file = direct_path / _SKILL_MANIFEST
if direct_path.is_dir() and direct_skill_file.is_file():
try:
is_direct_skill = direct_path.is_dir() and direct_skill_file.is_file()
except OSError as exc:
raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc
if is_direct_skill:
record(direct_path, direct_skill_file)

if "/" not in wanted:
Expand Down Expand Up @@ -238,15 +298,51 @@ def _resolved_skills_dir(self) -> Path | None:

def _iter_skill_files(self, root: Path):
"""Yield SKILL.md files under the default Hermes local skills dir."""
for skill_file in sorted(root.rglob(_SKILL_MANIFEST)):
try:
skill_files = sorted(root.rglob(_SKILL_MANIFEST))
except OSError as exc:
raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc

for skill_file in skill_files:
try:
resolved = skill_file.resolve()
except (OSError, ValueError):
except OSError as exc:
raise _UnsupportedHermesSkillRoot(root, str(exc)) from exc
except ValueError:
continue
if self._is_ignored_path(resolved, root):
continue
yield resolved

def _unsupported_reason_for_root(self, root: Path) -> str | None:
for sentinel in _SKILLFS_INPLACE_SENTINELS:
Comment thread
edonyzpc marked this conversation as resolved.
sentinel_path = root / sentinel
try:
if sentinel_path.exists():
return f"SkillFS in-place sentinel found: {sentinel}"
except OSError as exc:
return str(exc)
return None

def _handle_unsupported_hermes(
self, kwargs: dict[str, Any], root: Path, reason: str
) -> None:
log_message = "[agent-sec-core] skill-ledger %s root=%s reason=%s"
if self._policy == _POLICY_DEBUG:
logger.debug(log_message, _UNSUPPORTED_HERMES_NOTICE, root, reason)
return

logger.warning(log_message, _UNSUPPORTED_HERMES_NOTICE, root, reason)
if self._policy not in {_POLICY_ASK, _POLICY_WARN}:
return
self._remember_warning(
kwargs,
"Hermes",
root,
_UNSUPPORTED_STATUS,
_UNSUPPORTED_HERMES_NOTICE,
)

@staticmethod
def _is_ignored_path(path: Path, root: Path) -> bool:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,100 @@ def test_default_config_registers_skill_ledger_hooks():
class TestSkillLedgerHooks:
"""Behavior tests for pre_tool_call and transform_llm_output."""

@pytest.mark.parametrize(
"sentinel",
[
".skillfs-inbox",
"skill-discover/SKILL.md",
],
)
@patch("src.capabilities.skill_ledger.call_agent_sec_cli")
def test_skillfs_inplace_root_fails_open_with_short_notice(
self, mock_cli, tmp_path, sentinel
):
root = tmp_path / "skills"
_make_skill(root, "devops/risky")
sentinel_path = root / sentinel
if sentinel_path.name == "SKILL.md":
sentinel_path.parent.mkdir(parents=True)
sentinel_path.write_text(
"---\nname: skill-discover\n---\n", encoding="utf-8"
)
else:
sentinel_path.mkdir(parents=True)
cap = _make_capability(root, policy="warn")

result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1")
output = cap._on_transform_llm_output(
response_text="assistant response", session_id="s1"
)

assert result is None
mock_cli.assert_not_called()
assert (
output
== "暂不支持Hermes场景,请自行关注skill安全性。\n\nassistant response"
)

@patch("src.capabilities.skill_ledger.call_agent_sec_cli")
def test_skillfs_inplace_root_debug_policy_only_logs(
self, mock_cli, tmp_path, caplog
):
root = tmp_path / "skills"
_make_skill(root, "devops/risky")
(root / ".skillfs-inbox").mkdir()
cap = _make_capability(root, policy="debug")
caplog.set_level(logging.DEBUG, logger="agent-sec-core")

result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1")
output = cap._on_transform_llm_output(
response_text="assistant response", session_id="s1"
)

assert result is None
mock_cli.assert_not_called()
assert output is None
assert not cap._warnings_by_context
assert any("暂不支持Hermes场景" in record.message for record in caplog.records)

@patch("src.capabilities.skill_ledger.call_agent_sec_cli")
def test_skillfs_inplace_root_block_policy_does_not_block(self, mock_cli, tmp_path):
root = tmp_path / "skills"
_make_skill(root, "devops/risky")
(root / ".skillfs-inbox").mkdir()
cap = _make_capability(root, policy="block")

result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1")
output = cap._on_transform_llm_output(
response_text="assistant response", session_id="s1"
)

assert result is None
mock_cli.assert_not_called()
assert output is None

@patch("src.capabilities.skill_ledger.Path.rglob")
@patch("src.capabilities.skill_ledger.call_agent_sec_cli")
def test_skill_file_traversal_loop_fails_open_with_short_notice(
self, mock_cli, mock_rglob, tmp_path
):
root = tmp_path / "skills"
root.mkdir()
cap = _make_capability(root, policy="warn")
mock_rglob.side_effect = OSError("File system loop detected")

result = cap._on_pre_tool_call("skill_view", {"name": "risky"}, session_id="s1")
output = cap._on_transform_llm_output(
response_text="assistant response", session_id="s1"
)

assert result is None
mock_cli.assert_not_called()
assert (
output
== "暂不支持Hermes场景,请自行关注skill安全性。\n\nassistant response"
)

@patch("src.capabilities.skill_ledger.call_agent_sec_cli")
def test_pass_allows_without_warning(self, mock_cli, tmp_path):
root = tmp_path / "skills"
Expand Down
Loading