Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Nov 13, 2025

feat: Add get_record() method to fetch individual records by primary key

Summary

This PR adds a new get_record() method to PyAirbyte's Source class that enables fetching individual records by primary key value. This is particularly useful for lookup operations, data validation, and testing scenarios where you need to retrieve a specific record without syncing an entire stream.

Key features:

  • Keyword-only pk_value argument for future extensibility (allows adding other lookup types later)
  • Dual input modes: Accept either a direct PK value (pk_value="123") or an explicit dict (pk_value={"id": "123"})
  • Dict validation: When using dict input, validates that (1) only a single entry exists and (2) the key matches the stream's primary key field - following "explicit is better than implicit"
  • Declarative sources only: Currently only supported for YAML-based declarative sources
  • Primary key override support: Respects user-defined primary key overrides via set_primary_key()

Implementation details:

  • Added LookupValue type alias (Any) for primary key values
  • Added Source.get_record() method with keyword-only pk_value parameter
  • Added helper methods _get_stream_primary_key() and _normalize_and_validate_pk()
  • Added DeclarativeExecutor.fetch_record() to proxy to CDK's ConcurrentDeclarativeSource.fetch_record()
  • Comprehensive unit tests (16 tests covering happy paths, validation, error cases)

Dependencies:

Review & Testing Checklist for Human

  • Test with a real declarative source end-to-end - The unit tests use mocked executors. Verify the full flow works with an actual declarative connector (e.g., source-rest-api-tutorial or source-pokeapi) by fetching a known record
  • Verify dict validation behavior - Test that the dict validation correctly rejects multiple entries and mismatched keys, and that error messages are clear
  • Check primary key extraction edge cases - The code flattens Airbyte protocol's nested list structure ([["id"]]["id"]). Verify this works correctly for streams with nested field paths or unusual PK structures
  • Confirm CDK dependency is acceptable - This PR depends on an unmerged CDK PR (fix(mcp): MCP safe mode should use runtime checks instead of tool filtering #846). Decide if this should block merging or if we're okay with the dependency

Suggested test plan:

import airbyte as ab

# Test with a real declarative source
source = ab.get_source("source-pokeapi", config={})

# Test 1: Fetch by direct PK value
record = source.get_record("pokemon", pk_value="1")
assert record["name"] == "bulbasaur"

# Test 2: Fetch with explicit dict validation
record = source.get_record("pokemon", pk_value={"id": "1"})
assert record["name"] == "bulbasaur"

# Test 3: Verify dict validation rejects wrong key
try:
    source.get_record("pokemon", pk_value={"pokemon_id": "1"})
    assert False, "Should have raised PyAirbyteInputError"
except ab.exceptions.PyAirbyteInputError as e:
    assert "does not match" in str(e)

# Test 4: Verify non-declarative sources raise NotImplementedError
docker_source = ab.get_source("source-faker", config={}, install_if_missing=False)
try:
    docker_source.get_record("users", pk_value="1")
    assert False, "Should have raised NotImplementedError"
except NotImplementedError as e:
    assert "only supported for declarative sources" in str(e)

Notes

Summary by CodeRabbit

  • New Features

    • Fetch individual records from a stream by primary key (string, integer, or single-key dict); supports declarative fast-path and optional scanning fallback with timeout.
    • CLI/tool command to fetch a single record from a source connector.
  • Bug Fixes / Validation

    • Improved validation and clear errors for missing/invalid streams, unsupported PK shapes, and not-found records.
  • Tests

    • Comprehensive unit tests covering PK formats, scanning behavior, config propagation, and error cases.

- Add LookupValue type alias for primary key values
- Add get_record() method to Source class (keyword-only pk_value arg)
- Add helper methods _get_stream_primary_key() and _normalize_and_validate_pk()
- Add fetch_record() method to DeclarativeExecutor
- Support both direct PK values and dict input with validation
- Dict validation ensures single entry and key matches stream's primary key
- Only supported for declarative (YAML-based) sources
- Add comprehensive unit tests (16 tests, all passing)

This builds on airbytehq/airbyte-python-cdk#846

Co-Authored-By: AJ Steers <[email protected]>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Let's create a plan of action for the first phase of the project defined here: <https://github.com/airbytehq/airbyte-python-cdk/issues/833>

Specifically, for now we ONLY want to add a "get_one" implementation to the existing SimpleRetriever implementation - ignoring all other requirements listed there and not (yet) implementing a new retriever class.
We don't care yet about CLI-based invocation but we need a "fetch_record(pk_value: Any) -&gt; dict:" API method on the Source or the Stream.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1762969856048879

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1762999494-add-get-record-method' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1762999494-add-get-record-method'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 13, 2025

📝 Walkthrough

Walkthrough

Adds single-record lookup support: DeclarativeExecutor gains fetch_record(...); Source gains get_record(...) plus helpers to extract/validate stream primary keys; CLI-level fetch function and unit tests are added; pyproject pins airbyte-cdk to a specific git revision.

Changes

Cohort / File(s) Summary
Executor: fetch single record
airbyte/_executors/declarative.py
Adds public `fetch_record(stream_name: str, pk_value: str, config: dict
Source: PK helpers and get_record
airbyte/sources/base.py
Adds LookupValue type alias, _get_stream_primary_key(stream_name), _normalize_and_validate_pk(stream_name, pk_value, *, strict_pk_field: bool = True), and public get_record(stream_name, *, pk_value, allow_scanning=False, scan_timeout_seconds=5) to validate/normalize PKs, enforce declarative fast-path, and optionally fall back to scanning.
MCP CLI: single-record tool
airbyte/mcp/local_ops.py
Adds fetch_source_stream_record(...) tool function to fetch one record by PK (returns dict or error string). Note: duplicated insertion of the same function appears in the diff.
Tests: get_record behavior
tests/unit_tests/sources/test_source_get_record.py
New comprehensive unit tests covering declarative vs non-declarative executors, simple/composite/no PKs, string/dict/int PK inputs, config propagation, scanning fallback and timeouts, and error propagation.
Dependency pin
pyproject.toml
Replaces airbyte-cdk version range with a git-based pin (git + rev) to a specific commit revision.

Sequence Diagram(s)

sequenceDiagram
    actor Caller
    participant Source
    participant Executor
    Note over Source,Executor: New single-record lookup flow
    Caller->>Source: get_record(stream_name, pk_value, allow_scanning?)
    Source->>Source: _get_stream_primary_key(stream_name)
    Source->>Source: _normalize_and_validate_pk(stream_name, pk_value)
    alt declarative fast-path
        Source->>Executor: fetch_record(stream_name, pk_value_str, config)
        Executor-->>Source: record (dict) / RecordNotFoundException / NotImplementedError
    else fallback scanning (allow_scanning)
        Source->>Source: stream records iterator (with timeout)
        Source-->>Caller: first matching record or not-found
    end
    Source-->>Caller: record dict or error
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

  • Areas to focus during review:
    • _normalize_and_validate_pk(): verify handling of scalar vs dict PK inputs, composite/no-PK behavior, and thrown exception types/messages.
    • fetch_record() config-merge semantics: confirm which config (instance vs per-call) takes precedence—should instance config override call-time config or vice versa, wdyt?
    • MCP tool duplication in local_ops.py: confirm duplicate insertion is intentional or should be removed.
    • Tests: ensure fixtures match production catalog shapes and scanning timeout behavior is robust.

Possibly related PRs

Suggested reviewers

  • quintonwall

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly summarizes the main feature added in this PR: a new get_record() method for fetching individual records by primary key, which is the central change across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 97.62% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1762999494-add-get-record-method

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7eb746b and 8dabe3d.

📒 Files selected for processing (3)
  • airbyte/_executors/declarative.py (1 hunks)
  • airbyte/sources/base.py (2 hunks)
  • tests/unit_tests/sources/test_source_get_record.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/sources/base.py
🧬 Code graph analysis (2)
airbyte/sources/base.py (2)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
airbyte/_executors/declarative.py (1)
  • fetch_record (144-173)
tests/unit_tests/sources/test_source_get_record.py (4)
airbyte/_executors/declarative.py (2)
  • DeclarativeExecutor (41-173)
  • fetch_record (144-173)
airbyte/sources/base.py (4)
  • Source (70-1123)
  • get_record (679-729)
  • _get_stream_primary_key (583-610)
  • _normalize_and_validate_pk (612-677)
airbyte/_executors/base.py (1)
  • Executor (159-248)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
🪛 GitHub Actions: Run Linters
airbyte/_executors/declarative.py

[error] 151-151: Ruff: W293 Blank line contains whitespace.


[error] 156-156: Ruff: W293 Blank line contains whitespace.


[error] 159-159: Ruff: W293 Blank line contains whitespace.


[error] 168-168: Ruff: W293 Blank line contains whitespace.

airbyte/sources/base.py

[error] 585-585: Ruff: W293 Blank line contains whitespace.


[error] 589-589: Ruff: W293 Blank line contains whitespace.


[error] 592-592: Ruff: W293 Blank line contains whitespace.


[error] 595-595: Ruff: W293 Blank line contains whitespace.


[error] 600-600: Ruff: W293 Blank line contains whitespace.


[error] 606-606: Ruff: W293 Blank line contains whitespace.


[error] 616-616: Ruff: W293 Blank line contains whitespace.


[error] 618-618: Ruff: W293 Blank line contains whitespace.


[error] 621-621: Ruff: W293 Blank line contains whitespace.


[error] 625-625: Ruff: W293 Blank line contains whitespace.


[error] 628-628: Ruff: W293 Blank line contains whitespace.


[error] 634-634: Ruff: W293 Blank line contains whitespace.


[error] 648-648: Ruff: W293 Blank line contains whitespace.


[error] 715-715: Ruff: PLC0415 'import' should be at the top-level of a file.


[error] 722-722: Ruff: W293 Blank line contains whitespace.


[error] 724-724: Ruff: W293 Blank line contains whitespace.


[error] 726-726: Ruff: W293 Blank line contains whitespace.


[error] 730-730: Ruff: W293 Blank line contains whitespace.


[error] 737-737: Ruff: W293 Blank line contains whitespace.


[error] 741-741: Ruff: W293 Blank line contains whitespace.


[error] 745-745: Ruff: W293 Blank line contains whitespace.


[error] 748-748: Ruff: W293 Blank line contains whitespace.


[error] 757-757: Ruff: W293 Blank line contains whitespace.


[error] 764-764: Ruff: W293 Blank line contains whitespace.


[error] 768-768: Ruff: W293 Blank line contains whitespace.


[error] 772-772: Ruff: W293 Blank line contains whitespace.


[error] 775-775: Ruff: W293 Blank line contains whitespace.


[error] Ruff: 34 issues found. 15 issues fixable with --fix.

tests/unit_tests/sources/test_source_get_record.py

[error] 1-1: ruff format would reformat this file. Run 'ruff format tests/unit_tests/sources/test_source_get_record.py' to fix formatting.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/sources/base.py (1)

67-67: Type alias looks good!

The LookupValue = Any type alias provides a clear semantic meaning for primary key values while maintaining flexibility. This is a reasonable approach given that PKs can be strings, integers, or other types. 👍

@github-actions
Copy link

github-actions bot commented Nov 13, 2025

PyTest Results (Fast Tests Only, No Creds)

344 tests  +24   344 ✅ +24   5m 45s ⏱️ -9s
  1 suites ± 0     0 💤 ± 0 
  1 files   ± 0     0 ❌ ± 0 

Results for commit f5bc960. ± Comparison against base commit 7eb746b.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Nov 13, 2025

PyTest Results (Full)

413 tests  +24   396 ✅ +24   24m 30s ⏱️ - 5m 54s
  1 suites ± 0    16 💤 ± 0 
  1 files   ± 0     1 ❌ ± 0 

For more details on these failures, see this check.

Results for commit f5bc960. ± Comparison against base commit 7eb746b.

♻️ This comment has been updated with latest results.

…ream_record MCP tool

- Add allow_scanning and scan_timeout_seconds parameters to get_record()
- When allow_scanning=True, iterate through get_records() to find matching records
- Allow searching by non-primary-key fields when scanning is enabled
- Implement time-based timeout with configurable scan_timeout_seconds
- Update _normalize_and_validate_pk() to support strict_pk_field parameter
- Add fetch_source_stream_record MCP tool for single record fetching
- Add comprehensive unit tests for scanning feature (8 new tests)
- All tests passing (23/23)

Co-Authored-By: AJ Steers <[email protected]>
@airbytehq airbytehq deleted a comment from devin-ai-integration bot Nov 13, 2025
devin-ai-integration bot and others added 2 commits November 13, 2025 23:20
Updated CDK dependency to latest commit that addresses PR feedback:
- Made fetch_one public (removed underscore prefix)
- Fixed extractor/selector logic to handle single-object responses
- Added test for single-object response pattern

This commit includes the fixes from airbytehq/airbyte-python-cdk#846

Co-Authored-By: AJ Steers <[email protected]>
…e collection

When iterating inline over get_records(), the LazyDataset object gets
garbage collected immediately, triggering __del__() which calls close()
and cancels the sync before any records are yielded.

This fix ensures the dataset is stored in a variable before iteration,
preventing premature garbage collection and allowing the sync to complete.

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte/sources/base.py (1)

68-69: Consider a more specific type for LookupValue.

Currently LookupValue = Any is very broad. Would it be more helpful to constrain this to the actual expected types? For example:

LookupValue = str | int | float | bool

This would provide better type hints for callers while still covering the common PK value types. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8dabe3d and f5bc960.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • airbyte/_executors/declarative.py (1 hunks)
  • airbyte/mcp/local_ops.py (1 hunks)
  • airbyte/sources/base.py (3 hunks)
  • pyproject.toml (1 hunks)
  • tests/unit_tests/sources/test_source_get_record.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte/_executors/declarative.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-10-10T16:17:57.989Z
Learnt from: aaronsteers
Repo: airbytehq/PyAirbyte PR: 417
File: airbyte/cli.py:503-504
Timestamp: 2024-10-10T16:17:57.989Z
Learning: In the PyAirbyte project, support for Python versions earlier than 3.10 is not necessary, as the project requires Python 3.10 or newer.

Applied to files:

  • pyproject.toml
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/sources/base.py
🧬 Code graph analysis (3)
tests/unit_tests/sources/test_source_get_record.py (2)
airbyte/_executors/declarative.py (2)
  • DeclarativeExecutor (41-185)
  • fetch_record (144-185)
airbyte/sources/base.py (4)
  • Source (71-1207)
  • get_record (702-813)
  • _get_stream_primary_key (584-611)
  • _normalize_and_validate_pk (613-700)
airbyte/mcp/local_ops.py (3)
airbyte/mcp/_tool_utils.py (1)
  • mcp_tool (102-148)
airbyte/sources/base.py (3)
  • config_spec (361-370)
  • set_config (285-305)
  • get_record (702-813)
airbyte/mcp/_util.py (1)
  • resolve_config (129-226)
airbyte/sources/base.py (2)
airbyte/_executors/declarative.py (2)
  • DeclarativeExecutor (41-185)
  • fetch_record (144-185)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
tests/unit_tests/sources/test_source_get_record.py (1)

1-488: Outstanding test coverage! 🎉

The test suite is comprehensive and well-structured:

  • ✅ Covers both success and error paths
  • ✅ Tests PK variations (string, dict, integer coercion)
  • ✅ Validates edge cases (no PK, composite PK, wrong keys)
  • ✅ Tests scanning with timeouts
  • ✅ Verifies config propagation and exception handling
  • ✅ Tests helper methods in isolation

The fixture organization is clean, and test names are descriptive. This gives me confidence the new feature is solid!

airbyte/mcp/local_ops.py (1)

464-580: Clean implementation following established patterns.

The new fetch_source_stream_record function mirrors the existing read_source_stream_records pattern nicely:

  • Consistent parameter structure and naming
  • Appropriate error handling with traceback
  • Clear docstring explaining the functionality
  • Proper delegation to source.get_record()

The implementation is straightforward and maintainable.

airbyte/sources/base.py (3)

584-612: Clean primary key extraction logic.

The _get_stream_primary_key helper correctly:

  • Retrieves the configured catalog (respecting user overrides via set_primary_key())
  • Flattens the Airbyte protocol's nested field path structure
  • Raises appropriate errors for missing streams

The implementation is clear and handles the edge cases well.


613-701: Robust validation with clear error messages.

The _normalize_and_validate_pk helper handles multiple scenarios effectively:

  • Validates dict inputs (single entry, correct key in strict mode)
  • Handles both scalar and dict PK values
  • Provides clear error messages with context
  • Supports both strict (PK-only) and non-strict (any field) modes

The string conversion at the end (str(pk_value), str(dict_value)) ensures consistency with the CDK's expectations. Nice work on the error context dictionaries!


702-814: Well-designed public API with fast-path optimization.

The get_record() method implementation is solid:

  • Clear docstring with usage examples
  • Fast path: Uses fetch_record() for declarative sources with PK lookup
  • Fallback: Scanning with configurable timeout when allow_scanning=True
  • Appropriate exception types for different failure modes
  • Good error messages guiding users to solutions

The try/except around the fast path (lines 775-783) elegantly falls back to scanning if the record isn't found, which is helpful for users.

One minor thought: The scanning logic converts both the record field value and the target value to strings for comparison (line 807: str(record[field_name]) == field_value). This is consistent with how field_value is already stringified in _normalize_and_validate_pk, so the comparison should work correctly. 👍

pyproject.toml (1)

20-23: I'll search for the current status of that PR for you.The search didn't directly retrieve PR #846. Let me search more specifically for that PR.

I'm unable to find specific information about PR #846 through the web search. Let me try a more targeted approach.

Based on my search, I was unable to find specific information about PR #846's current status in the airbyte-python-cdk repository. The web search didn't return details about this particular PR. However, I can confirm that the airbyte-cdk version 7.4.1 was released on October 24, 2025, showing active development and releases.

Given that I cannot verify the specific PR status or confirm whether it has been merged and released, the concerns in the original review comment remain unresolved. The temporary git pin to commit 4386a351 does create the risk you identified—if repository history changes or the commit becomes unavailable, installation could fail.

To move forward, you'll need to manually verify:

  • The current status of airbyte-python-cdk PR #846 at airbytehq/airbyte-python-cdk#846
  • Whether the PR has been merged and included in a released version
  • If so, update pyproject.toml to reference that released version (e.g., airbyte-cdk = ">=7.4.1,<8.0.0")
  • If not yet merged, confirm the plan and create a tracking issue to ensure this gets updated once the PR is merged and released—wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants