diff --git a/README.md b/README.md index 8a9d771..bfb4ad2 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ validate-secrets check-file input/secrets_file.json --file-format json --format | `google_api_key` | Google API Keys | AIza... format | | `microsoft_teams_webhook` | Microsoft Teams/Office 365 Webhooks | webhook.office.com URLs | | `snyk_api_token` | Snyk API Tokens | API tokens | +| `databricks_token` | Databricks Personal Access Tokens | `dapi...` format | Note: Most accurate way to see available validators is to run `validate-secrets list-validators` command. @@ -200,6 +201,15 @@ With the `--output` option you can also specify the file to write the output to: validate-secrets check-file secrets.txt google_api_key --file-format csv --output results.csv ``` +### Databricks Token Validation + +Validate Databricks Personal Access Tokens against a workspace. The `--host-url` flag provides the workspace URL: + +```bash +# Validate a single token +validate-secrets validate "dapi1234abcd..." databricks_token --host-url https://my-workspace.cloud.databricks.com +``` + ## License This project is licensed under the terms of the MIT open source license. Please refer to [LICENSE.md](LICENSE.md) for the full terms. diff --git a/src/validate_secrets/cli.py b/src/validate_secrets/cli.py index 481f8ab..df1c1d8 100644 --- a/src/validate_secrets/cli.py +++ b/src/validate_secrets/cli.py @@ -3,6 +3,7 @@ """Command line interface for validate-secrets.""" import sys +import inspect import logging import click @@ -25,6 +26,17 @@ console = Console() +def _create_validator(validator_class, **kwargs): + """Create a validator instance, only passing kwargs it accepts. + + This allows validator-specific options like host_url to be passed + without breaking validators that don't accept them. + """ + sig = inspect.signature(validator_class.__init__) + valid_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} + return validator_class(**valid_kwargs) + + @click.group() @click.option("--config", "-c", help="Path to .env configuration file") @click.option( @@ -68,8 +80,12 @@ def cli(ctx, config, debug): help="Input file format", ) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify): +def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify, host_url): """Check secrets from a file.""" try: config = ctx.obj["config"] @@ -121,10 +137,12 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, try: # Get validator for this secret type validator_class = get_validator(current_secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=host_url, ) for secret_data in track( @@ -192,8 +210,14 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, help="Output format", ) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def check_github(ctx, org, repo, secret_type, state, validity, output, output_format, notify): +def check_github( + ctx, org, repo, secret_type, state, validity, output, output_format, notify, host_url +): """Check secrets from GitHub secret scanning alerts.""" try: config = ctx.obj["config"] @@ -241,10 +265,12 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo try: # Try to get validator using the GitHub secret type directly validator_class = get_validator(github_secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=host_url, ) status = validator.check(secret) @@ -321,8 +347,12 @@ def list_validators_cmd(): @click.argument("secret") @click.argument("secret_type", type=click.Choice(list_available_validators())) @click.option("--notify", "-n", is_flag=True, help="Send notifications to endpoints") +@click.option( + "--host-url", + help="Base URL of the service to validate against (only used by validators that require it)", +) @click.pass_context -def validate(ctx, secret, secret_type, notify): +def validate(ctx, secret, secret_type, notify, host_url): """Validate a single secret.""" try: config = ctx.obj["config"] @@ -330,10 +360,12 @@ def validate(ctx, secret, secret_type, notify): # Get validator validator_class = get_validator(secret_type) - validator = validator_class( + validator = _create_validator( + validator_class, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=host_url, ) # Validate secret diff --git a/src/validate_secrets/validators/databricks_token.py b/src/validate_secrets/validators/databricks_token.py new file mode 100644 index 0000000..466ab7d --- /dev/null +++ b/src/validate_secrets/validators/databricks_token.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 + +"""Validator for Databricks Personal Access Tokens.""" + +import os +import requests +import logging +from typing import Optional + +from ..core.base import Checker + +LOG = logging.getLogger(__name__) + + +class DatabricksTokenChecker(Checker): + """Class to check if a Databricks Personal Access Token is valid.""" + + name = "databricks_token" + description = "Validates Databricks Personal Access Tokens" + + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: + super().__init__(notify, debug, timeout) + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + # Handle host_url: strip trailing slash + self.host_url = host_url.rstrip("/") if host_url else None + + # Fall back to DATABRICKS_HOST env var if host_url not provided + if not self.host_url: + env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/") + if env_host: + self.host_url = env_host + + def check(self, token: str) -> Optional[bool]: + """Check if a Databricks token is still active.""" + token = token.strip() + + if not self.host_url: + LOG.error( + "No host URL configured. Use --host-url or set DATABRICKS_HOST env var." + ) + return None + + if self.notify: + LOG.debug("Cannot notify Databricks tokens") + + try: + api_url = f"{self.host_url}/api/2.0/token/list" + request = self.session.prepare_request( + requests.Request("GET", api_url, headers={"Authorization": f"Bearer {token}"}) + ) + LOG.debug("Request URL: %s", api_url) + LOG.debug("Headers: %s", request.headers) + response = self.session.send(request, timeout=self.timeout) + + LOG.debug("Response status: %s", response.status_code) + LOG.debug("Response text: %s", response.text) + + if response.status_code == 200: + return True + elif response.status_code in (401, 403): + return False + else: + LOG.error( + "Error for token %s: %s; %s", + token[:10] + "...", + response.status_code, + response.text, + ) + return None + except Exception as e: + LOG.error("Error validating Databricks token: %s", e) + return None diff --git a/tests/test_registry.py b/tests/test_registry.py index f33bd7f..94c6a6f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -31,6 +31,7 @@ def test_load_validators(self): assert "google_api_key" in validators assert "microsoft_teams_webhook" in validators assert "snyk_api_token" in validators + assert "databricks_token" in validators def test_get_validator(self): """Test getting a specific validator.""" diff --git a/tests/test_validators.py b/tests/test_validators.py index 68c50fc..f7baec9 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -12,6 +12,40 @@ from validate_secrets.validators.google_api_keys import GoogleApiKeyChecker from validate_secrets.validators.microsoft_teams_webhook import OfficeWebHookChecker from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker +from validate_secrets.validators.databricks_token import DatabricksTokenChecker + + +class TestDatabricksTokenChecker: + """Test the Databricks token validator with host_url parameter.""" + + def test_host_from_named_parameter(self): + """Test that host_url can be set as a named parameter.""" + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com") + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_strips_trailing_slash(self): + """Test that trailing slash is stripped from host.""" + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com/") + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_from_env_var_fallback(self, monkeypatch): + """Test that DATABRICKS_HOST env var is used as fallback.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker() + assert checker.host_url == "https://env-workspace.databricks.com" + + def test_named_param_overrides_env_var(self, monkeypatch): + """Test that host_url parameter takes precedence over env var.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker(host_url="https://cli-workspace.databricks.com") + assert checker.host_url == "https://cli-workspace.databricks.com" + + def test_missing_host_returns_none(self, monkeypatch): + """Test that check returns None when host is not configured.""" + monkeypatch.delenv("DATABRICKS_HOST", raising=False) + checker = DatabricksTokenChecker() + result = checker.check("dapi_fake_token_123") + assert result is None class TestFodselsNummerChecker: @@ -114,6 +148,7 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -128,6 +163,7 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(),