diff --git a/dlt/sources/_core_source_templates/rest_api_pipeline.py b/dlt/sources/_core_source_templates/rest_api_pipeline.py index 01a8828fcd..b320bb7152 100644 --- a/dlt/sources/_core_source_templates/rest_api_pipeline.py +++ b/dlt/sources/_core_source_templates/rest_api_pipeline.py @@ -75,18 +75,10 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any: { "name": "issue_comments", "endpoint": { - # The placeholder {issue_number} will be resolved - # from the parent resource - "path": "issues/{issue_number}/comments", - "params": { - # The value of `issue_number` will be taken - # from the `number` field in the `issues` resource - "issue_number": { - "type": "resolve", - "resource": "issues", - "field": "number", - } - }, + # The placeholder `{resources.issues.number}` + # will be replaced with the value of `number` field + # in the `issues` resource data + "path": "issues/{resources.issues.number}/comments", }, # Include data from `id` field of the parent resource # in the child data. The field name in the child data diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index ba2b212d46..e124e4fa03 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -39,6 +39,7 @@ ) from .config_setup import ( IncrementalParam, + InterceptingProxy, create_auth, create_paginator, build_resource_dependency_graph, @@ -119,16 +120,18 @@ def rest_api_source( "base_url": "https://pokeapi.co/api/v2/", "paginator": "json_link", }, - "endpoints": { - "pokemon": { - "params": { - "limit": 100, # Default page size is 20 + "resources": [ + { + "name": "pokemon", + "endpoint": { + "path": "pokemon", + "params": { + "limit": 100, + }, }, - "resource": { - "primary_key": "id", - } - }, - }, + "primary_key": "id", + } + ] }) """ # TODO: this must be removed when TypedDicts are supported by resolve_configuration @@ -316,6 +319,25 @@ def paginate_resource( incremental_cursor_transform, ) + # Interpolate incremental object value into path and params + _incremental: Union[Incremental[Any], InterceptingProxy] + if incremental_cursor_transform: + _incremental = InterceptingProxy( + incremental_object, + incremental_cursor_transform, + {"last_value", "end_value"}, + ) + else: + _incremental = incremental_object + + format_kwargs = {"incremental": _incremental} + + path = path.format(**format_kwargs) + params = { + k: v.format(**format_kwargs) if isinstance(v, str) else v + for k, v in params.items() + } + yield from client.paginate( method=method, path=path, @@ -374,21 +396,23 @@ def paginate_dependent_resource( ) for item in items: - formatted_path, parent_record, updated_params, updated_json = ( + formatted_path, expanded_params, updated_json, parent_record = ( process_parent_data_item( path=path, item=item, - # params=params, + params=params, request_json=request_json, resolved_params=resolved_params, include_from_parent=include_from_parent, + incremental=incremental_object, + incremental_value_convert=incremental_cursor_transform, ) ) for child_page in client.paginate( method=method, path=formatted_path, - params=updated_params, + params=expanded_params, json=updated_json, paginator=paginator, data_selector=data_selector, @@ -490,7 +514,8 @@ def identity_func(x: Any) -> Any: if transform is None: transform = identity_func - params[incremental_param.start] = transform(incremental_object.last_value) + if incremental_param.start: + params[incremental_param.start] = transform(incremental_object.last_value) if incremental_param.end: params[incremental_param.end] = transform(incremental_object.end_value) return params diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index 13a1cc52f7..2e9e9429a3 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -1,3 +1,4 @@ +import re import warnings from copy import copy from typing import ( @@ -6,6 +7,7 @@ Dict, Tuple, List, + Iterable, Optional, Union, Callable, @@ -93,10 +95,67 @@ class IncrementalParam(NamedTuple): - start: str + start: Optional[str] end: Optional[str] +class AttributeAccessibleDict(Dict[str, Any]): + def __getattr__(self, key: str) -> Any: + try: + return self[key] + except KeyError: + raise AttributeError(key) + + +class ResourcesContext: + def __init__(self) -> None: + self._resources: Dict[str, AttributeAccessibleDict] = {} + + def __getitem__(self, key: str) -> Any: + if key not in self._resources: + self._resources[key] = AttributeAccessibleDict() + return self._resources[key] + + def __getattr__(self, key: str) -> Any: + try: + return self[key] + except KeyError: + raise AttributeError(key) + + +# TODO: Remove once Incremental can do values conversion internally +class InterceptingProxy: + """A proxy class that intercepts access to selected attributes and + calls a function with the attribute value before returning it. + Attributes that are not in the intercept set are returned as is. + """ + + def __init__( + self, + instance: Any, + intercept_function: Callable[..., Any], + intercept_attributes: Optional[Iterable[str]] = None, + ) -> None: + self._instance = instance + self._intercept_function = intercept_function + self._intercept_attributes = set(intercept_attributes) if intercept_attributes else set() + + def __getattribute__(self, name: str) -> Any: + if name.startswith("_"): + return super().__getattribute__(name) + + intercept_attributes = super().__getattribute__("_intercept_attributes") + instance = super().__getattribute__("_instance") + + if name in intercept_attributes: + attribute_value = getattr(instance, name) + intercept_function = super().__getattribute__("_intercept_function") + + return intercept_function(attribute_value) + else: + return getattr(instance, name) + + def register_paginator( paginator_name: str, paginator_class: Type[BasePaginator], @@ -218,7 +277,7 @@ def setup_incremental_object( f"Only initial_value is allowed in the configuration of param: {param_name}. To" " set end_value too use the incremental configuration at the resource level." " See" - " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api#incremental-loading/" + " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic#incremental-loading" ) return param_config, IncrementalParam(start=param_name, end=None), None if isinstance(param_config, dict) and param_config.get("type") == "incremental": @@ -227,7 +286,7 @@ def setup_incremental_object( "Only start_param and initial_value are allowed in the configuration of param:" f" {param_name}. To set end_value too use the incremental configuration at the" " resource level. See" - " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api#incremental-loading" + " https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic#incremental-loading" ) convert = parse_convert_or_deprecated_transform(param_config) @@ -246,7 +305,7 @@ def setup_incremental_object( return ( Incremental(**config), IncrementalParam( - start=incremental_config["start_param"], + start=incremental_config.get("start_param"), end=incremental_config.get("end_param"), ), convert, @@ -286,7 +345,6 @@ def build_resource_dependency_graph( dependency_graph: graphlib.TopologicalSorter = graphlib.TopologicalSorter() # type: ignore[type-arg] resolved_param_map: Dict[str, Optional[List[ResolvedParam]]] = {} endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults) - # create dependency graph for resource_name, endpoint_resource in endpoint_resource_map.items(): if isinstance(endpoint_resource, DltResource): @@ -318,7 +376,7 @@ def build_resource_dependency_graph( predecessor = first_param.resolve_config["resource"] if predecessor not in endpoint_resource_map: raise ValueError( - f"A transformer resource {resource_name} refers to non existing parent resource" + f"A dependent resource {resource_name} refers to non existing parent resource" f" {predecessor} on {first_param}" ) @@ -403,9 +461,39 @@ def _replace_expression(template: str, params: Dict[str, Any]) -> str: return template +def _encode_template_placeholders( + text: str, prefix: str, delimiter_char: str = "||" +) -> Tuple[str, Dict[str, str]]: + """Encodes substrings starting with prefix in text using delimiter_char.""" + + # Store original values for restoration + replacements = {} + + def replace_match(match: re.Match[str]) -> Any: + content = match.group(1) + if content.startswith(prefix): + # Generate a unique key for this replacement + key = f"{delimiter_char}{content}{delimiter_char}" + replacements[key] = match.group(0) + return key + # Return unchanged for further processing + return match.group(0) + + # Find all {...} patterns and selectively replace them + pattern = r"\{\s*([^}]+)\}" + transformed = re.sub(pattern, replace_match, text) + return transformed, replacements + + +def _decode_special_objects(text: str, replacements: Dict[str, str]) -> str: + for key, value in replacements.items(): + text = text.replace(key, value) + return text + + def _bind_path_params(resource: EndpointResource) -> None: """Binds params declared in path to params available in `params`. Pops the - bound params but. Params of type `resolve` and `incremental` are skipped + bound params. Params of type `resolve` and `incremental` are skipped and bound later. """ path_params: Dict[str, Any] = {} @@ -619,7 +707,6 @@ def _extract_expressions( >>> _extract_expressions("blog/{resources.blog.id}/comments", "resources.") ["resources.blog.id"] """ - expressions = set() def recursive_search(value: Union[str, List[Any], Dict[str, Any]]) -> None: @@ -666,78 +753,126 @@ def _expressions_to_resolved_params(expressions: List[str]) -> List[ResolvedPara return resolved_params -def _bound_path_parameters( +def process_parent_data_item( path: str, - param_values: Dict[str, Any], -) -> Tuple[str, List[str]]: - path_params = _extract_expressions(path) - bound_path = _replace_expression(path, param_values) - - return bound_path, path_params + item: Dict[str, Any], + resolved_params: List[ResolvedParam], + params: Optional[Dict[str, Any]] = None, + request_json: Optional[Dict[str, Any]] = None, + include_from_parent: Optional[List[str]] = None, + incremental: Optional[Incremental[Any]] = None, + incremental_value_convert: Optional[Callable[..., Any]] = None, +) -> Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + params_values = collect_resolved_values( + item, resolved_params, incremental, incremental_value_convert + ) + expanded_path = expand_placeholders(path, params_values) + expanded_params = expand_placeholders(params or {}, params_values) + expanded_json = expand_placeholders(request_json or {}, params_values) -def _bound_json_parameters( - request_json: Dict[str, Any], - param_values: Dict[str, Any], -) -> Tuple[Dict[str, Any], List[str]]: - json_params = _extract_expressions(request_json) - bound_json = _replace_expression(json.dumps(request_json), param_values) + parent_resource_name = resolved_params[0].resolve_config["resource"] + parent_record = build_parent_record(item, parent_resource_name, include_from_parent) - return json.loads(bound_json), json_params + return expanded_path, expanded_params, expanded_json, parent_record -def process_parent_data_item( - path: str, +def collect_resolved_values( item: Dict[str, Any], - # params: Dict[str, Any],, resolved_params: List[ResolvedParam], - include_from_parent: List[str], - request_json: Optional[Dict[str, Any]] = None, -) -> Tuple[str, Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + incremental: Optional[Incremental[Any]], + incremental_value_convert: Optional[Callable[..., Any]], +) -> Dict[str, Any]: + """ + Collects field values from the parent item based on resolved_params + and sets up incremental if present. Returns the resulting placeholders + (params_values) and a ResourcesContext that may store `resources..`. + """ + if not resolved_params: + raise ValueError("Resolved params are required to process parent data item") + parent_resource_name = resolved_params[0].resolve_config["resource"] + resources_context = ResourcesContext() + params_values: Dict[str, Any] = {} - params_values = {} for resolved_param in resolved_params: field_values = jsonpath.find_values(resolved_param.field_path, item) - if not field_values: field_path = resolved_param.resolve_config["field"] raise ValueError( - f"Transformer expects a field '{field_path}' to be present in the incoming data" - f" from resource {parent_resource_name} in order to bind it to param" + f"Resource expects a field '{field_path}' to be present in the incoming data" + f" from resource {parent_resource_name} in order to bind it to path param" f" {resolved_param.param_name}. Available parent fields are" f" {', '.join(item.keys())}" ) - params_values[resolved_param.param_name] = field_values[0] + param_name = resolved_param.param_name + value = field_values[0] + + # If resolved param was defined as `resources..` + # we update the resources context + if param_name.startswith("resources."): + resource_name, field_name = param_name.split(".")[1:] + resources_context[resource_name][field_name] = value + params_values["resources"] = resources_context + else: + params_values[param_name] = value + + if incremental: + # Only wrap in InterceptingProxy if we have a converter + _incremental = ( + InterceptingProxy(incremental, incremental_value_convert, {"last_value", "end_value"}) + if incremental_value_convert + else incremental + ) + params_values["incremental"] = _incremental + + return params_values - bound_path, path_params = _bound_path_parameters(path, params_values) - json_params: List[str] = [] - if request_json: - request_json, json_params = _bound_json_parameters(request_json, params_values) +def expand_placeholders(obj: Any, placeholders: Dict[str, Any]) -> Any: + """ + Recursively expand str.format placeholders in `obj` using `placeholders`. + """ + if obj is None: + return None + + if isinstance(obj, str): + return obj.format(**placeholders) + + if isinstance(obj, dict): + return { + expand_placeholders(k, placeholders): expand_placeholders(v, placeholders) + for k, v in obj.items() + } + + if isinstance(obj, list): + return [expand_placeholders(item, placeholders) for item in obj] + + return obj # For other data types, do nothing + +def build_parent_record( + item: Dict[str, Any], parent_resource_name: str, include_from_parent: Optional[List[str]] +) -> Dict[str, Any]: + """ + Builds a dictionary of the `include_from_parent` fields from the parent, + renaming them using `make_parent_key_name`. + """ parent_record: Dict[str, Any] = {} - if include_from_parent: - for parent_key in include_from_parent: - child_key = make_parent_key_name(parent_resource_name, parent_key) - if parent_key not in item: - raise ValueError( - f"Transformer expects a field '{parent_key}' to be present in the incoming data" - f" from resource {parent_resource_name} in order to include it in child records" - f" under {child_key}. Available parent fields are {', '.join(item.keys())}" - ) - parent_record[child_key] = item[parent_key] - - # the params not present in the params already bound, - # will be returned and used as query params - params_values = { - param_name: param_value - for param_name, param_value in params_values.items() - if param_name not in path_params and param_name not in json_params - } + if not include_from_parent: + return parent_record - return bound_path, parent_record, params_values, request_json + for parent_key in include_from_parent: + child_key = make_parent_key_name(parent_resource_name, parent_key) + if parent_key not in item: + raise ValueError( + f"Resource expects a field '{parent_key}' to be present in the incoming data " + f"from resource {parent_resource_name} in order to include it in child records" + f" under {child_key}. Available parent fields are {', '.join(item.keys())}" + ) + parent_record[child_key] = item[parent_key] + return parent_record def _merge_resource_endpoints( diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index 21de95f2ed..c478bb4249 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -212,7 +212,7 @@ class IncrementalRESTArgs(IncrementalArgs, total=False): class IncrementalConfig(IncrementalRESTArgs, total=False): - start_param: str + start_param: Optional[str] end_param: Optional[str] diff --git a/tests/sources/helpers/rest_client/test_client.py b/tests/sources/helpers/rest_client/test_client.py index e67ff9c70a..d3ad07ce61 100644 --- a/tests/sources/helpers/rest_client/test_client.py +++ b/tests/sources/helpers/rest_client/test_client.py @@ -1,6 +1,6 @@ import os from base64 import b64encode -from typing import Any, Dict, cast +from typing import Any, Dict, cast, List, Optional from unittest.mock import patch, ANY import pytest @@ -401,7 +401,7 @@ def test_paginate_json_body_without_params(self, rest_client) -> None: posts_skip = (DEFAULT_TOTAL_PAGES - 3) * DEFAULT_PAGE_SIZE class JSONBodyPageCursorPaginator(BaseReferencePaginator): - def update_state(self, response, data): # type: ignore[override] + def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None: self._next_reference = response.json().get("next_page") def update_request(self, request): diff --git a/tests/sources/rest_api/configurations/source_configs.py b/tests/sources/rest_api/configurations/source_configs.py index ff58fee0fb..d3c5f9c9e7 100644 --- a/tests/sources/rest_api/configurations/source_configs.py +++ b/tests/sources/rest_api/configurations/source_configs.py @@ -1,7 +1,6 @@ from collections import namedtuple from typing import cast, List -import requests import dlt import dlt.common from dlt.common.configuration.exceptions import ConfigFieldMissingException @@ -162,6 +161,7 @@ def repositories(): VALID_CONFIGS: List[RESTAPIConfig] = [ + # Using the resolve field syntax { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -181,6 +181,30 @@ def repositories(): }, ], }, + # Using the resource field reference syntax + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": { + "path": "posts/{resources.posts.id}/comments", + }, + }, + ], + }, + # Using short, endpoint-only resource definition + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": "posts/{resources.posts.id}/comments", + }, + ], + }, { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -357,6 +381,7 @@ def repositories(): }, ], }, + # Using the resolve field syntax { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ @@ -376,20 +401,14 @@ def repositories(): repositories(), ], }, + # Using the resource field reference syntax { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ { "name": "issues", "endpoint": { - "path": "dlt-hub/{repository}/issues/", - "params": { - "repository": { - "type": "resolve", - "resource": "repositories", - "field": "name", - }, - }, + "path": "dlt-hub/{resources.repositories.name}/issues/", }, }, repositories(), diff --git a/tests/sources/rest_api/configurations/test_configuration.py b/tests/sources/rest_api/configurations/test_configuration.py index ca84479a0d..7834b4a687 100644 --- a/tests/sources/rest_api/configurations/test_configuration.py +++ b/tests/sources/rest_api/configurations/test_configuration.py @@ -1,5 +1,5 @@ from copy import copy -from typing import cast +from typing import cast, Dict, Any from unittest.mock import patch import pytest @@ -238,30 +238,51 @@ def test_setup_for_single_item_endpoint() -> None: assert isinstance(endpoint["paginator"], SinglePagePaginator) -def test_resource_schema() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "users", - { - "name": "user", - "endpoint": { - "path": "user/{id}", - "paginator": None, - "data_selector": None, - "params": { - "id": { - "type": "resolve", - "field": "id", - "resource": "users", +@pytest.mark.parametrize( + "config", + [ + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user", + "endpoint": { + "path": "user/{id}", + "paginator": None, + "data_selector": None, + "params": { + "id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, }, }, }, + ], + }, + { + "client": { + "base_url": "https://api.example.com", }, - ], - } + "resources": [ + "users", + { + "name": "user", + "endpoint": { + "path": "user/{resources.users.id}", + "paginator": None, + "data_selector": None, + }, + }, + ], + }, + ], +) +def test_resource_schema(config: RESTAPIConfig) -> None: resources = rest_api_resources(config) assert len(resources) == 2 resource = resources[0] @@ -403,7 +424,31 @@ def test_resource_defaults_no_params() -> None: } -def test_accepts_DltResource_in_resources() -> None: +@pytest.mark.parametrize( + "issues_resource_config", + [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{resources.repositories.name}/issues/", + }, + }, + ], +) +def test_accepts_DltResource_in_resources(issues_resource_config: Dict[str, Any]) -> None: @dlt.resource(selected=False) def repositories(): """A seed list of repositories to fetch""" @@ -412,19 +457,7 @@ def repositories(): config: RESTAPIConfig = { "client": {"base_url": "https://github.com/api/v2"}, "resources": [ - { - "name": "issues", - "endpoint": { - "path": "dlt-hub/{repository}/issues/", - "params": { - "repository": { - "type": "resolve", - "resource": "repositories", - "field": "name", - }, - }, - }, - }, + issues_resource_config, # type: ignore[list-item] repositories(), ], } diff --git a/tests/sources/rest_api/configurations/test_resolve_config.py b/tests/sources/rest_api/configurations/test_resolve_config.py index e31470c4b7..37d794c0f3 100644 --- a/tests/sources/rest_api/configurations/test_resolve_config.py +++ b/tests/sources/rest_api/configurations/test_resolve_config.py @@ -82,71 +82,100 @@ def test_bind_path_param() -> None: def test_process_parent_data_item() -> None: - resolve_params = [ + resolved_params = [ ResolvedParam("id", {"field": "obj_id", "resource": "issues", "type": "resolve"}) ] - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments" + assert expanded_params == {} assert parent_record == {} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=["obj_id"], ) assert parent_record == {"_issues_obj_id": 12345} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"obj_id": 12345, "obj_node": "node_1"}, - resolved_params=resolve_params, + resolved_params=resolved_params, include_from_parent=["obj_id", "obj_node"], ) assert parent_record == {"_issues_obj_id": 12345, "_issues_obj_node": "node_1"} - # test nested data - resolve_param_nested = [ + # Test resource field reference in path + resolved_params_reference = [ + ResolvedParam( + "resources.issues.obj_id", {"field": "obj_id", "resource": "issues", "type": "resolve"} + ) + ] + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( + path="dlt-hub/dlt/issues/{resources.issues.obj_id}/comments", + item={"obj_id": 12345, "obj_node": "node_1"}, + resolved_params=resolved_params_reference, + include_from_parent=["obj_id", "obj_node"], + ) + assert bound_path == "dlt-hub/dlt/issues/12345/comments" + + # Test resource field reference in params + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( + path="dlt-hub/dlt/issues/comments", + item={"obj_id": 12345, "obj_node": "node_1"}, + params={"id": "{resources.issues.obj_id}"}, + resolved_params=resolved_params_reference, + include_from_parent=["obj_id", "obj_node"], + ) + assert bound_path == "dlt-hub/dlt/issues/comments" + assert expanded_params == {"id": "12345"} + + # Test nested data + resolved_param_nested = [ ResolvedParam( "id", {"field": "some_results.obj_id", "resource": "issues", "type": "resolve"}, ) ] item = {"some_results": {"obj_id": 12345}} - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item=item, - resolved_params=resolve_param_nested, + params={}, + resolved_params=resolved_param_nested, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments" - # param path not found + # Param path not found with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"_id": 12345}, - resolved_params=resolve_params, + params={}, + resolved_params=resolved_params, include_from_parent=None, ) - assert "Transformer expects a field 'obj_id'" in str(val_ex.value) + assert "Resource expects a field 'obj_id'" in str(val_ex.value) - # included path not found + # Included path not found with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{id}/comments", item={"_id": 12345, "obj_node": "node_1"}, - resolved_params=resolve_params, + params={}, + resolved_params=resolved_params, include_from_parent=["obj_id", "node"], ) assert ( - "Transformer expects a field 'obj_id' to be present in the incoming data from resource" + "Resource expects a field 'obj_id' to be present in the incoming data from resource" " issues in order to bind it to" in str(val_ex.value) ) @@ -157,27 +186,30 @@ def test_process_parent_data_item() -> None: ResolvedParam("id", {"field": "id", "resource": "comments", "type": "resolve"}), ] - bound_path, parent_record, params_values, request_json = process_parent_data_item( + bound_path, expanded_params, request_json, parent_record = process_parent_data_item( path="dlt-hub/dlt/issues/{issue_id}/comments/{id}", item={"issue": 12345, "id": 56789}, + params={}, resolved_params=multi_resolve_params, include_from_parent=None, ) assert bound_path == "dlt-hub/dlt/issues/12345/comments/56789" assert parent_record == {} - # param path not found with multiple parameters + # Param path not found with multiple parameters with pytest.raises(ValueError) as val_ex: - bound_path, parent_record, params_values, request_json = process_parent_data_item( + process_parent_data_item( path="dlt-hub/dlt/issues/{issue_id}/comments/{id}", item={"_issue": 12345, "id": 56789}, + params={}, resolved_params=multi_resolve_params, include_from_parent=None, ) - assert "Transformer expects a field 'issue'" in str(val_ex.value) + assert "Resource expects a field 'issue'" in str(val_ex.value) def test_two_resources_can_depend_on_one_parent_resource() -> None: + # Using resolve syntax user_id = { "user_id": { "type": "resolve", @@ -211,9 +243,8 @@ def test_two_resources_can_depend_on_one_parent_resource() -> None: assert resources["meetings"]._pipe.parent.name == "users" assert resources["user_details"]._pipe.parent.name == "users" - -def test_dependent_resource_can_bind_multiple_parameters() -> None: - config: RESTAPIConfig = { + # Using resource field reference syntax + config_with_ref: RESTAPIConfig = { "client": { "base_url": "https://api.example.com", }, @@ -222,104 +253,218 @@ def test_dependent_resource_can_bind_multiple_parameters() -> None: { "name": "user_details", "endpoint": { - "path": "user/{user_id}/{group_id}", - "params": { - "user_id": { - "type": "resolve", - "field": "id", - "resource": "users", - }, - "group_id": { - "type": "resolve", - "field": "group", - "resource": "users", - }, - }, + "path": "user/{resources.users.id}/", + }, + }, + { + "name": "meetings", + "endpoint": { + "path": "meetings/{resources.users.id}/", }, }, ], } + resources = rest_api_source(config_with_ref).resources + assert resources["meetings"]._pipe.parent.name == "users" + assert resources["user_details"]._pipe.parent.name == "users" + +@pytest.mark.parametrize( + "config", + [ + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user_details", + "endpoint": { + "path": "user/{user_id}/{group_id}", + "params": { + "user_id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, + "group_id": { + "type": "resolve", + "field": "group", + "resource": "users", + }, + }, + }, + }, + ], + }, + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + { + "name": "user_details", + "endpoint": { + "path": "user/{resources.users.id}/{resources.users.group}", + }, + }, + ], + }, + ], +) +def test_dependent_resource_can_bind_multiple_parameters(config: RESTAPIConfig) -> None: resources = rest_api_source(config).resources assert resources["user_details"]._pipe.parent.name == "users" -def test_one_resource_cannot_bind_two_parents() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "users", - "groups", +@pytest.mark.parametrize( + "config,resolved_param1,resolved_param2", + [ + ( { - "name": "user_details", - "endpoint": { - "path": "user/{user_id}/{group_id}", - "params": { - "user_id": { - "type": "resolve", - "field": "id", - "resource": "users", - }, - "group_id": { - "type": "resolve", - "field": "id", - "resource": "groups", + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "users", + "groups", + { + "name": "user_details", + "endpoint": { + "path": "user/{user_id}/{group_id}", + "params": { + "user_id": { + "type": "resolve", + "field": "id", + "resource": "users", + }, + "group_id": { + "type": "resolve", + "field": "id", + "resource": "groups", + }, + }, }, }, + ], + }, + "ResolvedParam(param_name='user_id'", + "ResolvedParam(param_name='group_id'", + ), + ( + { + "client": { + "base_url": "https://api.example.com", }, + "resources": [ + "users", + "groups", + { + "name": "user_details", + "endpoint": { + "path": "user/{resources.users.id}/{resources.groups.id}", + }, + }, + ], }, - ], - } - - with pytest.raises(ValueError) as e: + "ResolvedParam(param_name='resources.users.id'", + "ResolvedParam(param_name='resources.groups.id'", + ), + ], +) +def test_one_resource_cannot_bind_two_parents( + config: RESTAPIConfig, resolved_param1: str, resolved_param2: str +) -> None: + with pytest.raises(ValueError) as exc_info: rest_api_resources(config) - error_part_1 = re.escape( - "Multiple parent resources for user_details: [ResolvedParam(param_name='user_id'" - ) - error_part_2 = re.escape("ResolvedParam(param_name='group_id'") - assert e.match(error_part_1) - assert e.match(error_part_2) + error_msg = str(exc_info.value) + assert "Multiple parent resources for user_details:" in error_msg + assert resolved_param1 in error_msg, f"{resolved_param1} not found in {error_msg}" + assert resolved_param2 in error_msg, f"{resolved_param2} not found in {error_msg}" -def test_resource_dependent_dependent() -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - "locations", - { - "name": "location_details", - "endpoint": { - "path": "location/{location_id}", - "params": { - "location_id": { - "type": "resolve", - "field": "id", - "resource": "locations", +@pytest.mark.parametrize( + "config", + [ + # Using resolve syntax + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": { + "path": "location/{location_id}", + "params": { + "location_id": { + "type": "resolve", + "field": "id", + "resource": "locations", + }, }, }, }, - }, - { - "name": "meetings", - "endpoint": { - "path": "/meetings/{room_id}", - "params": { - "room_id": { - "type": "resolve", - "field": "room_id", - "resource": "location_details", + { + "name": "meetings", + "endpoint": { + "path": "/meetings/{room_id}", + "params": { + "room_id": { + "type": "resolve", + "field": "room_id", + "resource": "location_details", + }, }, }, }, + ], + }, + # Using resource field reference syntax + { + "client": { + "base_url": "https://api.example.com", }, - ], - } - + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": { + "path": "location/{resources.locations.id}", + }, + }, + { + "name": "meetings", + "endpoint": { + "path": "/meetings/{resources.location_details.room_id}", + }, + }, + ], + }, + # Using shorter syntax with string endpoints + { + "client": { + "base_url": "https://api.example.com", + }, + "resources": [ + "locations", + { + "name": "location_details", + "endpoint": "location/{resources.locations.id}", + }, + { + "name": "meetings", + "endpoint": "/meetings/{resources.location_details.room_id}", + }, + ], + }, + ], +) +def test_resource_dependent_dependent(config: RESTAPIConfig) -> None: resources = rest_api_source(config).resources assert resources["meetings"]._pipe.parent.name == "location_details" assert resources["location_details"]._pipe.parent.name == "locations" diff --git a/tests/sources/rest_api/integration/test_offline.py b/tests/sources/rest_api/integration/test_offline.py index 4d843fa472..6244297b4b 100644 --- a/tests/sources/rest_api/integration/test_offline.py +++ b/tests/sources/rest_api/integration/test_offline.py @@ -20,15 +20,9 @@ from tests.utils import assert_load_info, assert_query_data, load_table_counts -def test_load_mock_api(mock_api_server): - pipeline = dlt.pipeline( - pipeline_name="rest_api_mock", - destination="duckdb", - dataset_name="rest_api_mock", - full_refresh=True, - ) - - mock_source = rest_api_source( +@pytest.mark.parametrize( + "config", + [ { "client": {"base_url": "https://api.example.com"}, "resources": [ @@ -60,9 +54,33 @@ def test_load_mock_api(mock_api_server): }, }, ], - } + }, + { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + "posts", + { + "name": "post_comments", + "endpoint": "posts/{resources.posts.id}/comments", + }, + { + "name": "post_details", + "endpoint": "posts/{resources.posts.id}", + }, + ], + }, + ], +) +def test_load_mock_api(mock_api_server, config): + pipeline = dlt.pipeline( + pipeline_name="rest_api_mock", + destination="duckdb", + dataset_name="rest_api_mock", + full_refresh=True, ) + mock_source = rest_api_source(config) + load_info = pipeline.run(mock_source) print(load_info) assert_load_info(load_info) @@ -101,6 +119,7 @@ def test_load_mock_api(mock_api_server): ) +@pytest.mark.skip(reason="TODO: this should raise an error") def test_load_mock_api_with_query_params(mock_api_server): pipeline = dlt.pipeline( pipeline_name="rest_api_mock", @@ -430,15 +449,9 @@ def test_posts_without_key(mock_api_server): ] -def test_load_mock_api_typeddict_config(mock_api_server): - pipeline = dlt.pipeline( - pipeline_name="rest_api_mock", - destination="duckdb", - dataset_name="rest_api_mock", - full_refresh=True, - ) - - mock_source = rest_api_source( +@pytest.mark.parametrize( + "config", + [ RESTAPIConfig( client=ClientConfig(base_url="https://api.example.com"), resources=[ @@ -457,9 +470,31 @@ def test_load_mock_api_typeddict_config(mock_api_server): ), ), ], - ) + ), + RESTAPIConfig( + client=ClientConfig(base_url="https://api.example.com"), + resources=[ + "posts", + EndpointResource( + name="post_comments", + endpoint=Endpoint( + path="posts/{resources.posts.id}/comments", + ), + ), + ], + ), + ], +) +def test_load_mock_api_typeddict_config(mock_api_server, config): + pipeline = dlt.pipeline( + pipeline_name="rest_api_mock", + destination="duckdb", + dataset_name="rest_api_mock", + full_refresh=True, ) + mock_source = rest_api_source(config) + load_info = pipeline.run(mock_source) print(load_info) assert_load_info(load_info) @@ -506,6 +541,44 @@ def test_posts_with_inremental_date_conversion(mock_api_server) -> None: assert called_kwargs["path"] == "posts" +def test_posts_with_inremental_in_param_template(mock_api_server) -> None: + start_time = pendulum.from_timestamp(1) + one_day_later = start_time.add(days=1) + config: RESTAPIConfig = { + "client": {"base_url": "https://api.example.com"}, + "resources": [ + { + "name": "posts", + "endpoint": { + "path": "posts", + "params": { + "since": "{incremental.last_value}", + "until": "{incremental.end_value}", + }, + "incremental": { + # "start_param": "since", + # "end_param": "until", + "cursor_path": "updated_at", + "initial_value": str(start_time.int_timestamp), + "end_value": str(one_day_later.int_timestamp), + "convert": lambda epoch: pendulum.from_timestamp( + int(epoch) + ).to_date_string(), + }, + }, + }, + ], + } + RESTClient = dlt.sources.helpers.rest_client.RESTClient + with mock.patch.object(RESTClient, "paginate") as mock_paginate: + source = rest_api_source(config).add_limit(1) + _ = list(source.with_resources("posts")) + assert mock_paginate.call_count == 1 + _, called_kwargs = mock_paginate.call_args_list[0] + assert called_kwargs["params"] == {"since": "1970-01-01", "until": "1970-01-02"} + assert called_kwargs["path"] == "posts" + + def test_custom_session_is_used(mock_api_server, mocker): class CustomSession(Session): pass @@ -533,7 +606,29 @@ class CustomSession(Session): assert mocked_send.call_args[0][0].url == "https://api.example.com/posts" -def test_DltResource_gets_called(mock_api_server, mocker) -> None: +@pytest.mark.parametrize( + "posts_resource_config", + [ + { + "name": "posts", + "endpoint": { + "path": "posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "post_list", + "field": "id", + }, + }, + }, + }, + { + "name": "posts", + "endpoint": "posts/{resources.post_list.id}/comments", + }, + ], +) +def test_DltResource_gets_called(mock_api_server, mocker, posts_resource_config) -> None: @dlt.resource() def post_list(): yield [{"id": "0"}, {"id": "1"}, {"id": "2"}] @@ -544,19 +639,7 @@ def post_list(): "write_disposition": "replace", }, "resources": [ - { - "name": "posts", - "endpoint": { - "path": "posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "post_list", - "field": "id", - }, - }, - }, - }, + posts_resource_config, post_list(), ], } diff --git a/tests/sources/rest_api/integration/test_processing_steps.py b/tests/sources/rest_api/integration/test_processing_steps.py index df18f98292..20ddd23d7e 100644 --- a/tests/sources/rest_api/integration/test_processing_steps.py +++ b/tests/sources/rest_api/integration/test_processing_steps.py @@ -1,5 +1,7 @@ from typing import Any, Callable, Dict, List +import pytest + import dlt from dlt.sources.rest_api import RESTAPIConfig, rest_api_source @@ -165,44 +167,23 @@ def id_by_10(row): assert data[0]["title"] == "Post 10" -def test_rest_api_source_filtered_child(mock_api_server) -> None: - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - { - "name": "posts", - "endpoint": "posts", - "processing_steps": [ - {"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item] - ], +@pytest.mark.parametrize( + "comments_endpoint", + [ + { + "path": "/posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } }, - { - "name": "comments", - "endpoint": { - "path": "/posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } - }, - }, - "processing_steps": [ - {"filter": lambda x: x["id"] == 1}, # type: ignore[typeddict-item] - ], - }, - ], - } - mock_source = rest_api_source(config) - - data = list(mock_source.with_resources("comments")) - assert len(data) == 2 - - -def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> None: + }, + "posts/{resources.posts.id}/comments", + ], +) +def test_rest_api_source_filtered_child(mock_api_server, comments_endpoint) -> None: config: RESTAPIConfig = { "client": { "base_url": "https://api.example.com", @@ -217,9 +198,7 @@ def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> }, { "name": "comments", - "endpoint": { - "path": "/posts/{resources.posts.id}/comments", - }, + "endpoint": comments_endpoint, "processing_steps": [ {"filter": lambda x: x["id"] == 1}, # type: ignore[typeddict-item] ], @@ -232,52 +211,23 @@ def test_rest_api_source_filtered_child_with_implicit_param(mock_api_server) -> assert len(data) == 2 -def test_rest_api_source_filtered_and_map_child(mock_api_server) -> None: - def extend_body(row): - row["body"] = f"{row['_posts_title']} - {row['body']}" - return row - - config: RESTAPIConfig = { - "client": { - "base_url": "https://api.example.com", - }, - "resources": [ - { - "name": "posts", - "endpoint": "posts", - "processing_steps": [ - {"filter": lambda x: x["id"] in (1, 2)}, # type: ignore[typeddict-item] - ], - }, - { - "name": "comments", - "endpoint": { - "path": "/posts/{post_id}/comments", - "params": { - "post_id": { - "type": "resolve", - "resource": "posts", - "field": "id", - } - }, - }, - "include_from_parent": ["title"], - "processing_steps": [ - {"map": extend_body}, # type: ignore[typeddict-item] - {"filter": lambda x: x["body"].startswith("Post 2")}, # type: ignore[typeddict-item] - ], +@pytest.mark.parametrize( + "comments_endpoint", + [ + { + "path": "/posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "posts", + "field": "id", + } }, - ], - } - mock_source = rest_api_source(config) - - data = list(mock_source.with_resources("comments")) - assert data[0]["body"] == "Post 2 - Comment 0 for post 2" - - -def test_rest_api_source_filtered_and_map_child_with_implicit_param( - mock_api_server, -) -> None: + }, + "posts/{resources.posts.id}/comments", + ], +) +def test_rest_api_source_filtered_and_map_child(mock_api_server, comments_endpoint) -> None: def extend_body(row): row["body"] = f"{row['_posts_title']} - {row['body']}" return row @@ -296,9 +246,7 @@ def extend_body(row): }, { "name": "comments", - "endpoint": { - "path": "/posts/{resources.posts.id}/comments", - }, + "endpoint": comments_endpoint, "include_from_parent": ["title"], "processing_steps": [ {"map": extend_body}, # type: ignore[typeddict-item]