From 998b744eca6dbe6c42d940a65f6b99c4ec07baac Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Thu, 20 Mar 2025 16:43:32 -0400 Subject: [PATCH 1/4] add logs:import --- ingestion/v1alpha/logs_import.py | 143 +++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 ingestion/v1alpha/logs_import.py diff --git a/ingestion/v1alpha/logs_import.py b/ingestion/v1alpha/logs_import.py new file mode 100644 index 0000000..e8d231b --- /dev/null +++ b/ingestion/v1alpha/logs_import.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""API Sample to import logs.""" +import argparse +import base64 +import datetime +import json +import logging +from typing import Mapping, Any + +from common import chronicle_auth +from common import project_instance +from common import project_id +from common import regions +from google.auth.transport import requests + +SCOPES = [ + "https://www.googleapis.com/auth/cloud-platform", +] + + +def logs_import( + http_session: requests.AuthorizedSession, + logs_file, + proj_id: str, + region: str, + project_instance: str, + forwarder_id: str) -> Mapping[str, Any]: + """Imports logs to Chronicle using the GCP CLOUDAUDIT log type. + + Args: + http_session: Authorized session for HTTP requests. + logs_file: File-like object containing the logs to import. + proj_id: Google Cloud project ID. + region: Chronicle region. + project_instance: Chronicle instance. + forwarder_id: UUID4 of the forwarder. + + Returns: + dict: JSON response from the API. + + Raises: + requests.HTTPError: If the request fails. + """ + log_type = "GCP_CLOUDAUDIT" + parent = (f"projects/{proj_id}/" + f"locations/{region}/" + f"instances/{project_instance}/" + f"logTypes/{log_type}") + url = (f"https://{region}-chronicle.googleapis.com/" + f"v1alpha/{parent}/logs:import") + logs = logs_file.read() + # Reset file pointer to beginning in case it needs to be read again + logs_file.seek(0) + logs = base64.b64encode(logs.encode("utf-8")).decode("utf-8") + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + body = { + "inline_source": { + "logs": [ + { + "data": logs, + "log_entry_time": now, + "collection_time": now, + } + ], + "forwarder": (f"projects/{proj_id}/" + f"locations/{region}/" + f"instances/{project_instance}/" + f"forwarders/{forwarder_id}") + } + } + response = http_session.request("POST", url, json=body) + if response.status_code >= 400: + logging.error("Error response: %s", response.text) + response.raise_for_status() + logging.info("Request successful with status code: %d", response.status_code) + return response.json() + + +def main(): + """Main entry point for the logs import script.""" + # Configure logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + logger = logging.getLogger(__name__) + + parser = argparse.ArgumentParser(description="Import logs to Chronicle.") + # common + chronicle_auth.add_argument_credentials_file(parser) + project_instance.add_argument_project_instance(parser) + project_id.add_argument_project_id(parser) + regions.add_argument_region(parser) + # local + parser.add_argument( + "--forwarder_id", + type=str, + required=True, + help="UUID4 of the forwarder") + parser.add_argument( + "--logs_file", + type=argparse.FileType("r"), + required=True, + help="path to a log file (or \"-\" for STDIN)") + args = parser.parse_args() + auth_session = chronicle_auth.initialize_http_session( + args.credentials_file, + SCOPES, + ) + try: + result = logs_import( + auth_session, + args.logs_file, + args.project_id, + args.region, + args.project_instance, + args.forwarder_id + ) + logging.info("Import operation completed successfully") + print(json.dumps(result, indent=2)) + except Exception as e: # pylint: disable=broad-except + logging.error("Import operation failed: %s", str(e)) + return 1 + return 0 + + +if __name__ == "__main__": + main() From bf283498e54ebcf44cde12b210f928e543341126 Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Fri, 21 Mar 2025 11:36:31 -0400 Subject: [PATCH 2/4] sort imports --- ingestion/v1alpha/logs_import.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/v1alpha/logs_import.py b/ingestion/v1alpha/logs_import.py index e8d231b..dd49867 100644 --- a/ingestion/v1alpha/logs_import.py +++ b/ingestion/v1alpha/logs_import.py @@ -23,8 +23,8 @@ from typing import Mapping, Any from common import chronicle_auth -from common import project_instance from common import project_id +from common import project_instance from common import regions from google.auth.transport import requests From aa0ded6957cf034620acb1d245a794ebe63e90ff Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Fri, 21 Mar 2025 12:16:47 -0400 Subject: [PATCH 3/4] presubmit lint fixes --- ingestion/v1alpha/logs_import.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ingestion/v1alpha/logs_import.py b/ingestion/v1alpha/logs_import.py index dd49867..b0111d3 100644 --- a/ingestion/v1alpha/logs_import.py +++ b/ingestion/v1alpha/logs_import.py @@ -38,7 +38,7 @@ def logs_import( logs_file, proj_id: str, region: str, - project_instance: str, + proj_instance: str, forwarder_id: str) -> Mapping[str, Any]: """Imports logs to Chronicle using the GCP CLOUDAUDIT log type. @@ -47,7 +47,7 @@ def logs_import( logs_file: File-like object containing the logs to import. proj_id: Google Cloud project ID. region: Chronicle region. - project_instance: Chronicle instance. + proj_instance: Chronicle instance. forwarder_id: UUID4 of the forwarder. Returns: @@ -59,7 +59,7 @@ def logs_import( log_type = "GCP_CLOUDAUDIT" parent = (f"projects/{proj_id}/" f"locations/{region}/" - f"instances/{project_instance}/" + f"instances/{proj_instance}/" f"logTypes/{log_type}") url = (f"https://{region}-chronicle.googleapis.com/" f"v1alpha/{parent}/logs:import") @@ -79,7 +79,7 @@ def logs_import( ], "forwarder": (f"projects/{proj_id}/" f"locations/{region}/" - f"instances/{project_instance}/" + f"instances/{proj_instance}/" f"forwarders/{forwarder_id}") } } @@ -131,13 +131,13 @@ def main(): args.project_instance, args.forwarder_id ) - logging.info("Import operation completed successfully") + logger.info("Import operation completed successfully") print(json.dumps(result, indent=2)) except Exception as e: # pylint: disable=broad-except - logging.error("Import operation failed: %s", str(e)) + logger.error("Import operation failed: %s", str(e)) return 1 return 0 if __name__ == "__main__": - main() + main() From 689d4439595d8acac796aa09446069d701582908 Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Mon, 24 Mar 2025 12:23:22 -0400 Subject: [PATCH 4/4] arg for log_type --- ingestion/v1alpha/logs_import.py | 37 ++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/ingestion/v1alpha/logs_import.py b/ingestion/v1alpha/logs_import.py index b0111d3..d347e6b 100644 --- a/ingestion/v1alpha/logs_import.py +++ b/ingestion/v1alpha/logs_import.py @@ -28,6 +28,7 @@ from common import regions from google.auth.transport import requests +CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com" SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", ] @@ -35,19 +36,21 @@ def logs_import( http_session: requests.AuthorizedSession, - logs_file, proj_id: str, - region: str, proj_instance: str, + proj_region: str, + log_type: str, + logs_file: str, forwarder_id: str) -> Mapping[str, Any]: """Imports logs to Chronicle using the GCP CLOUDAUDIT log type. Args: http_session: Authorized session for HTTP requests. - logs_file: File-like object containing the logs to import. proj_id: Google Cloud project ID. - region: Chronicle region. proj_instance: Chronicle instance. + proj_region: Chronicle region. + log_type: Log type. + logs_file: File-like object containing the logs to import. forwarder_id: UUID4 of the forwarder. Returns: @@ -56,13 +59,16 @@ def logs_import( Raises: requests.HTTPError: If the request fails. """ - log_type = "GCP_CLOUDAUDIT" parent = (f"projects/{proj_id}/" - f"locations/{region}/" + f"locations/{proj_region}/" f"instances/{proj_instance}/" f"logTypes/{log_type}") - url = (f"https://{region}-chronicle.googleapis.com/" - f"v1alpha/{parent}/logs:import") + + base_url_with_region = regions.url_always_prepend_region( + CHRONICLE_API_BASE_URL, + proj_region + ) + url = (f"{base_url_with_region}/v1alpha/{parent}/logs:import") logs = logs_file.read() # Reset file pointer to beginning in case it needs to be read again logs_file.seek(0) @@ -75,10 +81,13 @@ def logs_import( "data": logs, "log_entry_time": now, "collection_time": now, + "labels": { + "forwarder_id": {"value": forwarder_id} + } } ], "forwarder": (f"projects/{proj_id}/" - f"locations/{region}/" + f"locations/{proj_region}/" f"instances/{proj_instance}/" f"forwarders/{forwarder_id}") } @@ -112,6 +121,11 @@ def main(): type=str, required=True, help="UUID4 of the forwarder") + parser.add_argument( + "--log_type", + type=str, + required=True, + help="Log type") parser.add_argument( "--logs_file", type=argparse.FileType("r"), @@ -125,10 +139,11 @@ def main(): try: result = logs_import( auth_session, - args.logs_file, args.project_id, - args.region, args.project_instance, + args.region, + args.log_type, + args.logs_file, args.forwarder_id ) logger.info("Import operation completed successfully")