From ac1c10b09a2ff8ab012b8c1b25d3014f17c25d47 Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:01:33 +0100 Subject: [PATCH 1/4] Add host_url option to CLI and Checker initialization for service validation --- src/validate_secrets/cli.py | 10 +++++++++- src/validate_secrets/core/base.py | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/validate_secrets/cli.py b/src/validate_secrets/cli.py index 481f8ab..ff7a037 100644 --- a/src/validate_secrets/cli.py +++ b/src/validate_secrets/cli.py @@ -33,8 +33,12 @@ is_flag=True, help="Enable debug logging. To use, add the flag as a first argument!", ) +@click.option( + "--host-url", + help="Base URL of the service to validate against", +) @click.pass_context -def cli(ctx, config, debug): +def cli(ctx, config, debug, host_url): """Extensible secret validation tool.""" ctx.ensure_object(dict) @@ -48,6 +52,7 @@ def cli(ctx, config, debug): ctx.obj["config"].setup_logging() ctx.obj["debug"] = debug + ctx.obj["host_url"] = host_url @cli.command() @@ -125,6 +130,7 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) for secret_data in track( @@ -245,6 +251,7 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) status = validator.check(secret) @@ -334,6 +341,7 @@ def validate(ctx, secret, secret_type, notify): notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) # Validate secret diff --git a/src/validate_secrets/core/base.py b/src/validate_secrets/core/base.py index 44f295c..432eb41 100644 --- a/src/validate_secrets/core/base.py +++ b/src/validate_secrets/core/base.py @@ -54,17 +54,25 @@ class Checker(ABC): name: str = "" description: str = "" - def __init__(self, notify: bool = False, debug: bool = False, timeout: int = 30) -> None: + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: """Initialize the checker. Args: notify: Whether to send notifications to endpoints debug: Enable debug logging timeout: Timeout in seconds for validation + host_url: Base URL of the service to validate against """ self.notify = notify self.debug = debug self.timeout = timeout + self.host_url = host_url.rstrip("/") if host_url else None if self.debug: logging.getLogger().setLevel(logging.DEBUG) From b1a4f35d7333c5a5a074a10a8de9df693a5fa777 Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:13:44 +0100 Subject: [PATCH 2/4] Add Databricks token validator and update README with usage instructions --- README.md | 10 +++ .../validators/databricks_token.py | 77 +++++++++++++++++++ tests/test_registry.py | 1 + tests/test_validators.py | 45 +++++++++++ 4 files changed, 133 insertions(+) create mode 100644 src/validate_secrets/validators/databricks_token.py diff --git a/README.md b/README.md index 8a9d771..56aadad 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ validate-secrets check-file input/secrets_file.json --file-format json --format | `google_api_key` | Google API Keys | AIza... format | | `microsoft_teams_webhook` | Microsoft Teams/Office 365 Webhooks | webhook.office.com URLs | | `snyk_api_token` | Snyk API Tokens | API tokens | +| `databricks_token` | Databricks Personal Access Tokens | `dapi...` format | Note: Most accurate way to see available validators is to run `validate-secrets list-validators` command. @@ -200,6 +201,15 @@ With the `--output` option you can also specify the file to write the output to: validate-secrets check-file secrets.txt google_api_key --file-format csv --output results.csv ``` +### Databricks Token Validation + +Validate Databricks Personal Access Tokens against a workspace. The `--host-url` flag provides the workspace URL: + +```bash +# Validate a single token +validate-secrets --host-url https://my-workspace.cloud.databricks.com validate "dapi1234abcd..." databricks_token +```` + ## License This project is licensed under the terms of the MIT open source license. Please refer to [LICENSE.md](LICENSE.md) for the full terms. diff --git a/src/validate_secrets/validators/databricks_token.py b/src/validate_secrets/validators/databricks_token.py new file mode 100644 index 0000000..406fa83 --- /dev/null +++ b/src/validate_secrets/validators/databricks_token.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +"""Validator for Databricks Personal Access Tokens.""" + +import os +import requests +import logging +from typing import Optional + +from ..core.base import Checker + +LOG = logging.getLogger(__name__) + + +class DatabricksTokenChecker(Checker): + """Class to check if a Databricks Personal Access Token is valid.""" + + name = "databricks_token" + description = "Validates Databricks Personal Access Tokens" + + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: + super().__init__(notify, debug, timeout, host_url) + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + # Fall back to DATABRICKS_HOST env var if host_url not provided + if not self.host_url: + env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/") + if env_host: + self.host_url = env_host + + def check(self, token: str) -> Optional[bool]: + """Check if a Databricks token is still active.""" + token = token.strip() + + if not self.host_url: + LOG.error( + "No host URL configured. Use --host-url or set DATABRICKS_HOST env var." + ) + return None + + if self.notify: + LOG.debug("Cannot notify Databricks tokens") + + try: + api_url = f"{self.host_url}/api/2.0/token/list" + request = self.session.prepare_request( + requests.Request("GET", api_url, headers={"Authorization": f"Bearer {token}"}) + ) + LOG.debug("Request URL: %s", api_url) + LOG.debug("Headers: %s", request.headers) + response = self.session.send(request, timeout=self.timeout) + + LOG.debug("Response status: %s", response.status_code) + LOG.debug("Response text: %s", response.text) + + if response.status_code == 200: + return True + elif response.status_code in (401, 403): + return False + else: + LOG.error( + "Error for token %s: %s; %s", + token[:10] + "...", + response.status_code, + response.text, + ) + return None + except Exception as e: + LOG.error(f"Error validating Databricks token: {e}") + return None diff --git a/tests/test_registry.py b/tests/test_registry.py index f33bd7f..94c6a6f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -31,6 +31,7 @@ def test_load_validators(self): assert "google_api_key" in validators assert "microsoft_teams_webhook" in validators assert "snyk_api_token" in validators + assert "databricks_token" in validators def test_get_validator(self): """Test getting a specific validator.""" diff --git a/tests/test_validators.py b/tests/test_validators.py index 68c50fc..a49f59e 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -12,6 +12,45 @@ from validate_secrets.validators.google_api_keys import GoogleApiKeyChecker from validate_secrets.validators.microsoft_teams_webhook import OfficeWebHookChecker from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker +from validate_secrets.validators.databricks_token import DatabricksTokenChecker + +class TestDatabricksTokenChecker: + """Test the Databricks token validator with host_url parameter.""" + + def test_host_from_named_parameter(self): + """Test that host_url can be set as a named parameter.""" + checker = DatabricksTokenChecker( + host_url="https://my-workspace.databricks.com" + ) + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_strips_trailing_slash(self): + """Test that trailing slash is stripped from host.""" + checker = DatabricksTokenChecker( + host_url="https://my-workspace.databricks.com/" + ) + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_from_env_var_fallback(self, monkeypatch): + """Test that DATABRICKS_HOST env var is used as fallback.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker() + assert checker.host_url == "https://env-workspace.databricks.com" + + def test_named_param_overrides_env_var(self, monkeypatch): + """Test that host_url parameter takes precedence over env var.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker( + host_url="https://cli-workspace.databricks.com" + ) + assert checker.host_url == "https://cli-workspace.databricks.com" + + def test_missing_host_returns_none(self, monkeypatch): + """Test that check returns None when host is not configured.""" + monkeypatch.delenv("DATABRICKS_HOST", raising=False) + checker = DatabricksTokenChecker() + result = checker.check("dapi_fake_token_123") + assert result is None class TestFodselsNummerChecker: @@ -114,6 +153,9 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ + DatabricksTokenChecker( + host_url="https://test.databricks.com" + ), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -128,6 +170,9 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ + DatabricksTokenChecker( + host_url="https://test.databricks.com" + ), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), From 70dd9a280a382167856b85ed4930eeac20b544cd Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:17:05 +0100 Subject: [PATCH 3/4] Refactor DatabricksTokenChecker test cases for improved readability --- tests/test_validators.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tests/test_validators.py b/tests/test_validators.py index a49f59e..f7baec9 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -14,21 +14,18 @@ from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker from validate_secrets.validators.databricks_token import DatabricksTokenChecker + class TestDatabricksTokenChecker: """Test the Databricks token validator with host_url parameter.""" def test_host_from_named_parameter(self): """Test that host_url can be set as a named parameter.""" - checker = DatabricksTokenChecker( - host_url="https://my-workspace.databricks.com" - ) + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com") assert checker.host_url == "https://my-workspace.databricks.com" def test_host_strips_trailing_slash(self): """Test that trailing slash is stripped from host.""" - checker = DatabricksTokenChecker( - host_url="https://my-workspace.databricks.com/" - ) + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com/") assert checker.host_url == "https://my-workspace.databricks.com" def test_host_from_env_var_fallback(self, monkeypatch): @@ -40,9 +37,7 @@ def test_host_from_env_var_fallback(self, monkeypatch): def test_named_param_overrides_env_var(self, monkeypatch): """Test that host_url parameter takes precedence over env var.""" monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") - checker = DatabricksTokenChecker( - host_url="https://cli-workspace.databricks.com" - ) + checker = DatabricksTokenChecker(host_url="https://cli-workspace.databricks.com") assert checker.host_url == "https://cli-workspace.databricks.com" def test_missing_host_returns_none(self, monkeypatch): @@ -153,9 +148,7 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ - DatabricksTokenChecker( - host_url="https://test.databricks.com" - ), + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -170,9 +163,7 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ - DatabricksTokenChecker( - host_url="https://test.databricks.com" - ), + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), From a2896a545a8f463d8d414abe6c9ec48e3517f647 Mon Sep 17 00:00:00 2001 From: Stefan Petrushevski Date: Tue, 21 Apr 2026 18:16:41 +0200 Subject: [PATCH 4/4] Fix host_url handling: move to subcommand, pass only to supporting validators - Move --host-url from global CLI group to subcommand-level option on validate, check-file, and check-github commands - Add _create_validator() helper using inspect.signature() to only pass kwargs that a validator's __init__ actually accepts, preventing breakage for validators that don't need host_url - Revert host_url from base Checker class - validators that need it (e.g. DatabricksTokenChecker) handle it in their own __init__ - Fix README code fence (4 backticks -> 3) and update example to reflect --host-url as subcommand option - Fix f-string in LOG.error to use %s style for consistency Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- README.md | 4 +- src/validate_secrets/cli.py | 54 +++++++++++++------ src/validate_secrets/core/base.py | 10 +--- .../validators/databricks_token.py | 7 ++- 4 files changed, 47 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 56aadad..bfb4ad2 100644 --- a/README.md +++ b/README.md @@ -207,8 +207,8 @@ Validate Databricks Personal Access Tokens against a workspace. The `--host-url` ```bash # Validate a single token -validate-secrets --host-url https://my-workspace.cloud.databricks.com validate "dapi1234abcd..." databricks_token -```` +validate-secrets validate "dapi1234abcd..." databricks_token --host-url https://my-workspace.cloud.databricks.com +``` ## License diff --git a/src/validate_secrets/cli.py b/src/validate_secrets/cli.py index ff7a037..df1c1d8 100644 --- a/src/validate_secrets/cli.py +++ b/src/validate_secrets/cli.py @@ -3,6 +3,7 @@ """Command line interface for validate-secrets.""" import sys +import inspect import logging import click @@ -25,6 +26,17 @@ console = Console() +def _create_validator(validator_class, **kwargs): + """Create a validator instance, only passing kwargs it accepts. + + This allows validator-specific options like host_url to be passed + without breaking validators that don't accept them. + """ + sig = inspect.signature(validator_class.__init__) + valid_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} + return validator_class(**valid_kwargs) + + @click.group() @click.option("--config", "-c", help="Path to .env configuration file") @click.option( @@ -33,12 +45,8 @@ is_flag=True, help="Enable debug logging. To use, add the flag as a first argument!", ) -@click.option( - "--host-url", - help="Base URL of the service to validate against", -) @click.pass_context -def cli(ctx, config, debug, host_url): +def cli(ctx, config, debug): """Extensible secret validation tool.""" ctx.ensure_object(dict) @@ -52,7 +60,6 @@ def cli(ctx, config, debug, host_url): ctx.obj["config"].setup_logging() ctx.obj["debug"] = debug - ctx.obj["host_url"] = host_url @cli.command() @@ -73,8 +80,12 @@ def cli(ctx, config, debug, host_url): help="Input file format", ) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify): +def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify, host_url): """Check secrets from a file.""" try: config = ctx.obj["config"] @@ -126,11 +137,12 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, try: # Get validator for this secret type validator_class = get_validator(current_secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], - host_url=ctx.obj.get("host_url"), + host_url=host_url, ) for secret_data in track( @@ -198,8 +210,14 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, help="Output format", ) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def check_github(ctx, org, repo, secret_type, state, validity, output, output_format, notify): +def check_github( + ctx, org, repo, secret_type, state, validity, output, output_format, notify, host_url +): """Check secrets from GitHub secret scanning alerts.""" try: config = ctx.obj["config"] @@ -247,11 +265,12 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo try: # Try to get validator using the GitHub secret type directly validator_class = get_validator(github_secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], - host_url=ctx.obj.get("host_url"), + host_url=host_url, ) status = validator.check(secret) @@ -328,8 +347,12 @@ def list_validators_cmd(): @click.argument("secret") @click.argument("secret_type", type=click.Choice(list_available_validators())) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def validate(ctx, secret, secret_type, notify): +def validate(ctx, secret, secret_type, notify, host_url): """Validate a single secret.""" try: config = ctx.obj["config"] @@ -337,11 +360,12 @@ def validate(ctx, secret, secret_type, notify): # Get validator validator_class = get_validator(secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], - host_url=ctx.obj.get("host_url"), + host_url=host_url, ) # Validate secret diff --git a/src/validate_secrets/core/base.py b/src/validate_secrets/core/base.py index 432eb41..44f295c 100644 --- a/src/validate_secrets/core/base.py +++ b/src/validate_secrets/core/base.py @@ -54,25 +54,17 @@ class Checker(ABC): name: str = "" description: str = "" - def __init__( - self, - notify: bool = False, - debug: bool = False, - timeout: int = 30, - host_url: Optional[str] = None, - ) -> None: + def __init__(self, notify: bool = False, debug: bool = False, timeout: int = 30) -> None: """Initialize the checker. Args: notify: Whether to send notifications to endpoints debug: Enable debug logging timeout: Timeout in seconds for validation - host_url: Base URL of the service to validate against """ self.notify = notify self.debug = debug self.timeout = timeout - self.host_url = host_url.rstrip("/") if host_url else None if self.debug: logging.getLogger().setLevel(logging.DEBUG) diff --git a/src/validate_secrets/validators/databricks_token.py b/src/validate_secrets/validators/databricks_token.py index 406fa83..466ab7d 100644 --- a/src/validate_secrets/validators/databricks_token.py +++ b/src/validate_secrets/validators/databricks_token.py @@ -25,10 +25,13 @@ def __init__( timeout: int = 30, host_url: Optional[str] = None, ) -> None: - super().__init__(notify, debug, timeout, host_url) + super().__init__(notify, debug, timeout) self.session = requests.Session() self.session.headers.update({"Content-Type": "application/json"}) + # Handle host_url: strip trailing slash + self.host_url = host_url.rstrip("/") if host_url else None + # Fall back to DATABRICKS_HOST env var if host_url not provided if not self.host_url: env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/") @@ -73,5 +76,5 @@ def check(self, token: str) -> Optional[bool]: ) return None except Exception as e: - LOG.error(f"Error validating Databricks token: {e}") + LOG.error("Error validating Databricks token: %s", e) return None