diff --git a/CHANGELOG.md b/CHANGELOG.md index c3116d3..766f8b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.15.0] - 2025-09-04 +### Added +- Support for following forwarder methods: + - Patch forwarder + - Delete forwarder +- CLI command for following forwarder: + - Create forwarder + - Get forwarder + - List Forwarder + - Get Or Create forwarder +- Chronicle client methods for forwarder: + - Create forwarder + - Get forwarder + - List forwarder + ## [0.14.2] - 2025-09-03 ### Added - Support for list basis and time window params in list detections method. diff --git a/CLI.md b/CLI.md index 6341814..a2e58d3 100644 --- a/CLI.md +++ b/CLI.md @@ -204,6 +204,63 @@ secops log types --search "windows" > **Note:** Chronicle uses parsers to process and normalize raw log data into UDM format. If you're ingesting logs for a custom format, you may need to create or configure parsers. See the [Parser Management](#parser-management) section for details on managing parsers. +### Forwarder Management + +Log forwarders in Chronicle are used to ingest logs with specific configurations. The CLI provides commands for creating and managing forwarders. + +#### Create a new forwarder: + +```bash +# Create a basic forwarder +secops forwarder create --display-name "my-custom-forwarder" + +# Create a forwarder with metadata and http settings +secops forwarder create --display-name "my-forwarder" --metadata '{"environment":"prod","team":"security"}' --upload-compression true --enable-server true --http-settings '{"port":80,"host":"example.com"}' +``` + +#### List all forwarders: + +```bash +# List forwarders with default page size (50) +secops forwarder list + +# List forwarders with custom page size +secops forwarder list --page-size 100 +``` + +#### Get forwarder details: + +```bash +# Get a specific forwarder by ID +secops forwarder get --id "1234567890" +``` + +#### Get or create a forwarder: + +```bash +# Get an existing forwarder by display name or create a new one if it doesn't exist +secops forwarder get-or-create --display-name "my-app-forwarder" +``` + +#### Update a forwarder: + +```bash +# Update a forwarder's display name +secops forwarder update --id "1234567890" --display-name "updated-forwarder-name" + +# Update a forwarder with multiple properties +secops forwarder update --id "1234567890" --display-name "prod-forwarder" --upload-compression true --http-settings '{"port":80,"host":"example.com"}' + +# Update specific fields using update mask +secops forwarder update --id "1234567890" --display-name "prod-forwarder" --update-mask "display_name" +``` + +#### Delete a forwarder: + +```bash +# Delete a forwarder by ID +secops forwarder delete --id "1234567890" +``` ### Generate UDM Key/Value Mapping diff --git a/README.md b/README.md index 36a6b37..918d03f 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,121 @@ result = chronicle.ingest_log( ) ``` +### Forwarder Management + +Chronicle log forwarders are essential for handling log ingestion with specific configurations. The SDK provides comprehensive methods for creating and managing forwarders: + +#### Create a new forwarder + +```python +# Create a basic forwarder with just a display name +forwarder = chronicle.create_forwarder(display_name="MyAppForwarder") + +# Create a forwarder with optional configuration +forwarder = chronicle.create_forwarder( + display_name="ProductionForwarder", + metadata={"labels": {"env": "prod"}}, + upload_compression=True, # Enable upload compression for efficiency + enable_server=False # Server functionality disabled, + http_settings={ + "port":8080, + "host":"192.168.0.100", + "routeSettings":{ + "availableStatusCode": 200, + "readyStatusCode": 200, + "unreadyStatusCode": 500 + } + } +) + +print(f"Created forwarder with ID: {forwarder['name'].split('/')[-1]}") +``` + +#### List all forwarders + +Retrieve all forwarders in your Chronicle environment with pagination support: + +```python +# Get the default page size (50) +forwarders = chronicle.list_forwarders() + +# Get forwarders with custom page size +forwarders = chronicle.list_forwarders(page_size=100) + +# Process the forwarders +for forwarder in forwarders.get("forwarders", []): + forwarder_id = forwarder.get("name", "").split("/")[-1] + display_name = forwarder.get("displayName", "") + create_time = forwarder.get("createTime", "") + print(f"Forwarder ID: {forwarder_id}, Name: {display_name}, Created: {create_time}") +``` + +#### Get forwarder details + +Retrieve details about a specific forwarder using its ID: + +```python +# Get a specific forwarder using its ID +forwarder_id = "1234567890" +forwarder = chronicle.get_forwarder(forwarder_id=forwarder_id) + +# Access forwarder properties +display_name = forwarder.get("displayName", "") +metadata = forwarder.get("metadata", {}) +server_enabled = forwarder.get("enableServer", False) + +print(f"Forwarder {display_name} details:") +print(f" Metadata: {metadata}") +print(f" Server enabled: {server_enabled}") +``` + +#### Get or create a forwarder + +Retrieve an existing forwarder by display name or create a new one if it doesn't exist: + +```python +# Try to find a forwarder with the specified display name +# If not found, create a new one with that display name +forwarder = chronicle.get_or_create_forwarder(display_name="ApplicationLogForwarder") + +# Extract the forwarder ID for use in log ingestion +forwarder_id = forwarder["name"].split("/")[-1] +``` + +#### Update a forwarder + +Update an existing forwarder's configuration with specific properties: + +```python +# Update a forwarder with new properties +forwarder = chronicle.update_forwarder( + forwarder_id="1234567890", + display_name="UpdatedForwarderName", + metadata={"labels": {"env": "prod"}}, + upload_compression=True +) + +# Update specific fields using update mask +forwarder = chronicle.update_forwarder( + forwarder_id="1234567890", + display_name="ProdForwarder", + update_mask=["display_name"] +) + +print(f"Updated forwarder: {forwarder['name']}") +``` + +#### Delete a forwarder + +Delete an existing forwarder by its ID: + +```python +# Delete a forwarder by ID +chronicle.delete_forwarder(forwarder_id="1234567890") + +print("Forwarder deleted successfully") +``` + 5. Use custom timestamps: ```python from datetime import datetime, timedelta, timezone diff --git a/api_module_mapping.md b/api_module_mapping.md index 6dbaa2f..bef2b8f 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -197,13 +197,13 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |forwarders.collectors.get |v1alpha| | | |forwarders.collectors.list |v1alpha| | | |forwarders.collectors.patch |v1alpha| | | -|forwarders.create |v1alpha|chronicle.log_ingest.create_forwarder | | -|forwarders.delete |v1alpha| | | +|forwarders.create |v1alpha|chronicle.log_ingest.create_forwarder |secops forwarder create | +|forwarders.delete |v1alpha|chronicle.log_ingest.delete_forwarder |secops forwarder delete | |forwarders.generateForwarderFiles |v1alpha| | | -|forwarders.get |v1alpha|chronicle.log_ingest.get_forwarder | | +|forwarders.get |v1alpha|chronicle.log_ingest.get_forwarder |secops forwarder get | |forwarders.importStatsEvents |v1alpha| | | -|forwarders.list |v1alpha|chronicle.log_ingest.list_forwarders | | -|forwarders.patch |v1alpha| | | +|forwarders.list |v1alpha|chronicle.log_ingest.list_forwarder |secops forwarder list | +|forwarders.patch |v1alpha|chronicle.log_ingest.update_forwarder |secops forwarder update | |generateCollectionAgentAuth |v1alpha| | | |generateSoarAuthJwt |v1alpha| | | |generateUdmKeyValueMappings |v1alpha| | | diff --git a/examples/forwarder_example.py b/examples/forwarder_example.py new file mode 100644 index 0000000..41038e6 --- /dev/null +++ b/examples/forwarder_example.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""Example usage of Google SecOps SDK for Chronicle Forwarder Management. + +This example demonstrates how to use the Chronicle Forwarder Management API +via the Google SecOps SDK to create, list, get, update, and delete forwarders. +""" + +import argparse +import json +import time +from datetime import datetime + +from secops import SecOpsClient +from secops.chronicle import ChronicleClient +from secops.exceptions import APIError + + +def get_client(project_id, customer_id, region): + """Initialize and return the Chronicle client. + + Args: + project_id: Google Cloud Project ID. + customer_id: Chronicle Customer ID (UUID). + region: Chronicle region (us or eu). + + Returns: + Chronicle client instance. + """ + client = SecOpsClient() + chronicle = client.chronicle( + customer_id=customer_id, project_id=project_id, region=region + ) + return chronicle + + +def example_create_forwarder(chronicle: ChronicleClient): + """Example: Create a new log forwarder in Chronicle. + + Args: + chronicle: Initialized Chronicle client. + + Returns: + The created forwarder ID. + """ + print("\n=== Example: Create a log forwarder ===") + + # Generate a unique name for our example forwarder using current timestamp + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + display_name = f"Example-Forwarder-{timestamp}" + + try: + # Create a forwarder with some basic configuration + forwarder = chronicle.create_forwarder( + display_name=display_name, + metadata={"labels": {"env": "test", "purpose": "sdk-example"}}, + upload_compression=True, + enable_server=False, + http_settings={"port": 8080, "host": "192.168.0.100"}, + ) + + forwarder_id = forwarder["name"].split("/")[-1] + print(f"Successfully created forwarder:") + print(f" ID: {forwarder_id}") + print(f" Display Name: {forwarder.get('displayName')}") + print(f" Upload Compression: {forwarder.get('uploadCompression')}") + + # Return forwarder ID for use in other examples + return forwarder_id + + except APIError as e: + print(f"Error creating forwarder: {e}") + return None + + +def example_get_forwarder(chronicle: ChronicleClient, forwarder_id: str): + """Example: Get a forwarder by ID. + + Args: + chronicle: Initialized Chronicle client. + forwarder_id: ID of the forwarder to retrieve. + """ + print("\n=== Example: Get a forwarder by ID ===") + + try: + forwarder = chronicle.get_forwarder(forwarder_id=forwarder_id) + + print(f"Retrieved forwarder details:") + print(f" ID: {forwarder_id}") + print(f" Display Name: {forwarder.get('displayName')}") + print(f" Create Time: {forwarder.get('createTime')}") + print(f" Config: {json.dumps(forwarder.get('config', {}), indent=2)}") + + except APIError as e: + print(f"Error retrieving forwarder: {e}") + + +def example_list_forwarders(chronicle: ChronicleClient): + """Example: List all forwarders in Chronicle. + + Args: + chronicle: Initialized Chronicle client. + """ + print("\n=== Example: List all forwarders ===") + + try: + # Get first page of forwarders with a small page size for demo + response = chronicle.list_forwarders(page_size=5) + forwarders = response.get("forwarders", []) + + print(f"Retrieved {len(forwarders)} forwarders:") + + for idx, forwarder in enumerate(forwarders, 1): + forwarder_id = forwarder["name"].split("/")[-1] + print( + f" {idx}. {forwarder.get('displayName')} (ID: {forwarder_id})" + ) + + # Check if there are more pages + if "nextPageToken" in response: + print( + f"\nMore forwarders available. Next page token: {response['nextPageToken']}" + ) + + except APIError as e: + print(f"Error listing forwarders: {e}") + + +def example_update_forwarder(chronicle: ChronicleClient, forwarder_id: str): + """Example: Update an existing forwarder. + + Args: + chronicle: Initialized Chronicle client. + forwarder_id: ID of the forwarder to update. + """ + print("\n=== Example: Update a forwarder ===") + + try: + # First, get current forwarder to show before/after + before = chronicle.get_forwarder(forwarder_id=forwarder_id) + print("Current forwarder configuration:") + print(f" Display Name: {before.get('displayName')}") + print(f" Config: {json.dumps(before.get('config', {}), indent=2)}") + + # Update the forwarder with new metadata + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + updated_name = f"{before.get('displayName')}-Updated-{timestamp}" + + # Update specific fields + updated = chronicle.update_forwarder( + forwarder_id=forwarder_id, + display_name=updated_name, + metadata={ + "labels": { + "env": "test", + "purpose": "sdk-example", + "updated": "true", + } + }, + ) + + print("\nUpdated forwarder configuration:") + print(f" Display Name: {updated.get('displayName')}") + print(f" Config: {updated.get('config')}") + + except APIError as e: + print(f"Error updating forwarder: {e}") + + +def example_delete_forwarder(chronicle: ChronicleClient, forwarder_id: str): + """Example: Delete a forwarder. + + Args: + chronicle: Initialized Chronicle client. + forwarder_id: ID of the forwarder to delete. + """ + print("\n=== Example: Delete a forwarder ===") + + try: + # Delete the forwarder + chronicle.delete_forwarder(forwarder_id=forwarder_id) + print(f"Successfully deleted forwarder with ID: {forwarder_id}") + + # Verify deletion by trying to get the forwarder (should fail) + print("\nVerifying deletion...") + try: + chronicle.get_forwarder(forwarder_id=forwarder_id) + print("Error: Forwarder still exists!") + except APIError as e: + if "not found" in str(e).lower(): + print("Verification successful: Forwarder no longer exists") + else: + print(f"Error during verification: {e}") + + except APIError as e: + print(f"Error deleting forwarder: {e}") + + +def main(): + """Run the forwarder management examples.""" + parser = argparse.ArgumentParser( + description="Example usage of Google SecOps SDK for Chronicle Forwarder Management" + ) + parser.add_argument("--project-id", required=True, help="GCP project ID") + parser.add_argument( + "--customer-id", required=True, help="Chronicle customer ID" + ) + parser.add_argument( + "--region", default="us", choices=["us", "eu"], help="Chronicle region" + ) + args = parser.parse_args() + + # Initialize the Chronicle client + chronicle = get_client(args.project_id, args.customer_id, args.region) + + print("Google SecOps SDK - Chronicle Forwarder Management Examples") + print("----------------------------------------------------------") + + # Run the example to create a new test forwarder + forwarder_id = example_create_forwarder(chronicle) + if not forwarder_id: + print("Failed to create test forwarder. Exiting.") + return + + # Wait a moment for the forwarder to be fully created + print("\nWaiting for forwarder to be ready...") + time.sleep(2) + + # Run the example to get a forwarder by ID + example_get_forwarder(chronicle, forwarder_id) + + # Run the example to list all forwarders + example_list_forwarders(chronicle) + + # Run the example to update the forwarder + example_update_forwarder(chronicle, forwarder_id) + + # Finally, run the example to delete the forwarder + example_delete_forwarder(chronicle, forwarder_id) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 5a65626..0f659e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.14.2" +version = "0.15.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.7" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index b43d05f..5ba8118 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -46,8 +46,8 @@ # Import data table and reference list classes from secops.chronicle.data_table import ( DataTableColumnType, - update_data_table, replace_data_table_rows, + update_data_table, ) from secops.chronicle.entity import summarize_entity from secops.chronicle.gemini import ( @@ -59,11 +59,13 @@ from secops.chronicle.ioc import list_iocs from secops.chronicle.log_ingest import ( create_forwarder, + delete_forwarder, extract_forwarder_id, get_forwarder, get_or_create_forwarder, ingest_log, list_forwarders, + update_forwarder, ) from secops.chronicle.log_types import ( LogType, @@ -165,10 +167,12 @@ # Log Ingestion "ingest_log", "create_forwarder", + "delete_forwarder", "get_or_create_forwarder", "list_forwarders", "get_forwarder", "extract_forwarder_id", + "update_forwarder", # Log Types "LogType", "get_all_log_types", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 44eefc6..ee48367 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -87,7 +87,12 @@ from secops.chronicle.gemini import query_gemini as _query_gemini from secops.chronicle.ioc import list_iocs as _list_iocs from secops.chronicle.log_ingest import ( + delete_forwarder as _delete_forwarder, + create_forwarder as _create_forwarder, + get_forwarder as _get_forwarder, get_or_create_forwarder as _get_or_create_forwarder, + list_forwarders as _list_forwarders, + update_forwarder as _update_forwarder, ) from secops.chronicle.log_ingest import ingest_log as _ingest_log from secops.chronicle.log_ingest import ingest_udm as _ingest_udm @@ -1848,6 +1853,151 @@ def ingest_log( labels=labels, ) + def create_forwarder( + self, + display_name: str, + metadata: Optional[Dict[str, Any]] = None, + upload_compression: bool = False, + enable_server: bool = False, + regex_filters: Optional[List[Dict[str, Any]]] = None, + graceful_timeout: Optional[str] = None, + drain_timeout: Optional[str] = None, + http_settings: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Create a new forwarder in Chronicle. + + Args: + display_name: User-specified name for the forwarder + metadata: Optional forwarder metadata (asset_namespace, labels) + upload_compression: Whether uploaded data should be compressed + enable_server: Whether server functionality is enabled on + the forwarder + regex_filters: Regex filters applied at the forwarder level + graceful_timeout: Timeout, after which the forwarder returns a bad + readiness/health check and still accepts new connections + drain_timeout: Timeout, after which the forwarder waits for active + connections to successfully close on their own before being + closed by the server + http_settings: HTTP-specific server settings + + Returns: + Dictionary containing the created forwarder details + + Raises: + APIError: If the API request fails + """ + return _create_forwarder( + self, + display_name=display_name, + metadata=metadata, + upload_compression=upload_compression, + enable_server=enable_server, + regex_filters=regex_filters, + graceful_timeout=graceful_timeout, + drain_timeout=drain_timeout, + http_settings=http_settings, + ) + + def list_forwarders( + self, + page_size: Optional[int] = None, + page_token: Optional[str] = None, + ) -> Dict[str, Any]: + """List forwarders in Chronicle. + + Args: + page_size: Maximum number of forwarders to return (1-1000) + page_token: Token for pagination + + Returns: + Dictionary containing list of forwarders and next page token + + Raises: + APIError: If the API request fails + """ + return _list_forwarders( + self, + page_size=page_size, + page_token=page_token, + ) + + def get_forwarder(self, forwarder_id: str) -> Dict[str, Any]: + """Get a forwarder by ID. + + Args: + forwarder_id: ID of the forwarder to retrieve + + Returns: + Dictionary containing the forwarder details + + Raises: + APIError: If the API request fails + """ + return _get_forwarder(self, forwarder_id=forwarder_id) + + def update_forwarder( + self, + forwarder_id: str, + display_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + upload_compression: Optional[bool] = None, + enable_server: Optional[bool] = None, + regex_filters: Optional[List[Dict[str, Any]]] = None, + graceful_timeout: Optional[str] = None, + drain_timeout: Optional[str] = None, + http_settings: Optional[Dict[str, Any]] = None, + update_mask: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Update a forwarder in Chronicle. + + Args: + forwarder_id: ID of the forwarder to update + display_name: New display name for the forwarder + metadata: New metadata key-value pairs for the forwarder + upload_compression: New upload compression setting + enable_server: New server enabled setting + regex_filters: New regex filter patterns and actions + graceful_timeout: New graceful timeout duration for server + drain_timeout: New drain timeout duration for server + http_settings: New HTTP server settings + update_mask: List of field paths to update. + If not provided, all fields with non-None values + will be updated. + + Returns: + Dictionary containing the updated forwarder details + + Raises: + APIError: If the API request fails + """ + return _update_forwarder( + self, + forwarder_id=forwarder_id, + display_name=display_name, + metadata=metadata, + upload_compression=upload_compression, + enable_server=enable_server, + regex_filters=regex_filters, + graceful_timeout=graceful_timeout, + drain_timeout=drain_timeout, + http_settings=http_settings, + update_mask=update_mask, + ) + + def delete_forwarder(self, forwarder_id: str) -> Dict[str, Any]: + """Delete a forwarder from Chronicle. + + Args: + forwarder_id: ID of the forwarder to delete + + Returns: + Dictionary containing the empty response (usually {}) + + Raises: + APIError: If the API request fails + """ + return _delete_forwarder(self, forwarder_id=forwarder_id) + def get_or_create_forwarder( self, display_name: str = "Wrapper-SDK-Forwarder" ) -> Dict[str, Any]: diff --git a/src/secops/chronicle/log_ingest.py b/src/secops/chronicle/log_ingest.py index 28fe1b0..070a0ae 100644 --- a/src/secops/chronicle/log_ingest.py +++ b/src/secops/chronicle/log_ingest.py @@ -315,6 +315,10 @@ def create_forwarder( metadata: Optional[Dict[str, Any]] = None, upload_compression: bool = False, enable_server: bool = False, + regex_filters: Optional[List[Dict[str, Any]]] = None, + graceful_timeout: Optional[str] = None, + drain_timeout: Optional[str] = None, + http_settings: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Create a new forwarder in Chronicle. @@ -324,6 +328,13 @@ def create_forwarder( metadata: Optional forwarder metadata (asset_namespace, labels) upload_compression: Whether uploaded data should be compressed enable_server: Whether server functionality is enabled on the forwarder + regex_filters: Regex filters applied at the forwarder level + graceful_timeout: Timeout, after which the forwarder returns a bad + readiness/health check and still accepts new connections + drain_timeout: Timeout, after which the forwarder waits for active + connections to successfully close on their own before being closed + by the server + http_settings: HTTP-specific server settings Returns: Dictionary containing the created forwarder details @@ -341,11 +352,27 @@ def create_forwarder( "metadata": metadata or {}, "serverSettings": { "enabled": enable_server, + "gracefulTimeout": graceful_timeout, + "drainTimeout": drain_timeout, "httpSettings": {"routeSettings": {}}, }, }, } + if regex_filters: + payload["config"]["regexFilters"] = regex_filters + + if graceful_timeout: + payload["config"]["serverSettings"][ + "gracefulTimeout" + ] = graceful_timeout + + if drain_timeout: + payload["config"]["serverSettings"]["drainTimeout"] = drain_timeout + + if http_settings: + payload["config"]["serverSettings"]["httpSettings"] = http_settings + # Send the request response = client.session.post(url, json=payload) @@ -358,7 +385,7 @@ def create_forwarder( def list_forwarders( client: "ChronicleClient", - page_size: int = 50, + page_size: Optional[int] = None, page_token: Optional[str] = None, ) -> Dict[str, Any]: """List forwarders in Chronicle. @@ -393,7 +420,7 @@ def list_forwarders( result = response.json() # If there's a next page token, fetch additional pages and combine results - if "nextPageToken" in result and result["nextPageToken"]: + if not page_size and "nextPageToken" in result and result["nextPageToken"]: next_page = list_forwarders(client, page_size, result["nextPageToken"]) if "forwarders" in next_page and next_page["forwarders"]: # Combine the forwarders from both pages @@ -431,6 +458,160 @@ def get_forwarder( return response.json() +def update_forwarder( + client: "ChronicleClient", + forwarder_id: str, + display_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + upload_compression: Optional[bool] = None, + enable_server: Optional[bool] = None, + regex_filters: Optional[List[Dict[str, Any]]] = None, + graceful_timeout: Optional[str] = None, + drain_timeout: Optional[str] = None, + http_settings: Optional[Dict[str, Any]] = None, + update_mask: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Update an existing forwarder. + + Args: + client: The initialized Chronicle client. + forwarder_id: ID of the forwarder to update. + display_name: Display name for the forwarder. + metadata: Metadata key-value pairs for the forwarder. + upload_compression: Upload compression setting. + enable_server: Server enabled setting. + regex_filters: Regex filter patterns and actions. + graceful_timeout: Graceful timeout duration for server. + drain_timeout: Drain timeout duration for server. + http_settings: HTTP server settings. + update_mask: List of field paths to update. If not provided, all fields + with non-None values will be updated. + + Returns: + Dict containing the updated forwarder details. + + Raises: + APIError: If the API returns an error response. + """ + url = f"{client.base_url}/{client.instance_id}/forwarders/{forwarder_id}" + + auto_mask = [] # Update mask if not provided in argument + payload = {} + + if display_name is not None: + payload["displayName"] = display_name + auto_mask.append("display_name") + + # Check if we need to include config and its fields + has_config = any( + param is not None + for param in [ + metadata, + upload_compression, + regex_filters, + enable_server, + graceful_timeout, + drain_timeout, + http_settings, + ] + ) + + if has_config: + payload["config"] = {} + + # Add metadata if provided + if metadata: + payload["config"]["metadata"] = metadata + auto_mask.append("config.metadata") + + # Add upload compression if provided + if upload_compression is not None: + payload["config"]["uploadCompression"] = upload_compression + auto_mask.append("config.upload_compression") + + # Add regex filters if provided + if regex_filters: + payload["config"]["regexFilters"] = regex_filters + auto_mask.append("config.regex_filters") + + # Initialize serverSettings if any server-related fields are provided + if any( + param is not None + for param in [ + enable_server, + graceful_timeout, + drain_timeout, + http_settings, + ] + ): + payload["config"]["serverSettings"] = {} + + if enable_server is not None: + payload["config"]["serverSettings"]["enabled"] = enable_server + auto_mask.append("config.server_settings.enabled") + + if graceful_timeout: + payload["config"]["serverSettings"][ + "gracefulTimeout" + ] = graceful_timeout + auto_mask.append("config.server_settings.graceful_timeout") + + if drain_timeout: + payload["config"]["serverSettings"][ + "drainTimeout" + ] = drain_timeout + auto_mask.append("config.server_settings.drain_timeout") + + if http_settings: + payload["config"]["serverSettings"][ + "httpSettings" + ] = http_settings + auto_mask.append("config.server_settings.http_settings") + + # Prepare query parameters for update mask + params = {} + if update_mask: + # Use user-provided update mask + params["updateMask"] = ",".join(update_mask) + else: + params["updateMask"] = ",".join(auto_mask) + + # Send the request + response = client.session.patch(url, json=payload, params=params) + + # Check for errors + if response.status_code != 200: + raise APIError(f"Failed to update forwarder: {response.text}") + + return response.json() + + +def delete_forwarder( + client: "ChronicleClient", + forwarder_id: str, +) -> Dict[str, Any]: + """Delete a forwarder from Chronicle. + + Args: + client: ChronicleClient instance + forwarder_id: ID of the forwarder to delete + + Returns: + Dict containing the empty response (usually {}) + + Raises: + APIError: If the API returns an error response. + """ + url = f"{client.base_url}/{client.instance_id}/forwarders/{forwarder_id}" + + response = client.session.delete(url) + + if response.status_code != 200: + raise APIError(f"Failed to delete forwarder: {response.text}") + + return response.json() + + def _find_forwarder_by_display_name( client: "ChronicleClient", display_name: str ) -> Optional[Dict[str, Any]]: diff --git a/src/secops/cli.py b/src/secops/cli.py index b031b2b..e6384f2 100644 --- a/src/secops/cli.py +++ b/src/secops/cli.py @@ -3839,6 +3839,380 @@ def handle_dashboard_query_get_command(args, chronicle): sys.exit(1) +def setup_forwarder_command(subparsers): + """Set up the forwarder command parser.""" + forwarder_parser = subparsers.add_parser( + "forwarder", help="Manage log forwarders" + ) + forwarder_subparsers = forwarder_parser.add_subparsers( + dest="forwarder_command", help="Forwarder command" + ) + + # Create forwarder command + create_parser = forwarder_subparsers.add_parser( + "create", help="Create a new forwarder" + ) + create_parser.add_argument( + "--display-name", + "--display_name", + dest="display_name", + required=True, + help="Display name for the new forwarder", + ) + create_parser.add_argument( + "--metadata", help="JSON string of metadata to attach to the forwarder" + ) + create_parser.add_argument( + "--upload-compression", + "--upload_compression", + dest="upload_compression", + choices=["true", "false"], + help="Enable upload compression", + ) + create_parser.add_argument( + "--enable-server", + "--enable_server", + dest="enable_server", + choices=["true", "false"], + help="Enable server functionality on the forwarder", + ) + create_parser.add_argument( + "--regex-filters", + "--regex_filters", + dest="regex_filters", + help="JSON string of regex filters to apply at the forwarder level", + ) + create_parser.add_argument( + "--graceful-timeout", + "--graceful_timeout", + dest="graceful_timeout", + help="Timeout after which the forwarder returns a bad readiness check", + ) + create_parser.add_argument( + "--drain-timeout", + "--drain_timeout", + dest="drain_timeout", + help="Timeout after which the forwarder waits for connections to close", + ) + create_parser.add_argument( + "--http-settings", + "--http_settings", + dest="http_settings", + help="JSON string of HTTP-specific server settings", + ) + create_parser.set_defaults(func=handle_forwarder_create_command) + + # Update forwarder command + patch_parser = forwarder_subparsers.add_parser( + "update", help="Update an existing forwarder" + ) + patch_parser.add_argument( + "--id", required=True, help="ID of the forwarder to update" + ) + patch_parser.add_argument( + "--display-name", + "--display_name", + dest="display_name", + help="New display name for the forwarder", + ) + patch_parser.add_argument( + "--metadata", help="JSON string of metadata to attach to the forwarder" + ) + patch_parser.add_argument( + "--upload-compression", + "--upload_compression", + dest="upload_compression", + choices=["true", "false"], + help="Whether uploaded data should be compressed", + ) + patch_parser.add_argument( + "--enable-server", + "--enable_server", + dest="enable_server", + choices=["true", "false"], + help="Enable server functionality on the forwarder", + ) + patch_parser.add_argument( + "--regex-filters", + "--regex_filters", + dest="regex_filters", + help="JSON string of regex filters to apply at the forwarder level", + ) + patch_parser.add_argument( + "--graceful-timeout", + "--graceful_timeout", + dest="graceful_timeout", + help="Timeout after which the forwarder returns a bad readiness check", + ) + patch_parser.add_argument( + "--drain-timeout", + "--drain_timeout", + dest="drain_timeout", + help="Timeout after which the forwarder waits for connections to close", + ) + patch_parser.add_argument( + "--http-settings", + "--http_settings", + dest="http_settings", + help="JSON string of HTTP-specific server settings", + ) + patch_parser.add_argument( + "--update-mask", + "--update_mask", + dest="update_mask", + help="Comma-separated list of field paths to update", + ) + patch_parser.set_defaults(func=handle_forwarder_patch_command) + + # List forwarders command + list_parser = forwarder_subparsers.add_parser( + "list", help="List all forwarders" + ) + list_parser.add_argument( + "--page-size", + "--page_size", + dest="page_size", + type=int, + help="Maximum number of forwarders to return (1-1000)", + ) + list_parser.add_argument( + "--page-token", + "--page_token", + dest="page_token", + type=str, + help="Page token for pagination", + ) + list_parser.set_defaults(func=handle_forwarder_list_command) + + # Get forwarder command + get_parser = forwarder_subparsers.add_parser( + "get", help="Get details of a specific forwarder" + ) + get_parser.add_argument( + "--id", required=True, help="ID of the forwarder to retrieve" + ) + get_parser.set_defaults(func=handle_forwarder_get_command) + + # Get or create forwarder command + get_or_create_parser = forwarder_subparsers.add_parser( + "get-or-create", help="Get an existing forwarder or create a new one" + ) + get_or_create_parser.add_argument( + "--display-name", + "--display_name", + dest="display_name", + default="Wrapper-SDK-Forwarder", + help="Display name to find or create (default: Wrapper-SDK-Forwarder)", + ) + get_or_create_parser.set_defaults( + func=handle_forwarder_get_or_create_command + ) + + # Delete forwarder command + delete_parser = forwarder_subparsers.add_parser( + "delete", help="Delete a specific forwarder" + ) + delete_parser.add_argument( + "--id", required=True, help="ID of the forwarder to delete" + ) + delete_parser.set_defaults(func=handle_forwarder_delete_command) + + +def handle_forwarder_create_command(args, chronicle): + """Handle creating a new forwarder.""" + try: + # Parse JSON strings into Python objects + metadata = None + regex_filters = None + http_settings = None + + if args.metadata: + try: + metadata = json.loads(args.metadata) + except json.JSONDecodeError: + print("Error: Metadata must be valid JSON", file=sys.stderr) + sys.exit(1) + + if args.regex_filters: + try: + regex_filters = json.loads(args.regex_filters) + except json.JSONDecodeError: + print( + "Error: Regex filters must be valid JSON", file=sys.stderr + ) + sys.exit(1) + + if args.http_settings: + try: + http_settings = json.loads(args.http_settings) + except json.JSONDecodeError: + print( + "Error: HTTP settings must be valid JSON", file=sys.stderr + ) + sys.exit(1) + + # Convert string values to appropriate types + upload_compression = None + if args.upload_compression: + upload_compression = args.upload_compression.lower() == "true" + + enable_server = None + if args.enable_server: + enable_server = args.enable_server.lower() == "true" + + result = chronicle.create_forwarder( + display_name=args.display_name, + metadata=metadata, + upload_compression=upload_compression, + enable_server=enable_server, + regex_filters=regex_filters, + graceful_timeout=args.graceful_timeout, + drain_timeout=args.drain_timeout, + http_settings=http_settings, + ) + + print(json.dumps(result, indent=2)) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_forwarder_list_command(args, chronicle): + """Handle listing all forwarders.""" + try: + result = chronicle.list_forwarders( + page_size=args.page_size, page_token=args.page_token + ) + print(json.dumps(result, indent=2)) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_forwarder_get_command(args, chronicle): + """Handle getting a specific forwarder.""" + try: + result = chronicle.get_forwarder(forwarder_id=args.id) + print(json.dumps(result, indent=2)) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_forwarder_get_or_create_command(args, chronicle): + """Handle getting or creating a forwarder.""" + try: + result = chronicle.get_or_create_forwarder( + display_name=args.display_name + ) + print(json.dumps(result, indent=2)) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error getting query: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_forwarder_patch_command(args, chronicle): + """Handle updating an existing forwarder.""" + try: + # Process metadata if provided + metadata = None + if args.metadata: + try: + metadata = json.loads(args.metadata) + except json.JSONDecodeError: + print( + f"Error: Invalid JSON in metadata: {args.metadata}", + file=sys.stderr, + ) + sys.exit(1) + + # Process regex filters if provided + regex_filters = None + if args.regex_filters: + try: + regex_filters = json.loads(args.regex_filters) + except json.JSONDecodeError: + print( + "Error: Invalid JSON in regex_filters: " + f"{args.regex_filters}", + file=sys.stderr, + ) + sys.exit(1) + + # Process HTTP settings if provided + http_settings = None + if args.http_settings: + try: + http_settings = json.loads(args.http_settings) + except json.JSONDecodeError: + print( + "Error: Invalid JSON in http_settings: " + f"{args.http_settings}", + file=sys.stderr, + ) + sys.exit(1) + + # Process boolean flags + upload_compression = None + if args.upload_compression: + upload_compression = args.upload_compression.lower() == "true" + + enable_server = None + if args.enable_server: + enable_server = args.enable_server.lower() == "true" + + # Process update_mask + update_mask = None + if args.update_mask: + update_mask = [ + field.strip() for field in args.update_mask.split(",") + ] + + result = chronicle.update_forwarder( + forwarder_id=args.id, + display_name=args.display_name, + metadata=metadata, + upload_compression=upload_compression, + enable_server=enable_server, + regex_filters=regex_filters, + graceful_timeout=args.graceful_timeout, + drain_timeout=args.drain_timeout, + http_settings=http_settings, + update_mask=update_mask, + ) + print(json.dumps(result, indent=2)) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error patching forwarder: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_forwarder_delete_command(args, chronicle): + """Handle deleting a specific forwarder.""" + try: + chronicle.delete_forwarder(forwarder_id=args.id) + print( + json.dumps( + { + "success": True, + "message": f"Forwarder {args.id} deleted successfully", + }, + indent=2, + ) + ) + except APIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error deleting forwarder: {e}", file=sys.stderr) + sys.exit(1) + + def main() -> None: """Main entry point for the CLI.""" parser = argparse.ArgumentParser(description="Google SecOps CLI") @@ -3869,6 +4243,7 @@ def main() -> None: setup_data_table_command(subparsers) # Add data table command setup_reference_list_command(subparsers) # Add reference list command setup_rule_exclusion_command(subparsers) # Add rule exclusion command + setup_forwarder_command(subparsers) # Add forwarder command setup_config_command(subparsers) setup_help_command(subparsers) setup_dashboard_command(subparsers) @@ -3898,6 +4273,7 @@ def main() -> None: "export", "gemini", "rule-exclusion", + "forwarder", "dashboard", ] requires_chronicle = any(cmd in args.command for cmd in chronicle_commands) diff --git a/tests/chronicle/test_forwarder_integration.py b/tests/chronicle/test_forwarder_integration.py new file mode 100644 index 0000000..83511aa --- /dev/null +++ b/tests/chronicle/test_forwarder_integration.py @@ -0,0 +1,183 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle forwarder methods.""" +import pytest +import uuid + +from secops import SecOpsClient +from secops.exceptions import APIError +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON + + +@pytest.mark.integration +def test_forwarder_lifecycle(): + """Test complete lifecycle of forwarders with API. + + Tests the following client methods: + - create_forwarder + - list_forwarders + - get_forwarder + - update_forwarder + - delete_forwarder + - get_or_create_forwarder (both for existing and new) + """ + # Initialize client + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Generate unique forwarder name to avoid conflicts + unique_id = str(uuid.uuid4())[:8] + display_name = f"Test-Forwarder-{unique_id}" + + # Tracking created forwarder IDs for cleanup + created_forwarders = [] + + try: + print(f"\n--- Testing Forwarder Lifecycle ---") + print(f"Using display_name: {display_name}") + + # Step 1: Create a new forwarder + print("\n1. Testing create_forwarder()") + forwarder = chronicle.create_forwarder(display_name=display_name) + + # Verify forwarder was created + assert forwarder is not None + assert "name" in forwarder + assert "displayName" in forwarder + assert forwarder["displayName"] == display_name + + # Store forwarder ID for cleanup + forwarder_id = forwarder["name"].split("/")[-1] + created_forwarders.append(forwarder_id) + print(f"Created forwarder with ID: {forwarder_id}") + + # Step 2: List forwarders and verify our forwarder is in the list + print("\n2. Testing list_forwarders()") + forwarders = chronicle.list_forwarders() + + # Verify list returned results + assert forwarders is not None + assert "forwarders" in forwarders + + # Find our forwarder in the list + found = False + for f in forwarders["forwarders"]: + if f["name"].split("/")[-1] == forwarder_id: + found = True + break + + assert ( + found + ), f"Created forwarder {forwarder_id} not found in list results" + print(f"Successfully found forwarder {forwarder_id} in list results") + + # Step 3: Get specific forwarder + print("\n3. Testing get_forwarder()") + get_result = chronicle.get_forwarder(forwarder_id=forwarder_id) + + # Verify get returned the correct forwarder + assert get_result is not None + assert get_result["name"].split("/")[-1] == forwarder_id + assert get_result["displayName"] == display_name + print(f"Successfully retrieved forwarder {forwarder_id}") + + # Step 4: Update forwarder + print("\n4. Testing update_forwarder()") + updated_display_name = f"{display_name}-updated" + updated_forwarder = chronicle.update_forwarder( + forwarder_id=forwarder_id, + display_name=updated_display_name, + metadata={"labels": {"env": "test"}}, + ) + + # Verify update was successful + assert updated_forwarder is not None + assert updated_forwarder["name"].split("/")[-1] == forwarder_id + assert updated_forwarder["displayName"] == updated_display_name + print(f"Successfully updated forwarder to {updated_display_name}") + + # Verify the update was applied by getting the forwarder again + updated_get_result = chronicle.get_forwarder(forwarder_id=forwarder_id) + assert updated_get_result["displayName"] == updated_display_name + assert ( + updated_get_result["config"]["metadata"]["labels"]["env"] == "test" + ) + print("Verified update was applied correctly") + + # Step 5: Test get_or_create_forwarder for existing forwarder + print("\n5. Testing get_or_create_forwarder() with existing forwarder") + existing_result = chronicle.get_or_create_forwarder( + display_name=updated_display_name + ) + + # Verify we got the existing forwarder + assert existing_result is not None + assert existing_result["name"].split("/")[-1] == forwarder_id + assert existing_result["displayName"] == updated_display_name + print(f"Successfully retrieved existing forwarder {forwarder_id}") + + # Step 6: Test get_or_create_forwarder for new forwarder + print("\n6. Testing get_or_create_forwarder() with new forwarder") + new_display_name = f"New-Test-Forwarder-{unique_id}" + new_result = chronicle.get_or_create_forwarder( + display_name=new_display_name + ) + + # Verify we created a new forwarder + assert new_result is not None + assert "name" in new_result + assert new_result["displayName"] == new_display_name + + # Store forwarder ID for cleanup + new_forwarder_id = new_result["name"].split("/")[-1] + created_forwarders.append(new_forwarder_id) + print(f"Created new forwarder with ID: {new_forwarder_id}") + + # Step 7: Test delete_forwarder + print("\n7. Testing delete_forwarder()") + delete_result = chronicle.delete_forwarder(forwarder_id=forwarder_id) + + # Verify delete was successful + assert delete_result is not None + print(f"Successfully deleted forwarder {forwarder_id}") + + # Verify forwarder was actually deleted by trying to get it + try: + chronicle.get_forwarder(forwarder_id=forwarder_id) + pytest.fail(f"Forwarder {forwarder_id} still exists after deletion") + except APIError as e: + # Expected error for deleted resource + assert "not found" in str(e).lower() or "404" in str(e) + print("Verified deletion by confirming forwarder no longer exists") + + # Remove from cleanup list since it's already deleted + created_forwarders.remove(forwarder_id) + + except APIError as e: + print(f"\nAPI Error details: {str(e)}") + # Skip the test rather than fail if permissions are not available + if "permission" in str(e).lower(): + pytest.skip("Insufficient permissions to manage forwarders") + raise + + finally: + # Cleanup any remaining created forwarders + print("\n--- Cleanup Phase ---") + for fwd_id in created_forwarders: + try: + print(f"Cleaning up forwarder {fwd_id}") + chronicle.delete_forwarder(forwarder_id=fwd_id) + except Exception as e: + print(f"Error during cleanup of forwarder {fwd_id}: {str(e)}") diff --git a/tests/chronicle/test_log_ingest.py b/tests/chronicle/test_log_ingest.py index a777aaf..8753c71 100644 --- a/tests/chronicle/test_log_ingest.py +++ b/tests/chronicle/test_log_ingest.py @@ -25,8 +25,10 @@ get_or_create_forwarder, list_forwarders, create_forwarder, + update_forwarder, extract_forwarder_id, ingest_udm, + delete_forwarder, ) from secops.exceptions import APIError @@ -53,7 +55,17 @@ def mock_forwarder_response(): "displayName": "Wrapper-SDK-Forwarder", "createTime": "2025-01-01T00:00:00.000Z", "updateTime": "2025-01-01T00:00:00.000Z", - "config": {"uploadCompression": False, "metadata": {}}, + "config": { + "uploadCompression": False, + "metadata": {"environment": "test"}, + "regexFilters": [{"pattern": ".*error.*", "action": "INCLUDE"}], + "serverSettings": { + "enabled": True, + "gracefulTimeout": "30s", + "drainTimeout": "60s", + "httpSettings": {"routeSettings": {}}, + }, + }, } return mock @@ -77,6 +89,41 @@ def mock_forwarders_list_response(): return mock +@pytest.fixture +def mock_patch_forwarder_response(): + """Create a mock patch forwarder API response.""" + mock = Mock() + mock.status_code = 200 + mock.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/forwarders/test-forwarder-id", + "displayName": "Updated-Forwarder-Name", + "createTime": "2025-01-01T00:00:00.000Z", + "updateTime": "2025-01-02T00:00:00.000Z", + "config": { + "uploadCompression": True, + "metadata": {"environment": "production"}, + "regexFilters": [{"pattern": ".*error.*", "action": "INCLUDE"}], + "serverSettings": { + "enabled": True, + "gracefulTimeout": "45s", + "drainTimeout": "90s", + "httpSettings": {"routeSettings": {"port": 8080}}, + }, + }, + } + return mock + + +@pytest.fixture +def mock_delete_forwarder_response(): + """Create a mock delete forwarder API response.""" + mock = Mock() + mock.status_code = 200 + # DELETE operations typically return an empty response + mock.json.return_value = {} + return mock + + @pytest.fixture def mock_ingest_response(): """Create a mock log ingestion API response.""" @@ -145,6 +192,13 @@ def test_create_forwarder(chronicle_client, mock_forwarder_response): ) assert result["displayName"] == "Wrapper-SDK-Forwarder" + # Verify the request was called with default parameters + call_args = chronicle_client.session.post.call_args + assert call_args is not None + payload = call_args[1]["json"] + assert payload["displayName"] == "Wrapper-SDK-Forwarder" + assert payload["config"]["uploadCompression"] is False + def test_create_forwarder_error(chronicle_client): """Test error handling when creating a forwarder.""" @@ -152,13 +206,67 @@ def test_create_forwarder_error(chronicle_client): error_response.status_code = 400 error_response.text = "Invalid request" - with patch.object(chronicle_client.session, "post", return_value=error_response): + with patch.object( + chronicle_client.session, "post", return_value=error_response + ): with pytest.raises(APIError, match="Failed to create forwarder"): create_forwarder( client=chronicle_client, display_name="Wrapper-SDK-Forwarder" ) +def test_create_forwarder_with_config( + chronicle_client, mock_forwarder_response +): + """Test creating a forwarder with advanced configuration parameters.""" + # Define test values for the advanced configuration + metadata = {"environment": "production", "team": "security"} + regex_filters = [{"pattern": ".*error.*", "action": "INCLUDE"}] + graceful_timeout = "45s" + drain_timeout = "90s" + http_settings = {"routeSettings": {"port": 8080}} + + with patch.object( + chronicle_client.session, "post", return_value=mock_forwarder_response + ): + result = create_forwarder( + client=chronicle_client, + display_name="Advanced-Forwarder", + metadata=metadata, + upload_compression=True, + enable_server=True, + regex_filters=regex_filters, + graceful_timeout=graceful_timeout, + drain_timeout=drain_timeout, + http_settings=http_settings, + ) + + # Verify the result + assert result["displayName"] == "Wrapper-SDK-Forwarder" # From the mock + + # Verify the request payload contains all parameters + call_args = chronicle_client.session.post.call_args + assert call_args is not None + payload = call_args[1]["json"] + + # Check all parameters were passed correctly + assert payload["displayName"] == "Advanced-Forwarder" + assert payload["config"]["uploadCompression"] is True + assert payload["config"]["metadata"] == metadata + assert payload["config"]["regexFilters"] == regex_filters + assert payload["config"]["serverSettings"]["enabled"] is True + assert ( + payload["config"]["serverSettings"]["gracefulTimeout"] + == graceful_timeout + ) + assert ( + payload["config"]["serverSettings"]["drainTimeout"] == drain_timeout + ) + assert ( + payload["config"]["serverSettings"]["httpSettings"] == http_settings + ) + + def test_list_forwarders(chronicle_client, mock_forwarders_list_response): """Test listing forwarders.""" with patch.object( @@ -595,3 +703,269 @@ def test_ingest_log_backward_compatibility( assert "data" in log_entry decoded_data = base64.b64decode(log_entry["data"]).decode("utf-8") assert json.loads(decoded_data) == {"test": "log", "message": "Test message"} + + +def test_patch_forwarder(chronicle_client, mock_patch_forwarder_response): + """Test basic patch forwarder functionality.""" + with patch.object( + chronicle_client.session, + "patch", + return_value=mock_patch_forwarder_response, + ): + result = update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + display_name="Updated-Forwarder-Name", + ) + + # Verify the result + assert result["displayName"] == "Updated-Forwarder-Name" + assert ( + result["name"] + == "projects/test-project/locations/us/instances/test-customer/forwarders/test-forwarder-id" + ) + + # Verify the request was made correctly + call_args = chronicle_client.session.patch.call_args + assert call_args is not None + + # Check URL format + url = call_args[0][0] + assert ( + "test-project/locations/us/instances/test-customer/forwarders/test-forwarder-id" + in url + ) + + # Check payload + payload = call_args[1]["json"] + assert payload["displayName"] == "Updated-Forwarder-Name" + assert len(payload.keys()) == 1 # Only displayName should be in payload + + # Verify auto-generated update_mask query param + assert "params" in call_args[1] + assert call_args[1]["params"]["updateMask"] == "display_name" + + +def test_patch_forwarder_error(chronicle_client): + """Test error handling when patching a forwarder.""" + error_response = Mock() + error_response.status_code = 400 + error_response.text = "Invalid request" + + with patch.object( + chronicle_client.session, "patch", return_value=error_response + ): + with pytest.raises(APIError, match="Failed to update forwarder"): + update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + display_name="Updated-Name", + ) + + +def test_patch_forwarder_with_all_options( + chronicle_client, mock_patch_forwarder_response +): + """Test patching a forwarder with all available options.""" + # Define test values for configuration + metadata = {"environment": "production", "team": "security"} + regex_filters = [{"pattern": ".*error.*", "action": "INCLUDE"}] + graceful_timeout = "45s" + drain_timeout = "90s" + http_settings = {"routeSettings": {"port": 8080}} + + with patch.object( + chronicle_client.session, + "patch", + return_value=mock_patch_forwarder_response, + ): + result = update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + display_name="Updated-Forwarder-Name", + metadata=metadata, + upload_compression=True, + enable_server=True, + regex_filters=regex_filters, + graceful_timeout=graceful_timeout, + drain_timeout=drain_timeout, + http_settings=http_settings, + ) + + # Verify the result + assert result["displayName"] == "Updated-Forwarder-Name" + + # Verify the request payload contains all parameters + call_args = chronicle_client.session.patch.call_args + payload = call_args[1]["json"] + + # Check all parameters were passed correctly + assert payload["displayName"] == "Updated-Forwarder-Name" + assert payload["config"]["uploadCompression"] is True + assert payload["config"]["metadata"] == metadata + assert payload["config"]["regexFilters"] == regex_filters + assert payload["config"]["serverSettings"]["enabled"] is True + assert ( + payload["config"]["serverSettings"]["gracefulTimeout"] + == graceful_timeout + ) + assert ( + payload["config"]["serverSettings"]["drainTimeout"] == drain_timeout + ) + assert ( + payload["config"]["serverSettings"]["httpSettings"] == http_settings + ) + + +def test_patch_forwarder_with_update_mask( + chronicle_client, mock_patch_forwarder_response +): + """Test patching a forwarder with update mask.""" + update_mask = ["display_name", "config.metadata"] + metadata = {"environment": "staging"} + + with patch.object( + chronicle_client.session, + "patch", + return_value=mock_patch_forwarder_response, + ): + result = update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + display_name="Updated-Name", + metadata=metadata, + upload_compression=True, # This should be ignored by update_mask + update_mask=update_mask, + ) + + # Verify the result + assert ( + result["displayName"] == "Updated-Forwarder-Name" + ) # From the mock + + # Verify update_mask query parameter + call_args = chronicle_client.session.patch.call_args + assert "params" in call_args[1] + assert ( + call_args[1]["params"]["updateMask"] + == "display_name,config.metadata" + ) + + # Verify payload contains all fields regardless of update_mask + # (API should respect update_mask parameter) + payload = call_args[1]["json"] + assert payload["displayName"] == "Updated-Name" + assert payload["config"]["metadata"] == metadata + assert payload["config"]["uploadCompression"] is True + + +def test_patch_forwarder_partial_update( + chronicle_client, mock_patch_forwarder_response +): + """Test patching only specific fields of a forwarder.""" + with patch.object( + chronicle_client.session, + "patch", + return_value=mock_patch_forwarder_response, + ): + result = update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + upload_compression=True, + ) + + # Verify the result + assert ( + result["displayName"] == "Updated-Forwarder-Name" + ) # From the mock + + # Verify the request payload contains only the specified parameter + call_args = chronicle_client.session.patch.call_args + payload = call_args[1]["json"] + + assert "displayName" not in payload + assert "config" in payload + assert payload["config"]["uploadCompression"] is True + assert len(payload["config"]) == 1 # Only uploadCompression + + # Verify auto-generated update_mask contains the correct field path + assert "params" in call_args[1] + assert ( + call_args[1]["params"]["updateMask"] == "config.upload_compression" + ) + + +def test_auto_generated_update_mask_multiple_fields( + chronicle_client, mock_patch_forwarder_response +): + """Test auto-generation of update_mask with multiple fields.""" + metadata = {"environment": "staging"} + http_settings = {"routeSettings": {"port": 8080}} + + with patch.object( + chronicle_client.session, + "patch", + return_value=mock_patch_forwarder_response, + ): + result = update_forwarder( + client=chronicle_client, + forwarder_id="test-forwarder-id", + display_name="New-Name", + metadata=metadata, + http_settings=http_settings, + ) + + # Verify the result + assert ( + result["displayName"] == "Updated-Forwarder-Name" + ) # From the mock + + # Verify auto-generated update_mask contains all modified fields + call_args = chronicle_client.session.patch.call_args + assert "params" in call_args[1] + update_mask = call_args[1]["params"]["updateMask"] + + # Check that all fields are included in update_mask + assert "display_name" in update_mask + assert "config.metadata" in update_mask + assert "config.server_settings.http_settings" in update_mask + + # Verify payload contains all specified fields + payload = call_args[1]["json"] + assert payload["displayName"] == "New-Name" + assert payload["config"]["metadata"] == metadata + assert ( + payload["config"]["serverSettings"]["httpSettings"] == http_settings + ) + +def test_delete_forwarder(chronicle_client, mock_delete_forwarder_response): + """Test deleting a forwarder.""" + with patch.object( + chronicle_client.session, "delete", return_value=mock_delete_forwarder_response + ): + result = delete_forwarder(client=chronicle_client, forwarder_id="test-forwarder-id") + + # Verify the result (should be empty for delete operations) + assert result == {} + + # Verify the request was made with the correct URL + call_args = chronicle_client.session.delete.call_args + assert call_args is not None + + # Check URL format contains the forwarder ID + url = call_args[0][0] + assert ( + "test-project/locations/us/instances/test-customer/forwarders/test-forwarder-id" + in url + ) + + +def test_delete_forwarder_error(chronicle_client): + """Test error handling when deleting a forwarder.""" + error_response = Mock() + error_response.status_code = 400 + error_response.text = "Invalid request" + + with patch.object(chronicle_client.session, "delete", return_value=error_response): + with pytest.raises(APIError, match="Failed to delete forwarder"): + delete_forwarder(client=chronicle_client, forwarder_id="test-forwarder-id") diff --git a/tests/cli/test_forwarder_integration.py b/tests/cli/test_forwarder_integration.py new file mode 100644 index 0000000..02b5556 --- /dev/null +++ b/tests/cli/test_forwarder_integration.py @@ -0,0 +1,335 @@ +"""Integration tests for the SecOps CLI forwarder commands.""" + +import pytest +import subprocess +import json +import uuid + + +@pytest.mark.integration +def test_cli_forwarder_lifecycle(cli_env, common_args): + """Test forwarder creation, update, deletion, and retrieval (full lifecycle).""" + # Generate a unique display name for testing + test_display_name = f"test-forwarder-{uuid.uuid4().hex[:8]}" + forwarder_ids = [] + + try: + # 1. Create a forwarder + create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "forwarder", + "create", + "--display-name", + test_display_name, + "--metadata", + json.dumps( + { + "labels": {"environment": "integration_test"}, + } + ), + "--upload-compression", + "true", + "--enable-server", + "true", + ] + ) + + create_result = subprocess.run( + create_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + create_result.returncode == 0 + ), f"Forwarder creation failed: {create_result.stderr}\n{create_result.stdout}" + + # Parse the output to get the forwarder ID + try: + created_data = json.loads(create_result.stdout) + forwarder_id = created_data.get("name").split("/")[-1] + assert ( + forwarder_id + ), "Failed to get forwarder ID from creation response" + forwarder_ids.append(forwarder_id) + print(f"Created forwarder with ID: {forwarder_id}") + except json.JSONDecodeError: + pytest.fail( + f"Could not parse JSON from create command output: {create_result.stdout}" + ) + + # 2. List forwarders and verify our created forwarder is in the list + list_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "list"] + ) + + list_result = subprocess.run( + list_cmd, env=cli_env, capture_output=True, text=True + ) + + assert ( + list_result.returncode == 0 + ), f"List forwarders failed: {list_result.stderr}\n{list_result.stdout}" + + listed_forwarders = json.loads(list_result.stdout) + forwarders = listed_forwarders.get("forwarders", []) + + assert len(forwarders) > 0, "No forwarders returned in list response" + + # Find our created forwarder in the list + found_in_list = any( + f.get("name").split("/")[-1] == forwarder_id for f in forwarders + ) + assert ( + found_in_list + ), f"Created forwarder {forwarder_id} not found in listed forwarders" + + # 3. Get the specific forwarder + get_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "get", "--id", forwarder_id] + ) + + get_result = subprocess.run( + get_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the get command executed successfully + assert ( + get_result.returncode == 0 + ), f"Get forwarder failed: {get_result.stderr}\n{get_result.stdout}" + + get_data = json.loads(get_result.stdout) + assert ( + get_data["name"].split("/")[-1] == forwarder_id + ), "Retrieved forwarder ID doesn't match created ID" + assert ( + get_data["displayName"] == test_display_name + ), "Display name mismatch" + assert get_data["config"]["metadata"], "Metadata not properly set" + + # 4. Update the forwarder (patch) + updated_display_name = f"{test_display_name}-updated" + updated_metadata = json.dumps( + {"labels": {"environment": "updated_test"}} + ) + + patch_cmd = ( + [ + "secops", + ] + + common_args + + [ + "forwarder", + "update", + "--id", + forwarder_id, + "--display-name", + updated_display_name, + "--metadata", + updated_metadata, + "--upload-compression", + "false", + ] + ) + + patch_result = subprocess.run( + patch_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the update command executed successfully + assert ( + patch_result.returncode == 0 + ), f"Update forwarder failed: {patch_result.stderr}\n{patch_result.stdout}" + + # Verify the update was successful + get_updated_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "get", "--id", forwarder_id] + ) + + get_updated_result = subprocess.run( + get_updated_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the get command executed successfully + assert ( + get_updated_result.returncode == 0 + ), f"Get updated forwarder failed: {get_updated_result.stderr}\n{get_updated_result.stdout}" + + get_updated_data = json.loads(get_updated_result.stdout) + assert ( + get_updated_data["displayName"] == updated_display_name + ), "Updated display name not applied" + assert get_updated_data["config"][ + "metadata" + ], "Metadata not properly set" + + # 5. Test get-or-create with the same display name (should retrieve existing) + get_or_create_cmd = ( + [ + "secops", + ] + + common_args + + [ + "forwarder", + "get-or-create", + "--display-name", + updated_display_name, + ] + ) + + get_or_create_result = subprocess.run( + get_or_create_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + get_or_create_result.returncode == 0 + ), f"Get-or-create forwarder failed: {get_or_create_result.stderr}\n{get_or_create_result.stdout}" + + get_or_create_data = json.loads(get_or_create_result.stdout) + assert ( + get_or_create_data["name"].split("/")[-1] == forwarder_id + ), "Get-or-create retrieved a different forwarder ID" + + # 6. Test get-or-create with a new display name (should create new) + new_display_name = f"test-forwarder-new-{uuid.uuid4().hex[:8]}" + get_or_create_new_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "get-or-create", "--display-name", new_display_name] + ) + + get_or_create_new_result = subprocess.run( + get_or_create_new_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + get_or_create_new_result.returncode == 0 + ), f"Get-or-create new forwarder failed: {get_or_create_new_result.stderr}\n{get_or_create_new_result.stdout}" + + get_or_create_new_data = json.loads(get_or_create_new_result.stdout) + new_forwarder_id = get_or_create_new_data["name"].split("/")[-1] + forwarder_ids.append(new_forwarder_id) + assert ( + new_forwarder_id != forwarder_id + ), "Should have created a new forwarder" + assert ( + get_or_create_new_data["displayName"] == new_display_name + ), "New display name not set correctly" + + # 7. Delete the forwarders + for f_id in forwarder_ids: + delete_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "delete", "--id", f_id] + ) + + delete_result = subprocess.run( + delete_cmd, env=cli_env, capture_output=True, text=True + ) + + # Check that the delete command executed successfully + assert ( + delete_result.returncode == 0 + ), f"Delete forwarder failed: {delete_result.stderr}\n{delete_result.stdout}" + + # Verify the forwarder was actually deleted by trying to get it (should fail) + verify_delete_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "get", "--id", f_id] + ) + + verify_delete_result = subprocess.run( + verify_delete_cmd, env=cli_env, capture_output=True, text=True + ) + + # Should fail with an error since the forwarder was deleted + assert ( + verify_delete_result.returncode != 0 + ), "Forwarder still exists after deletion" + + if verify_delete_result.returncode == 0: + forwarder_ids.remove(f_id) + print(f"Successfully deleted forwarder with ID: {f_id}") + + except Exception as e: + # Attempt to clean up any created forwarders if test fails + for f_id in forwarder_ids: + try: + cleanup_cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "delete", "--id", f_id] + ) + subprocess.run( + cleanup_cmd, env=cli_env, capture_output=True, text=True + ) + print( + f"Cleaned up forwarder with ID: {f_id} " + "during exception handling" + ) + except Exception: + print( + f"Failed to clean up forwarder with ID: {f_id} " + "during exception handling" + ) + pass + + pytest.fail(f"Unexpected error in CLI forwarder test: {str(e)}") + + +@pytest.mark.integration +def test_cli_forwarder_list_pagination(cli_env, common_args): + """Test the forwarder list command with pagination.""" + # Execute the CLI command with a small page size + cmd = ( + [ + "secops", + ] + + common_args + + ["forwarder", "list", "--page-size", "1"] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + # Check that the command executed successfully + assert ( + result.returncode == 0 + ), f"Command failed: {result.stderr}\n{result.stdout}" + + # Try to parse the output as JSON + try: + output = json.loads(result.stdout) + assert "forwarders" in output + # With page size 1, we should have only 1 forwarder or empty list if none exist + if "forwarders" in output and output["forwarders"]: + assert len(output["forwarders"]) == 1 + except json.JSONDecodeError: + # If not valid JSON, check for expected error messages + assert "Error:" not in result.stdout