Skip to content

Commit

Permalink
Feat: Add ability to inspect GSM secret labels and save secret conten…
Browse files Browse the repository at this point in the history
…ts directly to files (#346)
  • Loading branch information
aaronsteers authored Aug 22, 2024
1 parent b43e68d commit c5e4c20
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 14 deletions.
27 changes: 27 additions & 0 deletions airbyte/secrets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


if TYPE_CHECKING:
from pathlib import Path

from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler, ValidationInfo
from pydantic.json_schema import JsonSchemaValue

Expand Down Expand Up @@ -234,3 +236,28 @@ def parse_json(self) -> dict:
string, a `PyAirbyteInputError` will be raised.
"""
return self.get_value().parse_json()

def write_to_file(
self,
file_path: Path,
/,
*,
silent: bool = False,
) -> None:
"""Write the secret to a file.
If `silent` is True, the method will not print any output to the console. Otherwise,
the method will print a message to the console indicating the file path to which the secret
is being written.
This method is a convenience method that writes the secret to a file as text.
"""
if not silent:
print(
f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'"
)

file_path.write_text(
str(self.get_value()),
encoding="utf-8",
)
78 changes: 64 additions & 14 deletions airbyte/secrets/google_gsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,34 @@


if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Iterable, MutableMapping

from google.cloud.secretmanager_v1.services.secret_manager_service.pagers import (
ListSecretsPager,
)


class GSMSecretHandle(SecretHandle):
"""A handle for a secret stored in Google Secrets Manager (GSM).
This class inherits from `SecretHandle` and adds a `labels` attribute for inspecting GSM
labels.
"""

parent: GoogleGSMSecretManager

def _get_gsm_secret_object(self) -> secretmanager.Secret:
"""Get the `Secret` object from GSM."""
return self.parent.secret_client.get_secret(
name=self.secret_name,
)

@property
def labels(self) -> MutableMapping[str, str]:
"""Get the labels of the secret."""
return self._get_gsm_secret_object().labels


class GoogleGSMSecretManager(CustomSecretManager):
"""Secret manager that retrieves secrets from Google Secrets Manager (GSM).
Expand Down Expand Up @@ -128,8 +149,8 @@ def __init__(

super().__init__() # Handles the registration if needed

def get_secret(self, secret_name: str) -> SecretString | None:
"""Get a named secret from Google Colab user secrets."""
def _fully_qualified_secret_name(self, secret_name: str) -> str:
"""Get the fully qualified secret name."""
full_name = secret_name
if "projects/" not in full_name:
# This is not yet fully qualified
Expand All @@ -138,15 +159,41 @@ def get_secret(self, secret_name: str) -> SecretString | None:
if "/versions/" not in full_name:
full_name += "/versions/latest"

return full_name

def get_secret(self, secret_name: str) -> SecretString | None:
"""Get a named secret from Google Colab user secrets."""
return SecretString(
self.secret_client.access_secret_version(name=full_name).payload.data.decode("UTF-8")
self.secret_client.access_secret_version(
name=self._fully_qualified_secret_name(secret_name)
).payload.data.decode("UTF-8")
)

def get_secret_handle(
self,
secret_name: str,
) -> GSMSecretHandle:
"""Fetch secret in the secret manager, using the secret name.
Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to
inspect the secret's labels and other metadata.
Args:
secret_name (str): The name of the connector to filter by.
Returns:
GSMSecretHandle: A handle for the matching secret.
"""
return GSMSecretHandle(
parent=self,
secret_name=self._fully_qualified_secret_name(secret_name),
)

def fetch_secrets(
self,
*,
filter_string: str,
) -> Iterable[SecretHandle]:
) -> Iterable[GSMSecretHandle]:
"""List all available secrets in the secret manager.
Example filter strings:
Expand All @@ -158,7 +205,8 @@ def fetch_secrets(
https://cloud.google.com/secret-manager/docs/filtering
Returns:
Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets.
Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
secrets.
"""
gsm_secrets: ListSecretsPager = self.secret_client.list_secrets(
request=secretmanager.ListSecretsRequest(
Expand All @@ -168,7 +216,7 @@ def fetch_secrets(
)

return [
SecretHandle(
GSMSecretHandle(
parent=self,
secret_name=secret.name,
)
Expand All @@ -179,22 +227,23 @@ def fetch_secrets_by_label(
self,
label_key: str,
label_value: str,
) -> Iterable[SecretHandle]:
) -> Iterable[GSMSecretHandle]:
"""List all available secrets in the secret manager.
Args:
label_key (str): The key of the label to filter by.
label_value (str): The value of the label to filter by.
Returns:
Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets.
Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
secrets.
"""
return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}")

def fetch_connector_secrets(
self,
connector_name: str,
) -> Iterable[SecretHandle]:
) -> Iterable[GSMSecretHandle]:
"""Fetch secrets in the secret manager, using the connector name as a filter for the label.
The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute,
Expand All @@ -204,7 +253,8 @@ def fetch_connector_secrets(
connector_name (str): The name of the connector to filter by.
Returns:
Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets.
Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
secrets.
"""
return self.fetch_secrets_by_label(
label_key=self.CONNECTOR_LABEL,
Expand All @@ -214,7 +264,7 @@ def fetch_connector_secrets(
def fetch_connector_secret(
self,
connector_name: str,
) -> SecretHandle:
) -> GSMSecretHandle:
"""Fetch secret in the secret manager, using the connector name as a filter for the label.
This method is a convenience method that returns the first secret found for the connector.
Expand All @@ -226,9 +276,9 @@ def fetch_connector_secret(
connector_name (str): The name of the connector to filter by.
Returns:
SecretHandle: The matching secret.
GSMSecretHandle: A handle for the matching secret.
"""
results: Iterable[SecretHandle] = self.fetch_connector_secrets(connector_name)
results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name)
try:
result = next(iter(results))
except StopIteration:
Expand Down
54 changes: 54 additions & 0 deletions examples/run_gsm_connector_secret_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Simple script to download secrets from GCS.
Secrets will be located based on the `connector` label in the GSM secret metadata, and they
will be written to the connector's secrets directory based upon the `filename` label.
Filename is appended with `.json` and the secret is written to that file.
As a safety measure, we will only write to the connector's secrets directory if it exists.
If it doesn't exist, the script will fail. Users should ensure the directory
exists and is excluded from git before running the script.
Usage:
poetry run python examples/run_gsm_secret_fetch.py
"""

from __future__ import annotations

from pathlib import Path

import airbyte as ab
from airbyte.secrets import GoogleGSMSecretManager, SecretHandle

AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"
CONNECTOR_NAME = "source-s3"

AIRBYTE_REPO_ROOT = Path(__file__).parent.parent.parent / "airbyte"


CONNECTOR_SECRETS_DIR = (
AIRBYTE_REPO_ROOT
/ "airbyte-integrations"
/ "connectors"
/ CONNECTOR_NAME
/ "secrets"
)


def main() -> None:
secret_mgr = GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"),
)

secret: SecretHandle
for secret in secret_mgr.fetch_connector_secrets("source-s3"):
if "filename" in secret.labels:
secret_file_path = (
CONNECTOR_SECRETS_DIR / f"{secret.labels['filename']}.json"
)
secret.write_to_file(secret_file_path)


if __name__ == "__main__":
main()

0 comments on commit c5e4c20

Please sign in to comment.