diff --git a/tap_googleads/client.py b/tap_googleads/client.py index cb30f46..e934eb4 100644 --- a/tap_googleads/client.py +++ b/tap_googleads/client.py @@ -2,7 +2,6 @@ from datetime import datetime from functools import cached_property -from http import HTTPStatus from typing import Any, Dict, Optional import requests @@ -58,13 +57,6 @@ def response_error_message(self, response: requests.Response) -> str: except Exception: return base_msg - def validate_response(self, response): - if response.status_code == HTTPStatus.FORBIDDEN: - msg = self.response_error_message(response) - raise ResumableAPIError(msg, response) - - super().validate_response(response) - @cached_property def authenticator(self) -> OAuthAuthenticator: """Return a new authenticator object.""" @@ -106,6 +98,13 @@ def authenticator(self) -> OAuthAuthenticator: auth_headers=auth_headers, ) + def _get_login_customer_id(self, context): + if self.login_customer_id: + return self.login_customer_id + if context: + return context.get("parent_customer_id") or context.get("customer_id") + return None + @property def http_headers(self) -> dict: """Return the http headers needed.""" @@ -113,11 +112,7 @@ def http_headers(self) -> dict: if "user_agent" in self.config: headers["User-Agent"] = self.config.get("user_agent") headers["developer-token"] = self.config["developer_token"] - headers["login-customer-id"] = ( - self.login_customer_id - or self.context - and self.context.get("customer_id") - ) + headers["login-customer-id"] = self._get_login_customer_id(self.context) return headers def get_url_params( diff --git a/tap_googleads/streams.py b/tap_googleads/streams.py index 5676de3..eb34566 100644 --- a/tap_googleads/streams.py +++ b/tap_googleads/streams.py @@ -3,12 +3,13 @@ from __future__ import annotations import datetime +from http import HTTPStatus from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterable from singer_sdk import typing as th # JSON Schema typing helpers -from tap_googleads.client import GoogleAdsStream +from tap_googleads.client import GoogleAdsStream, ResumableAPIError if TYPE_CHECKING: from singer_sdk.helpers.types import Context, Record @@ -82,6 +83,7 @@ def gaql(self): parent_stream_type = AccessibleCustomers schema = th.PropertiesList( th.Property("customer_id", th.StringType), + th.Property("parent_customer_id", th.StringType), th.Property( "customerClient", th.ObjectType( @@ -100,6 +102,13 @@ def gaql(self): seen_customer_ids = set() + def validate_response(self, response): + if response.status_code == HTTPStatus.FORBIDDEN: + msg = self.response_error_message(response) + raise ResumableAPIError(msg, response) + + super().validate_response(response) + def generate_child_contexts(self, record, context): customer_ids = self.customer_ids @@ -124,7 +133,13 @@ def generate_child_contexts(self, record, context): # sync only customers we haven't seen customer_ids = set(customer_ids) - self.seen_customer_ids - yield from ({"customer_id": customer_id} for customer_id in customer_ids) + + for customer_id in customer_ids: + customer_context = {"customer_id": customer_id} + # Add parent manager account id if this is a child + if customer_id != context['customer_id']: + customer_context['parent_customer_id'] = context['customer_id'] + yield customer_context self.seen_customer_ids.update(customer_ids) @@ -252,6 +267,17 @@ def request_records(self, context): yield from records + def validate_response(self, response): + if response.status_code == HTTPStatus.FORBIDDEN: + error = response.json()["error"]["details"][0]["errors"][0] + msg = ( + "Click view report not accessible to customer " + f"'{self.context['customer_id']}': {error['message']}" + ) + raise ResumableAPIError(msg, response) + + super().validate_response(response) + class CampaignsStream(ReportsStream): """Define custom stream."""