From b6cd2080cbbf4d9eb6aebedae51e8ebe91b369eb Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 5 Nov 2025 14:12:30 -0500 Subject: [PATCH 1/4] share API budget with parent streams --- .../parsers/model_to_component_factory.py | 4 ++- .../test_model_to_component_factory.py | 33 +++++++++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..51c38d5c4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -681,6 +681,7 @@ def __init__( connector_state_manager: Optional[ConnectorStateManager] = None, max_concurrent_async_job_count: Optional[int] = None, configured_catalog: Optional[ConfiguredAirbyteCatalog] = None, + api_budget: Optional[APIBudget] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -695,7 +696,7 @@ def __init__( configured_catalog ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() - self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None + self._api_budget: Optional[Union[APIBudget]] = api_budget self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) # placeholder for deprecation warnings self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = [] @@ -3887,6 +3888,7 @@ def create_parent_stream_config_with_substream_wrapper( self._evaluate_log_level(self._emit_connector_builder_messages), ), ), + api_budget=self._api_budget, ) return substream_factory.create_parent_stream_config( diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 591d47ae6..acba15faa 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Iterable, Mapping, Optional, Union +from unittest.mock import Mock import freezegun import pytest @@ -170,7 +171,7 @@ ) from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository -from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy +from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, DayClampingStrategy, @@ -744,7 +745,26 @@ def test_create_substream_partition_router(): "", resolved_manifest["partition_router"], {} ) - partition_router = factory.create_component( + model_to_component_factory = ModelToComponentFactory() + model_to_component_factory.set_api_budget( + { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "limit": 1, + "interval": "PT60S", + } + ], + "matchers": [], + } + ], + }, + input_config, + ) + partition_router = model_to_component_factory.create_component( model_type=SubstreamPartitionRouterModel, component_definition=partition_router_manifest, config=input_config, @@ -757,7 +777,14 @@ def test_create_substream_partition_router(): assert isinstance(parent_stream_configs[0].stream, DefaultStream) assert isinstance(parent_stream_configs[1].stream, DefaultStream) - assert partition_router.parent_stream_configs[0].parent_key.eval({}) == "id" + # ensure api budget + assert get_retriever( + parent_stream_configs[0].stream + ).requester._http_client._api_budget._policies + assert get_retriever( + parent_stream_configs[1].stream + ).requester._http_client._api_budget._policies + assert partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id" assert ( partition_router.parent_stream_configs[0].request_option.inject_into From 8b1541a0c76e01a3af8bec9cbaf649bdd2b42c58 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 5 Nov 2025 14:17:13 -0500 Subject: [PATCH 2/4] format/check --- .../declarative/parsers/test_model_to_component_factory.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index acba15faa..aa03d819f 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Iterable, Mapping, Optional, Union -from unittest.mock import Mock import freezegun import pytest @@ -171,7 +170,7 @@ ) from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository -from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy +from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, DayClampingStrategy, From d0ca8c62bda858b56bfd0570a6a47ea6dfd05e77 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 5 Nov 2025 14:21:24 -0500 Subject: [PATCH 3/4] re-add mistakenly removed assertion --- .../declarative/parsers/test_model_to_component_factory.py | 1 + 1 file changed, 1 insertion(+) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index aa03d819f..4c2e935dd 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -784,6 +784,7 @@ def test_create_substream_partition_router(): parent_stream_configs[1].stream ).requester._http_client._api_budget._policies + assert partition_router.parent_stream_configs[0].parent_key.eval({}) == "id" assert partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id" assert ( partition_router.parent_stream_configs[0].request_option.inject_into From ec0d3f1a112f624343e407effe484e78b3d7f3d3 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Mon, 10 Nov 2025 13:20:23 -0500 Subject: [PATCH 4/4] remove non thread safe test --- .../test_manifest_declarative_source.py | 553 ------------------ 1 file changed, 553 deletions(-) diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 87e8574bb..ab4335fe6 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -1491,559 +1491,6 @@ def _create_page(response_body): return response -@pytest.mark.parametrize( - "test_name, manifest, pages, expected_records, expected_calls", - [ - ( - "test_read_manifest_no_pagination_no_partitions", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [{"ABC": 0}, {"AED": 1}], - [call({}, {}, None)], - ), - ( - "test_read_manifest_with_added_fields", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "transformations": [ - { - "type": "AddFields", - "fields": [ - { - "type": "AddedFieldDefinition", - "path": ["added_field_key"], - "value": "added_field_value", - } - ], - } - ], - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [ - {"ABC": 0, "added_field_key": "added_field_value"}, - {"AED": 1, "added_field_key": "added_field_value"}, - ], - [call({}, {}, None)], - ), - ( - "test_read_manifest_with_flatten_fields", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "transformations": [{"type": "FlattenFields"}], - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [ - {"nested_fields": {"ABC": 0}, "id": 1}, - {"nested_fields": {"AED": 1}, "id": 2}, - ], - "_metadata": {"next": "next"}, - } - ), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [ - {"ABC": 0, "id": 1}, - {"AED": 1, "id": 2}, - ], - [call({}, {}, None)], - ), - ( - "test_read_with_pagination_no_partitions", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "USD": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": { - "type": "DefaultPaginator", - "page_size": 2, - "page_size_option": { - "inject_into": "request_parameter", - "field_name": "page_size", - }, - "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": { - "type": "CursorPagination", - "cursor_value": "{{ response._metadata.next }}", - "page_size": 2, - }, - }, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {}}), - ) - * 10, - [{"ABC": 0}, {"AED": 1}, {"USD": 2}], - [ - call({}, {}, None), - call( - {}, - {}, - {"next_page_token": "next"}, - ), - ], - ), - ( - "test_no_pagination_with_partition_router", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "partition": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "partition_router": { - "type": "ListPartitionRouter", - "values": ["0", "1"], - "cursor_field": "partition", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}], - "_metadata": {"next": "next"}, - } - ), - _create_page( - {"rates": [{"ABC": 2, "partition": 1}], "_metadata": {"next": "next"}} - ), - ), - [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}, {"ABC": 2, "partition": 1}], - [ - call({}, {"partition": "0"}, None), - call( - {}, - {"partition": "1"}, - None, - ), - ], - ), - ( - "test_with_pagination_and_partition_router", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "partition": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "partition_router": { - "type": "ListPartitionRouter", - "values": ["0", "1"], - "cursor_field": "partition", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": { - "type": "DefaultPaginator", - "page_size": 2, - "page_size_option": { - "inject_into": "request_parameter", - "field_name": "page_size", - }, - "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": { - "type": "CursorPagination", - "cursor_value": "{{ response._metadata.next }}", - "page_size": 2, - }, - }, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}], - "_metadata": {"next": "next"}, - } - ), - _create_page({"rates": [{"USD": 3, "partition": 0}], "_metadata": {}}), - _create_page({"rates": [{"ABC": 2, "partition": 1}], "_metadata": {}}), - ), - [ - {"ABC": 0, "partition": 0}, - {"AED": 1, "partition": 0}, - {"USD": 3, "partition": 0}, - {"ABC": 2, "partition": 1}, - ], - [ - call({}, {"partition": "0"}, None), - call({}, {"partition": "0"}, {"next_page_token": "next"}), - call( - {}, - {"partition": "1"}, - None, - ), - ], - ), - ], -) -def test_read_manifest_declarative_source( - test_name, manifest, pages, expected_records, expected_calls -): - _stream_name = "Rates" - with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages) as mock_retriever: - output_data = [ - message.record.data for message in _run_read(manifest, _stream_name) if message.record - ] - assert output_data == expected_records - mock_retriever.assert_has_calls(expected_calls) - - def test_only_parent_streams_use_cache(): applications_stream = { "type": "DeclarativeStream",