Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 11 additions & 89 deletions oid4vc/mso_mdoc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""MSO_MDOC Credential Handler Plugin."""

import logging
import os
from typing import Optional, Union
from typing import Optional

from acapy_agent.config.injection_context import InjectionContext
from acapy_agent.core.event_bus import EventBus
Expand All @@ -11,91 +10,25 @@

from mso_mdoc.cred_processor import MsoMdocCredProcessor
from mso_mdoc.key_generation import generate_default_keys_and_certs
from mso_mdoc.mdoc.verifier import FileTrustStore, WalletTrustStore
from mso_mdoc.storage import MdocStorageManager
from oid4vc.cred_processor import CredProcessors
from . import routes as routes

LOGGER = logging.getLogger(__name__)

# Trust store type configuration
TRUST_STORE_TYPE_FILE = "file"
TRUST_STORE_TYPE_WALLET = "wallet"

# Store reference to processor for startup initialization
_mso_mdoc_processor: Optional[MsoMdocCredProcessor] = None


def create_trust_store(
profile: Optional[Profile] = None,
) -> Optional[Union[FileTrustStore, WalletTrustStore]]:
"""Create a trust store based on configuration.

Environment variables:
- OID4VC_MDOC_TRUST_STORE_TYPE: "file" or "wallet" (default: "file")
- OID4VC_MDOC_TRUST_ANCHORS_PATH: Path for file-based trust store

Args:
profile: ACA-Py profile for wallet-based trust store
(optional, required for wallet type)

Returns:
Configured trust store instance or None if disabled
"""
trust_store_type = os.getenv(
"OID4VC_MDOC_TRUST_STORE_TYPE", TRUST_STORE_TYPE_FILE
).lower()

if trust_store_type == TRUST_STORE_TYPE_WALLET:
if profile is None:
LOGGER.warning(
"Wallet trust store requires a profile, deferring initialization"
)
return None
LOGGER.info("Using wallet-based trust store")
return WalletTrustStore(profile)
elif trust_store_type == TRUST_STORE_TYPE_FILE:
trust_store_path = os.getenv(
"OID4VC_MDOC_TRUST_ANCHORS_PATH", "/etc/acapy/mdoc/trust-anchors/"
)
LOGGER.info("Using file-based trust store at: %s", trust_store_path)
return FileTrustStore(trust_store_path)
elif trust_store_type == "none" or trust_store_type == "disabled":
LOGGER.info("Trust store disabled")
return None
else:
LOGGER.warning(
"Unknown trust store type '%s', falling back to file-based",
trust_store_type,
)
trust_store_path = os.getenv(
"OID4VC_MDOC_TRUST_ANCHORS_PATH", "/etc/acapy/mdoc/trust-anchors/"
)
return FileTrustStore(trust_store_path)


async def on_startup(profile: Profile, event: object):
"""Handle startup event to initialize profile-dependent resources."""
global _mso_mdoc_processor
"""Handle startup event to initialize profile-dependent resources.

Trust anchors are always wallet-scoped; a fresh WalletTrustStore is
constructed per-request in verify_credential / verify_presentation so
each tenant's Askar partition is queried automatically.
"""
LOGGER.info("MSO_MDOC plugin startup - initializing profile-dependent resources")

trust_store_type = os.getenv(
"OID4VC_MDOC_TRUST_STORE_TYPE", TRUST_STORE_TYPE_FILE
).lower()

# If using wallet trust store, initialize it now that we have a profile
if trust_store_type == TRUST_STORE_TYPE_WALLET and _mso_mdoc_processor is not None:
trust_store = WalletTrustStore(profile)
try:
await trust_store.refresh_cache()
LOGGER.info("Loaded trust anchors from wallet")
except Exception as e:
LOGGER.warning("Failed to load trust anchors from wallet: %s", e)

# Update the processor with the trust store
_mso_mdoc_processor.trust_store = trust_store

# Initialize storage and generate default keys/certs if needed
try:
storage_manager = MdocStorageManager(profile)
Expand Down Expand Up @@ -127,28 +60,17 @@ async def setup(context: InjectionContext):

LOGGER.info("Setting up MSO_MDOC plugin")

# For wallet trust store, we'll initialize the trust store in on_startup
# For file-based trust store, we can initialize now
trust_store_type = os.getenv(
"OID4VC_MDOC_TRUST_STORE_TYPE", TRUST_STORE_TYPE_FILE
).lower()

if trust_store_type == TRUST_STORE_TYPE_WALLET:
# Defer trust store initialization until startup
trust_store = None
LOGGER.info("Wallet-based trust store will be initialized at startup")
else:
# File-based trust store can be initialized immediately
trust_store = create_trust_store()

# Trust anchors are always wallet-scoped. A fresh WalletTrustStore is
# constructed per-request inside verify_credential / verify_presentation
# so each tenant's Askar partition is used automatically.
# Register credential processor
processors = context.inject(CredProcessors)
_mso_mdoc_processor = MsoMdocCredProcessor(trust_store=trust_store)
_mso_mdoc_processor = MsoMdocCredProcessor()
processors.register_issuer("mso_mdoc", _mso_mdoc_processor)
processors.register_cred_verifier("mso_mdoc", _mso_mdoc_processor)
processors.register_pres_verifier("mso_mdoc", _mso_mdoc_processor)

# Register startup event handler for profile-dependent initialization
# Register startup event handler for storage initialization
event_bus = context.inject(EventBus)
event_bus.subscribe(STARTUP_EVENT_PATTERN, on_startup)
LOGGER.info("MSO_MDOC plugin registered startup handler")
26 changes: 6 additions & 20 deletions oid4vc/mso_mdoc/cred_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,9 @@ async def verify_credential(
credential: Any,
):
"""Verify an mso_mdoc credential."""
# In wallet trust-store mode, self.trust_store was built at startup
# with the root profile. Sub-wallet credential verification must use
# the calling profile so per-tenant Askar partitions are queried.
# For file- or None-based trust stores the singleton is fine.
if os.getenv("OID4VC_MDOC_TRUST_STORE_TYPE", "file").lower() == "wallet":
trust_store = WalletTrustStore(profile)
else:
trust_store = self.trust_store

# Always build a per-request WalletTrustStore from the calling profile
# so each tenant's Askar partition is queried (wallet-scoped registry).
trust_store = WalletTrustStore(profile)
verifier = MsoMdocCredVerifier(trust_store=trust_store)
return await verifier.verify_credential(profile, credential)

Expand All @@ -765,17 +759,9 @@ async def verify_presentation(
presentation_record: "OID4VPPresentation",
):
"""Verify an mso_mdoc presentation."""
# In wallet trust-store mode, self.trust_store was built at startup
# with the root profile. Sub-wallet VP verification must use the
# calling profile so per-tenant Askar partitions are queried and
# anchors registered via /mso_mdoc/trust-anchors with a sub-wallet
# Bearer token are visible. For file- or None-based trust stores
# the singleton is reused as-is.
if os.getenv("OID4VC_MDOC_TRUST_STORE_TYPE", "file").lower() == "wallet":
trust_store = WalletTrustStore(profile)
else:
trust_store = self.trust_store

# Always build a per-request WalletTrustStore from the calling profile
# so each tenant's Askar partition is queried (wallet-scoped registry).
trust_store = WalletTrustStore(profile)
verifier = MsoMdocPresVerifier(trust_store=trust_store)
return await verifier.verify_presentation(
profile, presentation, presentation_record
Expand Down
25 changes: 0 additions & 25 deletions oid4vc/mso_mdoc/mdoc/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import base64
import json
import logging
import os
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Optional, Protocol
Expand Down Expand Up @@ -87,30 +86,6 @@ def get_trust_anchors(self) -> List[str]:
...


class FileTrustStore:
"""Trust store implementation backed by a directory of PEM files."""

def __init__(self, path: str):
"""Initialize the file trust store."""
self.path = path

def get_trust_anchors(self) -> List[str]:
"""Retrieve trust anchors from the directory."""
anchors = []
if not os.path.isdir(self.path):
LOGGER.warning("Trust store path %s is not a directory.", self.path)
return anchors

for filename in os.listdir(self.path):
if filename.endswith(".pem") or filename.endswith(".crt"):
try:
with open(os.path.join(self.path, filename), "r") as f:
anchors.append(f.read())
except Exception as e:
LOGGER.warning("Failed to read trust anchor %s: %s", filename, e)
return anchors


class WalletTrustStore:
"""Trust store implementation backed by Askar wallet storage.

Expand Down
3 changes: 1 addition & 2 deletions oid4vc/mso_mdoc/tests/test_review_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
# Now import the modules under test.
# ---------------------------------------------------------------------------
from ..mdoc.verifier import ( # noqa: E402
FileTrustStore,
MsoMdocCredVerifier,
MsoMdocPresVerifier,
WalletTrustStore,
Expand Down Expand Up @@ -131,7 +130,7 @@ async def test_no_trust_store_passes_empty_registry(self):
@pytest.mark.asyncio
async def test_empty_trust_store_passes_empty_registry(self):
"""verify_presentation with a trust_store returning [] must also fail-closed."""
mock_store = MagicMock(spec=FileTrustStore)
mock_store = MagicMock()
mock_store.get_trust_anchors.return_value = []
verifier = MsoMdocPresVerifier(trust_store=mock_store)
profile, _ = make_mock_profile()
Expand Down
119 changes: 1 addition & 118 deletions oid4vc/mso_mdoc/tests/test_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import sys
from contextlib import asynccontextmanager
from unittest.mock import MagicMock, mock_open, patch
from unittest.mock import MagicMock, patch

import pytest

from oid4vc.models.presentation import OID4VPPresentation

from ..mdoc.verifier import (
FileTrustStore,
MsoMdocCredVerifier,
MsoMdocPresVerifier,
PreverifiedMdocClaims,
Expand Down Expand Up @@ -48,122 +47,6 @@ async def mock_session_context():
return profile, mock_session


class TestFileTrustStore:
"""Test FileTrustStore functionality."""

def test_init_stores_path(self):
"""Test that initialization stores the path correctly."""
store = FileTrustStore("/some/path")
assert store.path == "/some/path"

def test_get_trust_anchors_success(self):
"""Test retrieving trust anchors successfully."""
with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["cert1.pem", "cert2.crt", "ignore.txt"]),
patch("builtins.open", mock_open(read_data="CERT_CONTENT")),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()

assert len(anchors) == 2
assert anchors == ["CERT_CONTENT", "CERT_CONTENT"]

def test_get_trust_anchors_no_dir(self):
"""Test handling of missing directory."""
with patch("os.path.isdir", return_value=False):
store = FileTrustStore("/invalid/path")
anchors = store.get_trust_anchors()
assert anchors == []

def test_get_trust_anchors_read_error(self):
"""Test handling of file read errors."""
with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["cert1.pem"]),
patch("builtins.open", side_effect=Exception("Read error")),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()
assert anchors == []

def test_get_trust_anchors_empty_directory(self):
"""Test handling of empty directory with no certificate files."""
with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=[]),
):
store = FileTrustStore("/path/to/empty")
anchors = store.get_trust_anchors()
assert anchors == []

def test_get_trust_anchors_only_non_cert_files(self):
"""Test directory with only non-certificate files."""
with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["readme.txt", "config.json", "script.sh"]),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()
assert anchors == []

def test_get_trust_anchors_partial_read_failure(self):
"""Test that successful reads continue after a failed read."""

def mock_open_side_effect(path, mode="r"):
if "fail" in path:
raise Exception("Read error")
return mock_open(read_data="CERT_CONTENT")()

with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["good1.pem", "fail.pem", "good2.crt"]),
patch("builtins.open", side_effect=mock_open_side_effect),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()

# Should have 2 successful reads despite 1 failure
assert len(anchors) == 2
assert all(a == "CERT_CONTENT" for a in anchors)

def test_get_trust_anchors_case_sensitive_extensions(self):
"""Test that file extension matching is case-sensitive."""
with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["cert1.PEM", "cert2.CRT", "cert3.pem"]),
patch("builtins.open", mock_open(read_data="CERT_CONTENT")),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()

# Only .pem (lowercase) should be matched, not .PEM or .CRT
assert len(anchors) == 1

def test_get_trust_anchors_reads_different_content(self):
"""Test that different certificate files have different content."""
file_contents = {
"/path/to/certs/cert1.pem": "CERT_ONE",
"/path/to/certs/cert2.crt": "CERT_TWO",
}

def mock_open_with_content(path, mode="r"):
content = file_contents.get(path, "UNKNOWN")
return mock_open(read_data=content)()

with (
patch("os.path.isdir", return_value=True),
patch("os.listdir", return_value=["cert1.pem", "cert2.crt"]),
patch("builtins.open", side_effect=mock_open_with_content),
):
store = FileTrustStore("/path/to/certs")
anchors = store.get_trust_anchors()

assert len(anchors) == 2
assert "CERT_ONE" in anchors
assert "CERT_TWO" in anchors


class TestMsoMdocCredVerifier:
"""Test MsoMdocCredVerifier functionality."""

Expand Down
Loading
Loading