Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds resource field reference syntax to template strings #2210

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions dlt/sources/_core_source_templates/rest_api_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,10 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any:
{
"name": "issue_comments",
"endpoint": {
# The placeholder {issue_number} will be resolved
# from the parent resource
"path": "issues/{issue_number}/comments",
"params": {
# The value of `issue_number` will be taken
# from the `number` field in the `issues` resource
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
# The placeholder `{resources.issues.number}`
# will be replaced with the value of `number` field
# in the `issues` resource data
"path": "issues/{resources.issues.number}/comments",
},
# Include data from `id` field of the parent resource
# in the child data. The field name in the child data
Expand Down
51 changes: 38 additions & 13 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from .config_setup import (
IncrementalParam,
InterceptingProxy,
create_auth,
create_paginator,
build_resource_dependency_graph,
Expand Down Expand Up @@ -119,16 +120,18 @@ def rest_api_source(
"base_url": "https://pokeapi.co/api/v2/",
"paginator": "json_link",
},
"endpoints": {
"pokemon": {
"params": {
"limit": 100, # Default page size is 20
"resources": [
{
"name": "pokemon",
"endpoint": {
"path": "pokemon",
"params": {
"limit": 100,
},
},
"resource": {
"primary_key": "id",
}
},
},
"primary_key": "id",
}
]
})
"""
# TODO: this must be removed when TypedDicts are supported by resolve_configuration
Expand Down Expand Up @@ -316,6 +319,25 @@ def paginate_resource(
incremental_cursor_transform,
)

# Interpolate incremental object value into path and params
_incremental: Union[Incremental[Any], InterceptingProxy]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need intercepting proxy? we are not doing late binding here. we are formatting values of path and params right away when calling paginate. so you could do same thing you do below:

if incremental_param.start:
        params[incremental_param.start] = transform(incremental_object.last_value)

also please use start_value, not last_value. in this very case there's no difference, but last_value is updated after every page yielded. here we need a value with which Incremental was started.

if incremental_cursor_transform:
_incremental = InterceptingProxy(
incremental_object,
incremental_cursor_transform,
{"last_value", "end_value"},
)
else:
_incremental = incremental_object

format_kwargs = {"incremental": _incremental}

path = path.format(**format_kwargs)
params = {
k: v.format(**format_kwargs) if isinstance(v, str) else v
for k, v in params.items()
}

yield from client.paginate(
method=method,
path=path,
Expand Down Expand Up @@ -374,21 +396,23 @@ def paginate_dependent_resource(
)

for item in items:
formatted_path, parent_record, updated_params, updated_json = (
formatted_path, expanded_params, updated_json, parent_record = (
process_parent_data_item(
path=path,
item=item,
# params=params,
params=params,
request_json=request_json,
resolved_params=resolved_params,
include_from_parent=include_from_parent,
incremental=incremental_object,
incremental_value_convert=incremental_cursor_transform,
)
)

for child_page in client.paginate(
method=method,
path=formatted_path,
params=updated_params,
params=expanded_params,
json=updated_json,
paginator=paginator,
data_selector=data_selector,
Expand Down Expand Up @@ -490,7 +514,8 @@ def identity_func(x: Any) -> Any:

if transform is None:
transform = identity_func
params[incremental_param.start] = transform(incremental_object.last_value)
if incremental_param.start:
params[incremental_param.start] = transform(incremental_object.last_value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls use start_value

if incremental_param.end:
params[incremental_param.end] = transform(incremental_object.end_value)
return params
Expand Down
Loading
Loading