diff --git a/.gitignore b/.gitignore index bac09d4..49cbdde 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,7 @@ tmp/ .ruff_cache/ logs/ + +# Output files from nac-collector +*.zip +*.json diff --git a/README.md b/README.md index bc89bcf..2a71c66 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,8 @@ Options: [env var: NAC_PASSWORD] * --url TEXT Base URL for the service (required for controller-based solutions) [env var: NAC_URL] + --op-item TEXT 1Password item reference to retrieve credentials + [env var: NAC_OP_ITEM] -v, --verbosity [CRITICAL|ERROR|WARNING|INFO|DEBUG] Log level [default: WARNING] -f, --fetch-latest Fetch the latest endpoint definitions from @@ -53,7 +55,36 @@ Options: --help Show this message and exit ``` -Set environment variables pointing to supported solution instance: +### Authentication Options + +nac-collector supports three methods of providing credentials (in order of precedence): + +1. **1Password CLI Integration** (recommended for security) +2. **Command-line options** (`--username`, `--password`, `--url`) +3. **Environment variables** (`NAC_USERNAME`, `NAC_PASSWORD`, `NAC_URL`) + +#### Using 1Password CLI + +Store credentials in 1Password and reference them with `--op-item`: + +```shell +# Using item name +nac-collector -s SDWAN --op-item "vManage Production" + +# Using environment variable +export NAC_OP_ITEM="vManage Production" +nac-collector -s SDWAN + +# Override specific fields from 1Password +nac-collector -s SDWAN --op-item "vManage" --url "https://custom-url.com" +``` + +**Prerequisites:** +- Install [1Password CLI](https://developer.1password.com/docs/cli/get-started/) +- Sign in with `op signin` +- Ensure your 1Password item contains `username`, `password`, and `url` fields + +#### Using Environment Variables ```shell export NAC_USERNAME=admin @@ -61,6 +92,12 @@ export NAC_PASSWORD=Cisco123 export NAC_URL=https://10.1.1.1 ``` +#### Using Command-line Options + +```shell +nac-collector -s SDWAN --username admin --password Cisco123 --url https://10.1.1.1 +``` + ## Examples ### SDWAN diff --git a/nac_collector/cli/main.py b/nac_collector/cli/main.py index 799d820..6f9dec9 100644 --- a/nac_collector/cli/main.py +++ b/nac_collector/cli/main.py @@ -20,6 +20,10 @@ from nac_collector.device.nxos import CiscoClientNXOS from nac_collector.device_inventory import load_devices_from_file from nac_collector.endpoint_resolver import EndpointResolver +from nac_collector.onepassword_helper import ( + OnePasswordError, + get_credentials_from_op, +) console = Console() logger = logging.getLogger("main") @@ -131,6 +135,14 @@ def main( help="Base URL for the service", ), ] = None, + op_item: Annotated[ + str | None, + typer.Option( + "--op-item", + envvar="NAC_OP_ITEM", + help="1Password item reference (name, UUID, or share link) to retrieve credentials", + ), + ] = None, verbosity: Annotated[ LogLevel, typer.Option("-v", "--verbosity", help="Log level"), @@ -177,6 +189,26 @@ def main( configure_logging(verbosity) + # Retrieve credentials from 1Password if op_item is provided + if op_item: + try: + op_username, op_password, op_url = get_credentials_from_op(op_item) + + # Use 1Password credentials if not overridden by explicit options + if op_username and not username: + username = op_username + logger.debug("Using username from 1Password") + if op_password and not password: + password = op_password + logger.debug("Using password from 1Password") + if op_url and not url: + url = op_url + logger.debug("Using URL from 1Password") + + except OnePasswordError as e: + console.print(f"[red]1Password error: {e}[/red]") + raise typer.Exit(1) from e + # Define device-based solutions DEVICE_BASED_SOLUTIONS = [Solution.IOSXE, Solution.IOSXR, Solution.NXOS] diff --git a/nac_collector/controller/sdwan.py b/nac_collector/controller/sdwan.py index 6bdd9b8..ab7a84c 100644 --- a/nac_collector/controller/sdwan.py +++ b/nac_collector/controller/sdwan.py @@ -44,6 +44,9 @@ def authenticate(self) -> bool: """ Perform token-based authentication. + Handles URLs with SSO bypass paths (e.g., /login.html) by using them + for authentication but extracting the base URL for API calls. + Returns: bool: True if authentication is successful, False otherwise. """ @@ -63,8 +66,19 @@ def authenticate(self) -> bool: logger.error("No valid JSESSION ID returned") jsessionid = None + # Extract base URL for API calls, removing common login paths + # This handles SSO bypass URLs like https://vmanage.../login.html + api_base_url = self.base_url + for login_path in ["/login.html", "/login", "/index.html"]: + if api_base_url.endswith(login_path): + api_base_url = api_base_url[: -len(login_path)] + logger.debug( + f"Detected SSO bypass path, using base URL: {api_base_url}" + ) + break + headers = {"Cookie": jsessionid} if jsessionid else {} - url = self.base_url + "/dataservice/client/token" + url = api_base_url + "/dataservice/client/token" response = httpx.get( url=url, headers=headers, verify=self.ssl_verify, timeout=self.timeout ) @@ -84,7 +98,7 @@ def authenticate(self) -> bool: "X-XSRF-TOKEN": response.text, } ) - self.base_url = self.base_url + "/dataservice" + self.base_url = api_base_url + "/dataservice" return True logger.error( diff --git a/nac_collector/onepassword_helper.py b/nac_collector/onepassword_helper.py new file mode 100644 index 0000000..7feb040 --- /dev/null +++ b/nac_collector/onepassword_helper.py @@ -0,0 +1,152 @@ +"""Helper module for 1Password CLI integration.""" + +import json +import logging +import subprocess +from typing import Any + +logger = logging.getLogger(__name__) + + +class OnePasswordError(Exception): + """Exception raised for 1Password CLI errors.""" + + pass + + +def check_op_cli_available() -> bool: + """ + Check if 1Password CLI is available and accessible. + + Returns: + bool: True if op CLI is available, False otherwise. + """ + try: + result = subprocess.run( + ["op", "--version"], + capture_output=True, + text=True, + check=False, + timeout=5, + ) + return result.returncode == 0 + except (subprocess.SubprocessError, FileNotFoundError): + return False + + +def get_op_item(item_reference: str) -> dict[str, Any]: + """ + Retrieve an item from 1Password using the CLI. + + Args: + item_reference: The item reference (item name, UUID, or share link). + + Returns: + Dictionary containing the item data. + + Raises: + OnePasswordError: If retrieval fails or op CLI is not available. + """ + if not check_op_cli_available(): + raise OnePasswordError( + "1Password CLI (op) is not available. " + "Please install from https://developer.1password.com/docs/cli/get-started/" + ) + + try: + result = subprocess.run( + ["op", "item", "get", item_reference, "--format", "json"], + capture_output=True, + text=True, + check=True, + timeout=30, + ) + item_dict: dict[str, Any] = json.loads(result.stdout) + return item_dict + except subprocess.CalledProcessError as e: + logger.error(f"Failed to retrieve 1Password item: {e.stderr}") + raise OnePasswordError( + f"Failed to retrieve item '{item_reference}': {e.stderr.strip()}" + ) from e + except subprocess.TimeoutExpired as e: + raise OnePasswordError( + f"Timeout while retrieving item '{item_reference}'" + ) from e + except json.JSONDecodeError as e: + raise OnePasswordError( + f"Failed to parse 1Password CLI output for '{item_reference}'" + ) from e + + +def extract_credentials( + item_data: dict[str, Any] +) -> tuple[str | None, str | None, str | None]: + """ + Extract username, password, and URL from 1Password item data. + + Args: + item_data: The 1Password item data dictionary. + + Returns: + Tuple of (username, password, url). Any field may be None if not found. + Note: URL paths like /login.html are preserved for SSO bypass requirements. + """ + username = None + password = None + url = None + + # Extract from fields array + fields = item_data.get("fields", []) + for field in fields: + field_id = field.get("id", "").lower() + field_label = field.get("label", "").lower() + field_value = field.get("value") + + # Match username field + if field_id == "username" or "username" in field_label: + username = field_value + # Match password field + elif field_id == "password" or "password" in field_label: + password = field_value + # Match URL field + elif field_id in ["url", "website"] or any( + term in field_label for term in ["url", "website", "address"] + ): + url = field_value + + # Try to extract URL from urls array if not found in fields + if not url: + urls = item_data.get("urls", []) + if urls and len(urls) > 0: + url = urls[0].get("href") + + return username, password, url + + +def get_credentials_from_op( + item_reference: str, +) -> tuple[str | None, str | None, str | None]: + """ + Get credentials from 1Password for a given item reference. + + Args: + item_reference: The 1Password item reference. + + Returns: + Tuple of (username, password, url). + + Raises: + OnePasswordError: If retrieval or parsing fails. + """ + logger.debug(f"Retrieving credentials from 1Password item: {item_reference}") + + item_data = get_op_item(item_reference) + username, password, url = extract_credentials(item_data) + + logger.debug( + f"Extracted from 1Password - username: {'***' if username else 'None'}, " + f"password: {'***' if password else 'None'}, " + f"url: {url or 'None'}" + ) + + return username, password, url diff --git a/tests/test_onepassword_helper.py b/tests/test_onepassword_helper.py new file mode 100644 index 0000000..33b2257 --- /dev/null +++ b/tests/test_onepassword_helper.py @@ -0,0 +1,210 @@ +"""Tests for 1Password helper module.""" + +import json +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from nac_collector.onepassword_helper import ( + OnePasswordError, + check_op_cli_available, + extract_credentials, + get_credentials_from_op, + get_op_item, +) + + +class TestCheckOpCliAvailable: + """Tests for check_op_cli_available function.""" + + @patch("subprocess.run") + def test_op_cli_available(self, mock_run: MagicMock) -> None: + """Test when op CLI is available.""" + mock_run.return_value = MagicMock(returncode=0) + assert check_op_cli_available() is True + + @patch("subprocess.run") + def test_op_cli_not_available(self, mock_run: MagicMock) -> None: + """Test when op CLI is not available.""" + mock_run.return_value = MagicMock(returncode=1) + assert check_op_cli_available() is False + + @patch("subprocess.run") + def test_op_cli_file_not_found(self, mock_run: MagicMock) -> None: + """Test when op command is not found.""" + mock_run.side_effect = FileNotFoundError() + assert check_op_cli_available() is False + + @patch("subprocess.run") + def test_op_cli_timeout(self, mock_run: MagicMock) -> None: + """Test when op command times out.""" + mock_run.side_effect = subprocess.TimeoutExpired("op", 5) + assert check_op_cli_available() is False + + +class TestGetOpItem: + """Tests for get_op_item function.""" + + @patch("nac_collector.onepassword_helper.check_op_cli_available") + def test_op_cli_not_available(self, mock_check: MagicMock) -> None: + """Test when op CLI is not available.""" + mock_check.return_value = False + with pytest.raises(OnePasswordError, match="not available"): + get_op_item("test-item") + + @patch("subprocess.run") + @patch("nac_collector.onepassword_helper.check_op_cli_available") + def test_successful_retrieval( + self, mock_check: MagicMock, mock_run: MagicMock + ) -> None: + """Test successful item retrieval.""" + mock_check.return_value = True + item_data = {"id": "test123", "title": "Test Item"} + mock_run.return_value = MagicMock( + returncode=0, stdout=json.dumps(item_data), stderr="" + ) + + result = get_op_item("test-item") + assert result == item_data + mock_run.assert_called_once_with( + ["op", "item", "get", "test-item", "--format", "json"], + capture_output=True, + text=True, + check=True, + timeout=30, + ) + + @patch("subprocess.run") + @patch("nac_collector.onepassword_helper.check_op_cli_available") + def test_item_not_found(self, mock_check: MagicMock, mock_run: MagicMock) -> None: + """Test when item is not found.""" + mock_check.return_value = True + mock_run.side_effect = subprocess.CalledProcessError( + 1, "op", stderr="[ERROR] item not found" + ) + + with pytest.raises(OnePasswordError, match="Failed to retrieve"): + get_op_item("nonexistent-item") + + @patch("subprocess.run") + @patch("nac_collector.onepassword_helper.check_op_cli_available") + def test_timeout(self, mock_check: MagicMock, mock_run: MagicMock) -> None: + """Test timeout during retrieval.""" + mock_check.return_value = True + mock_run.side_effect = subprocess.TimeoutExpired("op", 30) + + with pytest.raises(OnePasswordError, match="Timeout"): + get_op_item("test-item") + + @patch("subprocess.run") + @patch("nac_collector.onepassword_helper.check_op_cli_available") + def test_invalid_json(self, mock_check: MagicMock, mock_run: MagicMock) -> None: + """Test invalid JSON response.""" + mock_check.return_value = True + mock_run.return_value = MagicMock(returncode=0, stdout="not valid json") + + with pytest.raises(OnePasswordError, match="Failed to parse"): + get_op_item("test-item") + + +class TestExtractCredentials: + """Tests for extract_credentials function.""" + + def test_extract_all_fields(self) -> None: + """Test extracting all credential fields.""" + item_data = { + "fields": [ + {"id": "username", "label": "username", "value": "admin"}, + {"id": "password", "label": "password", "value": "secret123"}, + {"id": "url", "label": "url", "value": "https://example.com"}, + ] + } + username, password, url = extract_credentials(item_data) + assert username == "admin" + assert password == "secret123" + assert url == "https://example.com" + + def test_extract_from_labels(self) -> None: + """Test extracting from field labels.""" + item_data = { + "fields": [ + {"id": "field1", "label": "Username", "value": "testuser"}, + {"id": "field2", "label": "Password", "value": "testpass"}, + {"id": "field3", "label": "Website Address", "value": "https://test.com"}, + ] + } + username, password, url = extract_credentials(item_data) + assert username == "testuser" + assert password == "testpass" + assert url == "https://test.com" + + def test_extract_url_from_urls_array(self) -> None: + """Test extracting URL from urls array.""" + item_data = { + "fields": [ + {"id": "username", "value": "admin"}, + {"id": "password", "value": "secret"}, + ], + "urls": [{"href": "https://fallback.com"}], + } + username, password, url = extract_credentials(item_data) + assert username == "admin" + assert password == "secret" + assert url == "https://fallback.com" + + def test_missing_fields(self) -> None: + """Test with missing fields.""" + item_data = {"fields": [{"id": "username", "value": "admin"}]} + username, password, url = extract_credentials(item_data) + assert username == "admin" + assert password is None + assert url is None + + def test_empty_item(self) -> None: + """Test with empty item data.""" + item_data: dict[str, list[dict[str, str]]] = {"fields": []} + username, password, url = extract_credentials(item_data) + assert username is None + assert password is None + assert url is None + + def test_case_insensitive_matching(self) -> None: + """Test case-insensitive field matching.""" + item_data = { + "fields": [ + {"id": "USERNAME", "label": "USERNAME", "value": "admin"}, + {"id": "PASSWORD", "label": "PASSWORD", "value": "secret"}, + ] + } + username, password, url = extract_credentials(item_data) + assert username == "admin" + assert password == "secret" + + +class TestGetCredentialsFromOp: + """Tests for get_credentials_from_op function.""" + + @patch("nac_collector.onepassword_helper.get_op_item") + def test_successful_credential_retrieval(self, mock_get_item: MagicMock) -> None: + """Test successful credential retrieval.""" + mock_get_item.return_value = { + "fields": [ + {"id": "username", "value": "admin"}, + {"id": "password", "value": "secret123"}, + {"id": "url", "value": "https://vmanage.example.com"}, + ] + } + + username, password, url = get_credentials_from_op("my-item") + assert username == "admin" + assert password == "secret123" + assert url == "https://vmanage.example.com" + + @patch("nac_collector.onepassword_helper.get_op_item") + def test_op_error_propagation(self, mock_get_item: MagicMock) -> None: + """Test that OnePasswordError is propagated.""" + mock_get_item.side_effect = OnePasswordError("Test error") + + with pytest.raises(OnePasswordError, match="Test error"): + get_credentials_from_op("my-item")