From 6ac4fdcbd9fcd449696fdab3767e2762287de664 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 14 Jan 2025 08:40:39 +0100 Subject: [PATCH 1/5] Adds resource field reference syntax to template strings --- .../rest_api_pipeline.py | 16 +- dlt/sources/rest_api/__init__.py | 51 ++- dlt/sources/rest_api/config_setup.py | 247 ++++++++++--- dlt/sources/rest_api/typing.py | 2 +- .../helpers/rest_client/test_client.py | 4 +- .../rest_api/configurations/source_configs.py | 37 +- .../configurations/test_configuration.py | 103 ++++-- .../configurations/test_resolve_config.py | 343 +++++++++++++----- .../rest_api/integration/test_offline.py | 151 ++++++-- .../integration/test_processing_steps.py | 124 ++----- 10 files changed, 729 insertions(+), 349 deletions(-) diff --git a/dlt/sources/_core_source_templates/rest_api_pipeline.py b/dlt/sources/_core_source_templates/rest_api_pipeline.py index 01a8828fcd..b320bb7152 100644 --- a/dlt/sources/_core_source_templates/rest_api_pipeline.py +++ b/dlt/sources/_core_source_templates/rest_api_pipeline.py @@ -75,18 +75,10 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any: { "name": "issue_comments", "endpoint": { - # The placeholder {issue_number} will be resolved - # from the parent resource - "path": "issues/{issue_number}/comments", - "params": { - # The value of `issue_number` will be taken - # from the `number` field in the `issues` resource - "issue_number": { - "type": "resolve", - "resource": "issues", - "field": "number", - } - }, + # The placeholder `{resources.issues.number}` + # will be replaced with the value of `number` field + # in the `issues` resource data + "path": "issues/{resources.issues.number}/comments", }, # Include data from `id` field of the parent resource # in the child data. The field name in the child data diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index ba2b212d46..e124e4fa03 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -39,6 +39,7 @@ ) from .config_setup import ( IncrementalParam, + InterceptingProxy, create_auth, create_paginator, build_resource_dependency_graph, @@ -119,16 +120,18 @@ def rest_api_source( "base_url": "https://pokeapi.co/api/v2/", "paginator": "json_link", }, - "endpoints": { - "pokemon": { - "params": { - "limit": 100, # Default page size is 20 + "resources": [ + { + "name": "pokemon", + "endpoint": { + "path": "pokemon", + "params": { + "limit": 100, + }, }, - "resource": { - "primary_key": "id", - } - }, - }, + "primary_key": "id", + } + ] }) """ # TODO: this must be removed when TypedDicts are supported by resolve_configuration @@ -316,6 +319,25 @@ def paginate_resource( incremental_cursor_transform, ) + # Interpolate incremental object value into path and params + _incremental: Union[Incremental[Any], InterceptingProxy] + if incremental_cursor_transform: + _incremental = InterceptingProxy( + incremental_object, + incremental_cursor_transform, + {"last_value", "end_value"}, + ) + else: + _incremental = incremental_object + + format_kwargs = {"incremental": _incremental} + + path = path.format(**format_kwargs) + params = { + k: v.format(**format_kwargs) if isinstance(v, str) else v + for k, v in params.items() + } + yield from client.paginate( method=method, path=path, @@ -374,21 +396,23 @@ def paginate_dependent_resource( ) for item in items: - formatted_path, parent_record, updated_params, updated_json = ( + formatted_path, expanded_params, updated_json, parent_record = ( process_parent_data_item( path=path, item=item, - # params=params, + params=params, request_json=request_json, resolved_params=resolved_params, include_from_parent=include_from_parent, + incremental=incremental_object, + incremental_value_convert=incremental_cursor_transform, ) ) for child_page in client.paginate( method=method, path=formatted_path, - params=updated_params, + params=expanded_params, json=updated_json, paginator=paginator, data_selector=data_selector, @@ -490,7 +514,8 @@ def identity_func(x: Any) -> Any: if transform is None: transform = identity_func - params[incremental_param.start] = transform(incremental_object.last_value) + if incremental_param.start: + params[incremental_param.start] = transform(incremental_object.last_value) if incremental_param.end: params[incremental_param.end] = transform(incremental_object.end_value) return params diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index 13a1cc52f7..2e9e9429a3 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -1,3 +1,4 @@ +import re import warnings from copy import copy from typing import ( @@ -6,6 +7,7 @@ Dict, Tuple, List, + Iterable, Optional, Union, Callable, @@ -93,10 +95,67 @@ class IncrementalParam(NamedTuple): - start: str + start: Optional[str] end: Optional[str] +class AttributeAccessibleDict(Dict[str, Any]): + def __getattr__(self, key: str) -> Any: + try: + return self[key] + except KeyError: + raise AttributeError(key) + + +class ResourcesContext: + def __init__(self) -> None: + self._resources: Dict[str, AttributeAccessibleDict] = {} + + def __getitem__(self, key: str) -> Any: + if key not in self._resources: + self._resources[key] = AttributeAccessibleDict() + return self._resources[key] + + def __getattr__(self, key: str) -> Any: + try: + return self[key] + except KeyError: + raise AttributeError(key) + + +# TODO: Remove once Incremental can do values conversion internally +class InterceptingProxy: + """A proxy class that intercepts access to selected attributes and + calls a function with the attribute value before returning it. + Attributes that are not in the intercept set are returned as is. + """ + + def __init__( + self, + instance: Any, + intercept_function: Callable[..., Any], + intercept_attributes: Optional[Iterable[str]] = None, + ) -> None: + self._instance = instance + self._intercept_function = intercept_function + self._intercept_attributes = set(intercept_attributes) if intercept_attributes else set() + + def __getattribute__(self, name: str) -> Any: + if name.startswith("_"): + return super().__getattribute__(name) + + intercept_attributes = super().__getattribute__("_intercept_attributes") + instance = super().__getattribute__("_instance") + + if name in intercept_attributes: + attribute_value = getattr(instance, name) + intercept_function = super().__getattribute__("_intercept_function") + + return intercept_function(attribute_value) + else: + return getattr(instance, name) + + def register_paginator( paginator_name: str, paginator_class: Type[BasePaginator], @@ -218,7 +277,7 @@ def setup_incremental_object( f"Only initial_value is allowed in the configuration of param: {param_name}. To" " set end_value too use the incremental configuration at the resource level." " See" - " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api#incremental-loading/" + " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic#incremental-loading" ) return param_config, IncrementalParam(start=param_name, end=None), None if isinstance(param_config, dict) and param_config.get("type") == "incremental": @@ -227,7 +286,7 @@ def setup_incremental_object( "Only start_param and initial_value are allowed in the configuration of param:" f" {param_name}. To set end_value too use the incremental configuration at the" " resource level. See" - " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api#incremental-loading" + " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic#incremental-loading" ) convert = parse_convert_or_deprecated_transform(param_config) @@ -246,7 +305,7 @@ def setup_incremental_object( return ( Incremental(**config), IncrementalParam( - start=incremental_config["start_param"], + start=incremental_config.get("start_param"), end=incremental_config.get("end_param"), ), convert, @@ -286,7 +345,6 @@ def build_resource_dependency_graph( dependency_graph: graphlib.TopologicalSorter = graphlib.TopologicalSorter() # type: ignore[type-arg] resolved_param_map: Dict[str, Optional[List[ResolvedParam]]] = {} endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults) - # create dependency graph for resource_name, endpoint_resource in endpoint_resource_map.items(): if isinstance(endpoint_resource, DltResource): @@ -318,7 +376,7 @@ def build_resource_dependency_graph( predecessor = first_param.resolve_config["resource"] if predecessor not in endpoint_resource_map: raise ValueError( - f"A transformer resource {resource_name} refers to non existing parent resource" + f"A dependent resource {resource_name} refers to non existing parent resource" f" {predecessor} on {first_param}" ) @@ -403,9 +461,39 @@ def _replace_expression(template: str, params: Dict[str, Any]) -> str: return template +def _encode_template_placeholders( + text: str, prefix: str, delimiter_char: str = "||" +) -> Tuple[str, Dict[str, str]]: + """Encodes substrings starting with prefix in text using delimiter_char.""" + + # Store original values for restoration + replacements = {} + + def replace_match(match: re.Match[str]) -> Any: + content = match.group(1) + if content.startswith(prefix): + # Generate a unique key for this replacement + key = f"{delimiter_char}{content}{delimiter_char}" + replacements[key] = match.group(0) + return key + # Return unchanged for further processing + return match.group(0) + + # Find all {...} patterns and selectively replace them + pattern = r"\{\s*([^}]+)\}" + transformed = re.sub(pattern, replace_match, text) + return transformed, replacements + + +def _decode_special_objects(text: str, replacements: Dict[str, str]) -> str: + for key, value in replacements.items(): + text = text.replace(key, value) + return text + + def _bind_path_params(resource: EndpointResource) -> None: """Binds params declared in path to params available in `params`. Pops the - bound params but. Params of type `resolve` and `incremental` are skipped + bound params. Params of type `resolve` and `incremental` are skipped and bound later. """ path_params: Dict[str, Any] = {} @@ -619,7 +707,6 @@ def _extract_expressions( >>> _extract_expressions("blog/{resources.blog.id}/comments", "resources.") ["resources.blog.id"] """ - expressions = set() def recursive_search(value: Union[str, List[Any], Dict[str, Any]]) -> None: @@ -666,78 +753,126 @@ def _expressions_to_resolved_params(expressions: List[str]) -> List[ResolvedPara return resolved_params -def _bound_path_parameters( +def process_parent_data_item( path: str, - param_values: Dict[str, Any], -) -> Tuple[str, List[str]]: - path_params = _extract_expressions(path) - bound_path = _replace_expression(path, param_values) - - return bound_path, path_params + item: Dict[str, Any], + resolved_params: List[ResolvedParam], + params: Optional[Dict[str, Any]] = None, + request_json: Optional[Dict[str, Any]] = None, + include_from_parent: Optional[List[str]] = None, + incremental: Optional[Incremental[Any]] = None, + incremental_value_convert: Optional[Callable[..., Any]] = None, +) -> Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + params_values = collect_resolved_values( + item, resolved_params, incremental, incremental_value_convert + ) + expanded_path = expand_placeholders(path, params_values) + expanded_params = expand_placeholders(params or {}, params_values) + expanded_json = expand_placeholders(request_json or {}, params_values) -def _bound_json_parameters( - request_json: Dict[str, Any], - param_values: Dict[str, Any], -) -> Tuple[Dict[str, Any], List[str]]: - json_params = _extract_expressions(request_json) - bound_json = _replace_expression(json.dumps(request_json), param_values) + parent_resource_name = resolved_params[0].resolve_config["resource"] + parent_record = build_parent_record(item, parent_resource_name, include_from_parent) - return json.loads(bound_json), json_params + return expanded_path, expanded_params, expanded_json, parent_record -def process_parent_data_item( - path: str, +def collect_resolved_values( item: Dict[str, Any], - # params: Dict[str, Any],, resolved_params: List[ResolvedParam], - include_from_parent: List[str], - request_json: Optional[Dict[str, Any]] = None, -) -> Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + incremental: Optional[Incremental[Any]], + incremental_value_convert: Optional[Callable[..., Any]], +) -> Dict[str, Any]: + """ + Collects field values from the parent item based on resolved_params + and sets up incremental if present. Returns the resulting placeholders + (params_values) and a ResourcesContext that may store `resources..`. + """ + if not resolved_params: + raise ValueError("Resolved params are required to process parent data item") + parent_resource_name = resolved_params[0].resolve_config["resource"] + resources_context = ResourcesContext() + params_values: Dict[str, Any] = {} - params_values = {} for resolved_param in resolved_params: field_values = jsonpath.find_values(resolved_param.field_path, item) - if not field_values: field_path = resolved_param.resolve_config["field"] raise ValueError( - f"Transformer expects a field '{field_path}' to be present in the incoming data" - f" from resource {parent_resource_name} in order to bind it to param" + f"Resource expects a field '{field_path}' to be present in the incoming data" + f" from resource {parent_resource_name} in order to bind it to path param" f" {resolved_param.param_name}. Available parent fields are" f" {', '.join(item.keys())}" ) - params_values[resolved_param.param_name] = field_values[0] + param_name = resolved_param.param_name + value = field_values[0] + + # If resolved param was defined as `resources..` + # we update the resources context + if param_name.startswith("resources."): + resource_name, field_name = param_name.split(".")[1:] + resources_context[resource_name][field_name] = value + params_values["resources"] = resources_context + else: + params_values[param_name] = value + + if incremental: + # Only wrap in InterceptingProxy if we have a converter + _incremental = ( + InterceptingProxy(incremental, incremental_value_convert, {"last_value", "end_value"}) + if incremental_value_convert + else incremental + ) + params_values["incremental"] = _incremental + + return params_values - bound_path, path_params = _bound_path_parameters(path, params_values) - json_params: List[str] = [] - if request_json: - request_json, json_params = _bound_json_parameters(request_json, params_values) +def expand_placeholders(obj: Any, placeholders: Dict[str, Any]) -> Any: + """ + Recursively expand str.format placeholders in `obj` using `placeholders`. + """ + if obj is None: + return None + + if isinstance(obj, str): + return obj.format(**placeholders) + + if isinstance(obj, dict): + return { + expand_placeholders(k, placeholders): expand_placeholders(v, placeholders) + for k, v in obj.items() + } + + if isinstance(obj, list): + return [expand_placeholders(item, placeholders) for item in obj] + + return obj # For other data types, do nothing + +def build_parent_record( + item: Dict[str, Any], parent_resource_name: str, include_from_parent: Optional[List[str]] +) -> Dict[str, Any]: + """ + Builds a dictionary of the `include_from_parent` fields from the parent, + renaming them using `make_parent_key_name`. + """ parent_record: Dict[str, Any] = {} - if include_from_parent: - for parent_key in include_from_parent: - child_key = make_parent_key_name(parent_resource_name, parent_key) - if parent_key not in item: - raise ValueError( - f"Transformer expects a field '{parent_key}' to be present in the incoming data" - f" from resource {parent_resource_name} in order to include it in child records" - f" under {child_key}. Available parent fields are {', '.join(item.keys())}" - ) - parent_record[child_key] = item[parent_key] - - # the params not present in the params already bound, - # will be returned and used as query params - params_values = { - param_name: param_value - for param_name, param_value in params_values.items() - if param_name not in path_params and param_name not in json_params - } + if not include_from_parent: + return parent_record - return bound_path, parent_record, params_values, request_json + for parent_key in include_from_parent: + child_key = make_parent_key_name(parent_resource_name, parent_key) + if parent_key not in item: + raise ValueError( + f"Resource expects a field '{parent_key}' to be present in the incoming data " + f"from resource {parent_resource_name} in order to include it in child records" + f" under {child_key}. Available parent fields are {', '.join(item.keys())}" + ) + parent_record[child_key] = item[parent_key] + return parent_record def _merge_resource_endpoints( diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index 21de95f2ed..c478bb4249 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -212,7 +212,7 @@ class IncrementalRESTArgs(IncrementalArgs, total=False): class IncrementalConfig(IncrementalRESTArgs, total=False): - start_param: str + start_param: Optional[str] end_param: Optional[str] diff --git a/tests/sources/helpers/rest_client/test_client.py b/tests/sources/helpers/rest_client/test_client.py index e67ff9c70a..d3ad07ce61 100644 --- a/tests/sources/helpers/rest_client/test_client.py +++ b/tests/sources/helpers/rest_client/test_client.py @@ -1,6 +1,6 @@ import os from base64 import b64encode -from typing import Any, Dict, cast +from typing import Any, Dict, cast, List, Optional from unittest.mock import patch, ANY import pytest @@ -401,7 +401,7 @@ def test_paginate_json_body_without_params(self, rest_client) -> None: posts_skip = (DEFAULT_TOTAL_PAGES - 3) * DEFAULT_PAGE_SIZE class JSONBodyPageCursorPaginator(BaseReferencePaginator): - def update_state(self, response, data): # type: ignore[override] + def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None: self._next_reference = response.json().get("next_page") def update_request(self, request): diff --git a/tests/sources/rest_api/configurations/source_configs.py b/tests/sources/rest_api/configurations/source_configs.py index ff58fee0fb..d3c5f9c9e7 100644 --- a/tests/sources/rest_api/configurations/source_configs.py +++ b/tests/sources/rest_api/configurations/source_configs.py @@ -1,7 +1,6 @@ from collections import namedtuple from typing import cast, List -import requests import dlt import dlt.common from dlt.common.configuration.exceptions import ConfigFieldMissingException @@ -162,6 +161,7 @@ def repositories(): VALID_CONFIGS: List[RESTAPIConfig] = [ + # Using the resolve field syntax { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -181,6 +181,30 @@ def repositories(): }, ], }, + # Using the resource field reference syntax + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": { + "path": "posts/{resources.posts.id}/comments", + }, + }, + ], + }, + # Using short, endpoint-only resource definition + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": "posts/{resources.posts.id}/comments", + }, + ], + }, { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -357,6 +381,7 @@ def repositories(): }, ], }, + # Using the resolve field syntax { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ @@ -376,20 +401,14 @@ def repositories(): repositories(), ], }, + # Using the resource field reference syntax { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ { "name": "issues", "endpoint": { - "path": "dlt-hub/{repository}/issues/", - "params": { - "repository": { - "type": "resolve", - "resource": "repositories", - "field": "name", - }, - }, + "path": "dlt-hub/{resources.repositories.name}/issues/", }, }, repositories(), diff --git a/tests/sources/rest_api/configurations/test_configuration.py b/tests/sources/rest_api/configurations/test_configuration.py index ca84479a0d..7834b4a687 100644 --- a/tests/sources/rest_api/configurations/test_configuration.py +++ b/tests/sources/rest_api/configurations/test_configuration.py @@ -1,5 +1,5 @@ from copy import copy -from typing import cast +from typing import cast, Dict, Any from unittest.mock import patch import pytest @@ -238,30 +238,51 @@ def test_setup_for_single_item_endpoint() -> None: assert isinstance(endpoint["paginator"], SinglePagePaginator) -def test_resource_schema() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "users", - { - "name": "user", - "endpoint": { - "path": "user/{id}", - "paginator": None, - "data_selector": None, - "params": { - "id": { - "type": "resolve", - "field": "id", - "resource": "users", +@pytest.mark.parametrize( + "config", + [ + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user", + "endpoint": { + "path": "user/{id}", + "paginator": None, + "data_selector": None, + "params": { + "id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, }, }, }, + ], + }, + { + "client": { + "base_url": "https://api.example.com", }, - ], - } + "resources": [ + "users", + { + "name": "user", + "endpoint": { + "path": "user/{resources.users.id}", + "paginator": None, + "data_selector": None, + }, + }, + ], + }, + ], +) +def test_resource_schema(config: RESTAPIConfig) -> None: resources = rest_api_resources(config) assert len(resources) == 2 resource = resources[0] @@ -403,7 +424,31 @@ def test_resource_defaults_no_params() -> None: } -def test_accepts_DltResource_in_resources() -> None: +@pytest.mark.parametrize( + "issues_resource_config", + [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{resources.repositories.name}/issues/", + }, + }, + ], +) +def test_accepts_DltResource_in_resources(issues_resource_config: Dict[str, Any]) -> None: @dlt.resource(selected=False) def repositories(): """A seed list of repositories to fetch""" @@ -412,19 +457,7 @@ def repositories(): config: RESTAPIConfig = { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ - { - "name": "issues", - "endpoint": { - "path": "dlt-hub/{repository}/issues/", - "params": { - "repository": { - "type": "resolve", - "resource": "repositories", - "field": "name", - }, - }, - }, - }, + issues_resource_config, # type: ignore[list-item] repositories(), ], } diff --git a/tests/sources/rest_api/configurations/test_resolve_config.py b/tests/sources/rest_api/configurations/test_resolve_config.py index e31470c4b7..37d794c0f3 100644 --- a/tests/sources/rest_api/configurations/test_resolve_config.py +++ b/tests/sources/rest_api/configurations/test_resolve_config.py @@ -82,71 +82,100 @@ def test_bind_path_param() -> None: def test_process_parent_data_item() -> None: - resolve_params = [ + resolved_params = [ ResolvedParam("id", {"field": "obj_id", "resource": "issues", "type": "resolve"}) ] - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments" + assert expanded_params == {} assert parent_record == {} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=["obj_id"], ) assert parent_record == {"_issues_obj_id": 12345} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345, "obj_node": "node_1"}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=["obj_id", "obj_node"], ) assert parent_record == {"_issues_obj_id": 12345, "_issues_obj_node": "node_1"} - # test nested data - resolve_param_nested = [ + # Test resource field reference in path + resolved_params_reference = [ + ResolvedParam( + "resources.issues.obj_id", {"field": "obj_id", "resource": "issues", "type": "resolve"} + ) + ] + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( + path="dlt-hub/dlt/issues/{resources.issues.obj_id}/comments", + item={"obj_id": 12345, "obj_node": "node_1"}, + resolved_params=resolved_params_reference, + include_from_parent=["obj_id", "obj_node"], + ) + assert bound_path == "dlt-hub/dlt/issues/12345/comments" + + # Test resource field reference in params + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( + path="dlt-hub/dlt/issues/comments", + item={"obj_id": 12345, "obj_node": "node_1"}, + params={"id": "{resources.issues.obj_id}"}, + resolved_params=resolved_params_reference, + include_from_parent=["obj_id", "obj_node"], + ) + assert bound_path == "dlt-hub/dlt/issues/comments" + assert expanded_params == {"id": "12345"} + + # Test nested data + resolved_param_nested = [ ResolvedParam( "id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"}, ) ] item = {"some_results": {"obj_id": 12345}} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item=item, - resolved_params=resolve_param_nested, + params={}, + resolved_params=resolved_param_nested, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments" - # param path not found + # Param path not found with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"_id": 12345}, - resolved_params=resolve_params, + params={}, + resolved_params=resolved_params, include_from_parent=None, ) - assert "Transformer expects a field 'obj_id'" in str(val_ex.value) + assert "Resource expects a field 'obj_id'" in str(val_ex.value) - # included path not found + # Included path not found with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"_id": 12345, "obj_node": "node_1"}, - resolved_params=resolve_params, + params={}, + resolved_params=resolved_params, include_from_parent=["obj_id", "node"], ) assert ( - "Transformer expects a field 'obj_id' to be present in the incoming data from resource" + "Resource expects a field 'obj_id' to be present in the incoming data from resource" " issues in order to bind it to" in str(val_ex.value) ) @@ -157,27 +186,30 @@ def test_process_parent_data_item() -> None: ResolvedParam("id", {"field": "id", "resource": "comments", "type": "resolve"}), ] - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{issue_id}/comments/{id}", item={"issue": 12345, "id": 56789}, + params={}, resolved_params=multi_resolve_params, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments/56789" assert parent_record == {} - # param path not found with multiple parameters + # Param path not found with multiple parameters with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{issue_id}/comments/{id}", item={"_issue": 12345, "id": 56789}, + params={}, resolved_params=multi_resolve_params, include_from_parent=None, ) - assert "Transformer expects a field 'issue'" in str(val_ex.value) + assert "Resource expects a field 'issue'" in str(val_ex.value) def test_two_resources_can_depend_on_one_parent_resource() -> None: + # Using resolve syntax user_id = { "user_id": { "type": "resolve", @@ -211,9 +243,8 @@ def test_two_resources_can_depend_on_one_parent_resource() -> None: assert resources["meetings"]._pipe.parent.name == "users" assert resources["user_details"]._pipe.parent.name == "users" - -def test_dependent_resource_can_bind_multiple_parameters() -> None: - config: RESTAPIConfig = { + # Using resource field reference syntax + config_with_ref: RESTAPIConfig = { "client": { "base_url": "https://api.example.com", }, @@ -222,104 +253,218 @@ def test_dependent_resource_can_bind_multiple_parameters() -> None: { "name": "user_details", "endpoint": { - "path": "user/{user_id}/{group_id}", - "params": { - "user_id": { - "type": "resolve", - "field": "id", - "resource": "users", - }, - "group_id": { - "type": "resolve", - "field": "group", - "resource": "users", - }, - }, + "path": "user/{resources.users.id}/", + }, + }, + { + "name": "meetings", + "endpoint": { + "path": "meetings/{resources.users.id}/", }, }, ], } + resources = rest_api_source(config_with_ref).resources + assert resources["meetings"]._pipe.parent.name == "users" + assert resources["user_details"]._pipe.parent.name == "users" + +@pytest.mark.parametrize( + "config", + [ + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user_details", + "endpoint": { + "path": "user/{user_id}/{group_id}", + "params": { + "user_id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, + "group_id": { + "type": "resolve", + "field": "group", + "resource": "users", + }, + }, + }, + }, + ], + }, + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user_details", + "endpoint": { + "path": "user/{resources.users.id}/{resources.users.group}", + }, + }, + ], + }, + ], +) +def test_dependent_resource_can_bind_multiple_parameters(config: RESTAPIConfig) -> None: resources = rest_api_source(config).resources assert resources["user_details"]._pipe.parent.name == "users" -def test_one_resource_cannot_bind_two_parents() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "users", - "groups", +@pytest.mark.parametrize( + "config,resolved_param1,resolved_param2", + [ + ( { - "name": "user_details", - "endpoint": { - "path": "user/{user_id}/{group_id}", - "params": { - "user_id": { - "type": "resolve", - "field": "id", - "resource": "users", - }, - "group_id": { - "type": "resolve", - "field": "id", - "resource": "groups", + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + "groups", + { + "name": "user_details", + "endpoint": { + "path": "user/{user_id}/{group_id}", + "params": { + "user_id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, + "group_id": { + "type": "resolve", + "field": "id", + "resource": "groups", + }, + }, }, }, + ], + }, + "ResolvedParam(param_name='user_id'", + "ResolvedParam(param_name='group_id'", + ), + ( + { + "client": { + "base_url": "https://api.example.com", }, + "resources": [ + "users", + "groups", + { + "name": "user_details", + "endpoint": { + "path": "user/{resources.users.id}/{resources.groups.id}", + }, + }, + ], }, - ], - } - - with pytest.raises(ValueError) as e: + "ResolvedParam(param_name='resources.users.id'", + "ResolvedParam(param_name='resources.groups.id'", + ), + ], +) +def test_one_resource_cannot_bind_two_parents( + config: RESTAPIConfig, resolved_param1: str, resolved_param2: str +) -> None: + with pytest.raises(ValueError) as exc_info: rest_api_resources(config) - error_part_1 = re.escape( - "Multiple parent resources for user_details: [ResolvedParam(param_name='user_id'" - ) - error_part_2 = re.escape("ResolvedParam(param_name='group_id'") - assert e.match(error_part_1) - assert e.match(error_part_2) + error_msg = str(exc_info.value) + assert "Multiple parent resources for user_details:" in error_msg + assert resolved_param1 in error_msg, f"{resolved_param1} not found in {error_msg}" + assert resolved_param2 in error_msg, f"{resolved_param2} not found in {error_msg}" -def test_resource_dependent_dependent() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "locations", - { - "name": "location_details", - "endpoint": { - "path": "location/{location_id}", - "params": { - "location_id": { - "type": "resolve", - "field": "id", - "resource": "locations", +@pytest.mark.parametrize( + "config", + [ + # Using resolve syntax + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": { + "path": "location/{location_id}", + "params": { + "location_id": { + "type": "resolve", + "field": "id", + "resource": "locations", + }, }, }, }, - }, - { - "name": "meetings", - "endpoint": { - "path": "/meetings/{room_id}", - "params": { - "room_id": { - "type": "resolve", - "field": "room_id", - "resource": "location_details", + { + "name": "meetings", + "endpoint": { + "path": "/meetings/{room_id}", + "params": { + "room_id": { + "type": "resolve", + "field": "room_id", + "resource": "location_details", + }, }, }, }, + ], + }, + # Using resource field reference syntax + { + "client": { + "base_url": "https://api.example.com", }, - ], - } - + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": { + "path": "location/{resources.locations.id}", + }, + }, + { + "name": "meetings", + "endpoint": { + "path": "/meetings/{resources.location_details.room_id}", + }, + }, + ], + }, + # Using shorter syntax with string endpoints + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": "location/{resources.locations.id}", + }, + { + "name": "meetings", + "endpoint": "/meetings/{resources.location_details.room_id}", + }, + ], + }, + ], +) +def test_resource_dependent_dependent(config: RESTAPIConfig) -> None: resources = rest_api_source(config).resources assert resources["meetings"]._pipe.parent.name == "location_details" assert resources["location_details"]._pipe.parent.name == "locations" diff --git a/tests/sources/rest_api/integration/test_offline.py b/tests/sources/rest_api/integration/test_offline.py index 4d843fa472..6244297b4b 100644 --- a/tests/sources/rest_api/integration/test_offline.py +++ b/tests/sources/rest_api/integration/test_offline.py @@ -20,15 +20,9 @@ from tests.utils import assert_load_info, assert_query_data, load_table_counts -def test_load_mock_api(mock_api_server): - pipeline = dlt.pipeline( - pipeline_name="rest_api_mock", - destination="duckdb", - dataset_name="rest_api_mock", - full_refresh=True, - ) - - mock_source = rest_api_source( +@pytest.mark.parametrize( + "config", + [ { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -60,9 +54,33 @@ def test_load_mock_api(mock_api_server): }, }, ], - } + }, + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": "posts/{resources.posts.id}/comments", + }, + { + "name": "post_details", + "endpoint": "posts/{resources.posts.id}", + }, + ], + }, + ], +) +def test_load_mock_api(mock_api_server, config): + pipeline = dlt.pipeline( + pipeline_name="rest_api_mock", + destination="duckdb", + dataset_name="rest_api_mock", + full_refresh=True, ) + mock_source = rest_api_source(config) + load_info = pipeline.run(mock_source) print(load_info) assert_load_info(load_info) @@ -101,6 +119,7 @@ def test_load_mock_api(mock_api_server): ) +@pytest.mark.skip(reason="TODO: this should raise an error") def test_load_mock_api_with_query_params(mock_api_server): pipeline = dlt.pipeline( pipeline_name="rest_api_mock", @@ -430,15 +449,9 @@ def test_posts_without_key(mock_api_server): ] -def test_load_mock_api_typeddict_config(mock_api_server): - pipeline = dlt.pipeline( - pipeline_name="rest_api_mock", - destination="duckdb", - dataset_name="rest_api_mock", - full_refresh=True, - ) - - mock_source = rest_api_source( +@pytest.mark.parametrize( + "config", + [ RESTAPIConfig( client=ClientConfig(base_url="https://api.example.com"), resources=[ @@ -457,9 +470,31 @@ def test_load_mock_api_typeddict_config(mock_api_server): ), ), ], - ) + ), + RESTAPIConfig( + client=ClientConfig(base_url="https://api.example.com"), + resources=[ + "posts", + EndpointResource( + name="post_comments", + endpoint=Endpoint( + path="posts/{resources.posts.id}/comments", + ), + ), + ], + ), + ], +) +def test_load_mock_api_typeddict_config(mock_api_server, config): + pipeline = dlt.pipeline( + pipeline_name="rest_api_mock", + destination="duckdb", + dataset_name="rest_api_mock", + full_refresh=True, ) + mock_source = rest_api_source(config) + load_info = pipeline.run(mock_source) print(load_info) assert_load_info(load_info) @@ -506,6 +541,44 @@ def test_posts_with_inremental_date_conversion(mock_api_server) -> None: assert called_kwargs["path"] == "posts" +def test_posts_with_inremental_in_param_template(mock_api_server) -> None: + start_time = pendulum.from_timestamp(1) + one_day_later = start_time.add(days=1) + config: RESTAPIConfig = { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + { + "name": "posts", + "endpoint": { + "path": "posts", + "params": { + "since": "{incremental.last_value}", + "until": "{incremental.end_value}", + }, + "incremental": { + # "start_param": "since", + # "end_param": "until", + "cursor_path": "updated_at", + "initial_value": str(start_time.int_timestamp), + "end_value": str(one_day_later.int_timestamp), + "convert": lambda epoch: pendulum.from_timestamp( + int(epoch) + ).to_date_string(), + }, + }, + }, + ], + } + RESTClient = dlt.sources.helpers.rest_client.RESTClient + with mock.patch.object(RESTClient, "paginate") as mock_paginate: + source = rest_api_source(config).add_limit(1) + _ = list(source.with_resources("posts")) + assert mock_paginate.call_count == 1 + _, called_kwargs = mock_paginate.call_args_list[0] + assert called_kwargs["params"] == {"since": "1970-01-01", "until": "1970-01-02"} + assert called_kwargs["path"] == "posts" + + def test_custom_session_is_used(mock_api_server, mocker): class CustomSession(Session): pass @@ -533,7 +606,29 @@ class CustomSession(Session): assert mocked_send.call_args[0][0].url == "https://api.example.com/posts" -def test_DltResource_gets_called(mock_api_server, mocker) -> None: +@pytest.mark.parametrize( + "posts_resource_config", + [ + { + "name": "posts", + "endpoint": { + "path": "posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "post_list", + "field": "id", + }, + }, + }, + }, + { + "name": "posts", + "endpoint": "posts/{resources.post_list.id}/comments", + }, + ], +) +def test_DltResource_gets_called(mock_api_server, mocker, posts_resource_config) -> None: @dlt.resource() def post_list(): yield [{"id": "0"}, {"id": "1"}, {"id": "2"}] @@ -544,19 +639,7 @@ def post_list(): "write_disposition": "replace", }, "resources": [ - { - "name": "posts", - "endpoint": { - "path": "posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "post_list", - "field": "id", - }, - }, - }, - }, + posts_resource_config, post_list(), ], } diff --git a/tests/sources/rest_api/integration/test_processing_steps.py b/tests/sources/rest_api/integration/test_processing_steps.py index df18f98292..20ddd23d7e 100644 --- a/tests/sources/rest_api/integration/test_processing_steps.py +++ b/tests/sources/rest_api/integration/test_processing_steps.py @@ -1,5 +1,7 @@ from typing import Any, Callable, Dict, List +import pytest + import dlt from dlt.sources.rest_api import RESTAPIConfig, rest_api_source @@ -165,44 +167,23 @@ def id_by_10(row): assert data[0]["title"] == "Post 10" -def test_rest_api_source_filtered_child(mock_api_server) -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - { - "name": "posts", - "endpoint": "posts", - "processing_steps": [ - {"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item] - ], +@pytest.mark.parametrize( + "comments_endpoint", + [ + { + "path": "/posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } }, - { - "name": "comments", - "endpoint": { - "path": "/posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } - }, - }, - "processing_steps": [ - {"filter": lambda x: x["id"] == 1}, # type: ignore[typeddict-item] - ], - }, - ], - } - mock_source = rest_api_source(config) - - data = list(mock_source.with_resources("comments")) - assert len(data) == 2 - - -def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> None: + }, + "posts/{resources.posts.id}/comments", + ], +) +def test_rest_api_source_filtered_child(mock_api_server, comments_endpoint) -> None: config: RESTAPIConfig = { "client": { "base_url": "https://api.example.com", @@ -217,9 +198,7 @@ def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> }, { "name": "comments", - "endpoint": { - "path": "/posts/{resources.posts.id}/comments", - }, + "endpoint": comments_endpoint, "processing_steps": [ {"filter": lambda x: x["id"] == 1}, # type: ignore[typeddict-item] ], @@ -232,52 +211,23 @@ def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> assert len(data) == 2 -def test_rest_api_source_filtered_and_map_child(mock_api_server) -> None: - def extend_body(row): - row["body"] = f"{row['_posts_title']} - {row['body']}" - return row - - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - { - "name": "posts", - "endpoint": "posts", - "processing_steps": [ - {"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item] - ], - }, - { - "name": "comments", - "endpoint": { - "path": "/posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } - }, - }, - "include_from_parent": ["title"], - "processing_steps": [ - {"map": extend_body}, # type: ignore[typeddict-item] - {"filter": lambda x: x["body"].startswith("Post 2")}, # type: ignore[typeddict-item] - ], +@pytest.mark.parametrize( + "comments_endpoint", + [ + { + "path": "/posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } }, - ], - } - mock_source = rest_api_source(config) - - data = list(mock_source.with_resources("comments")) - assert data[0]["body"] == "Post 2 - Comment 0 for post 2" - - -def test_rest_api_source_filtered_and_map_child_with_implicit_param( - mock_api_server, -) -> None: + }, + "posts/{resources.posts.id}/comments", + ], +) +def test_rest_api_source_filtered_and_map_child(mock_api_server, comments_endpoint) -> None: def extend_body(row): row["body"] = f"{row['_posts_title']} - {row['body']}" return row @@ -296,9 +246,7 @@ def extend_body(row): }, { "name": "comments", - "endpoint": { - "path": "/posts/{resources.posts.id}/comments", - }, + "endpoint": comments_endpoint, "include_from_parent": ["title"], "processing_steps": [ {"map": extend_body}, # type: ignore[typeddict-item] From 7c538fc85b314fe9206e8bae3f8b586fad3a3922 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 11 Feb 2025 09:15:06 +0100 Subject: [PATCH 2/5] - Improve context validation - Add support for interpolating in query and json parameters - Stricter validation for resource params - Refactor path and parameter binding - Error handling for invalid resource params --- dlt/sources/rest_api/__init__.py | 19 +- dlt/sources/rest_api/config_setup.py | 220 +++++++---- .../configurations/test_resolve_config.py | 81 ++-- tests/sources/rest_api/conftest.py | 24 ++ .../rest_api/integration/test_offline.py | 357 +++++++++++++++--- 5 files changed, 543 insertions(+), 158 deletions(-) diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index abde01c321..d0843b2519 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -180,25 +180,18 @@ def rest_api_resources(config: RESTAPIConfig) -> List[DltResource]: "sort": "updated", "direction": "desc", "state": "open", - "since": { - "type": "incremental", - "cursor_path": "updated_at", - "initial_value": "2024-01-25T11:21:28Z", - }, + "since": "{incremental.last_value}", + }, + "incremental": { + "cursor_path": "updated_at", + "initial_value": "2024-01-25T11:21:28Z", }, }, }, { "name": "issue_comments", "endpoint": { - "path": "issues/{issue_number}/comments", - "params": { - "issue_number": { - "type": "resolve", - "resource": "issues", - "field": "number", - } - }, + "path": "issues/{resources.issues.number}/comments", }, }, ], diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index 33b8a954aa..d7f7c24a81 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -1,10 +1,10 @@ -import re import warnings from copy import copy from typing import ( Type, Any, Dict, + Set, Tuple, List, Iterable, @@ -14,6 +14,7 @@ cast, NamedTuple, ) +import functools import graphlib import string from requests import Response @@ -354,6 +355,30 @@ def build_resource_dependency_graph( # find resolved parameters to connect dependent resources resolved_params = _find_resolved_params(endpoint_resource["endpoint"]) + available_contexts = _get_available_contexts(endpoint_resource["endpoint"]) + + # Find more resolved params in path + # Ignore params that are not in available_contexts for backward compatibility + # with resolved params in path: these are validated in _bind_path_params + path_expressions = _find_expressions( + endpoint_resource["endpoint"]["path"], available_contexts + ) + + # Find all expressions in params and json, but error if any of them is not in available_contexts + params_expressions = _find_expressions(endpoint_resource["endpoint"].get("params", {})) + _raise_if_any_not_in(params_expressions, available_contexts, message="params") + + json_expressions = _find_expressions(endpoint_resource["endpoint"].get("json", {})) + _raise_if_any_not_in(json_expressions, available_contexts, message="json") + + resolved_params += _expressions_to_resolved_params( + { + x + for x in (path_expressions | params_expressions | json_expressions) + if x.startswith("resources.") + } + ) + # set of resources in resolved params named_resources = {rp.resolve_config["resource"] for rp in resolved_params} @@ -441,78 +466,82 @@ def _make_endpoint_resource( return _merge_resource_endpoints(default_config, resource) -def _encode_template_placeholders( - text: str, prefix: str, delimiter_char: str = "||" -) -> Tuple[str, Dict[str, str]]: - """Encodes substrings starting with prefix in text using delimiter_char.""" - - # Store original values for restoration - replacements = {} - - def replace_match(match: re.Match[str]) -> Any: - content = match.group(1) - if content.startswith(prefix): - # Generate a unique key for this replacement - key = f"{delimiter_char}{content}{delimiter_char}" - replacements[key] = match.group(0) - return key - # Return unchanged for further processing - return match.group(0) - - # Find all {...} patterns and selectively replace them - pattern = r"\{\s*([^}]+)\}" - transformed = re.sub(pattern, replace_match, text) - return transformed, replacements - - -def _decode_special_objects(text: str, replacements: Dict[str, str]) -> str: - for key, value in replacements.items(): - text = text.replace(key, value) - return text - - def _bind_path_params(resource: EndpointResource) -> None: """Binds params declared in path to params available in `params`. Pops the bound params. Params of type `resolve` and `incremental` are skipped and bound later. """ - path_params: Dict[str, Any] = {} + # TODO: Deprecate static params usage in path + # TODO: and remove this function assert isinstance(resource["endpoint"], dict) # type guard + + params = resource["endpoint"].get("params", {}) + resolve_params = [r.param_name for r in _find_resolved_params(resource["endpoint"])] path = resource["endpoint"]["path"] - for format_ in string.Formatter().parse(path): - name = format_[1] - if name: - params = resource["endpoint"].get("params", {}) - if name not in params and name not in path_params: + + new_path_segments = [] + + for literal_text, field_name, _, _ in string.Formatter().parse(path): + # Always add literal text + new_path_segments.append(literal_text) + + if not field_name: + # There's no placeholder here + continue + + # If the placeholder starts with 'resources.' or 'incremental.', leave it intact + # TODO: Generalize this to regex + if field_name.startswith("resources.") or field_name.startswith("incremental."): + new_path_segments.append(f"{{{field_name}}}") + continue + + # If it's a "resolve" param, skip binding here so it remains in the path + # and can be processed later + if field_name in resolve_params: + # We insert a literal placeholder instead of substituting a value + new_path_segments.append(f"{{{field_name}}}") + # Remove from the list of resolve params so we don't complain about it later + resolve_params.remove(field_name) + continue + + # Otherwise, we attempt to bind a normal param from endpoint['params'] + if field_name not in params: + # Does not have a dot in the field name: most likely should be a resolve param + if "." not in field_name: raise ValueError( - f"The path {path} defined in resource {resource['name']} requires param with" - f" name {name} but it is not found in {params}" + f"The path '{path}' defined in resource '{resource['name']}' requires a param " + f"named '{field_name}', but it was not found in 'endpoint.params': {params}" + ) + else: + # Most likely mistyped placeholder context name + raise ValueError( + f"The path '{path}' defined in resource '{resource['name']}' contains a" + f" placeholder '{field_name}'. This placeholder is not a valid name." + " Valid names are: 'resources', 'incremental'." + ) + + if not isinstance(params[field_name], dict): + # bind resolved param and pop it from endpoint + value = params.pop(field_name) + new_path_segments.append(str(value)) + else: + param_type = params[field_name].get("type") + if param_type != "resolve": + raise ValueError( + f"The path {path} defined in resource {resource['name']} tries to bind" + f" param {field_name} with type {param_type}. Paths can only bind 'resolve'" + " type params." ) - if name in resolve_params: - resolve_params.remove(name) - if name in params: - if not isinstance(params[name], dict): - # bind resolved param and pop it from endpoint - path_params[name] = params.pop(name) - else: - param_type = params[name].get("type") - if param_type != "resolve": - raise ValueError( - f"The path {path} defined in resource {resource['name']} tries to bind" - f" param {name} with type {param_type}. Paths can only bind 'resolve'" - " type params." - ) - # resolved params are bound later - path_params[name] = "{" + name + "}" if len(resolve_params) > 0: - raise NotImplementedError( + raise ValueError( f"Resource {resource['name']} defines resolve params {resolve_params} that are not" - f" bound in path {path}. Resolve query params not supported yet." + f" bound in path {path}. To reference parent resource in query params use" + " resources.. syntax." ) - resource["endpoint"]["path"] = path.format(**path_params) + resource["endpoint"]["path"] = "".join(new_path_segments) def _setup_single_entity_endpoint(endpoint: Endpoint) -> Endpoint: @@ -665,17 +694,30 @@ def remove_field(response: Response, *args, **kwargs) -> Response: return None -def _extract_expressions( - template: Union[str, Dict[str, Any]], - prefix: str = "", -) -> List[str]: - """Takes a template string and extracts expressions that start with a prefix. +def _find_expressions( + content: Union[str, Dict[str, Any]], + prefixes: Optional[Iterable[str]] = None, +) -> Set[str]: + """Takes a string, dictionary, or nested structure and extracts expressions + that start with any of the given prefixes. If prefixes is None, extracts all expressions. + Recursively searches through dictionaries and lists to find expressions in string values. + Args: - template (str): A string with expressions to extract - prefix (str): A string that marks the beginning of an expression + content (Union[str, Dict[str, Any]]): A string, dictionary, or nested structure + to search for expressions + prefixes (Optional[Iterable[str]]): An iterable of strings that mark the beginning + of expressions. If None, all expressions are included. + + Returns: + Set[str]: Set of found expressions that match the prefix criteria (or all if no prefixes) + Example: - >>> _extract_expressions("blog/{resources.blog.id}/comments", "resources.") - ["resources.blog.id"] + >>> _find_expressions("blog/{resources.blog.id}/comments", ["resources."]) + {"resources.blog.id"} + >>> _find_expressions("blog/{resources.blog.id}/comments", None) + {"resources.blog.id"} + >>> _find_expressions("blog/{id}/comments", None) + {"id"} """ expressions = set() @@ -691,15 +733,16 @@ def recursive_search(value: Union[str, List[Any], Dict[str, Any]]) -> None: e = [ field_name for _, field_name, _, _ in string.Formatter().parse(value) - if field_name and field_name.startswith(prefix) + if field_name + and (prefixes is None or any(field_name.startswith(prefix) for prefix in prefixes)) ] expressions.update(e) - recursive_search(template) - return list(expressions) + recursive_search(content) + return expressions -def _expressions_to_resolved_params(expressions: List[str]) -> List[ResolvedParam]: +def _expressions_to_resolved_params(expressions: Set[str]) -> List[ResolvedParam]: resolved_params = [] # We assume that the expressions are in the format 'resources..' # and not more complex expressions @@ -889,3 +932,38 @@ def _merge_resource_endpoints( "endpoint": merged_endpoint, } return merged_resource + + +def _get_available_contexts(endpoint: Endpoint) -> Set[str]: + """Returns a list of available contexts for the endpoint. + Args: + endpoint (Endpoint): The endpoint configuration to check + + Returns: + List[str]: List of available context names + """ + contexts = {"resources"} # resources context is always available + + if "incremental" in endpoint: + contexts.add("incremental") + + return contexts + + +def _raise_if_any_not_in(expressions: Set[str], available_contexts: Set[str], message: str) -> None: + """Validates that all expressions start with one of the available contexts. + + Args: + expressions: Set of expressions to validate + available_contexts: Set of valid context prefixes (e.g. 'resources', 'incremental') + message: Location where invalid expression was found (for error message) + + Raises: + ValueError: If any expression doesn't start with an available context prefix + """ + for expression in expressions: + if not any(expression.startswith(prefix + ".") for prefix in available_contexts): + raise ValueError( + f"Expression '{expression}' defined in {message} is not valid. Valid expressions" + f" must start with one of: {', '.join(available_contexts)}" + ) diff --git a/tests/sources/rest_api/configurations/test_resolve_config.py b/tests/sources/rest_api/configurations/test_resolve_config.py index 2c626b859e..036ce9e2b8 100644 --- a/tests/sources/rest_api/configurations/test_resolve_config.py +++ b/tests/sources/rest_api/configurations/test_resolve_config.py @@ -83,7 +83,7 @@ def test_bind_path_param() -> None: # resolved param will remain unbounded and tp_6 = deepcopy(three_params) tp_6["endpoint"]["path"] = "{org}/{repo}/issues/1234/comments" # type: ignore[index] - with pytest.raises(NotImplementedError): + with pytest.raises(ValueError): _bind_path_params(tp_6) @@ -475,41 +475,62 @@ def test_resource_dependent_dependent(config: RESTAPIConfig) -> None: assert resources["location_details"]._pipe.parent.name == "locations" -def test_circular_resource_bindingis_invalid() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - { - "name": "chicken", - "endpoint": { - "path": "chicken/{egg_id}/", - "params": { - "egg_id": { - "type": "resolve", - "field": "id", - "resource": "egg", +@pytest.mark.parametrize( + "config", + [ + # Using resolve syntax + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + { + "name": "chicken", + "endpoint": { + "path": "chicken/{egg_id}/", + "params": { + "egg_id": { + "type": "resolve", + "field": "id", + "resource": "egg", + }, }, }, }, - }, - { - "name": "egg", - "endpoint": { - "path": "egg/{chicken_id}/", - "params": { - "chicken_id": { - "type": "resolve", - "field": "id", - "resource": "chicken", + { + "name": "egg", + "endpoint": { + "path": "egg/{chicken_id}/", + "params": { + "chicken_id": { + "type": "resolve", + "field": "id", + "resource": "chicken", + }, }, }, }, - }, - ], - } - + ], + }, + # Using resource field reference syntax + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + { + "name": "chicken", + "endpoint": { + "path": "chicken/{resources.egg.id}/", + }, + }, + { + "name": "egg", + "endpoint": { + "path": "egg/{resources.chicken.id}/", + }, + }, + ], + }, + ], +) +def test_circular_resource_bindingis_invalid(config: RESTAPIConfig) -> None: with pytest.raises(CycleError) as e: rest_api_resources(config) assert e.match(re.escape("'nodes are in a cycle', ['chicken', 'egg', 'chicken']")) diff --git a/tests/sources/rest_api/conftest.py b/tests/sources/rest_api/conftest.py index a416c1d1d6..6e3ead6211 100644 --- a/tests/sources/rest_api/conftest.py +++ b/tests/sources/rest_api/conftest.py @@ -126,11 +126,21 @@ def post_comments(request, context): post_id = int(request.url.split("/")[-2]) return paginate_by_page_number(request, generate_comments(post_id)) + @router.get(r"/post_comments(\?.*)?$") + def post_comments_via_query_param(request, context): + post_id = int(request.qs.get("post_id", [0])[0]) + return paginate_by_page_number(request, generate_comments(post_id)) + @router.get(r"/posts/\d+$") def post_detail(request, context): post_id = request.url.split("/")[-1] return {"id": int(post_id), "body": f"Post body {post_id}"} + @router.get(r"/post_detail(\?.*)?$") + def post_detail_via_query_param(request, context): + post_id = int(request.qs.get("post_id", [0])[0]) + return {"id": int(post_id), "body": f"Post body {post_id}"} + @router.get(r"/posts/\d+/some_details_404") def post_detail_404(request, context): """Return 404 for post with id > 0. Used to test ignoring 404 errors.""" @@ -192,6 +202,20 @@ def search_posts(request, context): "next_page": page_number + 1 if page_number < total_pages else None, } + @router.post(r"/posts/search_by_id/\d+$") + def search_posts_by_id(request, context): + body = request.json() + post_id = body.get("post_id", 0) + title = body.get("more", {}).get("title", 0) + + more_array = body.get("more_array", [])[0] + return { + "id": int(post_id), + "title": title, + "body": f"Post body {post_id}", + "more": f"More is equale to id: {more_array}", + } + @router.get("/protected/posts/basic-auth") def protected_basic_auth(request, context): auth = request.headers.get("Authorization") diff --git a/tests/sources/rest_api/integration/test_offline.py b/tests/sources/rest_api/integration/test_offline.py index 9d4e27a47c..e7dca6baa2 100644 --- a/tests/sources/rest_api/integration/test_offline.py +++ b/tests/sources/rest_api/integration/test_offline.py @@ -23,52 +23,88 @@ @pytest.mark.parametrize( "config", [ - { - "client": {"base_url": "https://api.example.com"}, - "resources": [ - "posts", - { - "name": "post_comments", - "endpoint": { - "path": "posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } + # Using resolve params in path + pytest.param( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": { + "path": "posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } + }, }, }, - }, - { - "name": "post_details", - "endpoint": { - "path": "posts/{post_id}", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } + { + "name": "post_details", + "endpoint": { + "path": "posts/{post_id}", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } + }, }, }, - }, - ], - }, - { - "client": {"base_url": "https://api.example.com"}, - "resources": [ - "posts", - { - "name": "post_comments", - "endpoint": "posts/{resources.posts.id}/comments", - }, - { - "name": "post_details", - "endpoint": "posts/{resources.posts.id}", - }, - ], - }, + ], + }, + id="resolve_params_in_path", + ), + # Using interpolated params in path + pytest.param( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": "posts/{resources.posts.id}/comments", + }, + { + "name": "post_details", + "endpoint": "posts/{resources.posts.id}", + }, + ], + }, + id="interpolated_params_in_path", + ), + # Using interpolated params in query string + pytest.param( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": { + "path": "post_comments", + "params": { + "post_id": "{resources.posts.id}", + }, + }, + }, + { + "name": "post_details", + "endpoint": { + "path": "post_detail", + "params": { + "post_id": "{resources.posts.id}", + }, + }, + }, + ], + }, + id="interpolated_params_in_query_string", + ), ], ) def test_load_mock_api(mock_api_server, config): @@ -410,6 +446,173 @@ def test_dependent_resource_query_string_params( assert 1 <= int(qs["page"][0]) <= 10 +@pytest.mark.parametrize( + "endpoint_params,expected_static_params", + [ + # No static params + pytest.param( + { + "path": "post_detail", + "params": {"post_id": "{resources.posts.id}"}, + }, + {}, + id="interpolate_param_only", + ), + # With static params + pytest.param( + { + "path": "post_detail", + "params": {"post_id": "{resources.posts.id}", "sort": "desc"}, + }, + {"sort": ["desc"]}, + id="interpolate_param_with_static", + ), + # One static param is empty + pytest.param( + { + "path": "post_detail", + "params": {"post_id": "{resources.posts.id}", "sort": "desc", "locale": ""}, + }, + {"sort": ["desc"], "locale": [""]}, + id="one_static_param_is_empty", + ), + ], +) +def test_interpolate_params_in_query_string( + mock_api_server, endpoint_params, expected_static_params +): + mock_source = rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_details", + "endpoint": { + **endpoint_params, + }, + }, + ], + } + ) + list(mock_source.with_resources("posts", "post_details").add_limit(1)) + + history = mock_api_server.request_history + post_details_calls = [h for h in history if "/post_detail" in h.url] + assert len(post_details_calls) == 5 + + for index, call in enumerate(post_details_calls): + qs = parse_qs(urlsplit(call.url).query, keep_blank_values=True) + assert set(qs.keys()) == set(expected_static_params.keys()) | {"post_id"} + assert qs["post_id"] == [str(index)] + + +def test_raises_error_for_unused_resolve_params(mock_api_server): + with pytest.raises(ValueError) as exc_info: + rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_details", + "endpoint": { + "path": "posts", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } + }, + }, + }, + ], + } + ) + + assert ( + "Resource post_details defines resolve params ['post_id'] that are not bound in path posts." + " To reference parent resource in query params use resources.." + " syntax." + in str(exc_info.value) + ) + + +def test_raises_error_for_incorrect_interpolation_in_path(): + with pytest.raises(ValueError) as exc_info: + rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_detail", + "endpoint": {"path": "posts/{unknown.posts.id}"}, + }, + ], + } + ) + + assert ( + "The path 'posts/{unknown.posts.id}' defined in resource 'post_detail' contains a" + " placeholder 'unknown.posts.id'. This placeholder is not a valid name. Valid names are:" + " 'resources', 'incremental'." + in str(exc_info.value) + ) + + +def test_raises_error_for_incorrect_interpolation_in_query_string(mock_api_server): + with pytest.raises(ValueError) as exc_info: + rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_detail", + "endpoint": { + "path": "post_detail", + "params": {"post_id": "{unknown.posts.id}"}, + }, + }, + ], + } + ) + + assert ( + "Expression 'unknown.posts.id' defined in params is not valid. Valid expressions must start" + " with one of: resources" + in str(exc_info.value) + ) + + +def test_raises_error_for_incorrect_interpolation_in_json(mock_api_server): + with pytest.raises(ValueError) as exc_info: + rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "search_by_id", + "endpoint": { + "path": "posts/search_by_id/{resources.posts.id}", + "method": "POST", + "json": {"post_id": "{unknown.posts.id}"}, + }, + }, + ], + } + ) + + assert ( + "Expression 'unknown.posts.id' defined in json is not valid. Valid expressions must start" + " with one of: resources" + in str(exc_info.value) + ) + + def test_source_with_post_request(mock_api_server): class JSONBodyPageCursorPaginator(BaseReferencePaginator): def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None: @@ -444,6 +647,72 @@ def update_request(self, request: Request) -> None: assert res[i] == {"id": 51 + i, "title": f"Post {51 + i}"} +def test_interpolate_parent_values_in_path_and_json_body(mock_api_server): + pipeline = dlt.pipeline( + pipeline_name="rest_api_mock", + destination="duckdb", + dataset_name="rest_api_mock", + full_refresh=True, + ) + mock_source = rest_api_source( + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_details", + "endpoint": { + "path": "posts/search_by_id/{resources.posts.id}", + "method": "POST", + "json": { + "post_id": "{resources.posts.id}", + "limit": 5, + "more": { + "title": "{resources.posts.title}", + }, + "more_array": [ + "{resources.posts.id}", + ], + }, + }, + }, + ], + } + ) + load_info = pipeline.run(mock_source) + print(load_info) + assert_load_info(load_info) + table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + table_counts = load_table_counts(pipeline, *table_names) + assert table_counts.keys() == {"posts", "post_details"} + assert table_counts["posts"] == DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES + assert table_counts["post_details"] == DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES + with pipeline.sql_client() as client: + posts_table = client.make_qualified_table_name("posts") + posts_details_table = client.make_qualified_table_name("post_details") + print(pipeline.default_schema.to_pretty_yaml()) + assert_query_data( + pipeline, + f"SELECT title FROM {posts_table} ORDER BY id limit 25", + [f"Post {i}" for i in range(25)], + ) + assert_query_data( + pipeline, + f"SELECT body FROM {posts_details_table} ORDER BY id limit 25", + [f"Post body {i}" for i in range(25)], + ) + assert_query_data( + pipeline, + f"SELECT title FROM {posts_details_table} ORDER BY id limit 25", + [f"Post {i}" for i in range(25)], + ) + assert_query_data( + pipeline, + f"SELECT more FROM {posts_details_table} ORDER BY id limit 25", + [f"More is equale to id: {i}" for i in range(25)], + ) + + def test_unauthorized_access_to_protected_endpoint(mock_api_server): pipeline = dlt.pipeline( pipeline_name="rest_api_mock", @@ -580,7 +849,7 @@ def test_load_mock_api_typeddict_config(mock_api_server, config): assert table_counts["post_comments"] == DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES * 50 -def test_posts_with_inremental_date_conversion(mock_api_server) -> None: +def test_posts_with_incremental_date_conversion(mock_api_server) -> None: start_time = pendulum.from_timestamp(1) one_day_later = start_time.add(days=1) config: RESTAPIConfig = { @@ -614,7 +883,7 @@ def test_posts_with_inremental_date_conversion(mock_api_server) -> None: assert called_kwargs["path"] == "posts" -def test_posts_with_inremental_in_param_template(mock_api_server) -> None: +def test_posts_with_incremental_in_param_template(mock_api_server) -> None: start_time = pendulum.from_timestamp(1) one_day_later = start_time.add(days=1) config: RESTAPIConfig = { From 7963df08e79234e1d1d6e19ae776d07e2531add0 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 11 Feb 2025 14:33:44 +0100 Subject: [PATCH 3/5] - Update GitHub source template with incremental configuration - Generalize context validation for path and parameter interpolation - Add parameterized test for incorrect interpolation scenarios --- .../rest_api_pipeline.py | 13 ++-- dlt/sources/rest_api/config_setup.py | 30 ++++----- .../rest_api/integration/test_offline.py | 61 ++++++------------- .../test_rest_api_pipeline_template.py | 1 - 4 files changed, 43 insertions(+), 62 deletions(-) diff --git a/dlt/sources/_core_source_templates/rest_api_pipeline.py b/dlt/sources/_core_source_templates/rest_api_pipeline.py index b320bb7152..359cc53e2c 100644 --- a/dlt/sources/_core_source_templates/rest_api_pipeline.py +++ b/dlt/sources/_core_source_templates/rest_api_pipeline.py @@ -61,11 +61,14 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any: # This works by getting the updated_at value # from the previous response data and using this value # for the `since` query parameter in the next request. - "since": { - "type": "incremental", - "cursor_path": "updated_at", - "initial_value": pendulum.today().subtract(days=30).to_iso8601_string(), - }, + "since": "{incremental.last_value}", + }, + # For incremental to work, we need to define the cursor_path + # (the field that will be used to get the incremental value) + # and the initial value + "incremental": { + "cursor_path": "updated_at", + "initial_value": pendulum.today().subtract(days=30).to_iso8601_string(), }, }, }, diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index d7f7c24a81..054b15c846 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -14,7 +14,6 @@ cast, NamedTuple, ) -import functools import graphlib import string from requests import Response @@ -334,6 +333,10 @@ def make_parent_key_name(resource_name: str, field_name: str) -> str: return f"_{resource_name}_{field_name}" +def _filter_resource_expressions(expressions: Set[str]) -> Set[str]: + return {x for x in expressions if x.startswith("resources.")} + + def build_resource_dependency_graph( resource_defaults: EndpointResourceBase, resource_list: List[Union[str, EndpointResource, DltResource]], @@ -358,8 +361,8 @@ def build_resource_dependency_graph( available_contexts = _get_available_contexts(endpoint_resource["endpoint"]) # Find more resolved params in path - # Ignore params that are not in available_contexts for backward compatibility - # with resolved params in path: these are validated in _bind_path_params + # Ignore params that are not in available_contexts for backward compatibility with + # resolved params in path: these are validated in _bind_path_params path_expressions = _find_expressions( endpoint_resource["endpoint"]["path"], available_contexts ) @@ -372,11 +375,7 @@ def build_resource_dependency_graph( _raise_if_any_not_in(json_expressions, available_contexts, message="json") resolved_params += _expressions_to_resolved_params( - { - x - for x in (path_expressions | params_expressions | json_expressions) - if x.startswith("resources.") - } + _filter_resource_expressions(path_expressions | params_expressions | json_expressions) ) # set of resources in resolved params @@ -475,10 +474,13 @@ def _bind_path_params(resource: EndpointResource) -> None: # TODO: and remove this function assert isinstance(resource["endpoint"], dict) # type guard - params = resource["endpoint"].get("params", {}) + endpoint = resource["endpoint"] + params = endpoint.get("params", {}) + path = endpoint["path"] + + resolve_params = [r.param_name for r in _find_resolved_params(endpoint)] - resolve_params = [r.param_name for r in _find_resolved_params(resource["endpoint"])] - path = resource["endpoint"]["path"] + available_contexts = _get_available_contexts(endpoint) new_path_segments = [] @@ -490,9 +492,9 @@ def _bind_path_params(resource: EndpointResource) -> None: # There's no placeholder here continue - # If the placeholder starts with 'resources.' or 'incremental.', leave it intact - # TODO: Generalize this to regex - if field_name.startswith("resources.") or field_name.startswith("incremental."): + # If placeholder starts with a recognized context, leave it intact + # e.g. "resources." or "incremental." or any future contexts + if any(field_name.startswith(prefix + ".") for prefix in available_contexts): new_path_segments.append(f"{{{field_name}}}") continue diff --git a/tests/sources/rest_api/integration/test_offline.py b/tests/sources/rest_api/integration/test_offline.py index e7dca6baa2..4bf6e2a956 100644 --- a/tests/sources/rest_api/integration/test_offline.py +++ b/tests/sources/rest_api/integration/test_offline.py @@ -539,32 +539,10 @@ def test_raises_error_for_unused_resolve_params(mock_api_server): ) -def test_raises_error_for_incorrect_interpolation_in_path(): - with pytest.raises(ValueError) as exc_info: - rest_api_source( - { - "client": {"base_url": "https://api.example.com"}, - "resources": [ - "posts", - { - "name": "post_detail", - "endpoint": {"path": "posts/{unknown.posts.id}"}, - }, - ], - } - ) - - assert ( - "The path 'posts/{unknown.posts.id}' defined in resource 'post_detail' contains a" - " placeholder 'unknown.posts.id'. This placeholder is not a valid name. Valid names are:" - " 'resources', 'incremental'." - in str(exc_info.value) - ) - - -def test_raises_error_for_incorrect_interpolation_in_query_string(mock_api_server): - with pytest.raises(ValueError) as exc_info: - rest_api_source( +@pytest.mark.parametrize( + "config,location", + [ + pytest.param( { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -577,19 +555,11 @@ def test_raises_error_for_incorrect_interpolation_in_query_string(mock_api_serve }, }, ], - } - ) - - assert ( - "Expression 'unknown.posts.id' defined in params is not valid. Valid expressions must start" - " with one of: resources" - in str(exc_info.value) - ) - - -def test_raises_error_for_incorrect_interpolation_in_json(mock_api_server): - with pytest.raises(ValueError) as exc_info: - rest_api_source( + }, + "params", + id="query_params" + ), + pytest.param( { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -603,11 +573,18 @@ def test_raises_error_for_incorrect_interpolation_in_json(mock_api_server): }, }, ], - } - ) + }, + "json", + id="json_body" + ), + ], +) +def test_raises_error_for_incorrect_interpolation(mock_api_server, config, location): + with pytest.raises(ValueError) as exc_info: + rest_api_source(config) assert ( - "Expression 'unknown.posts.id' defined in json is not valid. Valid expressions must start" + f"Expression 'unknown.posts.id' defined in {location} is not valid. Valid expressions must start" " with one of: resources" in str(exc_info.value) ) diff --git a/tests/sources/rest_api/test_rest_api_pipeline_template.py b/tests/sources/rest_api/test_rest_api_pipeline_template.py index 786dd10931..7f830fa99d 100644 --- a/tests/sources/rest_api/test_rest_api_pipeline_template.py +++ b/tests/sources/rest_api/test_rest_api_pipeline_template.py @@ -3,7 +3,6 @@ import pytest from dlt.common.typing import TSecretStrValue - @pytest.mark.parametrize( "example_name", ( From 833f548ee6b84a60f0254314eba47692901b702d Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 11 Feb 2025 16:04:18 +0100 Subject: [PATCH 4/5] Fix spacing --- tests/sources/rest_api/test_rest_api_pipeline_template.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/sources/rest_api/test_rest_api_pipeline_template.py b/tests/sources/rest_api/test_rest_api_pipeline_template.py index 7f830fa99d..786dd10931 100644 --- a/tests/sources/rest_api/test_rest_api_pipeline_template.py +++ b/tests/sources/rest_api/test_rest_api_pipeline_template.py @@ -3,6 +3,7 @@ import pytest from dlt.common.typing import TSecretStrValue + @pytest.mark.parametrize( "example_name", ( From a49b3d1c818c1df6673e83a695f93b9fe34aa035 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 11 Feb 2025 16:59:01 +0100 Subject: [PATCH 5/5] Add documentation for new resources.* interpolation method --- .../verified-sources/rest_api/basic.md | 147 ++++++++++++++---- 1 file changed, 117 insertions(+), 30 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md index ea3c9c768b..9bcafeecae 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md @@ -35,17 +35,12 @@ source = rest_api_source({ "posts", # The explicit configuration allows you to link resources - # and define parameters. + # and define query string parameters. { "name": "comments", "endpoint": { - "path": "posts/{post_id}/comments", + "path": "posts/{resources.posts.id}/comments", "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - }, "sort": "created_at", }, }, @@ -132,8 +127,6 @@ github_token = "your_github_token" ## Source configuration - - ### Quick example Let's take a look at the GitHub example in the `rest_api_pipeline.py` file: @@ -179,14 +172,7 @@ def github_source(github_token=dlt.secrets.value): { "name": "issue_comments", "endpoint": { - "path": "issues/{issue_number}/comments", - "params": { - "issue_number": { - "type": "resolve", - "resource": "issues", - "field": "number", - } - }, + "path": "issues/{resources.issues.number}/comments", }, "include_from_parent": ["id"], }, @@ -364,7 +350,7 @@ The fields in the endpoint configuration are: - `path`: The path to the API endpoint. By default this path is appended to the given `base_url`. If this is a fully qualified URL starting with `http:` or `https:` it will be used as-is and `base_url` will be ignored. - `method`: The HTTP method to be used. The default is `GET`. -- `params`: Query parameters to be sent with each request. For example, `sort` to order the results or `since` to specify [incremental loading](#incremental-loading). This is also used to define [resource relationships](#define-resource-relationships). +- `params`: Query parameters to be sent with each request. For example, `sort` to order the results or `since` to specify [incremental loading](#incremental-loading). This is also may be used to define [resource relationships](#define-resource-relationships). - `json`: The JSON payload to be sent with the request (for POST and PUT requests). - `paginator`: Pagination configuration for the endpoint. See the [pagination](#pagination) section for more details. - `data_selector`: A JSONPath to select the data from the response. See the [data selection](#data-selection) section for more details. @@ -613,9 +599,12 @@ register_auth("custom_auth", CustomAuth) ### Define resource relationships -When you have a resource that depends on another resource, you can define the relationship using the `resolve` configuration. This allows you to link one or more path parameters in the child resource to fields in the parent resource's data. +When you have a resource that depends on another resource (for example, you must fetch a parent resource to get an ID needed to fetch the child), you can reference fields in the parent resource using special placeholders. +This allows you to link one or more [path](#via-request-path), [query string](#via-query-string-parameters) or [JSON body](#via-json-body) parameters in the child resource to fields in the parent resource's data. + +#### Via request path -In the GitHub example, the `issue_comments` resource depends on the `issues` resource. The `issue_number` parameter in the `issue_comments` endpoint configuration is resolved from the `number` field of the `issues` resource: +In the GitHub example, the `issue_comments` resource depends on the `issues` resource. The `resources.issues.number` placeholder links the `number` field in the `issues` resource data to the current request's path parameter. ```py { @@ -630,14 +619,7 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res { "name": "issue_comments", "endpoint": { - "path": "issues/{issue_number}/comments", - "params": { - "issue_number": { - "type": "resolve", - "resource": "issues", - "field": "number", - } - }, + "path": "issues/{resources.issues.number}/comments", }, "include_from_parent": ["id"], }, @@ -645,7 +627,8 @@ In the GitHub example, the `issue_comments` resource depends on the `issues` res } ``` -This configuration tells the source to get issue numbers from the `issues` resource and use them to fetch comments for each issue. So if the `issues` resource yields the following data: +This configuration tells the source to get issue numbers from the `issues` resource data and use them to fetch comments for each issue number. So for each issue item, `"{resources.issues.number}"` is replaced by the issue number in the request path. +For example, if the `issues` resource yields the following data: ```json [ @@ -661,6 +644,111 @@ The `issue_comments` resource will make requests to the following endpoints: - `issues/124/comments` - `issues/125/comments` +The syntax for the placeholder is `resources..`. + +#### Via query string parameters + +The placeholder syntax can also be used in the query string parameters. For example, in an API which lets you fetch a blog posts (via `/posts`) and their comments (via `/comments?post_id=`), you can define a resource `posts` and a resource `post_comments` which depends on the `posts` resource. You can then reference the `id` field from the `posts` resource in the `post_comments` resource: + +```py +{ + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": { + "path": "comments", + "params": { + "post_id": "{resources.posts.id}", + }, + }, + }, + ], +} +``` + +Similar to the GitHub example above, if the `posts` resource yields the following data: + +```json +[ + {"id": 1, "title": "Post 1"}, + {"id": 2, "title": "Post 2"}, + {"id": 3, "title": "Post 3"} +] +``` + +The `post_comments` resource will make requests to the following endpoints: + +- `comments?post_id=1` +- `comments?post_id=2` +- `comments?post_id=3` + +#### Via JSON body + +In many APIs, you can send a complex query or configuration through a POST request’s JSON body rather than in the request path or query parameters. For example, consider an imaginary `/search` endpoint that supports multiple filters and settings. You might have a parent resource `posts` with each post’s `id` and a second resource, `post_details`, that uses `id` to perform a custom search. + +In the example below we reference the `posts` resource’s `id` field in the JSON body via placeholders: + +```py +{ + "resources": [ + "posts", + { + "name": "post_details", + "endpoint": { + "path": "search", + "method": "POST", + "json": { + "filters": { + "id": "{resources.posts.id}", + }, + "order": "desc", + "limit": 5, + } + }, + }, + ], +} +``` + + +#### Legacy syntax: `resolve` field in parameter configuration + +:::warning +`resolve` works only for path parameters. The new placeholder syntax is more flexible and recommended for new configurations. +::: + +An alternative, legacy way to define resource relationships is to use the `resolve` field in the parameter configuration. +Here's the same example as above that uses the `resolve` field: + +```py +{ + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "issues", + # ... + }, + }, + { + "name": "issue_comments", + "endpoint": { + "path": "issues/{issue_number}/comments", + "params": { + "issue_number": { + "type": "resolve", + "resource": "issues", + "field": "number", + } + }, + }, + "include_from_parent": ["id"], + }, + ], +} +``` + The syntax for the `resolve` field in parameter configuration is: ```py @@ -675,7 +763,6 @@ The syntax for the `resolve` field in parameter configuration is: The `field` value can be specified as a [JSONPath](https://github.com/h2non/jsonpath-ng?tab=readme-ov-file#jsonpath-syntax) to select a nested field in the parent resource data. For example: `"field": "items[0].id"`. -Under the hood, dlt handles this by using a [transformer resource](../../../general-usage/resource.md#process-resources-with-dlttransformer). #### Resolving multiple path parameters from a parent resource