Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ def effective_skill_dir_entries(config: dict[str, Any]) -> list[str]:
return _compact_skill_dirs(entries)


def managed_skill_dir_entries(config: dict[str, Any]) -> list[str]:
"""Return only explicitly managed skill directory entries."""
return _compact_skill_dirs([str(v) for v in config.get("managedSkillDirs", [])])


def deprecated_skill_dir_entries(config: dict[str, Any]) -> list[str]:
"""Return deprecated skillDirs entries retained only for diagnostics."""
entries = config.get(_DEPRECATED_SKILL_DIRS_KEY)
Expand Down Expand Up @@ -196,11 +201,22 @@ def resolve_skill_dirs(config: dict[str, Any] | None = None) -> list[Path]:
"""
if config is None:
config = load_config()
return _resolve_skill_dir_entries(effective_skill_dir_entries(config))


def resolve_managed_skill_dirs(config: dict[str, Any] | None = None) -> list[Path]:
"""Expand only explicitly managed source/backing skill directories."""
if config is None:
config = load_config()
return _resolve_skill_dir_entries(managed_skill_dir_entries(config))


def _resolve_skill_dir_entries(entries: list[str]) -> list[Path]:
"""Expand skill directory entries into concrete directories."""
skill_dirs: list[Path] = []
seen: set[Path] = set()

for entry in effective_skill_dir_entries(config):
for entry in entries:
entry = str(entry)
expanded = Path(entry).expanduser()

Expand Down Expand Up @@ -301,6 +317,15 @@ def is_covered(skill_dir: Path, config: dict[str, Any] | None = None) -> bool:
return any(d.resolve() == resolved_target for d in all_dirs)


def is_managed_covered(skill_dir: Path, config: dict[str, Any] | None = None) -> bool:
"""Return ``True`` if *skill_dir* is explicitly covered by managedSkillDirs."""
if config is None:
config = load_config()
resolved_target = skill_dir.resolve()
managed_dirs = resolve_managed_skill_dirs(config)
return any(d.resolve() == resolved_target for d in managed_dirs)


def remember_skill_dir(
skill_dir: Path, config: dict[str, Any] | None = None
) -> str | None:
Expand All @@ -319,7 +344,7 @@ def remember_skill_dir(
if config is None:
config = load_config()

if is_covered(skill_dir, config):
if is_managed_covered(skill_dir, config):
return None

parent = skill_dir.parent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
compute_file_hashes,
diff_file_hashes,
)
from agent_sec_cli.skill_ledger.core.live_root import require_live_skill_dir
from agent_sec_cli.skill_ledger.core.manifest_integrity import (
manifest_hash_error,
verify_manifest_integrity,
Expand Down Expand Up @@ -465,6 +466,7 @@ def scan_skill(
force: bool = False,
) -> dict[str, Any]:
"""Run built-in scanners as needed and record signed scan results."""
skill_dir = str(require_live_skill_dir(skill_dir, backend))
validate_skill_dir(skill_dir)
_remember_skill_dir_best_effort(skill_dir)

Expand Down Expand Up @@ -579,6 +581,7 @@ def certify(
"--findings is required for certify; use 'skill-ledger scan' for built-in scanners",
)

skill_dir = str(require_live_skill_dir(skill_dir, backend))
validate_skill_dir(skill_dir)
_remember_skill_dir_best_effort(skill_dir)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
compute_file_hashes,
diff_file_hashes,
)
from agent_sec_cli.skill_ledger.core.live_root import require_live_skill_dir
from agent_sec_cli.skill_ledger.core.manifest_helpers import (
snapshot_matches_manifest,
)
from agent_sec_cli.skill_ledger.core.manifest_integrity import (
MISSING_SIGNATURE_ERROR,
manifest_hash_error,
Expand All @@ -26,6 +30,7 @@
from agent_sec_cli.skill_ledger.core.version_chain import (
latest_json_path,
load_latest_manifest,
snapshot_dir_path,
)
from agent_sec_cli.skill_ledger.models.manifest import (
SignedManifest,
Expand Down Expand Up @@ -64,7 +69,8 @@ def check(skill_dir: str, backend: SigningBackend) -> dict[str, Any]:
``skillName``, ``versionId``, ``createdAt``, ``updatedAt``, ``fileCount``,
``manifestHash``.
"""
# Step 0: Validate skill directory
# Step 0: Validate skill directory and avoid SkillFS runtime views.
skill_dir = str(require_live_skill_dir(skill_dir, backend))
validate_skill_dir(skill_dir)
skill_name = Path(skill_dir).name

Expand Down Expand Up @@ -160,6 +166,72 @@ def check(skill_dir: str, backend: SigningBackend) -> dict[str, Any]:
return {**meta, "status": "pass"}


def manifest_only_status(skill_dir: str, backend: SigningBackend) -> dict[str, Any]:
"""Return latest trusted manifest status without hashing root files."""
validate_skill_dir(skill_dir)
skill_name = Path(skill_dir).name
try:
manifest = load_latest_manifest(skill_dir)
except (json.JSONDecodeError, ValueError) as exc:
if latest_json_path(skill_dir).is_file():
return {
"status": "tampered",
"skillName": skill_name,
"versionId": None,
"createdAt": None,
"updatedAt": None,
"fileCount": None,
"manifestHash": None,
"reason": f"manifest file is corrupted: {exc}",
}
manifest = None
if manifest is None:
return {
"status": "none",
"skillName": skill_name,
"versionId": None,
"createdAt": None,
"updatedAt": None,
"fileCount": None,
"manifestHash": None,
}

meta = _manifest_metadata(manifest, skill_dir)
hash_error = manifest_hash_error(manifest)
if hash_error is not None:
return {**meta, "status": "tampered", "reason": hash_error}

signature_valid, signature_error = verify_manifest_signature(manifest, backend)
if not signature_valid and signature_error == MISSING_SIGNATURE_ERROR:
return {
**meta,
"status": "none",
"reason": "manifest has no signature (legacy)",
}
if not signature_valid:
return {**meta, "status": "tampered", "reason": signature_error}

if not snapshot_matches_manifest(
snapshot_dir_path(skill_dir, manifest.versionId),
manifest,
):
return {
**meta,
"status": "tampered",
"reason": "snapshot does not match manifest",
}

if manifest.scanStatus in {"deny", "warn"}:
return {
**meta,
"status": manifest.scanStatus,
"findings": _collect_findings(manifest),
}
if manifest.scanStatus == "none":
return {**meta, "status": "none"}
return {**meta, "status": "pass"}


def _collect_findings(manifest: SignedManifest) -> list[dict[str, Any]]:
"""Extract findings from all scans in the manifest."""
return [f for scan in manifest.scans for f in scan.findings]
Expand Down
Loading
Loading