Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Chronicle API CLI Configuration
CHRONICLE_CREDENTIALS_FILE=path/to/credentials.json
CHRONICLE_PROJECT_ID=your-project-id
CHRONICLE_INSTANCE=your-instance-id
CHRONICLE_REGION=your-region
45 changes: 45 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -4,3 +4,48 @@ __pycache__/
venv/

node_modules/

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Virtual Environment
venv/
env/
ENV/

# IDE
.idea/
.vscode/
*.swp
*.swo

# Environment Variables
.env
.env.*
!.env.example

# Credentials
*credentials*.json
*creds*.json

# Logs
*.log
4 changes: 4 additions & 0 deletions .style.yapf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[style]
based_on_style = google
indent_width = 2
column_limit = 80
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.PHONY: install dist clean

build:
python -m build

install:
python setup.py install --force

dist:
python setup.py bdist_wheel

clean:
rm -rf build/ dist/ *.egg-info/
170 changes: 160 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -30,15 +30,7 @@ samples try to use the file `.chronicle_credentials.json` in the user's home
directory. If this file is not found, you need to specify it explicitly by
adding the following argument to the sample's command-line:

```shell
-c <file_path>
```

or

```shell
--credentials_file <file_path>
```
`shell -c <file_path>` or `shell --credentials_file <file_path>`

## Usage

@@ -60,8 +52,166 @@ python3 -m lists.<sample_name> -h

### Lists API v1alpha

```
```shell
python -m lists.v1alpha.create_list -h
python -m lists.v1alpha.get_list -h
python -m lists.v1alpha.patch_list -h
```

## Installing the Chronicle REST API CLI

Install the CLI from source
```
python setup.py install
```

Alternatively, install the CLI from source using make
```
make install
```

Build the wheel file
```
make dist
```

## Using the Chronicle REST API CLI

The CLI provides a unified command-line interface for Chronicle APIs.
The CLI follows this pattern:
```
chronicle [common options] COMMAND_GROUP COMMAND [command options]
```

### Common Options

Common options can be provided either via command-line arguments or environment
variables:

| CLI Option | Environment Variable | Description |
|--------------------|----------------------------|--------------------------------|
| --credentials-file | CHRONICLE_CREDENTIALS_FILE | Path to service account file |
| --project-id | CHRONICLE_PROJECT_ID | GCP project id or number |
| --project-instance | CHRONICLE_INSTANCE | Chronicle instance ID (uuid) |
| --region | CHRONICLE_REGION | Region where project is located|

You can set these options in a `.env` file in your project root:

```bash
# .env file
CHRONICLE_CREDENTIALS_FILE=path/to/credentials.json
CHRONICLE_PROJECT_ID=your-project-id
CHRONICLE_INSTANCE=your-instance-id
CHRONICLE_REGION=your-region
```

The CLI will use values from the `.env` file or a file provided with the
`--env-file` parameter. Command-line options take precedence over environment
variables.

### Command Groups

#### Detection API
```bash
chronicle detect <command-group> <command> [options]
```

Available command groups:

- `alerts`
- `get <alert-id>`: Get alert by ID
- `update <alert-id>`: Update an alert
- `bulk-update`: Bulk update alerts matching a filter

- `detections`
- `get <detection-id>`: Get detection by ID
- `list [--filter <filter>]`: List detections

- `rules`
- `create`: Create a new rule
- `get <rule-id>`: Get rule by ID
- `delete <rule-id>`: Delete a rule
- `enable <rule-id>`: Enable a rule
- `list [--filter <filter>]`: List rules

- `retrohunts`
- `create`: Create a new retrohunt
- `get <retrohunt-id>`: Get retrohunt by ID

- `errors`
- `list [--filter <filter>]`: List errors

- `rulesets`
- `batch-update`: Batch update rule set deployments

#### Ingestion API
```bash
chronicle ingestion <command> [options]
```

Available commands:

- `import-events`: Import events into Chronicle
- `get-event <event-id>`: Get event details
- `batch-get-events`: Batch retrieve events

#### Search API
```bash
chronicle search <command> [options]
```

Available commands:

- `find-asset-events [--filter <filter>]`: Find events for an asset
- `find-raw-logs [--filter <filter>]`: Search raw logs
- `find-udm-events [--filter <filter>]`: Find UDM events

#### Lists API
```bash
chronicle lists <command> [options]
```

Available commands:

- `create <name> [--description <desc>] --lines <json-array>`: Create a new list
- `get <list-id>`: Get list by ID
- `patch <list-id> [--description <desc>]
[--lines-to-add <json-array>] \
[--lines-to-remove <json-array>]`: Update an existing list

### Examples

Using environment variables (after setting up .env):
```bash
# Get an alert
chronicle detect alerts get --alert-id ABC123 --env-file=.env

# Create a list
chronicle lists create --name "blocklist" --description "Blocked IPs" \
--lines '["1.1.1.1", "2.2.2.2"]' \
--env-file=.env

# Search for events
chronicle search find-raw-logs --filter "timestamp.seconds > 1600000000" \
--env-file=.env

# Override a specific environment variable
chronicle --region us-central1 detect alerts get --alert-id ABC123 \
--env-file=.env
```

## Running Individual Scripts

You can also run individual API sample scripts directly.
Each script supports the `-h` flag to show available options:

```bash
# Get help for a specific script
python -m detect.v1alpha.get_alert -h
python -m search.v1alpha.find_asset_events -h
python -m lists.v1alpha.patch_list -h
```

## License

Apache 2.0 - See [LICENSE](LICENSE) for more information.
13 changes: 13 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
19 changes: 19 additions & 0 deletions chronicle_api.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Metadata-Version: 2.2
Name: chronicle-api
Version: 0.1.3
Summary: CLI for Chronicle API
Author: Google LLC
Author-email: [email protected]
License: Apache 2.0
Requires-Python: >=3.10
License-File: LICENSE
Requires-Dist: click>=8.0.0
Requires-Dist: google-auth>=2.0.0
Requires-Dist: requests>=2.25.0
Requires-Dist: python-dotenv>=1.0.0
Dynamic: author
Dynamic: author-email
Dynamic: license
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary
66 changes: 66 additions & 0 deletions chronicle_api.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
LICENSE
README.md
pyproject.toml
setup.py
chronicle_api.egg-info/PKG-INFO
chronicle_api.egg-info/SOURCES.txt
chronicle_api.egg-info/dependency_links.txt
chronicle_api.egg-info/entry_points.txt
chronicle_api.egg-info/requires.txt
chronicle_api.egg-info/top_level.txt
common/__init__.py
common/chronicle_auth.py
common/chronicle_auth_test.py
common/datetime_converter.py
common/datetime_converter_test.py
common/project_id.py
common/project_instance.py
common/regions.py
common/regions_test.py
detect/v1alpha/__init__.py
detect/v1alpha/batch_update_curated_rule_set_deployments.py
detect/v1alpha/bulk_update_alerts.py
detect/v1alpha/create_retrohunt.py
detect/v1alpha/create_rule.py
detect/v1alpha/delete_rule.py
detect/v1alpha/enable_rule.py
detect/v1alpha/get_alert.py
detect/v1alpha/get_detection.py
detect/v1alpha/get_retrohunt.py
detect/v1alpha/get_rule.py
detect/v1alpha/list_detections.py
detect/v1alpha/list_errors.py
detect/v1alpha/list_rules.py
detect/v1alpha/update_alert.py
detect/v1alpha/update_rule.py
ingestion/v1alpha/__init__.py
ingestion/v1alpha/create_udm_events.py
ingestion/v1alpha/event_import.py
ingestion/v1alpha/events_batch_get.py
ingestion/v1alpha/events_get.py
ingestion/v1alpha/get_udm_event.py
iocs/v1alpha/__init__.py
iocs/v1alpha/batch_get_iocs.py
iocs/v1alpha/get_ioc.py
iocs/v1alpha/get_ioc_state.py
lists/v1alpha/__init__.py
lists/v1alpha/create_list.py
lists/v1alpha/get_list.py
lists/v1alpha/patch_list.py
lists/v1alpha/patch_list_test.py
cli/__init__.py
cli/cli.py
cli/commands/__init__.py
cli/commands/common.py
cli/commands/detect.py
cli/commands/ingestion.py
cli/commands/iocs.py
cli/commands/lists.py
cli/commands/search.py
search/v1alpha/__init__.py
search/v1alpha/asset_events_find.py
search/v1alpha/client.py
search/v1alpha/raw_logs_find.py
search/v1alpha/search_queries_list.py
search/v1alpha/search_query_get.py
search/v1alpha/udm_events_find.py
1 change: 1 addition & 0 deletions chronicle_api.egg-info/dependency_links.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions chronicle_api.egg-info/entry_points.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[console_scripts]
chronicle = cli.cli:cli
4 changes: 4 additions & 0 deletions chronicle_api.egg-info/requires.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
click>=8.0.0
google-auth>=2.0.0
requests>=2.25.0
python-dotenv>=1.0.0
7 changes: 7 additions & 0 deletions chronicle_api.egg-info/top_level.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
common
detect
ingestion
iocs
lists
cli
search
13 changes: 13 additions & 0 deletions cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
72 changes: 72 additions & 0 deletions cli/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Chronicle API Command Line Interface.
This module provides a unified CLI for interacting with Chronicle APIs.
"""

from cli.commands import detect
from cli.commands import ingestion
from cli.commands import iocs
from cli.commands import lists
from cli.commands import search
import click


def add_common_options(func):
"""Add common options to a command."""
func = click.option(
"--credentials-file",
required=True,
help="Path to service account credentials file.",
)(func)
func = click.option(
"--project-id",
required=True,
help="GCP project id or number to which the target instance belongs.",
)(func)
func = click.option(
"--project-instance",
required=True,
help="Customer ID (uuid with dashes) for the Chronicle instance.",
)(func)
func = click.option(
"--region",
required=True,
help="Region in which the target project is located.",
)(func)
return func


@click.group()
def cli():
"""Chronicle API Command Line Interface.
This CLI provides access to Chronicle's detection, ingestion, IoCs,
search, and lists APIs.
"""
pass


# Add command groups
cli.add_command(detect.detect)
cli.add_command(ingestion.ingestion)
cli.add_command(iocs.iocs)
cli.add_command(lists.lists)
cli.add_command(search.search)

if __name__ == "__main__":
cli()
13 changes: 13 additions & 0 deletions cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
123 changes: 123 additions & 0 deletions cli/commands/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utilities for CLI commands."""

import functools
import os

import click
import dotenv


def get_env_value(key, default=None):
"""Gets value from environment variable with Chronicle prefix.
Args:
key: The environment variable key (without CHRONICLE_ prefix).
default: Default value if environment variable is not set.
Returns:
The value of the environment variable or the default value.
"""
return os.getenv(f"CHRONICLE_{key}", default)


def add_common_options(func):
"""Adds common CLI options to a command.
Adds standard Chronicle CLI options for region, project instance, project ID,
credentials file, and environment file. Values can be provided via command
line arguments or environment variables.
Args:
func: The function to wrap with common options.
Returns:
A decorated function that includes common Chronicle CLI options.
"""

# Add CLI options first
@click.option(
"--region",
required=False,
help="Region in which the target project is located. Can also be set "
"via CHRONICLE_REGION env var.",
)
@click.option(
"--project-instance",
required=False,
help="Customer ID (uuid with dashes) for the Chronicle instance. "
"Can also be set via CHRONICLE_INSTANCE env var.",
)
@click.option(
"--project-id",
required=False,
help="GCP project id or number. Can also be set via CHRONICLE_PROJECT_ID "
"env var.",
)
@click.option(
"--credentials-file",
required=False,
help="Path to service account credentials file. Can also be set via "
"CHRONICLE_CREDENTIALS_FILE env var.",
)
@click.option(
"--env-file",
required=False,
help="Path to .env file containing configuration variables.",
)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Load environment variables from .env file
env_file = kwargs.pop("env_file", None)
if env_file:
dotenv.load_dotenv(env_file)
else:
# Look for .env in the current working directory
cwd_env = os.path.join(os.getcwd(), ".env")
dotenv.load_dotenv(cwd_env)

# Now validate required options
missing = []
# pylint: disable=line-too-long
if not kwargs.get("credentials_file") and not get_env_value(
"CREDENTIALS_FILE"):
# pylint: enable=line-too-long
missing.append("credentials-file (or CHRONICLE_CREDENTIALS_FILE)")
if not kwargs.get("project_id") and not get_env_value("PROJECT_ID"):
missing.append("project-id (or CHRONICLE_PROJECT_ID)")
if not kwargs.get("project_instance") and not get_env_value("INSTANCE"):
missing.append("project-instance (or CHRONICLE_INSTANCE)")
if not kwargs.get("region") and not get_env_value("REGION"):
missing.append("region (or CHRONICLE_REGION)")

if missing:
raise click.UsageError(
f"Missing required options: {', '.join(missing)}\n"
"These can be provided via command line options or environment "
"variables.")

# If options not provided via CLI, get from environment
if not kwargs.get("credentials_file"):
kwargs["credentials_file"] = get_env_value("CREDENTIALS_FILE")
if not kwargs.get("project_id"):
kwargs["project_id"] = get_env_value("PROJECT_ID")
if not kwargs.get("project_instance"):
kwargs["project_instance"] = get_env_value("INSTANCE")
if not kwargs.get("region"):
kwargs["region"] = get_env_value("REGION")

return func(*args, **kwargs)

return wrapper
491 changes: 491 additions & 0 deletions cli/commands/detect.py

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions cli/commands/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Chronicle Ingestion API commands."""

import click
from common import chronicle_auth

from ingestion.v1alpha import event_import
from ingestion.v1alpha import events_batch_get
from ingestion.v1alpha import events_get

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


@click.group()
def ingestion():
"""Ingestion API commands."""
pass


@ingestion.command("import-events")
@click.option(
"--json-events",
required=True,
help="Events in (serialized) JSON format.",
)
@click.pass_context
def import_events_cmd(ctx, json_events):
"""Import events into Chronicle."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
event_import.import_events(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
json_events,
)


@ingestion.command("get-event")
@click.option(
"--event-id",
required=True,
help="The ID of the event to retrieve.",
)
@click.pass_context
def get_event_cmd(ctx, event_id):
"""Get event details by ID."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
events_get.get_event(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
event_id,
)


@ingestion.command("batch-get-events")
@click.option(
"--event-ids",
required=True,
help="JSON string containing a list of event IDs to retrieve.",
)
@click.pass_context
def batch_get_events_cmd(ctx, event_ids):
"""Batch get events by IDs."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
events_batch_get.batch_get_events(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
event_ids,
)
162 changes: 162 additions & 0 deletions cli/commands/iocs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Chronicle IoCs API commands."""

import json

from cli.commands.common import add_common_options
import click
from common import chronicle_auth
from iocs.v1alpha import batch_get_iocs
from iocs.v1alpha import get_ioc
from iocs.v1alpha import get_ioc_state


SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


@click.group()
def iocs():
"""IoCs API commands."""
pass


@iocs.command("batch-get")
@add_common_options
@click.option(
"--ioc-values",
required=True,
help="JSON array of IoC values to retrieve.",
)
@click.option(
"--ioc-type",
type=click.Choice([
"IOC_TYPE_UNSPECIFIED",
"DOMAIN",
"IP",
"FILE_HASH",
"URL",
"USER_EMAIL",
"MUTEX",
"FILE_HASH_MD5",
"FILE_HASH_SHA1",
"FILE_HASH_SHA256",
"IOC_TYPE_RESOURCE",
]),
required=True,
help="Type of IoCs being requested.",
)
def batch_get_iocs_cmd(credentials_file, project_id, project_instance, region,
ioc_values, ioc_type):
"""Get multiple IoCs by their values."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)

ioc_values_list = json.loads(ioc_values)

result = batch_get_iocs.batch_get_iocs(
auth_session,
project_id,
project_instance,
region,
ioc_values_list,
ioc_type,
)
print(json.dumps(result, indent=2))


@iocs.command("get")
@add_common_options
@click.option(
"--ioc-value",
required=True,
help="Value of the IoC to retrieve.",
)
@click.option(
"--ioc-type",
type=click.Choice([
"IOC_TYPE_UNSPECIFIED",
"DOMAIN",
"IP",
"FILE_HASH",
"URL",
"USER_EMAIL",
"MUTEX",
"FILE_HASH_MD5",
"FILE_HASH_SHA1",
"FILE_HASH_SHA256",
"IOC_TYPE_RESOURCE",
]),
required=True,
help="Type of IoC being requested.",
)
def get_ioc_cmd(credentials_file, project_id, project_instance, region,
ioc_value, ioc_type):
"""Get a single IoC by its value."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)

result = get_ioc.get_ioc(
auth_session,
project_id,
project_instance,
region,
ioc_value,
ioc_type,
)
print(json.dumps(result, indent=2))


@iocs.command("get-state")
@add_common_options
@click.option(
"--ioc-value",
required=True,
help="Value of the IoC to get state for.",
)
@click.option(
"--ioc-type",
type=click.Choice([
"IOC_TYPE_UNSPECIFIED", "DOMAIN", "IP", "FILE_HASH", "URL",
"USER_EMAIL", "MUTEX", "FILE_HASH_MD5", "FILE_HASH_SHA1",
"FILE_HASH_SHA256", "IOC_TYPE_RESOURCE"
]),
required=True,
help="Type of IoC being requested.",
)
def get_ioc_state_cmd(credentials_file, project_id, project_instance, region,
ioc_value, ioc_type):
"""Get the state of an IoC by its value."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)

result = get_ioc_state.get_ioc_state(
auth_session,
project_id,
project_instance,
region,
ioc_value,
ioc_type,
)
print(json.dumps(result, indent=2))
139 changes: 139 additions & 0 deletions cli/commands/lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Chronicle Lists API commands."""

import json

from cli.commands.common import add_common_options
import click
from common import chronicle_auth
from lists.v1alpha import create_list
from lists.v1alpha import get_list
from lists.v1alpha import patch_list

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


@click.group()
def lists():
"""Lists API commands."""
pass


@lists.command("create")
@add_common_options
@click.option(
"--name",
required=True,
help="Name of the list to create.",
)
@click.option(
"--description",
help="Description of the list.",
)
@click.option(
"--lines",
required=True,
help="JSON array of strings to add to the list.",
)
def create_list_cmd(credentials_file, project_id, project_instance, region,
name, description, lines):
"""Create a new list."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)
result = create_list.create_list(
auth_session,
project_id,
project_instance,
region,
name,
description,
lines,
)
print(json.dumps(result, indent=2))


@lists.command("get")
@add_common_options
@click.option(
"--list-id",
required=True,
help="ID of the list to retrieve.",
)
def get_list_cmd(credentials_file, project_id, project_instance, region,
list_id):
"""Get a list by ID."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)
result = get_list.get_list(
auth_session,
project_id,
project_instance,
region,
list_id,
)
print(json.dumps(result, indent=2))


@lists.command("patch")
@add_common_options
@click.option(
"--list-id",
required=True,
help="ID of the list to update.",
)
@click.option(
"--description",
help="New description for the list.",
)
@click.option(
"--lines-to-add",
help="JSON array of strings to add to the list.",
)
@click.option(
"--lines-to-remove",
help="JSON array of strings to remove from the list.",
)
def patch_list_cmd(credentials_file, project_id, project_instance, region,
list_id, description, lines_to_add, lines_to_remove):
"""Update an existing list."""
auth_session = chronicle_auth.initialize_http_session(
credentials_file,
SCOPES,
)
result = patch_list.patch_list(
auth_session,
project_id,
project_instance,
region,
list_id,
description,
lines_to_add,
lines_to_remove,
)
print(json.dumps(result, indent=2))






204 changes: 204 additions & 0 deletions cli/commands/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Chronicle Search API commands."""

import click
from common import chronicle_auth

from search.v1alpha import asset_events_find
from search.v1alpha import raw_logs_find
from search.v1alpha import search_query_get
from search.v1alpha import udm_events_find

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


@click.group()
def search():
"""Search API commands."""
pass


@search.command("find-asset-events")
@click.option(
"--asset-indicator",
required=True,
help="The asset indicator to search for.",
)
@click.option(
"--start-time",
required=True,
help="RFC3339 formatted timestamp for the start time.",
)
@click.option(
"--end-time",
required=True,
help="RFC3339 formatted timestamp for the end time.",
)
@click.option(
"--page-size",
type=int,
help="Optional maximum number of results to return.",
)
@click.option(
"--page-token",
help="Optional page token from a previous response.",
)
@click.pass_context
def find_asset_events_cmd(ctx, asset_indicator, start_time, end_time, page_size,
page_token):
"""Find asset events within a time range."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
asset_events_find.find_asset_events(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
asset_indicator,
start_time,
end_time,
page_size,
page_token,
)


@search.command("find-raw-logs")
@click.option(
"--query",
required=True,
help="Search parameters that expand or restrict the search.",
)
@click.option(
"--batch-tokens",
multiple=True,
help="Optional list of tokens that should be downloaded.",
)
@click.option(
"--log-ids",
multiple=True,
help="Optional list of raw log ids that should be downloaded.",
)
@click.option(
"--regex-search",
is_flag=True,
help="Treat query as regex.",
)
@click.option(
"--case-sensitive",
is_flag=True,
help="Make search case-sensitive.",
)
@click.option(
"--max-response-size",
type=int,
help="Optional maximum response size in bytes.",
)
@click.pass_context
def find_raw_logs_cmd(ctx, query, batch_tokens, log_ids, regex_search,
case_sensitive, max_response_size):
"""Find raw logs based on search criteria."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
raw_logs_find.find_raw_logs(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
query,
list(batch_tokens) if batch_tokens else None,
list(log_ids) if log_ids else None,
regex_search,
case_sensitive,
max_response_size,
)


@search.command("find-udm-events")
@click.option(
"--tokens",
multiple=True,
help=
"Optional list of tokens, with each token referring to a group of "
"UDM/Entity events.",
)
@click.option(
"--event-ids",
multiple=True,
help="Optional list of UDM/Entity event ids that should be returned.",
)
@click.option(
"--return-unenriched-data",
is_flag=True,
help="Return unenriched data.",
)
@click.option(
"--return-all-events-for-log",
is_flag=True,
help="Return all events generated from the ingested log.",
)
@click.pass_context
def find_udm_events_cmd(ctx, tokens, event_ids, return_unenriched_data,
return_all_events_for_log):
"""Find UDM events based on tokens or event IDs."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
udm_events_find.find_udm_events(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
list(tokens) if tokens else None,
list(event_ids) if event_ids else None,
return_unenriched_data,
return_all_events_for_log,
)


@search.command("get-search-query")
@click.option(
"--user-id",
required=True,
help="ID of the user who owns the search query.",
)
@click.option(
"--query-id",
required=True,
help="ID of the search query to retrieve.",
)
@click.pass_context
def get_search_query_cmd(ctx, user_id, query_id):
"""Get a search query by ID."""
auth_session = chronicle_auth.initialize_http_session(
ctx.obj["credentials_file"],
SCOPES,
)
search_query_get.get_search_query(
auth_session,
ctx.obj["project_id"],
ctx.obj["project_instance"],
ctx.obj["region"],
user_id,
query_id,
)
2 changes: 1 addition & 1 deletion common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
13 changes: 13 additions & 0 deletions detect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2 changes: 1 addition & 1 deletion detect/v1alpha/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
151 changes: 79 additions & 72 deletions detect/v1alpha/batch_update_curated_rule_set_deployments.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,155 +14,162 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable sample for batch updating curated rule sets deployments.
Sample Commands (run from api_samples_python dir):
# Modify the script to update the constants that point to deployments.
python3 -m detect.v1alpha.batch_update_curated_rule_set_deployments \
-r=<region> -p=<project_id> -i=<instance_id>
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for batch updating curated rule set deployments.
Usage:
python -m detect.v1alpha.batch_update_curated_rule_set_deployments \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION>
# The script contains example category/rule_set/precision IDs that need to be updated:
# Category A: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
# Rule Set A: bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb
# Precision A: broad
#
# Category B: cccccccc-cccc-cccc-cccc-cccccccccccc
# Rule Set B: dddddddd-dddd-dddd-dddd-dddddddddddd
# Precision B: precise
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments/batchUpdate
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments#CuratedRuleSetDeployment
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments/batchUpdate
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.curatedRuleSetCategories.curatedRuleSets.curatedRuleSetDeployments#CuratedRuleSetDeployment
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def batch_update_curated_rule_set_deployments(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
proj_region: str,
) -> Mapping[str, Any]:
"""Batch update curated rule set deployments.
"""Batch updates multiple curated rule set deployments.
Args:
http_session: Authorized session for HTTP requests.
proj_region: region in which the target project is located
proj_id: GCP project id or number which the target instance belongs to
proj_instance: uuid of the instance (with dashes)
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
Returns:
an object with information about the modified deployments
Dictionary containing information about the modified deployments.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
"""
Requires the following IAM permission on the parent resource:
chronicle.curatedRuleSetDeployments.update
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"

# We use "-" in the URL because we provide category and rule_set IDs
# in the request data.
# in the request data
# pylint: disable-next=line-too-long
url = f"{base_url_with_region}/v1alpha/{parent}/curatedRuleSetCategories/-/curatedRuleSets/-/curatedRuleSetDeployments:batchUpdate"

# Helper function for making a deployment name. Use this as the
# curated_rule_set_deployment.name field in the request data below.
def make_deployment_name(category, rule_set, precision):
def make_deployment_name(category: str, rule_set: str, precision: str) -> str:
"""Helper function to create a deployment name."""
# pylint: disable-next=line-too-long
return f"{parent}/curatedRuleSetCategories/{category}/curatedRuleSets/{rule_set}/curatedRuleSetDeployments/{precision}"

# Note that IDs are hard-coded below, as examples.
print("\nCategories, rule sets, and precisions are hard-coded as " +
"examples. Update the script to provide actual IDs.\n"
)

# Modify the category/rule_set/precision for each deployment below.
# Deployment A.
# Example deployment configurations - update these with actual IDs
# Deployment A
category_a = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
rule_set_a = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
precision_a = "broad"

# Deployment B.
# Deployment B
category_b = "cccccccc-cccc-cccc-cccc-cccccccccccc"
rule_set_b = "dddddddd-dddd-dddd-dddd-dddddddddddd"
precision_b = "precise"

# Modify the data below to change the behavior of the request.
# - Add elements to `requests` to batch update multiple deployments
# - Change the enabled and alerting fields as needed
# - Change the update_mask to modify only certain properties
print("\nNOTE: Using example category/rule_set/precision IDs.")
print("Please update the script with actual IDs before use.\n")

json_data = {
"parent": f"{parent}/curatedRuleSetCategories/-/curatedRuleSets/-",
"parent":
f"{parent}/curatedRuleSetCategories/-/curatedRuleSets/-",
"requests": [
{
"curated_rule_set_deployment": {
"name": make_deployment_name(
category_a,
rule_set_a,
precision_a,
),
"enabled": True,
"alerting": False,
"name":
make_deployment_name(
category_a,
rule_set_a,
precision_a,
),
"enabled":
True,
"alerting":
False,
},
"update_mask": {
"paths": ["alerting", "enabled"],
},
},
{
"curated_rule_set_deployment": {
"name": make_deployment_name(
category_b,
rule_set_b,
precision_b,
),
"enabled": True,
"alerting": True,
"name":
make_deployment_name(
category_b,
rule_set_b,
precision_b,
),
"enabled":
True,
"alerting":
True,
},
"update_mask": {
"paths": ["alerting", "enabled"],
},
},
],
}
}

# See API reference links at top of this file, for response format.
response = http_session.request("POST", url, json=json_data)

if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
regions.add_argument_region(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
project_instance.add_argument_project_instance(parser)
regions.add_argument_region(parser)

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
session = chronicle_auth.initialize_http_session(args.credentials_file)
print(
json.dumps(
batch_update_curated_rule_set_deployments(
auth_session,
args.region,
args.project_id,
args.project_instance,
),
indent=2,
)
)

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
result = batch_update_curated_rule_set_deployments(auth_session,
args.project_id,
args.project_instance,
args.region)
print(json.dumps(result, indent=2))
2 changes: 1 addition & 1 deletion detect/v1alpha/bulk_update_alerts.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
#
r"""Executable and reusable sample for bulk updating alerts.
The file provided to the --alert_ids_file parameter should have one alert
The file provided to the --alert_ids_file parameter should have one alert
ID per line like so:
```
de_ad9d2771-a567-49ee-6452-1b2db13c1d33
114 changes: 53 additions & 61 deletions detect/v1alpha/create_retrohunt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,21 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable sample for creating a retrohunt.
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for creating a retrohunt.
Sample Commands (run from api_samples_python dir):
python3 -m detect.v1alpha.create_retrohunt \
-r=<region> -p=<project_id> -i=<instance_id> -rid=<rule_id> \
-st="2023-10-02T18:00:00Z" -et="2023-10-02T20:00:00Z"
Usage:
python -m detect.v1alpha.create_retrohunt \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_id=ru_<UUID> \
--start_time=2023-10-02T18:00:00Z \
--end_time=2023-10-02T20:00:00Z
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts/create
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.operations#Operation
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts/create
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.operations#Operation
"""
# pylint: enable=line-too-long

import argparse
import datetime
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import datetime_converter
from common import project_id
@@ -37,106 +45,90 @@
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def create_retrohunt(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
proj_region: str,
rule_id: str,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> Mapping[str, Any]:
"""Creates a retrohunt.
"""Creates a retrohunt to run a detection rule over historical data.
Args:
http_session: Authorized session for HTTP requests.
proj_region: region in which the target project is located
proj_id: GCP project id or number which the target instance belongs to
proj_instance: uuid of the instance (with dashes)
rule_id: Unique ID of the detection rule to retrieve ("ru_<UUID>").
start_time: the start time of the event time range this retrohunt will be
executed over
end_time: the end time of the event time range this retrohunt will be
executed over
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
rule_id: Unique ID of the detection rule to run (in the format "ru_<UUID>").
start_time: Start time of the event time range for the retrohunt.
end_time: End time of the event time range for the retrohunt.
Returns:
an Operation resource object containing relevant retrohunt's information
Dictionary containing the Operation resource for the retrohunt.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.retrohunts.create
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
# pylint: disable-next=line-too-long
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}/retrohunts"
# pylint: enable=line-too-long

body = {
"process_interval": {
"start_time": datetime_converter.strftime(start_time),
"end_time": datetime_converter.strftime(end_time),
},
}

# See API reference links at top of this file, for response format.
response = http_session.request("POST", url, json=body)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
regions.add_argument_region(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"-rid",
"--rule_id",
type=str,
required=True,
help='rule ID to create retrohunt for. In the form of "ru_<UUID>"',
)
parser.add_argument(
"-st",
"--start_time",
type=datetime_converter.iso8601_datetime_utc,
required=True,
help="Retrohunt start time in UTC ('yyyy-mm-ddThh:mm:ssZ')",
)
parser.add_argument(
"-et",
"--end_time",
type=datetime_converter.iso8601_datetime_utc,
required=True,
help="Retrohunt end time in UTC ('yyyy-mm-ddThh:mm:ssZ')",
)
help='ID of rule to create retrohunt for (format: "ru_<UUID>")')
parser.add_argument("--start_time",
type=datetime_converter.iso8601_datetime_utc,
required=True,
help="Start time in UTC (format: yyyy-mm-ddThh:mm:ssZ)")
parser.add_argument("--end_time",
type=datetime_converter.iso8601_datetime_utc,
required=True,
help="End time in UTC (format: yyyy-mm-ddThh:mm:ssZ)")

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
print(
json.dumps(
create_retrohunt(
auth_session,
args.region,
args.project_id,
args.project_instance,
args.rule_id,
args.start_time,
args.end_time,
),
indent=2,
)
)

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
result = create_retrohunt(auth_session, args.project_id,
args.project_instance, args.region, args.rule_id,
args.start_time, args.end_time)
print(json.dumps(result, indent=2))
69 changes: 32 additions & 37 deletions detect/v1alpha/create_rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,27 +14,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for creating a detection rule.
Sample Commands (run from api_samples_python dir):
# From file
python3 -m detect.v1alpha.create_rule \
--region $region \
--project_instance $project_instance \
--project_id $PROJECT_ID \
--rule_file=./path/to/rule/rulename.yaral
# From stdin
cat ./path/rulename.yaral | python3 -m detect.v1alpha.create_rule \
--region $region \
--project_instance $project_instance \
--project_id $PROJECT_ID \
--rule_file -
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for creating a detection rule.
Usage:
python -m detect.v1alpha.create_rule \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_file=./path/to/rule/rulename.yaral
# Or from stdin:
cat ./path/rulename.yaral | python -m detect.v1alpha.create_rule \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_file=-
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/create
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/create
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
"""
# pylint: enable=line-too-long

import argparse
import json
@@ -47,7 +48,6 @@
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]
@@ -70,16 +70,17 @@ def create_rule(
rule_file_path: Content of the new detection rule, used to evaluate logs.
Returns:
New detection rule.
Dictionary containing the newly created detection rule.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.rules.create
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules"
@@ -88,11 +89,11 @@ def create_rule(
"text": rule_file_path.read(),
}

# See API reference links at top of this file, for response format.
response = http_session.request("POST", url, json=body)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


@@ -105,21 +106,15 @@ def create_rule(
regions.add_argument_region(parser)
# local
parser.add_argument(
"-f",
"--rule_file",
type=argparse.FileType("r"),
required=True,
help="path of a file with the desired rule's content, or - for STDIN",
)
help="Path to file containing the rule content, or - for STDIN")

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
new_rule = create_rule(auth_session,
args.project_id,
args.project_instance,
args.region,
args.rule_file)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
new_rule = create_rule(auth_session, args.project_id, args.project_instance,
args.region, args.rule_file)
print(json.dumps(new_rule, indent=2))
15 changes: 5 additions & 10 deletions detect/v1alpha/delete_rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -66,9 +66,7 @@ def delete_rule(
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, args.region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}"
@@ -95,10 +93,8 @@ def delete_rule(
help='ID of rule to be deleted. In the form of "ru_<UUID>"',
)
args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
print(
json.dumps(
delete_rule(
@@ -109,5 +105,4 @@ def delete_rule(
args.rule_id,
),
indent=2,
)
)
))
88 changes: 42 additions & 46 deletions detect/v1alpha/enable_rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,105 +14,101 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable sample for enabling a rule.
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for enabling a detection rule.
Sample Commands (run from api_samples_python dir):
python3 detect.v1alpha.enable_rule -r=<region> \
-p=<project_id> -i=<instance_id> \
-rid=<rule_id>
Usage:
python -m detect.v1alpha.enable_rule \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_id=ru_<UUID>
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/updateDeployment
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/RuleDeployment
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/updateDeployment
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/RuleDeployment
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def enable_rule(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
proj_region: str,
rule_id: str,
) -> Mapping[str, Any]:
"""Enables a rule.
"""Enables a detection rule.
Args:
http_session: Authorized session for HTTP requests.
proj_region: region in which the target project is located
proj_id: GCP project id or number which the target instance belongs to
proj_instance: uuid of the instance whose rules are being
created (with dashes)
rule_id: Unique ID of the detection rule to retrieve ("ru_<UUID>").
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
rule_id: Unique ID of the detection rule to enable
(in the format "ru_<UUID>").
Returns:
a rule deployment object containing relevant rule's deployment information
Dictionary containing the rule's deployment information.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.rules.updateDeployment
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}/deployment"

body = {
# You can set enabled to False to disable a rule.
"enabled": True,
"enabled": True, # Set to False to disable the rule
}
params = {"update_mask": "enabled"}

# See API reference links at top of this file, for response format.
response = http_session.request("PATCH", url, params=params, json=body)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
"-rid",
"--rule_id",
type=str,
required=True,
help='ID of rule to be enabled. In the form of "ru_<UUID>"',
)
# local
parser.add_argument("--rule_id",
type=str,
required=True,
help='ID of rule to enable (format: "ru_<UUID>")')

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
print(
json.dumps(
enable_rule(
auth_session,
args.region,
args.project_id,
args.project_instance,
args.rule_id,
),
indent=2,
)
)

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
result = enable_rule(auth_session, args.project_id, args.project_instance,
args.region, args.rule_id)
print(json.dumps(result, indent=2))
59 changes: 30 additions & 29 deletions detect/v1alpha/get_alert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,18 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for getting a Reference List.
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting an Alert.
Usage:
python -m alerts.v1alpha.get_alert \
--project_id=<PROJECT_ID> \
python -m detect.v1alpha.get_alert \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--alert_id=<ALERT_ID>
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyGetAlert
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyGetAlert
"""
# pylint: enable=line-too-long

import argparse
import json
@@ -35,7 +37,6 @@
from common import project_id
from common import project_instance
from common import regions

from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
@@ -52,7 +53,7 @@ def get_alert(
alert_id: str,
include_detections: bool = False,
) -> Mapping[str, Any]:
"""Gets an Alert.
"""Gets an Alert using the Legacy Get Alert API.
Args:
http_session: Authorized session for HTTP requests.
@@ -63,54 +64,54 @@ def get_alert(
include_detections: Flag to include detections.
Returns:
Dictionary representation of the Alert
Dictionary representation of the Alert.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.alerts.get
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
proj_region
)
# pylint: disable-next=line-too-long
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/legacy:legacyGetAlert"
# pylint: disable=line-too-long

query_params = {"alertId": alert_id}
if include_detections:
query_params["includeDetections"] = True

url = f"{base_url_with_region}/v1alpha/{parent}/legacy:legacyGetAlert"

response = http_session.request("GET", url, params=query_params)
# Expected server response is described in:
# https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyGetAlert
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
"--alert_id", type=str, required=True,
help="identifier for the alert"
)
parser.add_argument(
"-d", "--include-detections", type=bool, default=False, required=False,
help="flag to include detections"
)
# local
parser.add_argument("--alert_id",
type=str,
required=True,
help="Identifier for the alert")
parser.add_argument("--include-detections",
action="store_true",
help="Include detections in the response")

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
alert = get_alert(
auth_session,
args.project_id,
123 changes: 123 additions & 0 deletions detect/v1alpha/get_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting a Detection.
Usage:
python -m detect.v1alpha.get_detection \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--detection_id=<DETECTION_ID> \
--rule_id=<RULE_ID>
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyGetDetection
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def get_detection(
http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
detection_id: str,
rule_id: str,
) -> Mapping[str, Any]:
"""Gets a Detection using the Legacy Get Detection API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
detection_id: Identifier for the detection.
rule_id: Identifier for the rule that created the detection.
Returns:
Dictionary representation of the Detection.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.detections.get
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/legacy:legacyGetDetection"
# pylint: enable=line-too-long

query_params = {"detectionId": detection_id, "ruleId": rule_id}

response = http_session.request("GET", url, params=query_params)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument("--detection_id",
type=str,
required=True,
help="Identifier for the detection")
parser.add_argument("--rule_id",
type=str,
required=True,
help="Identifier for the rule that created the detection")

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
detection = get_detection(
auth_session,
args.project_id,
args.project_instance,
args.region,
args.detection_id,
args.rule_id,
)
print(json.dumps(detection, indent=2))
115 changes: 56 additions & 59 deletions detect/v1alpha/get_retrohunt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,118 +14,115 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable sample for getting a retrohunt.
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting a retrohunt.
Sample Commands (run from api_samples_python dir):
python3 -m detect.v1alpha.get_retrohunt -r=<region> \
-p=<project_id> -i=<instance_id> \
-rid=<rule_id> -oid=<op_id>
Usage:
python -m detect.v1alpha.get_retrohunt \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_id=ru_<UUID> \
--op_id=<OPERATION_ID>
python3 -m detect.v1alpha.get_retrohunt -r=<region> \
-p=<project_id> -i=<instance_id> \
-rid=<rule_id>@v_<seconds>_<nanoseconds> -oid=<op_id>
# You can also specify a specific rule version:
python -m detect.v1alpha.get_retrohunt \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--rule_id=ru_<UUID>@v_<seconds>_<nanoseconds> \
--op_id=<OPERATION_ID>
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts/get
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts#Retrohunt
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts/get
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules.retrohunts#Retrohunt
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def get_retrohunt(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
proj_region: str,
rule_id: str,
op_id: str,
) -> Mapping[str, Any]:
"""Get a retrohunt for a given rule.
"""Gets information about a retrohunt for a specific detection rule.
Args:
http_session: Authorized session for HTTP requests.
proj_region: region in which the target project is located
proj_id: GCP project id or number which the target instance belongs to
proj_instance: uuid of the instance (with dashes)
rule_id: Unique ID of the detection rule to retrieve ("ru_<UUID>" or
"ru_<UUID>@v_<seconds>_<nanoseconds>"). If a version suffix isn't
specified we use the rule's latest version.
op_id: the operation ID of the retrohunt
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
rule_id: Unique ID of the detection rule. Can be either "ru_<UUID>" or
"ru_<UUID>@v_<seconds>_<nanoseconds>". If version suffix isn't specified,
uses the rule's latest version.
op_id: Operation ID of the retrohunt to retrieve.
Returns:
a retrohunt object containing relevant retrohunt's information
Dictionary containing information about the retrohunt.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.retrohunts.get
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
# pylint: disable-next=line-too-long
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}/retrohunts/{op_id}"
# pylint: enable=line-too-long

# See API reference links at top of this file, for response format.
response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
regions.add_argument_region(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
project_instance.add_argument_project_instance(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"-rid",
"--rule_id",
type=str,
required=True,
help=(
'rule ID to get retrohunt for. can use both "ru_<UUID>" or'
' "ru_<UUID>@v_<seconds>_<nanoseconds>"'
),
)
parser.add_argument(
"-oid",
"--op_id",
type=str,
required=True,
help="operation ID for the retrohunt",
)
help=('ID of rule to get retrohunt for. Format: "ru_<UUID>" or '
'"ru_<UUID>@v_<seconds>_<nanoseconds>"'))
parser.add_argument("--op_id",
type=str,
required=True,
help="Operation ID of the retrohunt")

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
print(
json.dumps(
get_retrohunt(
auth_session,
args.region,
args.project_id,
args.project_instance,
args.rule_id,
args.op_id,
),
indent=2,
)
)

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
result = get_retrohunt(auth_session, args.project_id, args.project_instance,
args.region, args.rule_id, args.op_id)
print(json.dumps(result, indent=2))
87 changes: 39 additions & 48 deletions detect/v1alpha/get_rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,107 +14,98 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable sample for getting a rule.
Sample Commands (run from api_samples_python dir):
python3 -m detect.v1alpha.get_rule -r=<region> -p=<project_id> \
-i=<instance_id> -rid=<rule_id>
python3 -m detect.v1alpha.get_rule -r=<region> -p=<project_id> \
-i=<instance_id> -rid=<rule_id>@v_<seconds>_<nanoseconds>
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting a rule.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/get
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/get
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
"""
# pylint: enable=line-too-long

import argparse
import json

from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def get_rule(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
proj_region: str,
rule_id: str,
) -> Mapping[str, Any]:
"""Get a rule.
"""Gets a rule using the Get Rule API.
Args:
http_session: Authorized session for HTTP requests.
proj_region: region in which the target project is located
proj_id: GCP project id or number which the target instance belongs to
proj_instance: uuid of the instance (with dashes)
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid w/ dashes) for the Chronicle instance.
proj_region: Region where the target project is located.
rule_id: Unique ID of the detection rule to retrieve ("ru_<UUID>" or
"ru_<UUID>@v_<seconds>_<nanoseconds>"). If a version suffix isn't
specified we use the rule's latest version.
"ru_<UUID>@v_<seconds>_<nanoseconds>"). If a version suffix isn't
specified we use the rule's latest version.
Returns:
a rule object containing relevant rule's information
Dictionary containing the rule's information.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.rules.get
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}"

# See API reference links at top of this file, for response format.
response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
"-rid",
"--rule_id",
type=str,
required=True,
help=(
'rule ID to get rule for. can use both "ru_<UUID>" or'
' "ru_<UUID>@v_<seconds>_<nanoseconds>"'
),
)
# local
parser.add_argument("--rule_id",
type=str,
required=True,
help='Rule ID to retrieve ("ru_<UUID>" '
'or "ru_<UUID>@v_<seconds>_<nanoseconds>")')

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
SCOPES,
)
print(
json.dumps(
get_rule(
auth_session,
args.region,
args.project_id,
args.project_instance,
args.rule_id
),
indent=2,
)
rule = get_rule(
auth_session,
args.project_id,
args.project_instance,
args.region,
args.rule_id,
)
print(json.dumps(rule, indent=2))
42 changes: 17 additions & 25 deletions detect/v1alpha/list_detections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -44,6 +44,7 @@
import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
@@ -63,16 +64,14 @@
)


def list_detections(
http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
rule_id: str,
alert_state: str | None = None,
page_size: int | None = None,
page_token: str | None = None
) -> Mapping[str, Any]:
def list_detections(http_session: requests.AuthorizedSession,
proj_region: str,
proj_id: str,
proj_instance: str,
rule_id: str,
alert_state: str | None = None,
page_size: int | None = None,
page_token: str | None = None) -> Mapping[str, Any]:
"""List detections for a rule.
Args:
@@ -95,9 +94,7 @@ def list_detections(
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/legacy:legacySearchDetections"
@@ -130,11 +127,9 @@ def list_detections(
"--rule_id",
type=str,
required=True,
help=(
"rule id to list detections for. Options are (1) rule_id (2)"
" rule_id@v_<seconds>_<nanoseconds> (3) rule_id@- which matches on"
" all versions."
),
help=("rule id to list detections for. Options are (1) rule_id (2)"
" rule_id@v_<seconds>_<nanoseconds> (3) rule_id@- which matches on"
" all versions."),
)
parser.add_argument(
"--alert_state",
@@ -155,10 +150,8 @@ def list_detections(
default=None,
)
args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
print(
json.dumps(
list_detections(
@@ -172,5 +165,4 @@ def list_detections(
args.page_token,
),
indent=2,
)
)
))
24 changes: 9 additions & 15 deletions detect/v1alpha/list_errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
@@ -74,9 +75,7 @@ def list_errors(
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, args.region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/ruleExecutionErrors"
@@ -107,17 +106,13 @@ def list_errors(
"--rule_id",
type=str,
required=True,
help=(
"rule id to list errors for. Options are (1) rule_id (2)"
" rule_id@v_<seconds>_<nanoseconds> (3) rule_id@- which matches on"
" all versions."
),
help=("rule id to list errors for. Options are (1) rule_id (2)"
" rule_id@v_<seconds>_<nanoseconds> (3) rule_id@- which matches on"
" all versions."),
)
args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
print(
json.dumps(
list_errors(
@@ -128,5 +123,4 @@ def list_errors(
args.rule_id,
),
indent=2,
)
)
))
51 changes: 27 additions & 24 deletions detect/v1alpha/list_rules.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,16 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for retrieving a list of rules.
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for retrieving a list of rules.
Sample Commands (run from api_samples_python dir):
python3 -m detect.v1alpha.list_rules -r=<region> \
-p=<project_id> -i=<instance_id>
Usage:
python -m detect.v1alpha.list_rules \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION>
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/list
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/list
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
"""
# pylint: enable=line-too-long

import argparse
import json
@@ -36,7 +40,6 @@
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]
@@ -47,51 +50,51 @@ def list_rules(
proj_id: str,
proj_instance: str,
proj_region: str,
) -> Mapping[str, Any]:
) -> Mapping[str, Any]:
"""Gets a list of rules.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
Returns:
Array containing information about rules.
Dictionary containing an array of rules.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.rules.list
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules"

# See API reference links at top of this file, for response format.
response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)

args = parser.parse_args()
session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
rules = list_rules(
session,
args.project_id,
args.project_instance,
args.region
)

auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
rules = list_rules(auth_session, args.project_id, args.project_instance,
args.region)
print(json.dumps(rules, indent=2))
48 changes: 21 additions & 27 deletions detect/v1alpha/update_alert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -55,7 +55,6 @@
from common import project_id
from common import project_instance
from common import regions

from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
@@ -183,29 +182,26 @@ def get_update_parser():
return parser


def check_args(
parser: argparse.ArgumentParser,
args_to_check: argparse.Namespace):
def check_args(parser: argparse.ArgumentParser,
args_to_check: argparse.Namespace):
"""Checks if at least one of the required arguments is provided.
Args:
parser: instance of argparse.ArgumentParser (to raise error if needed).
args_to_check: instance of argparse.Namespace with the arguments to check.
"""
if not any(
[
args_to_check.comment or args_to_check.comment == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.disregarded,
args_to_check.priority,
args_to_check.reason,
args_to_check.reputation,
args_to_check.risk_score or args_to_check.risk_score == 0,
args_to_check.root_cause or args_to_check.root_cause == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.severity or args_to_check.severity == 0,
args_to_check.status,
args_to_check.verdict,
]
):
if not any([
args_to_check.comment or args_to_check.comment == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.disregarded,
args_to_check.priority,
args_to_check.reason,
args_to_check.reputation,
args_to_check.risk_score or args_to_check.risk_score == 0,
args_to_check.root_cause or args_to_check.root_cause == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.severity or args_to_check.severity == 0,
args_to_check.status,
args_to_check.verdict,
]):
parser.error("At least one of the arguments "
"--comment, "
"--disregarded, "
@@ -237,7 +233,7 @@ def update_alert(
severity: int | None = None,
comment: str | Literal[""] | None = None,
root_cause: str | Literal[""] | None = None,
) -> Mapping[str, Any]:
) -> Mapping[str, Any]:
"""Updates an Alert.
Args:
@@ -266,9 +262,7 @@ def update_alert(
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
proj_region
)
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/legacy:legacyUpdateAlert/"
@@ -314,10 +308,10 @@ def update_alert(

if __name__ == "__main__":
main_parser = get_update_parser()
main_parser.add_argument(
"--alert_id", type=str, required=True,
help="identifier for the alert"
)
main_parser.add_argument("--alert_id",
type=str,
required=True,
help="identifier for the alert")
args = main_parser.parse_args()

# Check if at least one of the specific arguments is provided
18 changes: 8 additions & 10 deletions detect/v1alpha/update_rule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable sample for updating a rule.
Sample Commands (run from api_samples_python dir):
@@ -32,9 +33,11 @@
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/patch
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
"""
# pylint: enable=line-too-long
import argparse
import json
from typing import Any, Mapping

from common import chronicle_auth
from common import project_id
from common import project_instance
@@ -74,9 +77,7 @@ def update_rule(
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, args.region)
# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/rules/{rule_id}"
@@ -115,10 +116,8 @@ def update_rule(
help="path of a file with the desired rule's content, or - for STDIN",
)
args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)
print(
json.dumps(
update_rule(
@@ -130,5 +129,4 @@ def update_rule(
args.rule_file,
),
indent=2,
)
)
))
14 changes: 14 additions & 0 deletions ingestion/v1alpha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
26 changes: 13 additions & 13 deletions ingestion/v1alpha/create_udm_events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -32,22 +32,20 @@
import argparse
import json

from google.auth.transport import requests

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def create_udm_events(
http_session: requests.AuthorizedSession, json_events: str
) -> None:
def create_udm_events(http_session: requests.AuthorizedSession,
json_events: str) -> None:
"""Sends a collection of UDM events to the Google SecOps backend for ingestion.
A Unified Data Model (UDM) event is a structured representation of an event
@@ -70,13 +68,17 @@ def create_udm_events(
"""

base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
args.region
)
CHRONICLE_API_BASE_URL, args.region)
# pylint: disable-next=line-too-long
parent = f"projects/{args.project_id}/locations/{args.region}/instances/{args.project_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/events:import"
body = {"inline_source": {"events": [{"udm": json.loads(json_events)[0],}]}}
body = {
"inline_source": {
"events": [{
"udm": json.loads(json_events)[0],
}]
}
}

response = http_session.request("POST", url, json=body)
print(response)
@@ -98,9 +100,7 @@ def create_udm_events(
"--json_events_file",
type=argparse.FileType("r"),
required=True,
help=(
"path to a file containing a list of UDM events in json format"
),
help=("path to a file containing a list of UDM events in json format"),
)
args = parser.parse_args()

104 changes: 104 additions & 0 deletions ingestion/v1alpha/event_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for importing events into Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/import
"""
# pylint: enable=line-too-long

import argparse
import json

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def import_events(http_session: requests.AuthorizedSession, proj_id: str,
proj_instance: str, proj_region: str,
json_events: str) -> None:
"""Import events into Chronicle using the Events Import API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid w/ dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
json_events: Events in (serialized) JSON format.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.events.import
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/events:import"
# pylint: enable=line-too-long

body = {
"events": json.loads(json_events),
}

response = http_session.request("POST", url, json=body)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

result = response.json()
if "successCount" in result:
print(f"Successfully imported {result['successCount']} events")
if "failureCount" in result:
print(f"Failed to import {result['failureCount']} events")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"--json_events_file",
type=argparse.FileType("r"),
required=True,
help="path to a file (or \"-\" for STDIN) containing events in JSON "
"format"
)

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
import_events(auth_session, args.project_id, args.project_instance,
args.region, args.json_events_file.read())
108 changes: 108 additions & 0 deletions ingestion/v1alpha/events_batch_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for batch getting events from Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/batchGet
"""
# pylint: enable=line-too-long

import argparse
import base64
import json

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def batch_get_events(
http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
event_ids: str
) -> None:
"""Batch get events from Chronicle using the Events BatchGet API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid w/ dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
event_ids: JSON string containing a list of event IDs to retrieve.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.events.batchGet
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/events:batchGet"
# pylint: enable=line-too-long

# Convert event IDs to URL-encoded Base64 and create query parameters
event_ids_list = json.loads(event_ids)
encoded_ids = [
base64.urlsafe_b64encode(id.encode()).decode() for id in event_ids_list
]
query_params = "&".join([f"names={id}" for id in encoded_ids])

url = f"{url}?{query_params}"

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

print(json.dumps(response.json(), indent=2))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument("--event_ids",
type=str,
required=True,
help='JSON string containing a list of event IDs to '
'retrieve (e.g., \'["id1", "id2"]\')')

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
batch_get_events(auth_session, args.project_id, args.project_instance,
args.region, args.event_ids)
92 changes: 92 additions & 0 deletions ingestion/v1alpha/events_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting event details from Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/get
"""
# pylint: enable=line-too-long

import argparse
import base64

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def get_event(http_session: requests.AuthorizedSession, proj_id: str,
proj_instance: str, proj_region: str, event_id: str) -> None:
"""Get event details from Chronicle using the Events Get API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
event_id: The ID of the event to retrieve.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.events.get
"""
# URL encode the event_id in Base64
encoded_event_id = base64.urlsafe_b64encode(event_id.encode()).decode()
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
name = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}/events/{encoded_event_id}"
url = f"{base_url_with_region}/v1alpha/{name}"

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

print(response.json())


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument("--event_id",
type=str,
required=True,
help="The ID of the event to retrieve")

args = parser.parse_args()
auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
get_event(auth_session, args.project_id, args.project_instance, args.region,
args.event_id)
13 changes: 13 additions & 0 deletions iocs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 13 additions & 0 deletions iocs/v1alpha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
81 changes: 81 additions & 0 deletions iocs/v1alpha/batch_get_iocs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Get multiple IoCs from Chronicle."""

from typing import List, Mapping, Any

from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"


def batch_get_iocs(
http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
ioc_values: List[str],
ioc_type: str,
) -> Mapping[str, Any]:
"""Get multiple IoCs by their values from Chronicle.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the instance.
proj_region: region in which the target project is located.
ioc_values: List of IoC values to retrieve.
ioc_type: Type of IoCs being requested. One of:
IOC_TYPE_UNSPECIFIED
DOMAIN
IP
FILE_HASH
URL
USER_EMAIL
MUTEX
FILE_HASH_MD5
FILE_HASH_SHA1
FILE_HASH_SHA256
IOC_TYPE_RESOURCE
Returns:
Dict containing the requested IoCs.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/iocs:batchGet"

body = {
"ioc_values": ioc_values,
"ioc_type": ioc_type,
}

response = http_session.request(
"POST",
url,
json=body,
)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()
return response.json()
73 changes: 73 additions & 0 deletions iocs/v1alpha/get_ioc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Get a single IoC from Chronicle."""

from typing import Any, Mapping

from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"


def get_ioc(
http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
ioc_value: str,
ioc_type: str,
) -> Mapping[str, Any]:
"""Get a single IoC by its value from Chronicle.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the instance.
proj_region: region in which the target project is located.
ioc_value: Value of the IoC to retrieve.
ioc_type: Type of IoC being requested. One of:
IOC_TYPE_UNSPECIFIED
DOMAIN
IP
FILE_HASH
URL
USER_EMAIL
MUTEX
FILE_HASH_MD5
FILE_HASH_SHA1
FILE_HASH_SHA256
IOC_TYPE_RESOURCE
Returns:
Mapping containing the requested IoC.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/iocs/{ioc_type}/{ioc_value}"
# pylint: enable=line-too-long

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()
return response.json()
72 changes: 72 additions & 0 deletions iocs/v1alpha/get_ioc_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Get the state of an IoC from Chronicle."""

from typing import Any, Mapping

from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"


def get_ioc_state(
http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
ioc_value: str,
ioc_type: str,
) -> Mapping[str, Any]:
"""Get the state of an IoC by its value from Chronicle.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the instance.
proj_region: region in which the target project is located.
ioc_value: Value of the IoC to get state for.
ioc_type: Type of IoC being requested. One of:
IOC_TYPE_UNSPECIFIED
DOMAIN
IP
FILE_HASH
URL
USER_EMAIL
MUTEX
FILE_HASH_MD5
FILE_HASH_SHA1
FILE_HASH_SHA256
IOC_TYPE_RESOURCE
Returns:
Dict containing the IoC state.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/iocs/{ioc_type}/{ioc_value}:getIocState"
# pylint: enable=line-too-long
response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()
return response.json()
2 changes: 1 addition & 1 deletion lists/append_to_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
2 changes: 1 addition & 1 deletion lists/remove_from_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
13 changes: 13 additions & 0 deletions lists/v1alpha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
3 changes: 1 addition & 2 deletions lists/v1alpha/get_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -36,7 +36,6 @@
from common import project_id
from common import project_instance
from common import regions

from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
80 changes: 48 additions & 32 deletions lists/v1alpha/patch_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -117,15 +117,15 @@ def patch_list(
(response.status_code >= 400).
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL,
proj_region
)
CHRONICLE_API_BASE_URL, proj_region)

# pylint: disable-next=line-too-long
parent = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{parent}/referenceLists/{name}"
body = {
"entries": [{"value": line.strip()} for line in content_lines],
"entries": [{
"value": line.strip()
} for line in content_lines],
}
if scope_name:
body["scope_info"] = {
@@ -161,29 +161,46 @@ def parse_arguments():
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument("-n", "--name", type=str, required=True,
parser.add_argument("-n",
"--name",
type=str,
required=True,
help="unique name for the list")
parser.add_argument("-f", "--list_file", type=argparse.FileType("r"),
parser.add_argument("-f",
"--list_file",
type=argparse.FileType("r"),
required=True,
help="path of a file containing the list content")
parser.add_argument("-d", "--description", type=str,
parser.add_argument("-d",
"--description",
type=str,
help="description of the list")
parser.add_argument(
"-s", "--scope_name", type=str, help="data RBAC scope name for the list"
)
parser.add_argument("-t", "--syntax_type", type=str, default=None,
parser.add_argument("-s",
"--scope_name",
type=str,
help="data RBAC scope name for the list")
parser.add_argument("-t",
"--syntax_type",
type=str,
default=None,
choices=SYNTAX_TYPE_ENUM,
help="syntax type of the list, used for validation")
add_delete_group = parser.add_mutually_exclusive_group()
add_delete_group.add_argument("--add", action="store_true",
add_delete_group.add_argument("--add",
action="store_true",
help="only append to the existing list")
add_delete_group.add_argument("--remove", action="store_true",
add_delete_group.add_argument("--remove",
action="store_true",
help="only remove from the existing list")
parser.add_argument("--force", action="store_true",
parser.add_argument("--force",
action="store_true",
help="patch regardless of pre-check on changes to list")
parser.add_argument("--max_attempts", type=int, default=6,
parser.add_argument("--max_attempts",
type=int,
default=6,
help="how many times to attempt the patch operation")
parser.add_argument("--quiet", action="store_true",
parser.add_argument("--quiet",
action="store_true",
help="only print the updated list")
return parser.parse_args()

@@ -220,13 +237,16 @@ def get_current_state(auth_session, args):
return curr_list, curr_json["revisionCreateTime"]


def op_update_content_lines(operation_type, curr_list, content_lines,
def op_update_content_lines(operation_type,
curr_list,
content_lines,
force=False):
"""Updates the content lines of a Reference List."""
if operation_type == "add":
seen = set(curr_list)
deduplicated_list = [x for x in content_lines
if not (x in seen or seen.add(x))]
deduplicated_list = [
x for x in content_lines if not (x in seen or seen.add(x))
]
content_lines = curr_list + deduplicated_list
elif operation_type == "remove":
content_lines = [item for item in curr_list if item not in content_lines]
@@ -240,17 +260,15 @@ def main():
args = parse_arguments()
og_content_lines = read_content_lines(args.list_file)

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES
)
auth_session = chronicle_auth.initialize_http_session(args.credentials_file,
SCOPES)

operation_type = "add" if args.add else "remove" if args.remove else None

curr_list, _ = get_current_state(auth_session, args)

content_lines = op_update_content_lines(
operation_type, curr_list, og_content_lines, args.force)
content_lines = op_update_content_lines(operation_type, curr_list,
og_content_lines, args.force)

attempt, wait_time = 0, 1
while attempt < args.max_attempts:
@@ -276,15 +294,13 @@ def main():
print(f"Patch {operation_type or ''} success.")
print(json.dumps(patched_json, indent=2))
break
wait_time = exponential_backoff(attempt, args.max_attempts,
wait_time, args.quiet)
wait_time = exponential_backoff(attempt, args.max_attempts, wait_time,
args.quiet)
# read and verify again in case other processes updated while waiting
og_content_lines = read_content_lines(args.list_file)
curr_list, _ = get_current_state(auth_session, args)
content_lines = op_update_content_lines(operation_type,
curr_list,
og_content_lines,
args.force)
content_lines = op_update_content_lines(operation_type, curr_list,
og_content_lines, args.force)
attempt += 1


43 changes: 27 additions & 16 deletions lists/v1alpha/patch_list_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -39,9 +39,15 @@ def test_extract_values_with_valid_entries(self, mocked_get_list):

mocked_get_list.return_value = {
"entries": [
{"value": 1},
{"value": 2},
{"value": 3},
{
"value": 1
},
{
"value": 2
},
{
"value": 3
},
],
"revisionCreateTime": None,
}
@@ -68,14 +74,18 @@ def test_get_current_state_with_empty_entries(self, mocked_get_list):
expected_result)

@mock.patch("lists.v1alpha.get_list.get_list")
def test_get_current_state_with_some_entries_missing_value(self,
mocked_get_list):
def test_get_current_state_with_some_entries_missing_value(
self, mocked_get_list):
"""Test when some "entries" are missing the "value" key."""
mocked_get_list.return_value = {
"entries": [
{"value": 1},
{
"value": 1
},
{}, # Missing "value"
{"value": 3},
{
"value": 3
},
],
"revisionCreateTime": None,
}
@@ -106,8 +116,7 @@ def test_function_returns_correct_wait_time_without_jitter(self, mock_sleep):
@mock.patch("random.uniform", return_value=0.5)
def test_quiet_mode_suppresses_output(self, _, mock_sleep):
with unittest.mock.patch(
"sys.stdout",
new_callable=unittest.mock.MagicMock) as mock_stdout:
"sys.stdout", new_callable=unittest.mock.MagicMock) as mock_stdout:
patch_list.exponential_backoff(1, 3, quiet=True)
mock_stdout.write.assert_not_called()
mock_sleep.assert_called_once_with(2.0)
@@ -126,30 +135,32 @@ class PatchListOpUpdateContentLinesTest(unittest.TestCase):
def test_add_with_duplicates(self):
curr_list = ["item1", "item2"]
content_lines = ["item2", "item3"]
result = patch_list.op_update_content_lines("add",
curr_list, content_lines)
result = patch_list.op_update_content_lines("add", curr_list, content_lines)
self.assertEqual(result, ["item1", "item2", "item3"])

def test_remove(self):
curr_list = ["item1", "item2", "item3"]
content_lines = ["item2"] # Items to remove
result = patch_list.op_update_content_lines("remove",
curr_list, content_lines)
result = patch_list.op_update_content_lines("remove", curr_list,
content_lines)
self.assertEqual(result, ["item1", "item3"])

def test_no_change_no_force(self):
curr_list = ["item1"]
content_lines = ["item1"]
with self.assertRaises(SystemExit) as cm: # Expect the exit behavior
patch_list.op_update_content_lines("add",
curr_list, content_lines, force=False)
curr_list,
content_lines,
force=False)
self.assertEqual(cm.exception.code, 0)

def test_no_change_force(self):
curr_list = ["item1"]
content_lines = ["item1"]
result = patch_list.op_update_content_lines("add",
curr_list, content_lines,
curr_list,
content_lines,
force=True)
self.assertEqual(result, ["item1"])

12 changes: 12 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# pyproject.toml
[tool.isort]
profile = "google"
use_parentheses = true
known_first_party = ["google3", "google"]
multi_line_output = 3

[tool.yapf]
based_on_style = "google"
indent_width = 2
column_limit = 80

6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
google-auth
requests < 2.32
google-auth>=2.0.0
requests>=2.0.0
click>=8.0.0
python-dotenv>=1.0.0
8 changes: 8 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
isort==6.0.1
pre_commit==4.1.0
mypy==1.15.0
mypy-extensions==1.0.0
setuptools>=25.0.1
twine>=6.1.0
wheel>=0.45.1
#yapf>=0.40.0build
13 changes: 13 additions & 0 deletions search/v1alpha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
170 changes: 170 additions & 0 deletions search/v1alpha/asset_events_find.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for finding asset events in Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyFindAssetEvents
"""
# pylint: enable=line-too-long

import argparse
import datetime
import json
from typing import Optional

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]

DEFAULT_MAX_RESULTS = 10000
MAX_RESULTS_LIMIT = 250000


def find_asset_events(http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
asset_indicator: str,
start_time: str,
end_time: str,
reference_time: Optional[str] = None,
max_results: Optional[int] = None) -> None:
"""Find asset events in Chronicle using the Legacy Find Asset Events API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the instance.
proj_region: region in which the target project is located.
asset_indicator: JSON str containing the asset indicator to search for.
start_time: Start time in RFC3339 format (e.g., "2024-01-01T00:00:00Z").
end_time: End time in RFC3339 format (e.g., "2024-01-02T00:00:00Z").
reference_time: Optional reference time in RFC3339 format for
asset aliasing.
max_results: Optional maximum number of results to return
(default: 10000, max: 250000).
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
ValueError: If the time format is invalid.
Requires the following IAM permission on the instance resource:
chronicle.legacies.legacyFindAssetEvents
"""
# Validate and parse the times to ensure they're in RFC3339 format
for time_str in [start_time, end_time, reference_time
] if reference_time else [start_time, end_time]:
try:
datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
except ValueError as e:
if "does not match format" in str(e):
raise ValueError(f"Time '{time_str}' must be in RFC3339 format "
"(e.g., '2024-01-01T00:00:00Z')") from e
raise

base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/legacy:legacyFindAssetEvents"
# pylint: enable=line-too-long

# Build query parameters
params = [
f"assetIndicator={asset_indicator}", f"timeRange.startTime={start_time}",
f"timeRange.endTime={end_time}"
]

if reference_time:
params.append(f"referenceTime={reference_time}")

if max_results:
# Ensure max_results is within bounds
max_results = min(max(1, max_results), MAX_RESULTS_LIMIT)
params.append(f"maxResults={max_results}")

url = f"{url}?{'&'.join(params)}"

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

result = response.json()
print(json.dumps(result, indent=2))

if result.get("more_data_available"):
print("\nWarning: More data is available but was not returned due to "
"maxResults limit.")

if result.get("uri"):
print("\nBackstory UI URLs:")
for uri in result["uri"]:
print(f" {uri}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument("--asset_indicator",
type=str,
required=True,
help="JSON string containing the asset indicator "
"(e.g., '{\"hostname\": \"example.com\"}')")
parser.add_argument(
"--start_time",
type=str,
required=True,
help="Start time in RFC3339 format (e.g., '2024-01-01T00:00:00Z')")
parser.add_argument(
"--end_time",
type=str,
required=True,
help="End time in RFC3339 format (e.g., '2024-01-02T00:00:00Z')")
parser.add_argument(
"--reference_time",
type=str,
help="Optional reference time in RFC3339 format for asset aliasing")
parser.add_argument(
"--max_results",
type=int,
help="Maximum number of results to return "
f"(default: {DEFAULT_MAX_RESULTS}, max: {MAX_RESULTS_LIMIT})")

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
find_asset_events(auth_session, args.project_id, args.project_instance,
args.region, args.asset_indicator, args.start_time,
args.end_time, args.reference_time, args.max_results)
162 changes: 162 additions & 0 deletions search/v1alpha/raw_logs_find.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for finding raw logs in Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyFindRawLogs
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import List, Optional

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]

DEFAULT_MAX_RESPONSE_SIZE = 52428800 # 50MiB in bytes


def find_raw_logs(http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
query: str,
batch_tokens: Optional[List[str]] = None,
log_ids: Optional[List[str]] = None,
regex_search: bool = False,
case_sensitive: bool = False,
max_response_size: Optional[int] = None) -> None:
# pylint: disable=line-too-long
"""Find raw logs in Chronicle using the Legacy Find Raw Logs API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
query: Required search parameters that expand or restrict the search.
batch_tokens: Optional list of tokens that should be downloaded.
log_ids: Optional list of raw log ids that should be downloaded.
If both batch_tokens and log_ids are provided, batch_tokens will be discarded.
regex_search: Optional boolean to treat query as regex. Default is False.
case_sensitive: Optional boolean for case-sensitive search. Default is False.
max_response_size: Optional maximum response size in bytes. Default is 50MiB.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the instance resource:
chronicle.legacies.legacyFindRawLogs
"""
# pylint: enable=line-too-long
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/legacy:legacyFindRawLogs"

# Build query parameters
params = [f"query={query}"]
if batch_tokens and not log_ids: # log_ids take precedence over batch_tokens
for token in batch_tokens:
params.append(f"batchToken={token}")
if log_ids:
for log_id in log_ids:
params.append(f"ids={log_id}")
if regex_search:
params.append("regexSearch=true")
if case_sensitive:
params.append("caseSensitive=true")
if max_response_size:
params.append(f"maxResponseByteSize={max_response_size}")

url = f"{url}?{'&'.join(params)}"

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

result = response.json()
print(json.dumps(result, indent=2))

if result.get("too_many_results"):
print("\nWarning: Some results were omitted due to too many matches.")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"--query",
type=str,
required=True,
help="Search parameters that expand or restrict the search")
parser.add_argument(
"--batch_tokens",
type=str,
help=
'JSON string containing a list of batch tokens '
' (e.g., \'["token1", "token2"]\')'
)
parser.add_argument(
"--log_ids",
type=str,
help=
'JSON string containing a list of raw log IDs (e.g., \'["id1", "id2"]\')')
parser.add_argument("--regex_search",
action="store_true",
help="Whether to treat the query as a regex pattern")
parser.add_argument("--case_sensitive",
action="store_true",
help="Whether to perform a case-sensitive search")
parser.add_argument(
"--max_response_size",
type=int,
help=
f"Maximum response size in bytes (default: {DEFAULT_MAX_RESPONSE_SIZE})")

args = parser.parse_args()

# Convert JSON strings to lists if provided
batch_tokens_list = json.loads(
args.batch_tokens) if args.batch_tokens else None
log_ids_list = json.loads(args.log_ids) if args.log_ids else None

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
find_raw_logs(auth_session, args.project_id, args.project_instance,
args.region, args.query, batch_tokens_list, log_ids_list,
args.regex_search, args.case_sensitive, args.max_response_size)
97 changes: 97 additions & 0 deletions search/v1alpha/search_query_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for getting a search query in Chronicle.
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.users.searchQueries/get
"""
# pylint: enable=line-too-long

import argparse

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def get_search_query(http_session: requests.AuthorizedSession, proj_id: str,
proj_instance: str, proj_region: str, user_id: str,
query_id: str) -> None:
"""Get a search query by ID from Chronicle.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the instance.
proj_region: region in which the target project is located.
user_id: ID of the user who owns the search query.
query_id: ID of the search query to retrieve.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the instance resource:
chronicle.searchQueries.get
"""
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/users/{user_id}/searchQueries/{query_id}"
# pylint: enable=line-too-long

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()
print(response.text)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument("--user_id",
type=str,
required=True,
help="ID of the user who owns the search query")
parser.add_argument("--query_id",
type=str,
required=True,
help="ID of the search query to retrieve")

args = parser.parse_args()

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
get_search_query(auth_session, args.project_id, args.project_instance,
args.region, args.user_id, args.query_id)
158 changes: 158 additions & 0 deletions search/v1alpha/udm_events_find.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=line-too-long
r"""Executable and reusable v1alpha API sample for finding UDM events in Chronicle.
Usage:
python -m search.v1alpha.udm_events_find \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--tokens='["token1", "token2"]'
# Or using event IDs:
python -m search.v1alpha.udm_events_find \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--region=<REGION> \
--event_ids='["id1", "id2"]'
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyFindUdmEvents
"""
# pylint: enable=line-too-long

import argparse
import json
from typing import List, Optional

from common import chronicle_auth
from common import project_id
from common import project_instance
from common import regions
from google.auth.transport import requests

CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def find_udm_events(http_session: requests.AuthorizedSession,
proj_id: str,
proj_instance: str,
proj_region: str,
tokens: Optional[List[str]] = None,
event_ids: Optional[List[str]] = None,
return_unenriched_data: bool = False,
return_all_events_for_log: bool = False) -> None:
# pylint: disable=line-too-long
"""Find UDM events in Chronicle using the Legacy Find UDM Events API.
Args:
http_session: Authorized session for HTTP requests.
proj_id: GCP project id or number to which the target instance belongs.
proj_instance: Customer ID (uuid with dashes) for the Chronicle instance.
proj_region: region in which the target project is located.
tokens: Optional list of tokens, with each token referring to a group of UDM/Entity events.
event_ids: Optional list of UDM/Entity event ids that should be returned.
If both tokens and event_ids are provided, tokens will be discarded.
return_unenriched_data: Optional boolean to return unenriched data. Default is False.
return_all_events_for_log: Optional boolean to return all events generated from the ingested log.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.events.batchGet
"""
# pylint: enable=line-too-long
base_url_with_region = regions.url_always_prepend_region(
CHRONICLE_API_BASE_URL, proj_region)
# pylint: disable-next=line-too-long
instance = f"projects/{proj_id}/locations/{proj_region}/instances/{proj_instance}"
url = f"{base_url_with_region}/v1alpha/{instance}/legacy:legacyFindUdmEvents"

# Build query parameters
params = []
if tokens and not event_ids: # event_ids take precedence over tokens
for token in tokens:
params.append(f"tokens={token}")
if event_ids:
for event_id in event_ids:
params.append(f"ids={event_id}")
if return_unenriched_data:
params.append("returnUnenrichedData=true")
if return_all_events_for_log:
params.append("returnAllEventsForLog=true")

if params:
url = f"{url}?{'&'.join(params)}"

response = http_session.request("GET", url)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()

print(json.dumps(response.json(), indent=2))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"--tokens",
type=str,
help=
'JSON string containing a list of tokens (e.g., \'["token1", "token2"]\')'
)
parser.add_argument(
"--event_ids",
type=str,
help=
'JSON string containing a list of event IDs (e.g., \'["id1", "id2"]\')')
parser.add_argument("--return_unenriched_data",
action="store_true",
help="Whether to return unenriched data")
parser.add_argument(
"--return_all_events_for_log",
action="store_true",
help="Whether to return all events generated from the ingested log")

args = parser.parse_args()

# Convert JSON strings to lists if provided
tokens_list = json.loads(args.tokens) if args.tokens else None
event_ids_list = json.loads(args.event_ids) if args.event_ids else None

# Validate that at least one of tokens or event_ids is provided
if not tokens_list and not event_ids_list:
parser.error("At least one of --tokens or --event_ids must be provided")

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
find_udm_events(auth_session, args.project_id, args.project_instance,
args.region, tokens_list, event_ids_list,
args.return_unenriched_data, args.return_all_events_for_log)
2 changes: 1 addition & 1 deletion service_management/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
47 changes: 47 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setup configuration for Chronicle API CLI."""

import setuptools

setuptools.setup(
name="chronicle-api-cli",
version="0.1.4",
description="Chronicle API CLI",
author="Google LLC",
author_email="chronicle-support@google.com",
packages=setuptools.find_packages(include=[
"common",
"cli", "cli.*",
"detect", "detect.v1alpha", "detect.v1alpha.*",
"ingestion", "ingestion.v1alpha", "ingestion.v1alpha.*",
"iocs", "iocs.v1alpha", "iocs.v1alpha.*",
"lists", "lists.v1alpha", "lists.v1alpha.*",
"search", "search.v1alpha", "search.v1alpha.*",
]),
install_requires=[
"click>=8.0.0",
"google-auth>=2.0.0",
"requests>=2.25.0",
"python-dotenv>=1.0.0",
],
entry_points={
"console_scripts": ["chronicle=cli.cli:cli",],
},
exclude_package_data={"": [".gitignore"]},
python_requires=">=3.10",
license="Apache 2.0",
)