Skip to content

Conversation

@maxi297
Copy link
Contributor

@maxi297 maxi297 commented Nov 12, 2025

Following https://airbytehq-team.slack.com/archives/C09SBQC9Q1X, we need more information to scope the issue. This is meant to create a dev version for a source, not for full release

Summary by CodeRabbit

  • Bug Fixes

    • Prevents indefinite blocking during concurrent partition processing by adding periodic wake-ups and timeout handling.
  • Chores

    • Added detailed logging around partition generation and processing (start, completion, and errors) for better observability.
    • Updated HTTP client logging to include request IDs and clearer request/response lifecycle messages for easier correlation.

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/enable_more_logging#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/enable_more_logging

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 12, 2025

📝 Walkthrough

Walkthrough

Replaces blocking queue consumption with a timeout-based loop in concurrent source; adds logging to partition enqueuer and partition reader (including a new PartitionLogger); and switches HTTP client request/response debug logs to info-level logs with generated request IDs.

Changes

Cohort / File(s) Summary
Queue timeout handling
airbyte_cdk/sources/concurrent_source/concurrent_source.py
Replaced indefinite blocking queue.get() loop with a stateful loop using a 5-minute queue.get(timeout=...), catches Empty to retry, and polls concurrent_stream_processor.is_done() to decide completion.
Partition enqueuer logging
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Added module-level logger, informational logs at start/end of partition generation, and logs exceptions before enqueueing StreamThreadException and completion sentinel.
Partition reader & PartitionLogger
airbyte_cdk/sources/streams/concurrent/partition_reader.py
Added module-level logger and new PartitionLogger class; logs partition start/end and errors; emits success or failing PartitionCompleteSentinel as before.
HTTP client request tracking
airbyte_cdk/sources/streams/http/http_client.py
Added uuid use; generate request_id per request and log info messages for outbound requests and responses (including exceptions) instead of verbose debug payload logs.

Sequence Diagram

sequenceDiagram
    participant Queue as Queue
    participant Consumer as _consume_from_queue
    participant Handler as _handle_item
    participant Processor as concurrent_stream_processor

    Note over Consumer: done = False
    loop Outer loop (wake-ups every <=5min)
        Consumer->>Queue: queue.get(timeout=5min)
        alt Item received
            Consumer->>Handler: _handle_item(item)
            Handler-->>Consumer: processed
        else Empty (timeout)
            Note over Consumer: log retry and continue
        end
        Consumer->>Processor: is_done()
        alt Processor reports done
            Processor-->>Consumer: True
            Note over Consumer: set done = True, break
        else Not done
            Processor-->>Consumer: False
        end
    end
    Note over Consumer: exit gracefully
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Could you double-check that the 5-minute timeout in concurrent_source.py is acceptable for systems that expect faster shutdown or very long idle periods? wdyt?
  • Might you review PartitionLogger usage to ensure it doesn't duplicate existing slice/message logging behavior and that its lifecycle is properly injected? wdyt?
  • Please confirm the info-level HTTP logs with request_id provide sufficient context without over-logging in high-throughput scenarios. wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '[DO NOT MERGE] enable more logging' directly matches the PR's objective to add additional logging for debugging purposes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch maxi297/enable_more_logging

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)

16-16: Remove unnecessary f-string for static logger name, wdyt?

The logger name is a static string, so the f-string formatting is unnecessary overhead.

Apply this diff:

-LOGGER = logging.getLogger(f"airbyte.PartitionEnqueuer")
+LOGGER = logging.getLogger("airbyte.PartitionEnqueuer")
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)

17-17: Remove unnecessary f-string for static logger name, wdyt?

Same as in partition_enqueuer.py - the f-string is unnecessary for a static string.

Apply this diff:

-LOGGER = logging.getLogger(f"airbyte.PartitionReader")
+LOGGER = logging.getLogger("airbyte.PartitionReader")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0443aa and 1d4e6a7.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (4 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (2 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (2)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
  • name (50-53)
airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
  • PartitionGenerationCompletedSentinel (9-24)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (3)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (3)
  • stream_name (118-119)
  • to_slice (115-116)
  • read (87-113)
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py (3)
  • stream_name (120-121)
  • to_slice (136-137)
  • read (129-134)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (3)
  • stream_name (36-41)
  • to_slice (25-33)
  • read (17-22)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
  • is_done (203-228)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • is_done (81-82)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/http/http_client.py

[error] 1-1: Ruff format check detected formatting changes needed. 3 files would be reformatted. Run 'poetry run ruff format --diff .' or commit formatted files to fix.

airbyte_cdk/sources/streams/concurrent/partition_reader.py

[error] 1-1: Ruff format check detected formatting changes needed in a workflow. 3 files would be reformatted. Run 'poetry run ruff format --diff .' to apply.

airbyte_cdk/sources/concurrent_source/concurrent_source.py

[error] 1-1: Ruff format check detected formatting changes needed in a workflow. 3 files would be reformatted. Run 'poetry run ruff format --diff .' to apply.


[error] 1-1: Process completed with exit code 1 due to ruff format changes required.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)

48-72: Nice observability enhancement!

The logging additions provide clear visibility into partition generation lifecycle. The info-level logs at start, completion, and error states will be helpful for debugging the scoped issue.

airbyte_cdk/sources/streams/http/http_client.py (2)

8-8: Good addition for request tracing!

The uuid import enables request ID generation for better observability.


333-336: Request ID tracking will help with debugging!

The request_id provides a way to correlate outbound requests with their responses, which is excellent for troubleshooting API interactions.

airbyte_cdk/sources/concurrent_source/concurrent_source.py (2)

7-7: Empty exception import looks good!

This is needed for the timeout-based queue consumption logic below.


146-162: Timeout-based loop improves resilience!

The shift from blocking indefinitely to a 5-minute timeout with periodic wake-ups is a solid improvement for preventing hangs. The done flag and Empty exception handling ensure the sync can continue even if the queue is temporarily empty.

One question: is 5 minutes the right timeout duration for your use case, wdyt? A shorter timeout (e.g., 1-2 minutes) might help detect issues faster, but it depends on how long partitions typically take to generate.

Could you verify that the is_done() check after each item (line 157) doesn't introduce any race conditions where partitions are still being generated but the queue happens to be empty temporarily?

airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)

24-44: Well-designed PartitionLogger class!

The PartitionLogger provides a clean abstraction for partition-level logging with proper dependency injection. The conditional check via should_log_slice_message is a nice touch to avoid log spam.


78-92: Excellent partition lifecycle logging!

The logging at start (line 78), completion (line 87), and error (line 90) provides comprehensive visibility into partition processing. Including both stream name and partition slice in the logs will make debugging much easier.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

PyTest Results (Fast)

3 010 tests   - 801   2 998 ✅  - 802   6m 54s ⏱️ -18s
    1 suites ±  0      11 💤 ±  0 
    1 files   ±  0       1 ❌ +  1 

For more details on these failures, see this check.

Results for commit 4aed10d. ± Comparison against base commit f0443aa.

This pull request removes 801 tests.
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_multiple_streams]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_partition_raises_exception]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_single_stream]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_single_stream_multiple_partitions]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_single_stream_multiple_partitions_concurrency_level_2]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_concurrent_cdk_single_stream_with_primary_key]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_incremental_stream_with_many_slices_but_without_slice_boundaries]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_incremental_stream_with_slice_boundaries]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_incremental_stream_with_slice_boundaries_no_input_state]
unit_tests.sources.streams.concurrent.scenarios.test_concurrent_scenarios ‑ test_concurrent_discover[test_incremental_stream_with_slice_boundaries_with_concurrent_state]
…

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/streams/http/http_client.py (2)

333-336: Nice addition for request tracing!

The request ID generation enables good correlation between request and response logs, which should help with the debugging you're doing. The INFO level is appropriate given this is a diagnostic PR.

One thought: would it be worth using structured logging (e.g., self._logger.info("Making outbound API request", extra={'request_id': request_id, 'url': request.url})) to make parsing and querying logs easier? wdyt?


350-352: Good correlation capability, but consider logging the full exception message?

The syntax issue from the previous review has been fixed - nice work! The request ID correlation between request and response logs will definitely help with debugging.

Small suggestion: logging type(exc) gives you the exception class name, but for debugging purposes, wouldn't the actual exception message be more valuable? Something like f' with exception {type(exc).__name__}: {exc}' would give both the type and the message. wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1d4e6a7 and 4aed10d.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Publish SDM to DockerHub
  • GitHub Check: Publish Manifest Server to DockerHub
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

8-8: LGTM!

The UUID import is needed for request ID generation and tracing. Nice addition for debugging!

@maxi297 maxi297 marked this pull request as draft November 12, 2025 15:32
@github-actions
Copy link

PyTest Results (Full)

3 814 tests  ±0   3 800 ✅  - 2   10m 58s ⏱️ -6s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       2 ❌ +2 

For more details on these failures, see this check.

Results for commit 4aed10d. ± Comparison against base commit f0443aa.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants