Skip to content

Commit

Permalink
Feat: Adapt BigQuery classes to be able to use default credential (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
4sushi authored Apr 2, 2024
1 parent eca28c0 commit a0eab0b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
15 changes: 9 additions & 6 deletions airbyte/_processors/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, final

import google.oauth2
import sqlalchemy
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
Expand Down Expand Up @@ -59,7 +60,7 @@ class BigQuerySqlProcessor(SqlProcessorBase):
cache: BigQueryCache

def __init__(self, cache: CacheBase, file_writer: FileWriterBase | None = None) -> None:
self._credentials: service_account.Credentials | None = None
self._credentials: google.auth.credentials.Credentials | None = None
self._schema_exists: bool | None = None
super().__init__(cache, file_writer)

Expand Down Expand Up @@ -142,13 +143,15 @@ def _ensure_schema_exists(

self._schema_exists = True

def _get_credentials(self) -> service_account.Credentials:
def _get_credentials(self) -> google.auth.credentials.Credentials:
"""Return the GCP credentials."""
if self._credentials is None:
self._credentials = service_account.Credentials.from_service_account_file(
self.cache.credentials_path
)

if self.cache.credentials_path:
self._credentials = service_account.Credentials.from_service_account_file(
self.cache.credentials_path
)
else:
self._credentials, _ = google.auth.default()
return self._credentials

def _table_exists(
Expand Down
20 changes: 14 additions & 6 deletions airbyte/caches/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

from __future__ import annotations

import urllib
from typing import Any
from typing import TYPE_CHECKING, Any, Optional

from overrides import overrides
from pydantic import root_validator
from sqlalchemy.engine import make_url

from airbyte._processors.sql.bigquery import BigQuerySqlProcessor
from airbyte.caches.base import (
CacheBase,
)


if TYPE_CHECKING:
from sqlalchemy.engine.url import URL


class BigQueryCache(CacheBase):
"""The BigQuery cache implementation."""

Expand All @@ -38,8 +42,9 @@ class BigQueryCache(CacheBase):
dataset_name: str = "airbyte_raw"
"""The name of the dataset to use. In BigQuery, this is equivalent to the schema name."""

credentials_path: str
"""The path to the credentials file to use."""
credentials_path: Optional[str] = None
"""The path to the credentials file to use.
If not passed, falls back to the default inferred from the environment."""

_sql_processor_class: type[BigQuerySqlProcessor] = BigQuerySqlProcessor

Expand All @@ -60,5 +65,8 @@ def get_database_name(self) -> str:
@overrides
def get_sql_alchemy_url(self) -> str:
"""Return the SQLAlchemy URL to use."""
credentials_path_encoded = urllib.parse.quote(self.credentials_path)
return f"bigquery://{self.project_name!s}?credentials_path={credentials_path_encoded}"
url: URL = make_url(f"bigquery://{self.project_name!s}")
if self.credentials_path:
url = url.update_query_dict({"credentials_path": self.credentials_path})

return str(url)

0 comments on commit a0eab0b

Please sign in to comment.