diff --git a/CHANGELOG.md b/CHANGELOG.md index 10550c5..1f21b6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.21.0] - 2025-10-10 +### Added +- Support for list and update method in data export +### Updated +- Data export methods to utilize enhanced endpoint and parameters + ## [0.20.2] - 2025-10-06 ### Updated - Data table rows bulk replace larger rows handling. diff --git a/CLI.md b/CLI.md index 0bcbb3d..6237db9 100644 --- a/CLI.md +++ b/CLI.md @@ -657,11 +657,30 @@ secops export log-types --time-window 24 secops export log-types --page-size 50 ``` +List recent data exports: + +```bash +# List all recent exports +secops export list + +# List with pagination +secops export list --page-size 10 +``` + Create a data export: ```bash +# Export a single log type (legacy method) secops export create --gcs-bucket "projects/my-project/buckets/my-bucket" --log-type "WINDOWS" --time-window 24 + +# Export multiple log types +secops export create --gcs-bucket "projects/my-project/buckets/my-bucket" --log-types "WINDOWS,LINUX,GCP_DNS" --time-window 24 + +# Export all log types secops export create --gcs-bucket "projects/my-project/buckets/my-bucket" --all-logs --time-window 24 + +# Export with explicit start and end times +secops export create --gcs-bucket "projects/my-project/buckets/my-bucket" --all-logs --start-time "2025-01-01T00:00:00Z" --end-time "2025-01-02T00:00:00Z" ``` Check export status: @@ -670,6 +689,19 @@ Check export status: secops export status --id "export-123" ``` +Update an export (only for exports in IN_QUEUE state): + +```bash +# Update start time +secops export update --id "export-123" --start-time "2025-01-01T02:00:00Z" + +# Update log types +secops export update --id "export-123" --log-types "WINDOWS,LINUX,AZURE" + +# Update the GCS bucket +secops export update --id "export-123" --gcs-bucket "projects/my-project/buckets/my-new-bucket" +``` + Cancel an export: ```bash diff --git a/README.md b/README.md index 29688dd..4dd5a54 100644 --- a/README.md +++ b/README.md @@ -583,26 +583,56 @@ for log_type in available_log_types["available_log_types"]: print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})") print(f" Available from {log_type.start_time} to {log_type.end_time}") -# Create a data export for a specific log type +# Create a data export for a single log type (legacy method) export = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-export-bucket", start_time=start_time, end_time=end_time, - log_type="GCP_DNS" # Specify log type to export + log_type="GCP_DNS" # Single log type to export +) + +# Create a data export for multiple log types +export_multiple = chronicle.create_data_export( + gcs_bucket="projects/my-project/buckets/my-export-bucket", + start_time=start_time, + end_time=end_time, + log_types=["WINDOWS", "LINUX", "GCP_DNS"] # Multiple log types to export ) # Get the export ID export_id = export["name"].split("/")[-1] print(f"Created export with ID: {export_id}") -print(f"Status: {export['data_export_status']['stage']}") + print(f"Status: {export['data_export_status']['stage']}") + +# List recent exports +recent_exports = chronicle.list_data_export(page_size=10) +print(f"Found {len(recent_exports.get('dataExports', []))} recent exports") + +# Print details of recent exports +for item in recent_exports.get("dataExports", []): + item_id = item["name"].split("/")[-1] + if "dataExportStatus" in item: + status = item["dataExportStatus"]["stage"] + else: + status = item["data_export_status"]["stage"] + print(f"Export ID: {item_id}, Status: {status}") # Check export status status = chronicle.get_data_export(export_id) -print(f"Export status: {status['data_export_status']['stage']}") -print(f"Progress: {status['data_export_status'].get('progress_percentage', 0)}%") + +# Update an export that is in IN_QUEUE state +if status.get("dataExportStatus", {}).get("stage") == "IN_QUEUE": + # Update with a new start time + updated_start = start_time + timedelta(hours=2) + update_result = chronicle.update_data_export( + data_export_id=export_id, + start_time=updated_start, + # Optionally update other parameters like end_time, gcs_bucket, or log_types + ) + print("Export updated successfully") # Cancel an export if needed -if status['data_export_status']['stage'] in ['IN_QUEUE', 'PROCESSING']: +if status.get("dataExportStatus", {}).get("stage") in ["IN_QUEUE", "PROCESSING"]: cancelled = chronicle.cancel_data_export(export_id) print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}") @@ -618,8 +648,10 @@ print(f"Created export for all logs. Status: {export_all['data_export_status'][' ``` The Data Export API supports: -- Exporting one or all log types to Google Cloud Storage +- Exporting one, multiple, or all log types to Google Cloud Storage +- Listing recent exports and filtering results - Checking export status and progress +- Updating exports that are in the queue - Cancelling exports in progress - Fetching available log types for a specific time range diff --git a/api_module_mapping.md b/api_module_mapping.md index a9c1416..ab89dfd 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -115,6 +115,8 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |dataExports.create |v1alpha|chronicle.data_export.create_data_export |secops export create | |dataExports.fetchavailablelogtypes |v1alpha|chronicle.data_export.fetch_available_log_types |secops export log-types | |dataExports.get |v1alpha|chronicle.data_export.get_data_export |secops export status | +|dataExports.list |v1alpha|chronicle.data_export.list_data_export |secops export list | +|dataExports.patch |v1alpha|chronicle.data_export.update_data_export |secops export update | |dataTableOperationErrors.get |v1alpha| | | |dataTables.create |v1alpha|chronicle.data_table.create_data_table |secops data-table create | |dataTables.dataTableRows.bulkCreate |v1alpha|chronicle.data_table.create_data_table_rows |secops data-table add-rows | diff --git a/examples/data_export_example.py b/examples/data_export_example.py index aed73f1..da6804f 100644 --- a/examples/data_export_example.py +++ b/examples/data_export_example.py @@ -20,6 +20,7 @@ import os import sys from datetime import datetime, timedelta, timezone +from time import sleep from secops import SecOpsClient from secops.exceptions import APIError @@ -30,11 +31,15 @@ def parse_args(): parser.add_argument("--project_id", required=True, help="GCP project ID") parser.add_argument("--customer_id", required=True, help="Chronicle customer ID") parser.add_argument("--region", default="us", help="Chronicle region (default: us)") - parser.add_argument("--bucket", required=True, help="GCS bucket name for export") + parser.add_argument("--bucket", help="GCS bucket name for export") parser.add_argument( "--days", type=int, default=1, help="Number of days to look back (default: 1)" ) - parser.add_argument("--log_type", help="Optional specific log type to export") + parser.add_argument("--log_type", help="Single log type to export (deprecated)") + parser.add_argument( + "--log_types", + help="Comma-separated list of log types to export (e.g., WINDOWS,LINUX)" + ) parser.add_argument("--all_logs", action="store_true", help="Export all log types") parser.add_argument( "--list_only", @@ -42,6 +47,31 @@ def parse_args(): help="Only list available log types, don't create export", ) parser.add_argument("--credentials", help="Path to service account JSON key file") + + # Additional options for demonstrating list/update functionality + parser.add_argument( + "--list_exports", + action="store_true", + help="List recent data exports" + ) + parser.add_argument( + "--list_count", + type=int, + default=5, + help="Number of exports to list when using --list_exports" + ) + parser.add_argument( + "--update", + help="Update an existing export with the given ID (must be in IN_QUEUE state)" + ) + parser.add_argument( + "--new_bucket", + help="New bucket name when updating an export" + ) + parser.add_argument( + "--new_log_types", + help="New comma-separated list of log types when updating an export" + ) return parser.parse_args() @@ -70,7 +100,79 @@ def main(): ) try: - # Fetch available log types + # Check if we should just list exports + if args.list_exports: + print("\nListing recent data exports...") + list_result = chronicle.list_data_export(page_size=args.list_count) + exports = list_result.get("dataExports", []) + print(f"Found {len(exports)} exports") + + for i, export_item in enumerate(exports, 1): + export_id = export_item["name"].split("/")[-1] + stage = export_item["dataExportStatus"]["stage"] + start = export_item.get("startTime", "N/A") + end = export_item.get("endTime", "N/A") + + print(f"\n{i}. Export ID: {export_id}") + print(f" Status: {stage}") + print(f" Time range: {start} to {end}") + print(f" GCS Bucket: {export_item.get('gcsBucket', 'N/A')}") + + # Get the log types + log_types = export_item.get("includeLogTypes", []) + if log_types: + log_type_names = [lt.split("/")[-1] for lt in log_types[:3]] + if len(log_types) <= 3: + print(f" Log types: {', '.join(log_type_names)}") + else: + print(f" Log types: {', '.join(log_type_names)} and {len(log_types) - 3} more") + + if "nextPageToken" in list_result: + print(f"\nNext page token: {list_result['nextPageToken']}") + + return 0 + + # Handle update command if specified + if args.update: + print(f"\nUpdating export ID: {args.update}") + + # Get current status to verify it's in queue state + status = chronicle.get_data_export(args.update) + stage = status["dataExportStatus"]["stage"] + + if stage != "IN_QUEUE": + print(f"Cannot update export: current status is {stage} but must be IN_QUEUE") + return 1 + + update_params = {"data_export_id": args.update} + updated = False + + # Add GCS bucket if provided + if args.new_bucket: + new_gcs_bucket = f"projects/{args.project_id}/buckets/{args.new_bucket}" + update_params["gcs_bucket"] = new_gcs_bucket + print(f"Setting new GCS bucket: {new_gcs_bucket}") + updated = True + + # Add log types if provided + if args.new_log_types: + new_log_types_list = [lt.strip() for lt in args.new_log_types.split(',')] + update_params["log_types"] = new_log_types_list + print(f"Setting new log types: {', '.join(new_log_types_list)}") + updated = True + + if not updated: + print("No update parameters provided. Use --new_bucket or --new_log_types") + return 1 + + # Perform the update + result = chronicle.update_data_export(**update_params) + print("\nExport updated successfully!") + print(f"Status: {result['dataExportStatus']['stage']}") + + return 0 + + # Fetch available log types for regular create flow print("\nFetching available log types for export...") result = chronicle.fetch_available_log_types( start_time=start_time, end_time=end_time @@ -94,14 +196,19 @@ def main(): return 0 # Validate export options - if args.all_logs and args.log_type: - print("Error: Cannot specify both --all_logs and --log_type") + option_count = sum([bool(args.all_logs), bool(args.log_type), bool(args.log_types)]) + + if option_count > 1: + print("Error: Can only specify one of: --all_logs, --log_type, or --log_types") return 1 - - if not args.all_logs and not args.log_type: - print("Error: Must specify either --all_logs or --log_type") + + if option_count == 0: + print("Error: Must specify one of: --all_logs, --log_type, or --log_types") return 1 + if not hasattr(args, "bucket") or not args.bucket: + print("Error: Must specify a GCS bucket name") + return 1 # Format GCS bucket path gcs_bucket = f"projects/{args.project_id}/buckets/{args.bucket}" print(f"\nExporting to GCS bucket: {gcs_bucket}") @@ -130,6 +237,18 @@ def main(): end_time=end_time, log_type=args.log_type, ) + elif args.log_types: + # Parse and validate comma-separated log types + log_types_list = [lt.strip() for lt in args.log_types.split(',')] + print(f"Creating data export for log types: {', '.join(log_types_list)}") + + # Create export with multiple log types + export = chronicle.create_data_export( + gcs_bucket=gcs_bucket, + start_time=start_time, + end_time=end_time, + log_types=log_types_list, + ) else: print("Creating data export for ALL log types") export = chronicle.create_data_export( @@ -143,15 +262,24 @@ def main(): export_id = export["name"].split("/")[-1] print(f"\nExport created successfully!") print(f"Export ID: {export_id}") - print(f"Status: {export['data_export_status']['stage']}") + + if "dataExportStatus" in export: + print(f"Status: {export['dataExportStatus']['stage']}") + else: + print(f"Status: {export['data_export_status']['stage']}") # Poll for status a few times to show progress print("\nChecking export status:") for i in range(3): status = chronicle.get_data_export(export_id) - stage = status["data_export_status"]["stage"] - progress = status["data_export_status"].get("progress_percentage", 0) + + if "dataExportStatus" in status: + stage = status["dataExportStatus"]["stage"] + progress = status["dataExportStatus"].get("progressPercentage", 0) + else: + stage = status["data_export_status"]["stage"] + progress = status["data_export_status"].get("progress_percentage", 0) print(f" Status: {stage}, Progress: {progress}%") @@ -160,12 +288,15 @@ def main(): if i < 2: # Don't wait after the last check print(" Waiting 5 seconds...") - from time import sleep - sleep(5) - print("\nExport job is running. You can check its status later with:") + print("\nExport job is running. You can check its status or manage it with:") + print(f" # Check Status:") print(f" python export_status.py --export_id {export_id} ...") + print(f" # List all exports:") + print(f" python data_export_example.py --project_id={args.project_id} --customer_id={args.customer_id} --list_exports") + print(f" \n # Update the export if still in queue:") + print(f" python data_export_example.py --project_id={args.project_id} --customer_id={args.customer_id} --bucket={args.bucket} --update={export_id} --new_log_types=WINDOWS,LINUX") return 0 diff --git a/pyproject.toml b/pyproject.toml index 088219b..8ee1ab2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.20.2" +version = "0.21.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.7" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 622dcec..37f157f 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -42,6 +42,8 @@ create_data_export, fetch_available_log_types, get_data_export, + list_data_export, + update_data_export, ) # Import data table and reference list classes @@ -189,6 +191,8 @@ "create_data_export", "cancel_data_export", "fetch_available_log_types", + "list_data_export", + "update_data_export", "AvailableLogType", "DataExport", "DataExportStatus", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 97a152c..720cf5d 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -28,15 +28,15 @@ from secops.chronicle.dashboard import DashboardAccessType, DashboardView from secops.chronicle.dashboard import add_chart as _add_chart from secops.chronicle.dashboard import create_dashboard as _create_dashboard -from secops.chronicle.dashboard import import_dashboard as _import_dashboard -from secops.chronicle.dashboard import export_dashboard as _export_dashboard from secops.chronicle.dashboard import delete_dashboard as _delete_dashboard from secops.chronicle.dashboard import ( duplicate_dashboard as _duplicate_dashboard, ) from secops.chronicle.dashboard import edit_chart as _edit_chart +from secops.chronicle.dashboard import export_dashboard as _export_dashboard from secops.chronicle.dashboard import get_chart as _get_chart from secops.chronicle.dashboard import get_dashboard as _get_dashboard +from secops.chronicle.dashboard import import_dashboard as _import_dashboard from secops.chronicle.dashboard import list_dashboards as _list_dashboards from secops.chronicle.dashboard import remove_chart as _remove_chart from secops.chronicle.dashboard import update_dashboard as _update_dashboard @@ -56,6 +56,10 @@ fetch_available_log_types as _fetch_available_log_types, ) from secops.chronicle.data_export import get_data_export as _get_data_export +from secops.chronicle.data_export import list_data_export as _list_data_export +from secops.chronicle.data_export import ( + update_data_export as _update_data_export, +) from secops.chronicle.data_table import DataTableColumnType from secops.chronicle.data_table import create_data_table as _create_data_table from secops.chronicle.data_table import ( @@ -165,11 +169,15 @@ from secops.chronicle.rule import delete_rule as _delete_rule from secops.chronicle.rule import enable_rule as _enable_rule from secops.chronicle.rule import get_rule as _get_rule +from secops.chronicle.rule import get_rule_deployment as _get_rule_deployment +from secops.chronicle.rule import ( + list_rule_deployments as _list_rule_deployments, +) from secops.chronicle.rule import list_rules as _list_rules from secops.chronicle.rule import run_rule_test from secops.chronicle.rule import search_rules as _search_rules -from secops.chronicle.rule import update_rule as _update_rule from secops.chronicle.rule import set_rule_alerting as _set_rule_alerting +from secops.chronicle.rule import update_rule as _update_rule from secops.chronicle.rule import ( update_rule_deployment as _update_rule_deployment, ) @@ -228,17 +236,13 @@ fetch_udm_search_csv as _fetch_udm_search_csv, ) from secops.chronicle.udm_search import ( - find_udm_field_values as _find_udm_field_values, + fetch_udm_search_view as _fetch_udm_search_view, ) from secops.chronicle.udm_search import ( - fetch_udm_search_view as _fetch_udm_search_view, + find_udm_field_values as _find_udm_field_values, ) from secops.chronicle.validate import validate_query as _validate_query from secops.exceptions import SecOpsError -from secops.chronicle.rule import get_rule_deployment as _get_rule_deployment -from secops.chronicle.rule import ( - list_rule_deployments as _list_rule_deployments, -) class ValueType(Enum): @@ -2179,6 +2183,7 @@ def create_data_export( start_time: datetime, end_time: datetime, log_type: Optional[str] = None, + log_types: Optional[List[str]] = None, export_all_logs: bool = False, ) -> Dict[str, Any]: """Create a new data export job. @@ -2188,7 +2193,9 @@ def create_data_export( "projects/{project}/buckets/{bucket}" start_time: Start time for the export (inclusive) end_time: End time for the export (exclusive) - log_type: Optional specific log type to export. + log_type: Optional specific log type to export (deprecated). + Use log_types instead. + log_types: Optional list of log types to export. If None and export_all_logs is False, no logs will be exported export_all_logs: Whether to export all log types @@ -2206,7 +2213,15 @@ def create_data_export( end_time = datetime.now() start_time = end_time - timedelta(days=1) - # Export a specific log type + # Export specific log types + export = chronicle.create_data_export( + gcs_bucket="projects/my-project/buckets/my-bucket", + start_time=start_time, + end_time=end_time, + log_types=["WINDOWS", "LINUX"] + ) + + # Export a single log type (legacy method) export = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-bucket", start_time=start_time, @@ -2229,6 +2244,7 @@ def create_data_export( start_time=start_time, end_time=end_time, log_type=log_type, + log_types=log_types, export_all_logs=export_all_logs, ) @@ -2304,6 +2320,72 @@ def fetch_available_log_types( page_token=page_token, ) + def update_data_export( + self, + data_export_id: str, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + gcs_bucket: Optional[str] = None, + log_types: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Update an existing data export job. + + Note: The job must be in the "IN_QUEUE" state to be updated. + + Args: + data_export_id: ID of the data export to update + start_time: Optional new start time for the export + end_time: Optional new end time for the export + gcs_bucket: Optional new GCS bucket path + log_types: Optional new list of log types to export + + Returns: + Dictionary containing details of the updated data export + + Raises: + APIError: If the API request fails + ValueError: If invalid parameters are provided + """ + return _update_data_export( + self, + data_export_id=data_export_id, + start_time=start_time, + end_time=end_time, + gcs_bucket=gcs_bucket, + log_types=log_types, + ) + + def list_data_export( + self, + filters: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, + ) -> Dict[str, Any]: + """List data export jobs. + + Args: + filters: Filter string + page_size: Page size + page_token: Page token + + Returns: + Dictionary containing data export list + + Raises: + APIError: If the API request fails + + Example: + ```python + export = chronicle.list_data_export() + ``` + """ + return _list_data_export( + self, + filters=filters, + page_size=page_size, + page_token=page_token, + ) + # Data Table methods def create_data_table( diff --git a/src/secops/chronicle/data_export.py b/src/secops/chronicle/data_export.py index 8ebec2c..42bd973 100644 --- a/src/secops/chronicle/data_export.py +++ b/src/secops/chronicle/data_export.py @@ -18,7 +18,7 @@ allowing users to export Chronicle data to Google Cloud Storage buckets. """ -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from datetime import datetime from dataclasses import dataclass from secops.exceptions import APIError @@ -41,6 +41,39 @@ class AvailableLogType: end_time: datetime +def _get_base_url(client) -> str: + """Get the enhanced/new base URL for the Chronicle Data Export API for + region other then dev and staging. + Args: + client: ChronicleClient instance + + Returns: + The base URL for the Chronicle Data Export API + """ + if client.region not in ["dev", "staging"]: + return f"https://chronicle.{client.region}.rep.googleapis.com/v1alpha" + return client.base_url + + +def _get_formatted_log_type(client, log_type: str) -> str: + """Get the formatted log type for the given log type. + + Args: + client: ChronicleClient instance + log_type: The log type to format + + Returns: + The formatted log type + """ + if "/" not in log_type: + return ( + f"projects/{client.project_id}/locations/{client.region}/" + f"instances/{client.customer_id}/logTypes/{log_type}" + ) + + return log_type + + def get_data_export(client, data_export_id: str) -> Dict[str, Any]: """Get information about a specific data export. @@ -60,7 +93,10 @@ def get_data_export(client, data_export_id: str) -> Dict[str, Any]: print(f"Export status: {export['data_export_status']['stage']}") ``` """ - url = f"{client.base_url}/{client.instance_id}/dataExports/{data_export_id}" + url = ( + f"{_get_base_url(client)}/{client.instance_id}/" + f"dataExports/{data_export_id}" + ) response = client.session.get(url) @@ -76,6 +112,7 @@ def create_data_export( start_time: datetime, end_time: datetime, log_type: Optional[str] = None, + log_types: Optional[List[str]] = None, export_all_logs: bool = False, ) -> Dict[str, Any]: """Create a new data export job. @@ -86,7 +123,9 @@ def create_data_export( "projects/{project}/buckets/{bucket}" start_time: Start time for the export (inclusive) end_time: End time for the export (exclusive) - log_type: Optional specific log type to export. + log_type: Optional specific log type to export (deprecated). + Use log_types instead. + log_types: Optional list of log types to export. If None and export_all_logs is False, no logs will be exported export_all_logs: Whether to export all log types @@ -103,13 +142,12 @@ def create_data_export( end_time = datetime.now() start_time = end_time - timedelta(days=1) - # Export a specific log type export = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-bucket", start_time=start_time, end_time=end_time, - log_type="WINDOWS" + log_types=["WINDOWS"] ) # Export all logs @@ -119,8 +157,18 @@ def create_data_export( end_time=end_time, export_all_logs=True ) - ``` """ + # Validate that the user hasn't provided both log_type and log_types + if log_type is not None and log_types is not None: + raise ValueError("Use either log_type or log_types, not both") + + # Handle both log_type and log_types for backward compatibility + if log_type is not None: + log_types = [log_type] + + # Initialize log_types if None + log_types = [] if log_types is None else log_types + # Validate parameters if not gcs_bucket: raise ValueError("GCS bucket must be provided") @@ -133,12 +181,12 @@ def create_data_export( if end_time <= start_time: raise ValueError("End time must be after start time") - if not export_all_logs and not log_type: + if not export_all_logs and len(log_types) == 0: raise ValueError( "Either log_type must be specified or export_all_logs must be True" ) - if export_all_logs and log_type: + if export_all_logs and len(log_types) > 0: raise ValueError( "Cannot specify both log_type and export_all_logs=True" ) @@ -149,68 +197,23 @@ def create_data_export( # Construct the request payload payload = { - "start_time": start_time_str, - "end_time": end_time_str, - "gcs_bucket": gcs_bucket, + "startTime": start_time_str, + "endTime": end_time_str, + "gcsBucket": gcs_bucket, } - # Add log_type if provided - if log_type: - # Check if we need to prefix with logTypes - if "/" not in log_type: - # First check if log type exists - try: - # Try to fetch available log types to validate - available_logs = fetch_available_log_types( - client, start_time=start_time, end_time=end_time - ) - found = False - for lt in available_logs.get("available_log_types", []): - if lt.log_type.endswith( - "/" + log_type - ) or lt.log_type.endswith("/logTypes/" + log_type): - # If we found the log type in the list, - # use its exact format - payload["log_type"] = lt.log_type - found = True - break - - if not found: - # If log type isn't in the list, try the standard format - # Format log_type as required by the API - - # the complete format - formatted_log_type = ( - f"projects/{client.project_id}/" - f"locations/{client.region}/instances/" - f"{client.customer_id}/logTypes/{log_type}" - ) - payload["log_type"] = formatted_log_type - except Exception: # pylint: disable=broad-exception-caught - # If we can't validate, just use the standard format - formatted_log_type = ( - f"projects/{client.project_id}/locations/" - f"{client.region}/instances/{client.customer_id}/" - f"logTypes/{log_type}" - ) - payload["log_type"] = formatted_log_type - else: - # Log type is already formatted - payload["log_type"] = log_type + # Process log types + payload["includeLogTypes"] = list( + map(lambda x: _get_formatted_log_type(client, x), log_types) + ) # Add export_all_logs if True if export_all_logs: - # Setting log type as ALL TYPES for all log export - payload["log_type"] = ( - f"projects/{client.project_id}/" - f"locations/{client.region}/instances/" - f"{client.customer_id}/logTypes/ALL_TYPES" - ) - # export_all_logs parameter is currently not valid - # TODO: Revert back to export_all_logs once parameter works - # payload["export_all_logs"] = True + # Setting log types as empty list for all log export + payload["includeLogTypes"] = [] # Construct the URL and send the request - url = f"{client.base_url}/{client.instance_id}/dataExports" + url = f"{_get_base_url(client)}/{client.instance_id}/dataExports" response = client.session.post(url, json=payload) @@ -240,7 +243,7 @@ def cancel_data_export(client, data_export_id: str) -> Dict[str, Any]: ``` """ url = ( - f"{client.base_url}/{client.instance_id}/dataExports/" + f"{_get_base_url(client)}/{client.instance_id}/dataExports/" f"{data_export_id}:cancel" ) @@ -306,18 +309,18 @@ def fetch_available_log_types( end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") # Construct the request payload - payload = {"start_time": start_time_str, "end_time": end_time_str} + payload = {"startTime": start_time_str, "endTime": end_time_str} # Add optional parameters if provided if page_size: - payload["page_size"] = page_size + payload["pageSize"] = page_size if page_token: - payload["page_token"] = page_token + payload["pageToken"] = page_token # Construct the URL and send the request url = ( - f"{client.base_url}/{client.instance_id}/" + f"{_get_base_url(client)}/{client.instance_id}/" "dataExports:fetchavailablelogtypes" ) @@ -352,3 +355,116 @@ def fetch_available_log_types( "available_log_types": available_log_types, "next_page_token": result.get("next_page_token", ""), } + + +def update_data_export( + client, + data_export_id: str, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + gcs_bucket: Optional[str] = None, + log_types: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Update an existing data export job. + + Note: The job must be in the "IN_QUEUE" state to be updated. + + Args: + client: ChronicleClient instance + data_export_id: ID of the data export to update + start_time: Optional new start time for the export + end_time: Optional new end time for the export + gcs_bucket: Optional new GCS bucket path + log_types: Optional new list of log types to export + + Returns: + Dictionary containing details of the updated data export + + Raises: + APIError: If the API request fails + ValueError: If invalid parameters are provided + """ + # Construct the request payload and update mask + payload = {} + update_mask = [] + + if start_time: + payload["startTime"] = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + update_mask.append("startTime") + + if end_time: + payload["endTime"] = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + update_mask.append("endTime") + + if gcs_bucket: + if not gcs_bucket.startswith("projects/"): + raise ValueError( + "GCS bucket must be in format: " + "projects/{project}/buckets/{bucket}" + ) + payload["gcsBucket"] = gcs_bucket + update_mask.append("gcsBucket") + + if log_types is not None: + payload["includeLogTypes"] = list( + map(lambda x: _get_formatted_log_type(client, x), log_types) + ) + update_mask.append("includeLogTypes") + + if not payload: + raise ValueError("At least one field to update must be provided.") + + # Construct the URL and send the request + url = ( + f"{_get_base_url(client)}/{client.instance_id}/dataExports/" + f"{data_export_id}" + ) + params = {"update_mask": ",".join(update_mask)} + + response = client.session.patch(url, json=payload, params=params) + + if response.status_code != 200: + raise APIError(f"Failed to update data export: {response.text}") + + return response.json() + + +def list_data_export( + client, + filters: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, +) -> Dict[str, Any]: + """List data export jobs. + + Args: + client: ChronicleClient instance + filters: Filter string + page_size: Page size + page_token: Page token + + Returns: + Dictionary containing data export list + + Raises: + APIError: If the API request fails + + Example: + ```python + export = chronicle.list_data_export() + ``` + """ + url = f"{_get_base_url(client)}/{client.instance_id}/dataExports" + + params = { + "pageSize": page_size, + "pageToken": page_token, + "filter": filters, + } + + response = client.session.get(url, params=params) + + if response.status_code != 200: + raise APIError(f"Failed to get data export: {response.text}") + + return response.json() diff --git a/src/secops/cli.py b/src/secops/cli.py index ffc829e..a8439ae 100644 --- a/src/secops/cli.py +++ b/src/secops/cli.py @@ -2080,7 +2080,16 @@ def setup_export_command(subparsers): help="GCS bucket in format 'projects/PROJECT_ID/buckets/BUCKET_NAME'", ) create_parser.add_argument( - "--log-type", "--log_type", dest="log_type", help="Log type to export" + "--log-type", + "--log_type", + dest="log_type", + help="Single log type to export (deprecated, use --log-types instead)", + ) + create_parser.add_argument( + "--log-types", + "--log_types", + dest="log_types", + help="Comma-separated list of log types to export", ) create_parser.add_argument( "--all-logs", @@ -2092,6 +2101,51 @@ def setup_export_command(subparsers): add_time_range_args(create_parser) create_parser.set_defaults(func=handle_export_create_command) + # List exports command + list_parser = export_subparsers.add_parser("list", help="List data exports") + list_parser.add_argument( + "--filter", dest="filters", help="Filter string for listing exports" + ) + list_parser.add_argument( + "--page-size", + "--page_size", + dest="page_size", + type=int, + help="Page size for results", + ) + list_parser.add_argument( + "--page-token", + "--page_token", + dest="page_token", + help="Page token for pagination", + ) + list_parser.set_defaults(func=handle_export_list_command) + + # Update export command + update_parser = export_subparsers.add_parser( + "update", help="Update an existing data export" + ) + update_parser.add_argument( + "--id", required=True, help="Export ID to update" + ) + update_parser.add_argument( + "--gcs-bucket", + "--gcs_bucket", + dest="gcs_bucket", + help=( + "New GCS bucket in format " + "'projects/PROJECT_ID/buckets/BUCKET_NAME'" + ), + ) + update_parser.add_argument( + "--log-types", + "--log_types", + dest="log_types", + help="Comma-separated list of log types to export", + ) + add_time_range_args(update_parser) + update_parser.set_defaults(func=handle_export_update_command) + # Get export status command status_parser = export_subparsers.add_parser( "status", help="Get export status" @@ -2202,15 +2256,28 @@ def handle_export_create_command(args, chronicle): export_all_logs=True, ) elif args.log_type: + # Single log type (legacy method) result = chronicle.create_data_export( gcs_bucket=args.gcs_bucket, start_time=start_time, end_time=end_time, log_type=args.log_type, ) + elif args.log_types: + # Multiple log types + log_types_list = [ + log_type.strip() for log_type in args.log_types.split(",") + ] + result = chronicle.create_data_export( + gcs_bucket=args.gcs_bucket, + start_time=start_time, + end_time=end_time, + log_types=log_types_list, + ) else: print( - "Error: Either --log-type or --all-logs must be specified", + "Error: Either --log-type, --log-types, or --all-logs " + "must be specified", file=sys.stderr, ) sys.exit(1) @@ -2282,6 +2349,49 @@ def handle_export_cancel_command(args, chronicle): sys.exit(1) +def handle_export_list_command(args, chronicle): + """Handle listing data exports command.""" + try: + result = chronicle.list_data_export( + filters=args.filters, + page_size=args.page_size, + page_token=args.page_token, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def handle_export_update_command(args, chronicle): + """Handle updating an existing data export command.""" + # Get the start_time and end_time if provided + start_time = None + end_time = None + if (hasattr(args, "start_time") and args.start_time) or ( + hasattr(args, "time_window") and args.time_window + ): + start_time, end_time = get_time_range(args) + + # Convert log_types string to list if provided + log_types = None + if args.log_types: + log_types = [log_type.strip() for log_type in args.log_types.split(",")] + + try: + result = chronicle.update_data_export( + data_export_id=args.id, + gcs_bucket=args.gcs_bucket if hasattr(args, "gcs_bucket") else None, + start_time=start_time, + end_time=end_time, + log_types=log_types, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + def setup_gemini_command(subparsers): """Set up the Gemini command parser.""" gemini_parser = subparsers.add_parser( diff --git a/tests/chronicle/test_data_export.py b/tests/chronicle/test_data_export.py index 66801fa..a06d91f 100644 --- a/tests/chronicle/test_data_export.py +++ b/tests/chronicle/test_data_export.py @@ -18,7 +18,11 @@ from unittest.mock import Mock, patch from secops.chronicle.client import ChronicleClient -from secops.chronicle.data_export import AvailableLogType +from secops.chronicle.data_export import ( + AvailableLogType, + update_data_export, + list_data_export +) from secops.exceptions import APIError @@ -65,23 +69,26 @@ def test_get_data_export_error(chronicle_client): chronicle_client.get_data_export("nonexistent-export") -def test_create_data_export(chronicle_client): - """Test creating a data export.""" +def test_create_data_export_with_log_type(chronicle_client): + """Test creating a data export with a single log type (string parameter).""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export123", - "start_time": "2024-01-01T00:00:00.000Z", - "end_time": "2024-01-02T00:00:00.000Z", - "gcs_bucket": "projects/test-project/buckets/my-bucket", - "log_type": "projects/test-project/locations/us/instances/test-customer/logTypes/WINDOWS", - "data_export_status": {"stage": "IN_QUEUE"}, + "startTime": "2024-01-01T00:00:00.000Z", + "endTime": "2024-01-02T00:00:00.000Z", + "gcsBucket": "projects/test-project/buckets/my-bucket", + "includeLogTypes": [ + "projects/test-project/locations/us/instances/test-customer/logTypes/WINDOWS" + ], + "dataExportStatus": {"stage": "IN_QUEUE"}, } - with patch.object(chronicle_client.session, "post", return_value=mock_response): + with patch.object(chronicle_client.session, "post", return_value=mock_response) as mock_post: start_time = datetime(2024, 1, 1, tzinfo=timezone.utc) end_time = datetime(2024, 1, 2, tzinfo=timezone.utc) + # Test with legacy log_type parameter result = chronicle_client.create_data_export( gcs_bucket="projects/test-project/buckets/my-bucket", start_time=start_time, @@ -90,8 +97,54 @@ def test_create_data_export(chronicle_client): ) assert result["name"].endswith("/dataExports/export123") - assert result["log_type"].endswith("/logTypes/WINDOWS") - assert result["data_export_status"]["stage"] == "IN_QUEUE" + assert len(result["includeLogTypes"]) == 1 + assert result["includeLogTypes"][0].endswith("/logTypes/WINDOWS") + assert result["dataExportStatus"]["stage"] == "IN_QUEUE" + + # Check that the API was called correctly + mock_post.assert_called_once() + _, kwargs = mock_post.call_args + assert "includeLogTypes" in kwargs["json"] + assert len(kwargs["json"]["includeLogTypes"]) == 1 + + +def test_create_data_export_with_log_types(chronicle_client): + """Test creating a data export with multiple log types (list parameter).""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export123", + "startTime": "2024-01-01T00:00:00.000Z", + "endTime": "2024-01-02T00:00:00.000Z", + "gcsBucket": "projects/test-project/buckets/my-bucket", + "includeLogTypes": [ + "projects/test-project/locations/us/instances/test-customer/logTypes/WINDOWS", + "projects/test-project/locations/us/instances/test-customer/logTypes/LINUX" + ], + "dataExportStatus": {"stage": "IN_QUEUE"}, + } + + with patch.object(chronicle_client.session, "post", return_value=mock_response) as mock_post: + start_time = datetime(2024, 1, 1, tzinfo=timezone.utc) + end_time = datetime(2024, 1, 2, tzinfo=timezone.utc) + + # Test with new log_types parameter + result = chronicle_client.create_data_export( + gcs_bucket="projects/test-project/buckets/my-bucket", + start_time=start_time, + end_time=end_time, + log_types=["WINDOWS", "LINUX"], + ) + + assert result["name"].endswith("/dataExports/export123") + assert len(result["includeLogTypes"]) == 2 + assert result["dataExportStatus"]["stage"] == "IN_QUEUE" + + # Check that the API was called correctly + mock_post.assert_called_once() + _, kwargs = mock_post.call_args + assert "includeLogTypes" in kwargs["json"] + assert len(kwargs["json"]["includeLogTypes"]) == 2 def test_create_data_export_validation(chronicle_client): @@ -104,10 +157,10 @@ def test_create_data_export_validation(chronicle_client): gcs_bucket="projects/test-project/buckets/my-bucket", start_time=start_time, end_time=end_time, - log_type="WINDOWS", + log_types=["WINDOWS"], ) - # Test missing log type and export_all_logs + # Test missing log types and export_all_logs start_time = datetime(2024, 1, 1, tzinfo=timezone.utc) end_time = datetime(2024, 1, 2, tzinfo=timezone.utc) @@ -121,7 +174,7 @@ def test_create_data_export_validation(chronicle_client): end_time=end_time, ) - # Test both log_type and export_all_logs specified + # Test both log_types and export_all_logs specified with pytest.raises( ValueError, match="Cannot specify both log_type and export_all_logs=True" ): @@ -129,9 +182,21 @@ def test_create_data_export_validation(chronicle_client): gcs_bucket="projects/test-project/buckets/my-bucket", start_time=start_time, end_time=end_time, - log_type="WINDOWS", + log_types=["WINDOWS"], export_all_logs=True, ) + + # Test both legacy log_type and new log_types specified together + with pytest.raises( + ValueError, match="Use either log_type or log_types, not both" + ): + chronicle_client.create_data_export( + gcs_bucket="projects/test-project/buckets/my-bucket", + start_time=start_time, + end_time=end_time, + log_type="WINDOWS", + log_types=["LINUX"], + ) # Test invalid GCS bucket format with pytest.raises(ValueError, match="GCS bucket must be in format"): @@ -174,7 +239,7 @@ def test_create_data_export_with_all_logs(chronicle_client): # Check that the request payload included export_all_logs mock_post.assert_called_once() _, kwargs = mock_post.call_args - assert "ALL_TYPES" in kwargs["json"]["log_type"] + assert kwargs["json"]["includeLogTypes"] == [] def test_cancel_data_export(chronicle_client): @@ -253,7 +318,7 @@ def test_fetch_available_log_types(chronicle_client): # Check that the request payload included page_size mock_post.assert_called_once() args, kwargs = mock_post.call_args - assert kwargs["json"]["page_size"] == 100 + assert kwargs["json"]["pageSize"] == 100 def test_fetch_available_log_types_validation(chronicle_client): @@ -281,3 +346,222 @@ def test_fetch_available_log_types_error(chronicle_client): chronicle_client.fetch_available_log_types( start_time=start_time, end_time=end_time ) + + +def test_update_data_export_success(chronicle_client): + """Test successful update of a data export.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export123", + "startTime": "2024-01-02T00:00:00.000Z", + "endTime": "2024-01-03T00:00:00.000Z", + "gcsBucket": "projects/test-project/buckets/updated-bucket", + "includeLogTypes": [ + "projects/test-project/locations/us/instances/test-customer/logTypes/WINDOWS", + "projects/test-project/locations/us/instances/test-customer/logTypes/LINUX" + ], + "dataExportStatus": {"stage": "IN_QUEUE"}, + } + + # Act + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + start_time = datetime(2024, 1, 2, tzinfo=timezone.utc) + end_time = datetime(2024, 1, 3, tzinfo=timezone.utc) + new_log_types = [ + "projects/test-project/locations/us/instances/test-customer/logTypes/WINDOWS", + "projects/test-project/locations/us/instances/test-customer/logTypes/LINUX" + ] + + result = update_data_export( + client=chronicle_client, + data_export_id="export123", + start_time=start_time, + end_time=end_time, + gcs_bucket="projects/test-project/buckets/updated-bucket", + log_types=new_log_types + ) + + # Assert + assert result["name"].endswith("/dataExports/export123") + assert result["startTime"] == "2024-01-02T00:00:00.000Z" + assert result["endTime"] == "2024-01-03T00:00:00.000Z" + assert result["gcsBucket"] == "projects/test-project/buckets/updated-bucket" + assert len(result["includeLogTypes"]) == 2 + + # Check request payload and parameters + mock_patch.assert_called_once() + _, kwargs = mock_patch.call_args + assert "update_mask" in kwargs["params"] + assert "startTime" in kwargs["params"]["update_mask"] + assert "endTime" in kwargs["params"]["update_mask"] + assert "gcsBucket" in kwargs["params"]["update_mask"] + assert "includeLogTypes" in kwargs["params"]["update_mask"] + + +def test_update_data_export_partial_update(chronicle_client): + """Test updating only some fields of a data export.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export123", + "gcsBucket": "projects/test-project/buckets/updated-bucket", + "dataExportStatus": {"stage": "IN_QUEUE"}, + } + + # Act + with patch.object( + chronicle_client.session, "patch", return_value=mock_response + ) as mock_patch: + result = update_data_export( + client=chronicle_client, + data_export_id="export123", + gcs_bucket="projects/test-project/buckets/updated-bucket" + ) + + # Assert + assert result["name"].endswith("/dataExports/export123") + assert result["gcsBucket"] == "projects/test-project/buckets/updated-bucket" + + # Check request payload and parameters + mock_patch.assert_called_once() + _, kwargs = mock_patch.call_args + assert "update_mask" in kwargs["params"] + assert kwargs["params"]["update_mask"] == "gcsBucket" + assert "gcsBucket" in kwargs["json"] + assert "startTime" not in kwargs["json"] + assert "endTime" not in kwargs["json"] + assert "includeLogTypes" not in kwargs["json"] + + +def test_update_data_export_validation_error(chronicle_client): + """Test validation error when updating a data export with invalid GCS bucket.""" + # Arrange + with pytest.raises(ValueError, match="GCS bucket must be in format"): + update_data_export( + client=chronicle_client, + data_export_id="export123", + gcs_bucket="invalid-bucket-format" + ) + + +def test_update_data_export_no_fields_error(chronicle_client): + """Test error when no fields are provided for update.""" + # Arrange + with pytest.raises(ValueError, match="At least one field to update must be provided"): + update_data_export( + client=chronicle_client, + data_export_id="export123" + ) + + +def test_update_data_export_api_error(chronicle_client): + """Test API error when updating a data export.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 400 + mock_response.text = "Invalid data export ID" + + # Act + with patch.object(chronicle_client.session, "patch", return_value=mock_response): + # Assert + with pytest.raises(APIError, match="Failed to update data export"): + update_data_export( + client=chronicle_client, + data_export_id="invalid-id", + gcs_bucket="projects/test-project/buckets/my-bucket" + ) + + +def test_list_data_export_success(chronicle_client): + """Test successful listing of data exports.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "dataExports": [ + { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export1", + "dataExportStatus": {"stage": "FINISHED_SUCCESS"}, + }, + { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export2", + "dataExportStatus": {"stage": "IN_QUEUE"}, + } + ], + "nextPageToken": "next-page" + } + + # Act + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_data_export( + client=chronicle_client, + filters="status=IN_QUEUE", + page_size=10, + page_token="current-page" + ) + + # Assert + assert len(result["dataExports"]) == 2 + assert result["nextPageToken"] == "next-page" + + # Check request parameters + mock_get.assert_called_once() + _, kwargs = mock_get.call_args + assert kwargs["params"]["filter"] == "status=IN_QUEUE" + assert kwargs["params"]["pageSize"] == 10 + assert kwargs["params"]["pageToken"] == "current-page" + + +def test_list_data_export_default_params(chronicle_client): + """Test listing data exports with default parameters.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "dataExports": [ + { + "name": "projects/test-project/locations/us/instances/test-customer/dataExports/export1", + "dataExportStatus": {"stage": "FINISHED_SUCCESS"}, + } + ] + } + + # Act + with patch.object( + chronicle_client.session, "get", return_value=mock_response + ) as mock_get: + result = list_data_export(client=chronicle_client) + + # Assert + assert len(result["dataExports"]) == 1 + + # Check default parameters + mock_get.assert_called_once() + _, kwargs = mock_get.call_args + assert kwargs["params"]["pageSize"] is None + assert kwargs["params"]["pageToken"] is None + assert kwargs["params"]["filter"] is None + + +def test_list_data_export_error(chronicle_client): + """Test error when listing data exports.""" + # Arrange + mock_response = Mock() + mock_response.status_code = 400 + mock_response.text = "Invalid filter" + + # Act + with patch.object(chronicle_client.session, "get", return_value=mock_response): + # Assert + with pytest.raises(APIError, match="Failed to get data export"): + list_data_export( + client=chronicle_client, + filters="invalid-filter" + ) diff --git a/tests/chronicle/test_data_export_integration.py b/tests/chronicle/test_data_export_integration.py new file mode 100644 index 0000000..a4ee977 --- /dev/null +++ b/tests/chronicle/test_data_export_integration.py @@ -0,0 +1,246 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle Data Export API functionality.""" + +import json +import os +import uuid +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Optional, Any + +import pytest + +from secops import SecOpsClient +from secops.exceptions import APIError +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON + +# Get GCS bucket from environment or use default +GCS_BUCKET_NAME = os.environ.get( + "TEST_GCS_BUCKET", "gcs-exports-prober-bucket-us" +) + + +@pytest.mark.integration +def test_fetch_available_log_types(): + """Test fetching available log types for export.""" + if ( + not CHRONICLE_CONFIG["customer_id"] + or not CHRONICLE_CONFIG["project_id"] + ): + pytest.skip( + "CHRONICLE_CUSTOMER_ID and CHRONICLE_PROJECT_ID environment variables must be set" + ) + if not SERVICE_ACCOUNT_JSON: + pytest.skip( + "CHRONICLE_SERVICE_ACCOUNT environment variable must be set" + ) + + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + try: + # Set up time range for testing + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=14) # Look back 14 days + + # Fetch available log types + result = chronicle.fetch_available_log_types( + start_time=start_time, + end_time=end_time, + page_size=10, # Limit to 10 for testing + ) + + # Verify the response structure + assert "available_log_types" in result + assert isinstance(result["available_log_types"], list) + + print( + f"\nFound {len(result['available_log_types'])} available log types for export" + ) + + # Show some log types if available + if result["available_log_types"]: + for log_type in result["available_log_types"][:3]: # Show first 3 + print( + f" {log_type.display_name} ({log_type.log_type.split('/')[-1]})" + ) + print( + f" Available from {log_type.start_time} to {log_type.end_time}" + ) + + except APIError as e: + # If we get API errors unrelated to configuration, fail the test + pytest.fail(f"API Error during fetch_available_log_types test: {e}") + + +@pytest.mark.integration +def test_data_export_lifecycle(): + """Test the complete lifecycle of a data export.""" + if ( + not CHRONICLE_CONFIG["customer_id"] + or not CHRONICLE_CONFIG["project_id"] + ): + pytest.skip( + "CHRONICLE_CUSTOMER_ID and CHRONICLE_PROJECT_ID environment variables must be set" + ) + if not SERVICE_ACCOUNT_JSON: + pytest.skip( + "CHRONICLE_SERVICE_ACCOUNT environment variable must be set" + ) + + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + # Variables to track resources we need to clean up + export_id = None + + try: + # Set up time range for testing + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=1) # Look back 1 day + bucket_path = ( + f"projects/{CHRONICLE_CONFIG['project_id']}/" + f"buckets/{GCS_BUCKET_NAME}" + ) + + # Step 1: Create a data export with all logs + print("\nStep 1: Creating data export with all logs") + export = chronicle.create_data_export( + gcs_bucket=bucket_path, + start_time=start_time, + end_time=end_time, + export_all_logs=True, # Using export_all_logs parameter as requested + ) + + # Get the export ID for subsequent operations and cleanup + export_id = export["name"].split("/")[-1] + print(f"Created export with ID: {export_id}") + + # Verify the response structure + assert "name" in export + if "dataExportStatus" in export: + assert "stage" in export["dataExportStatus"] + print(f"Status: {export['dataExportStatus']['stage']}") + else: + assert "stage" in export["data_export_status"] + print(f"Status: {export['data_export_status']['stage']}") + + # Store initial start time for later verification of update + initial_start_time = start_time + + # Step 2: Get the export details + print("\nStep 2: Getting export details") + export_details = chronicle.get_data_export(export_id) + + # Verify the response structure + assert "name" in export_details + assert export_details["name"].endswith(export_id) + + # Get status information + if "dataExportStatus" in export_details: + stage = export_details["dataExportStatus"]["stage"] + else: + stage = export_details["data_export_status"]["stage"] + + print(f"Export status: {stage}") + + # Step 3: List exports + print("\nStep 3: Listing recent exports") + list_result = chronicle.list_data_export(page_size=5) + + # Verify the response structure + assert "dataExports" in list_result + assert isinstance(list_result["dataExports"], list) + + # Verify our export is in the list + found_export = False + for item in list_result["dataExports"]: + if item["name"].endswith(export_id): + found_export = True + break + + if found_export: + print(f"Successfully found export {export_id} in list results") + else: + print( + f"Export {export_id} not found in list results." + "Could be in other page of list response" + ) + + # Step 4: Update the export if it's in IN_QUEUE state + if stage == "IN_QUEUE": + print("\nStep 4: Updating export (since it's in IN_QUEUE state)") + + # Update the start time to a newer time (2 hours after original start) + new_start_time = initial_start_time + timedelta(hours=2) + + print( + f"Updating export to use new start time: {new_start_time.isoformat()}" + ) + print(f"Previous start time was: {initial_start_time.isoformat()}") + + update_result = chronicle.update_data_export( + data_export_id=export_id, + start_time=new_start_time, # Update the start time instead of log types + ) + + # Verify the response structure + assert "name" in update_result + assert update_result["name"].endswith(export_id) + + # Get the updated status + if "dataExportStatus" in update_result: + updated_stage = update_result["dataExportStatus"]["stage"] + else: + updated_stage = update_result["data_export_status"]["stage"] + + print(f"Updated export status: {updated_stage}") + else: + print( + f"\nSkipping update test - export status is {stage}, not IN_QUEUE" + ) + + # Final Step: Cancel the export (cleanup) + print("\nFinal Step: Cancelling the export") + cancel_result = chronicle.cancel_data_export(export_id) + + # Verify the response structure + assert "name" in cancel_result + assert cancel_result["name"].endswith(export_id) + + # Get the cancelled status + if "dataExportStatus" in cancel_result: + cancelled_stage = cancel_result["dataExportStatus"]["stage"] + else: + cancelled_stage = cancel_result["data_export_status"]["stage"] + + print(f"Cancelled export status: {cancelled_stage}") + assert cancelled_stage in [ + "CANCELLING", + "CANCELLED", + ], f"Expected export to be in CANCELLING or CANCELLED state, got {cancelled_stage}" + + except APIError as e: + print(f"\nAPI Error during data_export_lifecycle test: {e}") + pytest.fail(f"API Error: {e}") + + finally: + # Cleanup: Try to cancel the export if we haven't already + if export_id and stage not in ["CANCELLING", "CANCELLED"]: + try: + print(f"\nCleaning up: Cancelling export {export_id}") + chronicle.cancel_data_export(export_id) + except APIError as e: + print(f"Error during cleanup: {e}") diff --git a/tests/chronicle/test_integration.py b/tests/chronicle/test_integration.py index 98668c2..0dcf89e 100644 --- a/tests/chronicle/test_integration.py +++ b/tests/chronicle/test_integration.py @@ -913,90 +913,6 @@ def test_chronicle_nl_search(): raise -@pytest.mark.integration -def test_chronicle_data_export(): - """Test Chronicle data export functionality with real API.""" - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) - chronicle = client.chronicle(**CHRONICLE_CONFIG) - - # Set up time range for testing - end_time = datetime.now(timezone.utc) - start_time = end_time - timedelta(days=14) # Look back 1 day - - try: - # First, fetch available log types - log_types_result = chronicle.fetch_available_log_types( - start_time=start_time, - end_time=end_time, - page_size=10, # Limit to 10 for testing - ) - - print( - f"\nFound {len(log_types_result['available_log_types'])} available log types for export" - ) - - # If no log types available, skip the test - if not log_types_result["available_log_types"]: - pytest.skip("No log types available for export in the specified time range") - - # Show some of the available log types - for log_type in log_types_result["available_log_types"][:3]: # Show first 3 - print(f" {log_type.display_name} ({log_type.log_type.split('/')[-1]})") - print(f" Available from {log_type.start_time} to {log_type.end_time}") - - # For the actual export test, we'll create an export but not wait for completion - # Choose a log type that's likely to be present - if log_types_result["available_log_types"]: - selected_log_type = log_types_result["available_log_types"][ - 0 - ].log_type.split("/")[-1] - - # Create a data export (this might fail if the GCS bucket isn't properly set up) - try: - # This part would require a valid GCS bucket to work properly - # We'll make the request but catch and report errors without failing the test - bucket_name = "dk-test-export-bucket" - - export = chronicle.create_data_export( - gcs_bucket=f"projects/{CHRONICLE_CONFIG['project_id']}/buckets/{bucket_name}", - start_time=start_time, - end_time=end_time, - log_type=selected_log_type, - ) - - print(f"\nCreated data export for log type: {selected_log_type}") - print(f"Export ID: {export['name'].split('/')[-1]}") - print(f"Status: {export['data_export_status']['stage']}") - - # Test the get_data_export function - export_id = export["name"].split("/")[-1] - export_status = chronicle.get_data_export(export_id) - print( - f"Retrieved export status: {export_status['data_export_status']['stage']}" - ) - - # Cancel the export - cancelled = chronicle.cancel_data_export(export_id) - print( - f"Cancelled export status: {cancelled['data_export_status']['stage']}" - ) - - except APIError as e: - # Don't fail the test if export creation fails due to permissions - # (GCS bucket access, etc.) - print(f"\nData export creation failed: {str(e)}") - print( - "This is expected if GCS bucket isn't configured or permissions are missing." - ) - - except APIError as e: - print(f"\nAPI Error details: {str(e)}") # Debug print - # If we get "not found" or permission errors, skip rather than fail - if "permission" in str(e).lower() or "not found" in str(e).lower(): - pytest.skip(f"Skipping due to permission issues: {str(e)}") - raise - - @pytest.mark.integration def test_chronicle_batch_log_ingestion(): """Test batch log ingestion with real API.""" diff --git a/tests/cli/test_export_integration.py b/tests/cli/test_export_integration.py new file mode 100644 index 0000000..7413310 --- /dev/null +++ b/tests/cli/test_export_integration.py @@ -0,0 +1,327 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for the SecOps CLI data export commands.""" + +import pytest +import subprocess +import json +import os +from datetime import datetime, timedelta, timezone + +from ..config import CHRONICLE_CONFIG + + +@pytest.mark.integration +def test_cli_export_list_available_types(cli_env, common_args): + """Test the export list-types command.""" + + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=1) # Look back 1 day + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + # Execute the CLI command + cmd = ( + [ + "secops", + ] + + common_args + + [ + "export", + "log-types", + "--start-time", + start_time_str, + "--end-time", + end_time_str, + ] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + # Check that the command executed successfully + assert result.returncode == 0 + + # Try to parse the output as JSON + try: + output = json.loads(result.stdout) + assert "log_types" in output + + # If log types are available, verify their structure + if len(output["log_types"]) > 0: + assert output["log_types"] + except json.JSONDecodeError: + # If not valid JSON, check for expected error messages + assert "Error:" not in result.stdout + + +@pytest.mark.integration +def test_cli_export_lifecycle(cli_env, common_args): + """Test the complete export command lifecycle. + + This test covers: + - Creating a data export + - Listing exports + - Getting export details + - Updating an export (if possible) + - Cancelling the export + """ + # Variables to track resources we create + export_id = None + + try: + # Set up time range for testing + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=1) # Look back 1 day + start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + # Store initial start time for later verification of update + initial_start_time = start_time + + # Step 1: Create a data export with all logs + print("\nStep 1: Creating data export with all logs") + + # Get the bucket from environment or use a test one + bucket_name = os.environ.get( + "TEST_GCS_BUCKET", + "gcs-exports-prober-bucket-us", + ) + + bucket_path = ( + f"projects/{CHRONICLE_CONFIG['project_id']}/buckets/{bucket_name}" + ) + + # Create the export + cmd_create = ( + [ + "secops", + ] + + common_args + + [ + "export", + "create", + "--gcs-bucket", + bucket_path, + "--all-logs", # Use all logs instead of specific log type + "--start-time", + start_time_str, + "--end-time", + end_time_str, + ] + ) + + result_create = subprocess.run( + cmd_create, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + result_create.returncode == 0 + ), f"Create export failed: {result_create.stderr}" + + # Parse the output to get the export ID + try: + create_data = json.loads(result_create.stdout) + export_id = create_data["name"].split("/")[-1] + print(f"Created export with ID: {export_id}") + + # Print the export status + if "dataExportStatus" in create_data: + print( + f"Initial status: {create_data['dataExportStatus']['stage']}" + ) + else: + print( + f"Initial status: {create_data['data_export_status']['stage']}" + ) + + except (json.JSONDecodeError, KeyError) as e: + pytest.fail( + f"Could not parse export ID from creation response: {str(e)}" + ) + + # Step 3: List exports and verify our export is in the list + print("\nListing exports") + cmd_list = ( + [ + "secops", + ] + + common_args + + ["export", "list", "--page-size", "10"] + ) + + result_list = subprocess.run( + cmd_list, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + result_list.returncode == 0 + ), f"List exports failed: {result_list.stderr}" + + # Parse the output and verify our export is in the list + list_data = json.loads(result_list.stdout) + assert "dataExports" in list_data + assert list_data["dataExports"] is not None + + # Find our export in the list + found_in_list = False + for export_item in list_data["dataExports"]: + if export_item["name"].split("/")[-1] == export_id: + found_in_list = True + break + + if found_in_list: + print(f"Successfully found export {export_id} in list results") + else: + print( + f"Export {export_id} not found in list results." + "Could be in other page of list response" + ) + + # Step 4: Get export details + print("\nGetting export details") + cmd_get = ( + [ + "secops", + ] + + common_args + + ["export", "status", "--id", export_id] + ) + + result_get = subprocess.run( + cmd_get, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + result_get.returncode == 0 + ), f"Get export failed: {result_get.stderr}" + + # Parse the output and verify details + get_data = json.loads(result_get.stdout) + assert get_data["name"].split("/")[-1] == export_id + + # Get the current status + if "dataExportStatus" in get_data: + current_status = get_data["dataExportStatus"]["stage"] + else: + current_status = get_data["data_export_status"]["stage"] + + print(f"Current export status: {current_status}") + + # Step 5: Try to update the export if it's in IN_QUEUE state + if current_status == "IN_QUEUE": + print("\nUpdating export (since it's in IN_QUEUE state)") + + # Update the start time to a newer time (2 hours after original start) + new_start_time = initial_start_time + timedelta(hours=2) + new_start_time_str = new_start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + + print( + f"Updating export to use new start time: {new_start_time_str}" + ) + print(f"Previous start time was: {start_time_str}") + + cmd_update = ( + [ + "secops", + ] + + common_args + + [ + "export", + "update", + "--id", + export_id, + "--start-time", + new_start_time_str, # Update the start time instead of log types + ] + ) + + result_update = subprocess.run( + cmd_update, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + result_update.returncode == 0 + ), f"Update export failed: {result_update.stderr}" + + # Parse the output and verify the update + update_data = json.loads(result_update.stdout) + assert update_data["name"].split("/")[-1] == export_id + + print("Successfully updated export with new start time") + else: + print( + f"Skipping update test - export status is {current_status}, not IN_QUEUE" + ) + + # Step 6: Cancel the export (cleanup) + print("\nCancelling export") + cmd_cancel = ( + [ + "secops", + ] + + common_args + + ["export", "cancel", "--id", export_id] + ) + + result_cancel = subprocess.run( + cmd_cancel, env=cli_env, capture_output=True, text=True + ) + + # Check that the command executed successfully + assert ( + result_cancel.returncode == 0 + ), f"Cancel export failed: {result_cancel.stderr}" + + # Parse the output and verify the cancellation + cancel_data = json.loads(result_cancel.stdout) + assert cancel_data["name"].split("/")[-1] == export_id + + # Get the cancelled status + if "dataExportStatus" in cancel_data: + cancelled_status = cancel_data["dataExportStatus"]["stage"] + else: + cancelled_status = cancel_data["data_export_status"]["stage"] + + print(f"Cancelled export status: {cancelled_status}") + assert cancelled_status in ["CANCELLING", "CANCELLED"] + + except Exception as e: + pytest.fail(f"Test failed with error: {str(e)}") + + finally: + # Cleanup: Try to cancel the export if we haven't already and we have an ID + if export_id: + try: + print(f"\nCleaning up: Cancelling export {export_id}") + cmd_cleanup = ( + [ + "secops", + ] + + common_args + + [ + "export", + "cancel", + "--id", + export_id, + ] + ) + subprocess.run( + cmd_cleanup, env=cli_env, capture_output=True, text=True + ) + except Exception as e: + print(f"Error during cleanup: {str(e)}")