From a0eab0b66d7067948363536e24c08894e69c718b Mon Sep 17 00:00:00 2001 From: 4sushi Date: Tue, 2 Apr 2024 08:21:31 +0200 Subject: [PATCH] Feat: Adapt BigQuery classes to be able to use default credential (#157) --- airbyte/_processors/sql/bigquery.py | 15 +++++++++------ airbyte/caches/bigquery.py | 20 ++++++++++++++------ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/airbyte/_processors/sql/bigquery.py b/airbyte/_processors/sql/bigquery.py index b7757352..404c76c1 100644 --- a/airbyte/_processors/sql/bigquery.py +++ b/airbyte/_processors/sql/bigquery.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import TYPE_CHECKING, final +import google.oauth2 import sqlalchemy from google.api_core.exceptions import NotFound from google.cloud import bigquery @@ -59,7 +60,7 @@ class BigQuerySqlProcessor(SqlProcessorBase): cache: BigQueryCache def __init__(self, cache: CacheBase, file_writer: FileWriterBase | None = None) -> None: - self._credentials: service_account.Credentials | None = None + self._credentials: google.auth.credentials.Credentials | None = None self._schema_exists: bool | None = None super().__init__(cache, file_writer) @@ -142,13 +143,15 @@ def _ensure_schema_exists( self._schema_exists = True - def _get_credentials(self) -> service_account.Credentials: + def _get_credentials(self) -> google.auth.credentials.Credentials: """Return the GCP credentials.""" if self._credentials is None: - self._credentials = service_account.Credentials.from_service_account_file( - self.cache.credentials_path - ) - + if self.cache.credentials_path: + self._credentials = service_account.Credentials.from_service_account_file( + self.cache.credentials_path + ) + else: + self._credentials, _ = google.auth.default() return self._credentials def _table_exists( diff --git a/airbyte/caches/bigquery.py b/airbyte/caches/bigquery.py index 993d1ae5..26fb19c9 100644 --- a/airbyte/caches/bigquery.py +++ b/airbyte/caches/bigquery.py @@ -17,11 +17,11 @@ from __future__ import annotations -import urllib -from typing import Any +from typing import TYPE_CHECKING, Any, Optional from overrides import overrides from pydantic import root_validator +from sqlalchemy.engine import make_url from airbyte._processors.sql.bigquery import BigQuerySqlProcessor from airbyte.caches.base import ( @@ -29,6 +29,10 @@ ) +if TYPE_CHECKING: + from sqlalchemy.engine.url import URL + + class BigQueryCache(CacheBase): """The BigQuery cache implementation.""" @@ -38,8 +42,9 @@ class BigQueryCache(CacheBase): dataset_name: str = "airbyte_raw" """The name of the dataset to use. In BigQuery, this is equivalent to the schema name.""" - credentials_path: str - """The path to the credentials file to use.""" + credentials_path: Optional[str] = None + """The path to the credentials file to use. + If not passed, falls back to the default inferred from the environment.""" _sql_processor_class: type[BigQuerySqlProcessor] = BigQuerySqlProcessor @@ -60,5 +65,8 @@ def get_database_name(self) -> str: @overrides def get_sql_alchemy_url(self) -> str: """Return the SQLAlchemy URL to use.""" - credentials_path_encoded = urllib.parse.quote(self.credentials_path) - return f"bigquery://{self.project_name!s}?credentials_path={credentials_path_encoded}" + url: URL = make_url(f"bigquery://{self.project_name!s}") + if self.credentials_path: + url = url.update_query_dict({"credentials_path": self.credentials_path}) + + return str(url)