From e048501750a4d5b8316698e296f4ee4b2eae666c Mon Sep 17 00:00:00 2001 From: Brad Haas Date: Mon, 13 Apr 2026 22:03:54 -0400 Subject: [PATCH 1/2] security: bump time crate to 0.3.47 (RUSTSEC-2026-0009) Updates Cargo.lock to pull in time >=0.3.47, addressing the DoS via stack exhaustion advisory flagged by cargo audit. The vulnerable version was pulled in transitively through x509-parser 0.16.0. Requires rustc >=1.88.0, which matches what the CI rust job already installs via dtolnay/rust-toolchain@stable. --- Cargo.lock | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f237aa..01443f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,9 +90,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", ] @@ -181,9 +181,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-integer" @@ -321,19 +321,19 @@ dependencies = [ ] [[package]] -name = "serde" -version = "1.0.219" +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -390,30 +390,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", From 25f1e88ad50eca4b505313c1cb2da10a8b6d21e3 Mon Sep 17 00:00:00 2001 From: Brad Haas Date: Mon, 13 Apr 2026 22:04:23 -0400 Subject: [PATCH 2/2] feat: dynamic validator args dispatch (#18) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the hardcoded subject_alt_names special case in core.validate() with a generic dispatch that discovers each validator's user arguments from its validate() signature. New validators automatically participate in argument passing without any core changes. Contract for validator authors: - The first three positional params of validate() are framework- supplied (cert/cipher data, host, port). Their names don't matter. - Any additional user-configurable arguments must be keyword-only, annotated, and have a default value. - Enforcement runs in BaseCertValidator/BaseCipherValidator __init_subclass__ at import time; malformed signatures raise TypeError before the class can be used. Performance: signature inspection happens once, at class definition, via __init_subclass__. The per-call dispatch hot path is a frozen-set difference and a dict unpack — sub-microsecond, zero inspect calls. User-facing API: - Canonical form: validate(validator_args={"name": {"arg": value}}) - The bare-list shorthand (subject_alt_names=[...]) still works for one-arg validators but now emits a DeprecationWarning. - New CertMonitor.describe_validators() returns every validator's name, doc, and argument schema (annotation, default) for introspection — reads the cached _user_params populated at class definition time. Validator migrations (signature changes only, no behavior changes): - subject_alt_names: alternate_names is now keyword-only. - sensitive_date: *args: SensitiveDate -> *, dates: Optional[List[ SensitiveDate]] = None. Weekend/leap-day flagging, return shape, and the internal isinstance check are unchanged. Existing tests that asserted positional dispatch were updated to the kwargs form. New tests cover enforcement (well-formed, missing annotation, missing default, *args, **kwargs, cipher parity), dispatch (canonical dict form, deprecation shim with pytest.warns, unknown arg, invalid arg type, validator raising TypeError), and describe_validators (shape, per-validator args, plain-class annotation rendering). Coverage: 98.67% (gate 95%); validators/base.py and all new core dispatch code are at 100%. --- certmonitor/core.py | 146 +++++++++++++++-- certmonitor/validators/base.py | 138 +++++++++++++--- certmonitor/validators/sensitive_date.py | 40 ++--- certmonitor/validators/subject_alt_names.py | 1 + docs/usage/validator_args.md | 120 ++++++++++---- tests/test_core/test_utility_methods.py | 65 ++++++++ tests/test_core/test_validation.py | 155 ++++++++++++++++-- tests/test_validators/test_base.py | 115 +++++++++++++ tests/test_validators/test_sensitive_date.py | 26 ++- .../test_validators/test_subject_alt_names.py | 15 +- 10 files changed, 699 insertions(+), 122 deletions(-) diff --git a/certmonitor/core.py b/certmonitor/core.py index 383d36b..5dd8e71 100644 --- a/certmonitor/core.py +++ b/certmonitor/core.py @@ -6,7 +6,8 @@ import socket import ssl import tempfile -from typing import Any, Dict, List, Optional, Union, cast, Tuple +import warnings +from typing import Any, Dict, FrozenSet, List, Mapping, Optional, Union, cast, Tuple from certmonitor import certinfo, config from certmonitor.cipher_algorithms import parse_cipher_suite @@ -606,15 +607,9 @@ def validate( } else: for validator in cert_validators: - args = [cert_data, self.host, self.port] - # Pass additional arguments if any - if validator_args and validator.name in validator_args: - if validator.name == "subject_alt_names": - args.append(validator_args[validator.name]) - else: - args.extend(validator_args[validator.name]) - - results[validator.name] = validator.validate(*args) + results[validator.name] = self._invoke_validator( + validator, (cert_data, self.host, self.port), validator_args + ) # Cipher-based validations if cipher_validators: @@ -625,15 +620,81 @@ def validate( ) else: for validator in cipher_validators: - args = [cipher_info, self.host, self.port] - # Pass additional arguments if any - if validator_args and validator.name in validator_args: - args.extend(validator_args[validator.name]) - - results[validator.name] = validator.validate(*args) + results[validator.name] = self._invoke_validator( + validator, (cipher_info, self.host, self.port), validator_args + ) return results + def _invoke_validator( + self, + validator: Any, + framework_args: Tuple[Any, ...], + validator_args: Optional[Dict[str, Any]], + ) -> Dict[str, Any]: + """Resolve user kwargs from ``validator_args`` and call ``validator.validate``. + + Looks up the validator's cached ``_user_param_names`` (built at class + definition time by ``BaseCertValidator.__init_subclass__``) and projects + the per-validator entry of ``validator_args`` onto them. Returns a + structured error dict if the user passed unknown keys; otherwise calls + the validator and returns its result. + """ + raw = (validator_args or {}).get(validator.name) + kwargs: Mapping[str, Any] + + if raw is None: + kwargs = {} + elif isinstance(raw, dict): + kwargs = raw + else: + # Backwards-compatibility shim: pre-#18, ``subject_alt_names`` accepted + # a bare list of alternate names. Map a bare list to the validator's + # single user param if (and only if) it has exactly one. Emit a + # ``DeprecationWarning`` so callers can migrate to the named form. + user_param_names: FrozenSet[str] = getattr( + validator, "_user_param_names", frozenset() + ) + if isinstance(raw, list) and len(user_param_names) == 1: + only_param = next(iter(user_param_names)) + warnings.warn( + ( + f"Passing a bare list to validator_args[{validator.name!r}] " + f"is deprecated; use {{'{validator.name}': " + f"{{'{only_param}': [...]}}}} instead." + ), + DeprecationWarning, + stacklevel=3, + ) + kwargs = {only_param: raw} + else: + return { + "is_valid": False, + "reason": ( + f"Invalid args for validator {validator.name!r}: " + f"expected a dict of keyword arguments, got {type(raw).__name__}." + ), + } + + user_param_names = getattr(validator, "_user_param_names", frozenset()) + unknown = set(kwargs) - set(user_param_names) + if unknown: + return { + "is_valid": False, + "reason": ( + f"Unknown args for validator {validator.name!r}: " + f"{sorted(unknown)}. Accepted args: {sorted(user_param_names)}." + ), + } + + try: + return cast(Dict[str, Any], validator.validate(*framework_args, **kwargs)) + except TypeError as exc: + return { + "is_valid": False, + "reason": f"Validator {validator.name!r} rejected args: {exc}", + } + def get_enabled_validators(self) -> List[str]: """ Get the list of validators enabled for this CertMonitor instance. @@ -655,3 +716,56 @@ def list_validators(self) -> List[str]: from .validators import list_validators as _list_validators return _list_validators() + + def describe_validators(self) -> Dict[str, Dict[str, Any]]: + """Describe every registered validator and the user args it accepts. + + Reads each validator's cached ``_user_params`` (built by + ``BaseCertValidator.__init_subclass__`` / ``BaseCipherValidator.__init_subclass__`` + at class definition time) and renders a serializable description suitable + for printing, logging, or feeding into a CLI ``--help`` page. + + Returns: + dict: Keyed by validator name. Each value contains: + + - ``validator_type``: ``"cert"`` or ``"cipher"``. + - ``doc``: the validator class docstring (first line). + - ``args``: dict keyed by user arg name, each with ``annotation`` + (string), ``default`` (the literal default value), and + ``required`` (always ``False`` — every user arg must declare a + default). + + Example: + ```python + with CertMonitor("example.com") as monitor: + for name, info in monitor.describe_validators().items(): + print(name, info["args"]) + ``` + """ + import inspect + + described: Dict[str, Dict[str, Any]] = {} + for name, validator in self.validators.items(): + user_params = getattr(validator, "_user_params", {}) or {} + args_info: Dict[str, Dict[str, Any]] = {} + for param_name, param in user_params.items(): + # ``str()`` renders both plain classes and parameterized + # generics; only plain classes need the ```` wrapper + # unwrapped. Enforcement in __init_subclass__ guarantees every + # user param has an annotation, so no empty-annotation path. + rendered = str(param.annotation) + if rendered.startswith(""): + rendered = rendered[len("")] + args_info[param_name] = { + "annotation": rendered.replace("typing.", ""), + "default": param.default, + "required": False, + } + + doc = inspect.getdoc(validator.__class__) or "" + described[name] = { + "validator_type": getattr(validator, "validator_type", "cert"), + "doc": doc.splitlines()[0] if doc else "", + "args": args_info, + } + return described diff --git a/certmonitor/validators/base.py b/certmonitor/validators/base.py index ecd2ce9..47e7f60 100644 --- a/certmonitor/validators/base.py +++ b/certmonitor/validators/base.py @@ -1,54 +1,140 @@ # validators/base.py +"""Base classes for certmonitor validators. + +Contributors writing a new validator should subclass :class:`BaseCertValidator` +(for validators that inspect certificate data) or :class:`BaseCipherValidator` +(for validators that inspect cipher suite data) and implement ``validate``. + +The first three positional parameters of ``validate`` are supplied by the +dispatcher — the parsed cert or cipher data, the host, and the port. Any +additional user-configurable arguments must be declared as **keyword-only** +parameters, each with a **type annotation** and a **default value**. The +class-level ``__init_subclass__`` hook enforces this at import time, caches the +discovered user parameters, and exposes them for dispatch and introspection. +""" + +import inspect from abc import ABC, abstractmethod -from typing import Any, Dict +from typing import Any, ClassVar, Dict, FrozenSet, Mapping class BaseValidator(ABC): - """ - Abstract base class for certificate validators. - """ + """Abstract base class for certificate and cipher validators.""" @property @abstractmethod def name(self) -> str: - """ - Returns the name of the validator. - - Returns: - str: The name of the validator. - """ + """Return the name used to register and look up this validator.""" @abstractmethod - def validate(self, cert: Dict[str, Any], host: str, port: int) -> Dict[str, Any]: - """ - Validates the given certificate. + def validate(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + """Run the validator and return a result dict.""" + + +class _ValidatorBase(BaseValidator): + """Internal base that handles user-arg discovery for concrete validators. + + Subclasses set ``_framework_arity`` to the number of positional parameters + the dispatcher supplies to ``validate`` (typically 3: the parsed data, the + host, and the port). Everything keyword-only after those positional + parameters is treated as a user-configurable argument and must be + annotated and have a default value. + """ + + _framework_arity: ClassVar[int] = 0 + _user_params: ClassVar[Mapping[str, inspect.Parameter]] = {} + _user_param_names: ClassVar[FrozenSet[str]] = frozenset() + + def __init_subclass__(cls, **kwargs: Any) -> None: + super().__init_subclass__(**kwargs) + + # Only inspect ``validate`` if this class actually defines one. The + # intermediate ``BaseCertValidator`` / ``BaseCipherValidator`` bases + # define stubs and run through here; concrete subclasses also do. + if "validate" not in cls.__dict__: + return + + sig = inspect.signature(cls.validate) - Args: - cert (dict): The certificate data. - host (str): The hostname or IP address. - port (int): The port number. + # Framework params are the first ``_framework_arity`` non-``self`` + # positional parameters, regardless of what they are named. + user_params: Dict[str, inspect.Parameter] = {} + problems = [] + positional_seen = 0 + arity = cls._framework_arity - Returns: - dict: The validation result. - """ + for param_name, param in sig.parameters.items(): + if param_name == "self": + continue + if param.kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ): + positional_seen += 1 + if positional_seen <= arity: + # Framework-supplied positional parameter. Skip. + continue + # A positional-or-keyword param beyond the framework arity is + # a user arg declared the wrong way. + problems.append( + f"{param_name!r}: must be keyword-only (declare after `*`)" + ) + continue + if param.kind is inspect.Parameter.VAR_POSITIONAL: + problems.append( + f"{param_name!r}: *args is not allowed for user args; " + "declare keyword-only parameters instead" + ) + continue + if param.kind is inspect.Parameter.VAR_KEYWORD: + problems.append( + f"{param_name!r}: **kwargs is not allowed for user args; " + "declare each argument explicitly" + ) + continue + # Keyword-only parameter — a user arg. + if param.annotation is inspect.Parameter.empty: + problems.append(f"{param_name!r}: missing type annotation") + if param.default is inspect.Parameter.empty: + problems.append(f"{param_name!r}: missing default value") + user_params[param_name] = param + if problems: + raise TypeError( + f"Validator {cls.__name__}.validate() has malformed user args:\n - " + + "\n - ".join(problems) + + "\n\nUser args must be keyword-only, annotated, and defaulted. " + "Example:\n" + " def validate(self, cert, host, port, *, " + "alternate_names: Optional[List[str]] = None) -> Dict[str, Any]: ..." + ) + + cls._user_params = user_params + cls._user_param_names = frozenset(user_params) + + +class BaseCertValidator(_ValidatorBase): + """Base class for validators that inspect parsed certificate data.""" -class BaseCertValidator(BaseValidator): validator_type: str = "cert" + _framework_arity: ClassVar[int] = 3 def validate( self, cert_info: Dict[str, Any], host: str, port: int ) -> Dict[str, Any]: - # Default implementation - subclasses should override this - return None # type: ignore + # Default implementation — subclasses override. + return None # type: ignore[return-value] + +class BaseCipherValidator(_ValidatorBase): + """Base class for validators that inspect negotiated cipher suite data.""" -class BaseCipherValidator(BaseValidator): validator_type: str = "cipher" + _framework_arity: ClassVar[int] = 3 def validate( self, cipher_info: Dict[str, Any], host: str, port: int ) -> Dict[str, Any]: - # Default implementation - subclasses should override this - return None # type: ignore + # Default implementation — subclasses override. + return None # type: ignore[return-value] diff --git a/certmonitor/validators/sensitive_date.py b/certmonitor/validators/sensitive_date.py index c67f454..44c719a 100644 --- a/certmonitor/validators/sensitive_date.py +++ b/certmonitor/validators/sensitive_date.py @@ -5,7 +5,7 @@ """ from datetime import datetime, date -from typing import Any, Dict, List, NamedTuple +from typing import Any, Dict, List, NamedTuple, Optional from .base import BaseCertValidator @@ -34,12 +34,13 @@ class SensitiveDateValidator(BaseCertValidator): name = "sensitive_date" - def validate( # pylint: disable=arguments-differ + def validate( self, cert: Dict[str, Any], host: str, port: int, - *args: SensitiveDate, # Accepts any number of SensitiveDate objects from unpacked validator_args + *, + dates: Optional[List[SensitiveDate]] = None, ) -> Dict[str, Any]: """ Validates the sensitivity of the expiry date of the provided SSL certificate. @@ -48,18 +49,16 @@ def validate( # pylint: disable=arguments-differ cert (dict): The SSL certificate. host (str): The hostname (not used in this validator). port (int): The port number (not used in this validator). - *args (SensitiveDate, optional): Zero or more SensitiveDate objects passed as positional - arguments, representing sensitive dates to check against the certificate's - expiration date. + dates (list, optional): A list of ``SensitiveDate`` objects to check + against the certificate's expiration date. Defaults to ``None`` + (no sensitive-date matching, only weekend/leap-day checks). Returns: dict: A dictionary containing the validation results, including whether the certificate - expires on a weekend, a leap day, or on any of the dates passed in as args. + expires on a weekend, a leap day, or on any of the passed-in dates. Examples: Example output (success): - This example shows a certificate that is valid and does not expire on any of the - passed-in dates, so no warnings are present. ```json { @@ -71,8 +70,6 @@ def validate( # pylint: disable=arguments-differ ``` Example output (failure): - This example shows a certificate that expires both on a weekend, and also on one of - the passed-in dates, so validation fails and a warning is included. ```json { @@ -80,19 +77,19 @@ def validate( # pylint: disable=arguments-differ "leapday_expiry": false, "weekend_expiry": true, "warnings": [ - 'Certificate is due to expire on sensitive date "Busy Sunday" (2025/11/16)' + 'Certificate is due to expire on sensitive date "Busy Sunday" (2025-11-16)' ] } ``` """ - for sd in args: + sensitive_dates: List[SensitiveDate] = list(dates) if dates else [] + + for sd in sensitive_dates: if not isinstance(sd, SensitiveDate): raise TypeError( f"Expected SensitiveDate, got {type(sd).__name__}: {sd!r}" ) - sensitive_dates: List[SensitiveDate] = list(args) - not_after = datetime.strptime( cert["cert_info"]["notAfter"], "%b %d %H:%M:%S %Y GMT" ) @@ -101,13 +98,12 @@ def validate( # pylint: disable=arguments-differ weekend_expiry = not_after.weekday() in (5, 6) warnings = [] - if sensitive_dates: - for sensitive_date in sensitive_dates: - if not_after.date() == sensitive_date.date: - warnings.append( - f'Certificate is due to expire on sensitive date "{sensitive_date.name}"' - f" ({sensitive_date.date.isoformat()})" - ) + for sensitive_date in sensitive_dates: + if not_after.date() == sensitive_date.date: + warnings.append( + f'Certificate is due to expire on sensitive date "{sensitive_date.name}"' + f" ({sensitive_date.date.isoformat()})" + ) is_valid = not (leapday_expiry or warnings or weekend_expiry) diff --git a/certmonitor/validators/subject_alt_names.py b/certmonitor/validators/subject_alt_names.py index d07c8ac..131719f 100644 --- a/certmonitor/validators/subject_alt_names.py +++ b/certmonitor/validators/subject_alt_names.py @@ -22,6 +22,7 @@ def validate( cert: Dict[str, Any], host: str, port: int, + *, alternate_names: Optional[List[str]] = None, ) -> Dict[str, Any]: """ diff --git a/docs/usage/validator_args.md b/docs/usage/validator_args.md index 0c49c58..8801310 100644 --- a/docs/usage/validator_args.md +++ b/docs/usage/validator_args.md @@ -1,55 +1,109 @@ # Passing Arguments to Validators -Some validators accept additional arguments to customize their behavior. You can pass these arguments as a dictionary to the `validate()` method. +Some validators accept additional arguments to customize their behavior. Pass +them as a dict to the `validate()` method's `validator_args` parameter, keyed by +validator name. Each validator's entry is itself a dict mapping argument names +to values. -## Example: Passing Alternate Names to the subject_alt_names Validator +## Canonical form ```python from certmonitor import CertMonitor with CertMonitor("example.com") as monitor: - results = monitor.validate({ - "subject_alt_names": ["example.com", "www.example.com", "test.example.com"] - }) + results = monitor.validate( + validator_args={ + "subject_alt_names": { + "alternate_names": ["example.com", "www.example.com"], + }, + } + ) print(results["subject_alt_names"]) ``` -### Example Output - -```json -{ - "is_valid": true, - "sans": {"DNS": ["example.com", "www.example.com"], "IP Address": []}, - "count": 2, - "contains_host": {"name": "example.com", "is_valid": true, "reason": "Matched DNS SAN"}, - "contains_alternate": { - "www.example.com": {"name": "www.example.com", "is_valid": true, "reason": "Matched DNS SAN"}, - "test.example.com": {"name": "test.example.com", "is_valid": false, "reason": "No match found for test.example.com in DNS SANs: example.com, www.example.com"} - }, - "warnings": [ - "The alternate name test.example.com is not included in the SANs: No match found for test.example.com in DNS SANs: example.com, www.example.com" - ] -} +The keys inside each per-validator dict must match the validator's user +parameter names exactly. Unknown keys are reported as a structured error in the +result, so typos are caught loudly: + +```python +results = monitor.validate( + validator_args={"subject_alt_names": {"alt_names": ["example.com"]}} +) +# results["subject_alt_names"] == { +# "is_valid": False, +# "reason": "Unknown args for validator 'subject_alt_names': ['alt_names']. " +# "Accepted args: ['alternate_names']." +# } ``` ---- +## Discovering what a validator accepts -## Example: Passing Arguments to a Custom Validator +Use `describe_validators()` to introspect every registered validator and the +arguments it accepts: -If you implement your own validator that accepts arguments, you can pass them in the same way: +```python +with CertMonitor("example.com") as monitor: + for name, info in monitor.describe_validators().items(): + if info["args"]: + print(name, info["args"]) +``` + +Each entry includes the argument's annotation, default value, and the +validator's class docstring — useful for building CLI help, dashboards, or +config validators. + +## Deprecated bare-list shorthand + +Earlier releases of CertMonitor accepted a bare list for validators with a +single user argument: ```python -def my_custom_validator(cert, host, port, my_arg): - # ... - return {"is_valid": True, "custom": my_arg} +# Deprecated — emits DeprecationWarning, will be removed in a future release. +monitor.validate(validator_args={"subject_alt_names": ["example.com"]}) +``` -with CertMonitor("example.com") as monitor: - results = monitor.validate({ - "my_custom_validator": ["my-value"] - }) - print(results["my_custom_validator"]) +This shorthand still works for backwards compatibility but emits a +`DeprecationWarning`. Migrate to the canonical dict form shown above. + +## Custom validators + +When you write your own validator, declare each user argument as a +**keyword-only** parameter on `validate()` with a **type annotation** and a +**default value**. CertMonitor enforces this at class definition time and will +raise `TypeError` at import if a validator is malformed. + +```python +from typing import Optional, List +from certmonitor.validators.base import BaseCertValidator + +class MyCustomValidator(BaseCertValidator): + name = "my_custom" + + def validate( + self, + cert, + host, + port, + *, + threshold: int = 0, + labels: Optional[List[str]] = None, + ): + return {"is_valid": True, "threshold": threshold, "labels": labels or []} ``` +Once registered via `register_validator()`, the new validator picks up the +dynamic args dispatch automatically: + +```python +results = monitor.validate( + validator_args={"my_custom": {"threshold": 5, "labels": ["prod"]}} +) +``` + +No core changes needed — the framework discovers the validator's user arguments +by reading its `validate()` signature once, at class definition time. + --- -> **Tip:** See the [Validators Reference](../validators/index.md) for details on which validators accept arguments and the expected format. +> **Tip:** See the [Validators Reference](../validators/index.md) for details on +> which built-in validators accept arguments and the expected format. diff --git a/tests/test_core/test_utility_methods.py b/tests/test_core/test_utility_methods.py index 439ab87..a2733cf 100644 --- a/tests/test_core/test_utility_methods.py +++ b/tests/test_core/test_utility_methods.py @@ -71,3 +71,68 @@ def test_to_structured_dict_invalid_tuple_length(self): assert isinstance(result, list) assert len(result) == 1 assert result[0] == ["single_value"] + + +class TestDescribeValidators: + """Test CertMonitor.describe_validators() introspection helper.""" + + def test_describe_validators_returns_all_registered(self): + """Every registered validator appears in the description.""" + monitor = CertMonitor("www.example.com") + described = monitor.describe_validators() + assert set(described.keys()) >= { + "expiration", + "hostname", + "key_info", + "subject_alt_names", + "root_certificate", + "sensitive_date", + "tls_version", + "weak_cipher", + } + + def test_describe_validators_subject_alt_names_args(self): + """subject_alt_names exposes ``alternate_names`` with annotation and default.""" + monitor = CertMonitor("www.example.com") + described = monitor.describe_validators() + + san = described["subject_alt_names"] + assert san["validator_type"] == "cert" + assert "alternate_names" in san["args"] + arg = san["args"]["alternate_names"] + assert arg["default"] is None + assert arg["required"] is False + assert "List" in arg["annotation"] and "str" in arg["annotation"] + + def test_describe_validators_validator_with_no_args(self): + """Validators without user args report an empty args dict.""" + monitor = CertMonitor("www.example.com") + described = monitor.describe_validators() + + assert described["expiration"]["args"] == {} + assert described["hostname"]["args"] == {} + + def test_describe_validators_includes_doc(self): + """Each entry includes the first line of the validator's class docstring.""" + monitor = CertMonitor("www.example.com") + described = monitor.describe_validators() + assert described["subject_alt_names"]["doc"] + assert isinstance(described["subject_alt_names"]["doc"], str) + + def test_describe_validators_renders_plain_class_annotations(self): + """Plain-class annotations like ``int`` render without ````.""" + from certmonitor.validators.base import BaseCertValidator + + class PlainAnnotationValidator(BaseCertValidator): + @property + def name(self): + return "plain_annotation" + + def validate(self, cert_info, host, port, *, threshold: int = 0): + return {"is_valid": True} + + monitor = CertMonitor("www.example.com") + monitor.validators = {"plain_annotation": PlainAnnotationValidator()} + described = monitor.describe_validators() + assert described["plain_annotation"]["args"]["threshold"]["annotation"] == "int" + assert described["plain_annotation"]["args"]["threshold"]["default"] == 0 diff --git a/tests/test_core/test_validation.py b/tests/test_core/test_validation.py index 7aef7ef..d60c0ce 100644 --- a/tests/test_core/test_validation.py +++ b/tests/test_core/test_validation.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch +import pytest from certmonitor.core import CertMonitor @@ -25,26 +26,29 @@ def test_validate(self, cert_monitor, sample_cert): assert "mock_validator" in result def test_validate_with_args(self, cert_monitor, sample_cert): - """Test validate function with additional arguments.""" + """Validator args passed in canonical dict form are forwarded as kwargs.""" cert_monitor.cert_info = sample_cert # Not wrapped cert_monitor.cert_data = {"cert_info": sample_cert} # Needed for validate() mock_validator = MagicMock(name="subject_alt_names") mock_validator.name = "subject_alt_names" mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"alternate_names"}) mock_validator.validate.return_value = {"is_valid": True} with patch.object( cert_monitor, "validators", {"subject_alt_names": mock_validator} ): cert_monitor.enabled_validators = ["subject_alt_names"] result = cert_monitor.validate( - validator_args={"subject_alt_names": ["example.com"]} + validator_args={ + "subject_alt_names": {"alternate_names": ["example.com"]} + } ) assert "subject_alt_names" in result mock_validator.validate.assert_called_once_with( {"cert_info": sample_cert}, cert_monitor.host, cert_monitor.port, - ["example.com"], + alternate_names=["example.com"], ) @@ -130,20 +134,24 @@ def test_validate_cert_validators_success(self): monitor.cert_data, monitor.host, monitor.port ) - def test_validate_cert_validators_with_subject_alt_names_args(self): - """Test validate() with subject_alt_names validator arguments.""" + def test_validate_cert_validators_with_subject_alt_names_args_dict(self): + """Canonical dict form for subject_alt_names args is forwarded as kwargs.""" monitor = CertMonitor("www.example.com") monitor.enabled_validators = ["subject_alt_names"] monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} - # Mock subject_alt_names validator mock_validator = MagicMock() mock_validator.name = "subject_alt_names" mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"alternate_names"}) mock_validator.validate.return_value = {"is_valid": True} with patch.object(monitor, "validators", {"subject_alt_names": mock_validator}): - validator_args = {"subject_alt_names": ["example.com", "www.example.com"]} + validator_args = { + "subject_alt_names": { + "alternate_names": ["example.com", "www.example.com"] + } + } result = monitor.validate(validator_args=validator_args) assert "subject_alt_names" in result @@ -151,30 +159,142 @@ def test_validate_cert_validators_with_subject_alt_names_args(self): monitor.cert_data, monitor.host, monitor.port, - ["example.com", "www.example.com"], + alternate_names=["example.com", "www.example.com"], + ) + + def test_validate_cert_validators_with_subject_alt_names_legacy_bare_list(self): + """Bare-list form still works for one-arg validators but emits DeprecationWarning.""" + monitor = CertMonitor("www.example.com") + monitor.enabled_validators = ["subject_alt_names"] + monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} + + mock_validator = MagicMock() + mock_validator.name = "subject_alt_names" + mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"alternate_names"}) + mock_validator.validate.return_value = {"is_valid": True} + + with patch.object(monitor, "validators", {"subject_alt_names": mock_validator}): + with pytest.warns(DeprecationWarning, match="bare list"): + validator_args = { + "subject_alt_names": ["example.com", "www.example.com"] + } + result = monitor.validate(validator_args=validator_args) + + assert "subject_alt_names" in result + mock_validator.validate.assert_called_once_with( + monitor.cert_data, + monitor.host, + monitor.port, + alternate_names=["example.com", "www.example.com"], ) def test_validate_cert_validators_with_other_args(self): - """Test validate() with other validator arguments (non-subject_alt_names).""" + """Custom validators receive args as kwargs from the canonical dict form.""" monitor = CertMonitor("www.example.com") monitor.enabled_validators = ["custom_validator"] monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} - # Mock custom validator mock_validator = MagicMock() mock_validator.name = "custom_validator" mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"threshold", "mode"}) mock_validator.validate.return_value = {"is_valid": True} with patch.object(monitor, "validators", {"custom_validator": mock_validator}): - validator_args = {"custom_validator": ["arg1", "arg2"]} + validator_args = {"custom_validator": {"threshold": 5, "mode": "strict"}} result = monitor.validate(validator_args=validator_args) assert "custom_validator" in result mock_validator.validate.assert_called_once_with( - monitor.cert_data, monitor.host, monitor.port, "arg1", "arg2" + monitor.cert_data, + monitor.host, + monitor.port, + threshold=5, + mode="strict", + ) + + def test_validate_cert_validators_unknown_arg_returns_error(self): + """Unknown user args produce a structured error dict, not an exception.""" + monitor = CertMonitor("www.example.com") + monitor.enabled_validators = ["custom_validator"] + monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} + + mock_validator = MagicMock() + mock_validator.name = "custom_validator" + mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"threshold"}) + mock_validator.validate.return_value = {"is_valid": True} + + with patch.object(monitor, "validators", {"custom_validator": mock_validator}): + validator_args = {"custom_validator": {"bogus": 1}} + result = monitor.validate(validator_args=validator_args) + + assert result["custom_validator"]["is_valid"] is False + assert "Unknown args" in result["custom_validator"]["reason"] + mock_validator.validate.assert_not_called() + + def test_validate_cert_validators_invalid_args_type_returns_error(self): + """Non-dict, non-list args (e.g. a string) produce a structured error.""" + monitor = CertMonitor("www.example.com") + monitor.enabled_validators = ["custom_validator"] + monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} + + mock_validator = MagicMock() + mock_validator.name = "custom_validator" + mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"threshold"}) + mock_validator.validate.return_value = {"is_valid": True} + + with patch.object(monitor, "validators", {"custom_validator": mock_validator}): + result = monitor.validate(validator_args={"custom_validator": "not a dict"}) + + assert result["custom_validator"]["is_valid"] is False + assert "expected a dict" in result["custom_validator"]["reason"] + mock_validator.validate.assert_not_called() + + def test_validate_cert_validators_bare_list_multi_arg_returns_error(self): + """Bare list shorthand only works for one-arg validators; otherwise error.""" + monitor = CertMonitor("www.example.com") + monitor.enabled_validators = ["multi_arg_validator"] + monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} + + mock_validator = MagicMock() + mock_validator.name = "multi_arg_validator" + mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"a", "b"}) + mock_validator.validate.return_value = {"is_valid": True} + + with patch.object( + monitor, "validators", {"multi_arg_validator": mock_validator} + ): + result = monitor.validate(validator_args={"multi_arg_validator": [1, 2, 3]}) + + assert result["multi_arg_validator"]["is_valid"] is False + assert "expected a dict" in result["multi_arg_validator"]["reason"] + mock_validator.validate.assert_not_called() + + def test_validate_cert_validators_validator_raises_typeerror(self): + """If the validator itself raises TypeError, dispatch returns a structured error.""" + monitor = CertMonitor("www.example.com") + monitor.enabled_validators = ["raising_validator"] + monitor.cert_data = {"cert_info": {"subject": {"commonName": "example.com"}}} + + mock_validator = MagicMock() + mock_validator.name = "raising_validator" + mock_validator.validator_type = "cert" + mock_validator._user_param_names = frozenset({"flag"}) + mock_validator.validate.side_effect = TypeError("flag must be bool") + + with patch.object(monitor, "validators", {"raising_validator": mock_validator}): + result = monitor.validate( + validator_args={"raising_validator": {"flag": "yes"}} ) + assert result["raising_validator"]["is_valid"] is False + assert "rejected args" in result["raising_validator"]["reason"] + assert "flag must be bool" in result["raising_validator"]["reason"] + class TestCipherValidators: """Test cipher-specific validator scenarios.""" @@ -233,14 +353,14 @@ def test_validate_cipher_validators_cipher_error(self): assert "weak_cipher" not in result or len(result) == 0 def test_validate_cipher_validators_with_args(self): - """Test validate() cipher validators with additional arguments.""" + """Cipher validators receive args as kwargs from the canonical dict form.""" monitor = CertMonitor("www.example.com") monitor.enabled_validators = ["custom_cipher_validator"] - # Mock cipher validator mock_validator = MagicMock() mock_validator.name = "custom_cipher_validator" mock_validator.validator_type = "cipher" + mock_validator._user_param_names = frozenset({"min_strength"}) mock_validator.validate.return_value = {"is_valid": True} mock_cipher_info = {"cipher_suite": {"name": "test"}} @@ -251,12 +371,15 @@ def test_validate_cipher_validators_with_args(self): with patch.object( monitor, "get_cipher_info", return_value=mock_cipher_info ): - validator_args = {"custom_cipher_validator": ["arg1", "arg2"]} + validator_args = {"custom_cipher_validator": {"min_strength": 256}} result = monitor.validate(validator_args=validator_args) assert "custom_cipher_validator" in result mock_validator.validate.assert_called_once_with( - mock_cipher_info, monitor.host, monitor.port, "arg1", "arg2" + mock_cipher_info, + monitor.host, + monitor.port, + min_strength=256, ) diff --git a/tests/test_validators/test_base.py b/tests/test_validators/test_base.py index b469da2..e4df8bd 100644 --- a/tests/test_validators/test_base.py +++ b/tests/test_validators/test_base.py @@ -189,5 +189,120 @@ def validate(self, cert_info, host, port): assert validator.name == "custom_name" +class TestUserArgEnforcement: + """Test the __init_subclass__ enforcement of user-arg declarations.""" + + def test_validator_with_no_user_args_is_allowed(self): + """A validator with only framework params and no user args passes.""" + + class NoUserArgsValidator(BaseCertValidator): + @property + def name(self): + return "no_user_args" + + def validate(self, cert_info, host, port): + return {"is_valid": True} + + v = NoUserArgsValidator() + assert v._user_param_names == frozenset() + + def test_validator_with_well_formed_user_arg_is_allowed(self): + """A keyword-only, annotated, defaulted user arg is accepted.""" + from typing import Optional, List + + class WellFormedValidator(BaseCertValidator): + @property + def name(self): + return "well_formed" + + def validate( + self, + cert_info, + host, + port, + *, + names: Optional[List[str]] = None, + ): + return {"is_valid": True, "names": names} + + v = WellFormedValidator() + assert v._user_param_names == frozenset({"names"}) + assert "names" in v._user_params + + def test_validator_with_positional_user_arg_is_rejected(self): + """A positional-or-keyword user arg fails enforcement at class creation.""" + with pytest.raises(TypeError, match="must be keyword-only"): + + class BadPositionalValidator(BaseCertValidator): + @property + def name(self): + return "bad_positional" + + def validate(self, cert_info, host, port, names=None): + return {"is_valid": True} + + def test_validator_with_unannotated_user_arg_is_rejected(self): + """A keyword-only user arg without a type annotation fails.""" + with pytest.raises(TypeError, match="missing type annotation"): + + class BadUnannotatedValidator(BaseCertValidator): + @property + def name(self): + return "bad_unannotated" + + def validate(self, cert_info, host, port, *, names=None): + return {"is_valid": True} + + def test_validator_with_no_default_user_arg_is_rejected(self): + """A keyword-only annotated user arg without a default fails.""" + from typing import List + + with pytest.raises(TypeError, match="missing default value"): + + class BadNoDefaultValidator(BaseCertValidator): + @property + def name(self): + return "bad_no_default" + + def validate(self, cert_info, host, port, *, names: List[str]): + return {"is_valid": True} + + def test_validator_with_var_positional_is_rejected(self): + """``*args`` for user args is rejected.""" + with pytest.raises(TypeError, match=r"\*args is not allowed"): + + class BadVarPositionalValidator(BaseCertValidator): + @property + def name(self): + return "bad_varargs" + + def validate(self, cert_info, host, port, *names): + return {"is_valid": True} + + def test_validator_with_var_keyword_is_rejected(self): + """``**kwargs`` for user args is rejected — every arg must be explicit.""" + with pytest.raises(TypeError, match=r"\*\*kwargs is not allowed"): + + class BadVarKeywordValidator(BaseCertValidator): + @property + def name(self): + return "bad_kwargs" + + def validate(self, cert_info, host, port, **opts): + return {"is_valid": True} + + def test_cipher_validator_enforcement(self): + """Cipher validators get the same enforcement as cert validators.""" + with pytest.raises(TypeError, match="must be keyword-only"): + + class BadCipherValidator(BaseCipherValidator): + @property + def name(self): + return "bad_cipher" + + def validate(self, cipher_info, host, port, threshold=0): + return {"is_valid": True} + + if __name__ == "__main__": pytest.main([__file__]) diff --git a/tests/test_validators/test_sensitive_date.py b/tests/test_validators/test_sensitive_date.py index 2dfa0b4..a8ead59 100644 --- a/tests/test_validators/test_sensitive_date.py +++ b/tests/test_validators/test_sensitive_date.py @@ -64,7 +64,10 @@ def test_sensitive_date_warning_and_invalid(sample_cert): sensitive_date = SensitiveDate("Busy Sunday", date(2025, 11, 16)) result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, sensitive_date + {"cert_info": sample_cert}, + "www.example.com", + 443, + dates=[sensitive_date], ) expected_warning = ( @@ -84,7 +87,10 @@ def test_sensitive_date_no_warning_and_valid(sample_cert): sensitive_date = SensitiveDate("Busy Tuesday", date(2025, 11, 18)) result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, sensitive_date + {"cert_info": sample_cert}, + "www.example.com", + 443, + dates=[sensitive_date], ) assert result["is_valid"] @@ -107,7 +113,10 @@ def test_many_sensitive_dates_no_match(sample_cert): ) result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, *sensitive_dates + {"cert_info": sample_cert}, + "www.example.com", + 443, + dates=sensitive_dates, ) assert result["is_valid"] @@ -129,7 +138,10 @@ def test_many_sensitive_dates_match(sample_cert): ) result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, *sensitive_dates + {"cert_info": sample_cert}, + "www.example.com", + 443, + dates=sensitive_dates, ) expected_warning_1 = ( @@ -159,8 +171,10 @@ def test_sensitive_date_validator_type_check(sample_cert): {"cert_info": sample_cert}, "www.example.com", 443, - SensitiveDate("Valid SensitiveDate", date(2025, 1, 1)), - "A string not a SensitiveDate", + dates=[ + SensitiveDate("Valid SensitiveDate", date(2025, 1, 1)), + "A string not a SensitiveDate", # type: ignore[list-item] + ], ) assert "Expected SensitiveDate, got str" in str(excinfo.value) diff --git a/tests/test_validators/test_subject_alt_names.py b/tests/test_validators/test_subject_alt_names.py index f7230b1..c72578e 100644 --- a/tests/test_validators/test_subject_alt_names.py +++ b/tests/test_validators/test_subject_alt_names.py @@ -15,7 +15,10 @@ def test_san_validation_success_with_alternates(self, sample_cert): """Test SAN validation success with alternate names.""" validator = SubjectAltNamesValidator() result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, ["example.com"] + {"cert_info": sample_cert}, + "www.example.com", + 443, + alternate_names=["example.com"], ) assert result["is_valid"] is True assert result["contains_host"]["is_valid"] is True @@ -26,7 +29,10 @@ def test_san_validation_partial_mismatch(self, sample_cert): """Test SAN validation with partial mismatch.""" validator = SubjectAltNamesValidator() result = validator.validate( - {"cert_info": sample_cert}, "www.example.com", 443, ["invalid.com"] + {"cert_info": sample_cert}, + "www.example.com", + 443, + alternate_names=["invalid.com"], ) assert result["is_valid"] is True # Host is valid assert result["contains_host"]["is_valid"] is True @@ -180,7 +186,10 @@ def test_alternate_names_validation(self): validator = SubjectAltNamesValidator() alternate_names = ["www.example.com", "192.168.1.1", "invalid.com"] result = validator.validate( - {"cert_info": cert_info}, "example.com", 443, alternate_names + {"cert_info": cert_info}, + "example.com", + 443, + alternate_names=alternate_names, ) assert result["is_valid"] is True