Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions tap_googleads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from datetime import datetime
from functools import cached_property
from http import HTTPStatus
from typing import Any, Dict, Optional

import requests
Expand Down Expand Up @@ -58,13 +57,6 @@ def response_error_message(self, response: requests.Response) -> str:
except Exception:
return base_msg

def validate_response(self, response):
if response.status_code == HTTPStatus.FORBIDDEN:
msg = self.response_error_message(response)
raise ResumableAPIError(msg, response)

super().validate_response(response)

@cached_property
def authenticator(self) -> OAuthAuthenticator:
"""Return a new authenticator object."""
Expand Down Expand Up @@ -106,18 +98,21 @@ def authenticator(self) -> OAuthAuthenticator:
auth_headers=auth_headers,
)

def _get_login_customer_id(self, context):
if self.login_customer_id:
return self.login_customer_id
if context:
return context.get("parent_customer_id") or context.get("customer_id")
return None

@property
def http_headers(self) -> dict:
"""Return the http headers needed."""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
headers["developer-token"] = self.config["developer_token"]
headers["login-customer-id"] = (
self.login_customer_id
or self.context
and self.context.get("customer_id")
)
headers["login-customer-id"] = self._get_login_customer_id(self.context)
return headers

def get_url_params(
Expand Down
30 changes: 28 additions & 2 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from __future__ import annotations

import datetime
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable

from singer_sdk import typing as th # JSON Schema typing helpers

from tap_googleads.client import GoogleAdsStream
from tap_googleads.client import GoogleAdsStream, ResumableAPIError

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
Expand Down Expand Up @@ -82,6 +83,7 @@ def gaql(self):
parent_stream_type = AccessibleCustomers
schema = th.PropertiesList(
th.Property("customer_id", th.StringType),
th.Property("parent_customer_id", th.StringType),
th.Property(
"customerClient",
th.ObjectType(
Expand All @@ -100,6 +102,13 @@ def gaql(self):

seen_customer_ids = set()

def validate_response(self, response):
if response.status_code == HTTPStatus.FORBIDDEN:
msg = self.response_error_message(response)
raise ResumableAPIError(msg, response)

super().validate_response(response)

def generate_child_contexts(self, record, context):
customer_ids = self.customer_ids

Expand All @@ -124,7 +133,13 @@ def generate_child_contexts(self, record, context):

# sync only customers we haven't seen
customer_ids = set(customer_ids) - self.seen_customer_ids
yield from ({"customer_id": customer_id} for customer_id in customer_ids)

for customer_id in customer_ids:
customer_context = {"customer_id": customer_id}
# Add parent manager account id if this is a child
if customer_id != context['customer_id']:
customer_context['parent_customer_id'] = context['customer_id']
yield customer_context

self.seen_customer_ids.update(customer_ids)

Expand Down Expand Up @@ -252,6 +267,17 @@ def request_records(self, context):

yield from records

def validate_response(self, response):
if response.status_code == HTTPStatus.FORBIDDEN:
error = response.json()["error"]["details"][0]["errors"][0]
msg = (
"Click view report not accessible to customer "
f"'{self.context['customer_id']}': {error['message']}"
)
raise ResumableAPIError(msg, response)

super().validate_response(response)


class CampaignsStream(ReportsStream):
"""Define custom stream."""
Expand Down