From c5e4c206560e50835499eab22a3c17623cab34c3 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 22 Aug 2024 12:35:01 -0700 Subject: [PATCH] Feat: Add ability to inspect GSM secret labels and save secret contents directly to files (#346) --- airbyte/secrets/base.py | 27 ++++++++ airbyte/secrets/google_gsm.py | 78 ++++++++++++++++++---- examples/run_gsm_connector_secret_fetch.py | 54 +++++++++++++++ 3 files changed, 145 insertions(+), 14 deletions(-) create mode 100644 examples/run_gsm_connector_secret_fetch.py diff --git a/airbyte/secrets/base.py b/airbyte/secrets/base.py index a37fc8ea..f2b80fc2 100644 --- a/airbyte/secrets/base.py +++ b/airbyte/secrets/base.py @@ -14,6 +14,8 @@ if TYPE_CHECKING: + from pathlib import Path + from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler, ValidationInfo from pydantic.json_schema import JsonSchemaValue @@ -234,3 +236,28 @@ def parse_json(self) -> dict: string, a `PyAirbyteInputError` will be raised. """ return self.get_value().parse_json() + + def write_to_file( + self, + file_path: Path, + /, + *, + silent: bool = False, + ) -> None: + """Write the secret to a file. + + If `silent` is True, the method will not print any output to the console. Otherwise, + the method will print a message to the console indicating the file path to which the secret + is being written. + + This method is a convenience method that writes the secret to a file as text. + """ + if not silent: + print( + f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'" + ) + + file_path.write_text( + str(self.get_value()), + encoding="utf-8", + ) diff --git a/airbyte/secrets/google_gsm.py b/airbyte/secrets/google_gsm.py index 9a709119..0398bf9a 100644 --- a/airbyte/secrets/google_gsm.py +++ b/airbyte/secrets/google_gsm.py @@ -44,13 +44,34 @@ if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Iterable, MutableMapping from google.cloud.secretmanager_v1.services.secret_manager_service.pagers import ( ListSecretsPager, ) +class GSMSecretHandle(SecretHandle): + """A handle for a secret stored in Google Secrets Manager (GSM). + + This class inherits from `SecretHandle` and adds a `labels` attribute for inspecting GSM + labels. + """ + + parent: GoogleGSMSecretManager + + def _get_gsm_secret_object(self) -> secretmanager.Secret: + """Get the `Secret` object from GSM.""" + return self.parent.secret_client.get_secret( + name=self.secret_name, + ) + + @property + def labels(self) -> MutableMapping[str, str]: + """Get the labels of the secret.""" + return self._get_gsm_secret_object().labels + + class GoogleGSMSecretManager(CustomSecretManager): """Secret manager that retrieves secrets from Google Secrets Manager (GSM). @@ -128,8 +149,8 @@ def __init__( super().__init__() # Handles the registration if needed - def get_secret(self, secret_name: str) -> SecretString | None: - """Get a named secret from Google Colab user secrets.""" + def _fully_qualified_secret_name(self, secret_name: str) -> str: + """Get the fully qualified secret name.""" full_name = secret_name if "projects/" not in full_name: # This is not yet fully qualified @@ -138,15 +159,41 @@ def get_secret(self, secret_name: str) -> SecretString | None: if "/versions/" not in full_name: full_name += "/versions/latest" + return full_name + + def get_secret(self, secret_name: str) -> SecretString | None: + """Get a named secret from Google Colab user secrets.""" return SecretString( - self.secret_client.access_secret_version(name=full_name).payload.data.decode("UTF-8") + self.secret_client.access_secret_version( + name=self._fully_qualified_secret_name(secret_name) + ).payload.data.decode("UTF-8") + ) + + def get_secret_handle( + self, + secret_name: str, + ) -> GSMSecretHandle: + """Fetch secret in the secret manager, using the secret name. + + Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to + inspect the secret's labels and other metadata. + + Args: + secret_name (str): The name of the connector to filter by. + + Returns: + GSMSecretHandle: A handle for the matching secret. + """ + return GSMSecretHandle( + parent=self, + secret_name=self._fully_qualified_secret_name(secret_name), ) def fetch_secrets( self, *, filter_string: str, - ) -> Iterable[SecretHandle]: + ) -> Iterable[GSMSecretHandle]: """List all available secrets in the secret manager. Example filter strings: @@ -158,7 +205,8 @@ def fetch_secrets( https://cloud.google.com/secret-manager/docs/filtering Returns: - Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. + Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching + secrets. """ gsm_secrets: ListSecretsPager = self.secret_client.list_secrets( request=secretmanager.ListSecretsRequest( @@ -168,7 +216,7 @@ def fetch_secrets( ) return [ - SecretHandle( + GSMSecretHandle( parent=self, secret_name=secret.name, ) @@ -179,7 +227,7 @@ def fetch_secrets_by_label( self, label_key: str, label_value: str, - ) -> Iterable[SecretHandle]: + ) -> Iterable[GSMSecretHandle]: """List all available secrets in the secret manager. Args: @@ -187,14 +235,15 @@ def fetch_secrets_by_label( label_value (str): The value of the label to filter by. Returns: - Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. + Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching + secrets. """ return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}") def fetch_connector_secrets( self, connector_name: str, - ) -> Iterable[SecretHandle]: + ) -> Iterable[GSMSecretHandle]: """Fetch secrets in the secret manager, using the connector name as a filter for the label. The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, @@ -204,7 +253,8 @@ def fetch_connector_secrets( connector_name (str): The name of the connector to filter by. Returns: - Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. + Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching + secrets. """ return self.fetch_secrets_by_label( label_key=self.CONNECTOR_LABEL, @@ -214,7 +264,7 @@ def fetch_connector_secrets( def fetch_connector_secret( self, connector_name: str, - ) -> SecretHandle: + ) -> GSMSecretHandle: """Fetch secret in the secret manager, using the connector name as a filter for the label. This method is a convenience method that returns the first secret found for the connector. @@ -226,9 +276,9 @@ def fetch_connector_secret( connector_name (str): The name of the connector to filter by. Returns: - SecretHandle: The matching secret. + GSMSecretHandle: A handle for the matching secret. """ - results: Iterable[SecretHandle] = self.fetch_connector_secrets(connector_name) + results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name) try: result = next(iter(results)) except StopIteration: diff --git a/examples/run_gsm_connector_secret_fetch.py b/examples/run_gsm_connector_secret_fetch.py new file mode 100644 index 00000000..fa03becb --- /dev/null +++ b/examples/run_gsm_connector_secret_fetch.py @@ -0,0 +1,54 @@ +"""Simple script to download secrets from GCS. + +Secrets will be located based on the `connector` label in the GSM secret metadata, and they +will be written to the connector's secrets directory based upon the `filename` label. + +Filename is appended with `.json` and the secret is written to that file. + +As a safety measure, we will only write to the connector's secrets directory if it exists. +If it doesn't exist, the script will fail. Users should ensure the directory +exists and is excluded from git before running the script. + +Usage: + poetry run python examples/run_gsm_secret_fetch.py +""" + +from __future__ import annotations + +from pathlib import Path + +import airbyte as ab +from airbyte.secrets import GoogleGSMSecretManager, SecretHandle + +AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" +CONNECTOR_NAME = "source-s3" + +AIRBYTE_REPO_ROOT = Path(__file__).parent.parent.parent / "airbyte" + + +CONNECTOR_SECRETS_DIR = ( + AIRBYTE_REPO_ROOT + / "airbyte-integrations" + / "connectors" + / CONNECTOR_NAME + / "secrets" +) + + +def main() -> None: + secret_mgr = GoogleGSMSecretManager( + project=AIRBYTE_INTERNAL_GCP_PROJECT, + credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), + ) + + secret: SecretHandle + for secret in secret_mgr.fetch_connector_secrets("source-s3"): + if "filename" in secret.labels: + secret_file_path = ( + CONNECTOR_SECRETS_DIR / f"{secret.labels['filename']}.json" + ) + secret.write_to_file(secret_file_path) + + +if __name__ == "__main__": + main()