Skip to content

feat: add async generator migration with symmetric bridging and statefulness#378

Merged
andreatgretel merged 11 commits intomainfrom
andreatgretel/feat/async-generators-migration
Mar 11, 2026
Merged

feat: add async generator migration with symmetric bridging and statefulness#378
andreatgretel merged 11 commits intomainfrom
andreatgretel/feat/async-generators-migration

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Mar 9, 2026

Summary

PR 2 of 4 in the async engine migration (#346).

Full plan: plans/346/async-generators-and-task-queue.md

PR stack:

  1. PR 1 - ExecutionGraph, CompletionTracker, and Task model (feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler #356, merged)
  2. PR 2 (this) - Async generator migration with symmetric bridging
  3. PR 3 - AsyncTaskScheduler and RowGroupBufferManager
  4. PR 4 - Wire async scheduler into ColumnWiseDatasetBuilder

This PR makes all column generators async-capable without breaking existing sync usage. Generators can implement either generate() or agenerate() (or both), and the base class bridges between them automatically.

Changes

Added

  • Symmetric generate/agenerate bridging in ColumnGenerator - implement one, get the other for free
  • _run_coroutine_sync helper for running coroutines from sync contexts (including notebooks with running event loops)
  • _is_overridden helper for symmetric override detection in both generate() and agenerate()
  • is_order_dependent property on ColumnGenerator (default False); SeedDatasetColumnGenerator declares True
  • Defensive data.copy() in base agenerate to prevent caller mutation across threads
  • FromScratchColumnGenerator.agenerate_from_scratch async wrapper
  • Native async paths for ImageCellGenerator and EmbeddingCellGenerator with extracted _prepare_*_inputs methods
  • CustomColumnGenerator.agenerate with full validation parity (required_columns, output shape, error wrapping)
  • _postprocess_result extracted for shared sync/async output validation
  • 22 tests covering bridging, order-dependence, async custom generators, image/embedding async paths, and error-path parity

Changed

  • Updated plan document with revised scope and status
  • Sync bridge timeout re-raises as builtin TimeoutError for Python 3.10 compat

Fixed

  • Sync bridge timeout now releases the caller immediately via shutdown(wait=False) instead of blocking on pool cleanup

Attention Areas

Reviewers, please pay special attention to:

  • base.py - the symmetric bridging logic, _is_overridden helper, and _run_coroutine_sync are the foundation for the rest of the stack
  • custom.py - async branch now runs the same validation as sync; _postprocess_result is shared between both paths

Closes #381


Description updated with AI

…fulness

- Symmetric generate/agenerate bridging in base ColumnGenerator
- is_stateful property; SeedDatasetColumnGenerator declares True
- Async wrappers for FromScratchColumnGenerator and ColumnGeneratorFullColumn
- Native async paths for ImageCellGenerator and EmbeddingCellGenerator
- CustomColumnGenerator.agenerate with full validation parity
- Extract _postprocess_result for shared sync/async output validation
@andreatgretel andreatgretel requested a review from a team as a code owner March 9, 2026 13:28
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 9, 2026

Greptile Summary

This PR (2 of 4 in the async engine migration) makes all column generators async-capable without breaking existing sync usage by adding symmetric generate/agenerate bridging to ColumnGenerator, a _run_coroutine_sync helper for running coroutines from sync contexts (including notebooks with live event loops), and native async paths for ImageCellGenerator and EmbeddingCellGenerator. Previous review issues around pool lifecycle, TimeoutError aliasing, duplicate agenerate overrides, and duplicated validation logic have all been addressed.

  • base.py: _run_coroutine_sync correctly uses try/finally with a timed_out flag to handle success, timeout, and domain-exception exit paths; _is_overridden correctly checks against ColumnGenerator via identity comparison so both bridging directions work reliably across the class hierarchy.
  • custom.py: agenerate branches on strategy and coroutine-ness; _ainvoke_generator_function correctly awaits the sync wrapper's return value (a coroutine object) for async user functions; _postprocess_result is shared between both paths for validation parity.
  • image.py / embedding.py: Shared _prepare_*_inputs helpers eliminate the previously duplicated validation logic; disk I/O in ImageCellGenerator.agenerate is correctly offloaded to a thread.
  • seed_dataset.py: is_order_dependent declared True to signal row-group ordering constraints to the upcoming scheduler (PR 3).
  • Tests: 19 tests cover bridging symmetry, error-path parity, and async variants of each generator type; stub_resource_provider is correctly resolved from tests/engine/conftest.py.

Confidence Score: 4/5

  • This PR is safe to merge; all critical pool-lifecycle and exception-handling issues from previous rounds are addressed and the bridging logic is sound.
  • The implementation is well-structured, all prior round issues (pool leaks, TimeoutError aliasing, duplicate overrides, duplicated validation) have been resolved, and 19 tests provide solid coverage. The only remaining items are a style note on shallow-copy semantics for dict inputs in agenerate and a missing test for the non-timeout exception propagation path through _run_coroutine_sync — neither of which blocks merging.
  • No files require special attention; base.py carries the most complex new logic but the bridging and pool-lifecycle handling are correct.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py Adds _run_coroutine_sync helper and symmetric generate/agenerate bridging; try/finally pool lifecycle and builtin TimeoutError wrapping look correct after previous fixes.
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py Adds agenerate with full-column thread delegation and native async cell-by-cell path; _postprocess_result and _ainvoke_generator_function are clean shared helpers; async full-column path delegates to sync generate (pre-existing limitation acknowledged by authors).
packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py 19 tests covering bridging symmetry, statefulness, async custom generators, and error-path parity; fixture dependencies resolved via parent conftest.py; object.__new__ pattern correctly avoids .fget brittleness.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant ColumnGenerator
    participant _run_coroutine_sync
    participant ThreadPoolExecutor
    participant asyncio

    note over Caller,asyncio: Sync caller → async-only generator
    Caller->>ColumnGenerator: generate(data)
    ColumnGenerator->>ColumnGenerator: _is_overridden("agenerate") → True
    ColumnGenerator->>_run_coroutine_sync: _run_coroutine_sync(self.agenerate(data))
    _run_coroutine_sync->>asyncio: get_running_loop() raises RuntimeError (no loop)
    _run_coroutine_sync->>asyncio: asyncio.run(coro)
    asyncio-->>_run_coroutine_sync: result
    _run_coroutine_sync-->>ColumnGenerator: result
    ColumnGenerator-->>Caller: result

    note over Caller,asyncio: Async caller (e.g. notebook) → sync-only generator
    Caller->>ColumnGenerator: await agenerate(data)
    ColumnGenerator->>ColumnGenerator: _is_overridden("generate") → True
    ColumnGenerator->>asyncio: asyncio.to_thread(self.generate, data.copy())
    asyncio->>ThreadPoolExecutor: run generate(data_copy) in thread
    ThreadPoolExecutor-->>asyncio: result
    asyncio-->>ColumnGenerator: result
    ColumnGenerator-->>Caller: result

    note over Caller,asyncio: Async caller with running loop → async-only generator (bridge path)
    Caller->>ColumnGenerator: generate(data)
    ColumnGenerator->>_run_coroutine_sync: _run_coroutine_sync(self.agenerate(data))
    _run_coroutine_sync->>asyncio: get_running_loop() → loop exists
    _run_coroutine_sync->>ThreadPoolExecutor: pool.submit(asyncio.run, coro)
    ThreadPoolExecutor-->>_run_coroutine_sync: future.result(timeout=300s)
    _run_coroutine_sync->>ThreadPoolExecutor: pool.shutdown(wait=True)
    _run_coroutine_sync-->>ColumnGenerator: result
    ColumnGenerator-->>Caller: result
Loading

Last reviewed commit: 1928b32

andreatgretel and others added 2 commits March 9, 2026 11:03
Use explicit pool lifecycle instead of context manager so that
a TimeoutError releases the caller immediately via
shutdown(wait=False) rather than blocking on pool.__exit__.
Add @overload declarations so the base agenerate accepts both
dict and pd.DataFrame, mirroring the existing generate pattern.
@andreatgretel
Copy link
Contributor Author

Base class agenerate signature (data: dict) -> dict is narrower than overrides in ColumnGeneratorFullColumn and FromScratchColumnGenerator [...] This violates Liskov Substitution

also addressed the agenerate type signature — added @overload declarations to match the existing generate pattern (dict -> dict | pd.DataFrame -> pd.DataFrame). fixed in 8b6e8b8.

The else clause after return was unreachable, leaking the
ThreadPoolExecutor on every successful call. Capture the result
first, shut down the pool, then return.
Ensures ThreadPoolExecutor is shut down on all exit paths,
including non-TimeoutError exceptions from the coroutine.
Move duplicated input validation and prompt rendering into
_prepare_image_inputs, shared by generate and agenerate.
Comment on lines +61 to +67
def is_stateful(self) -> bool:
"""Whether this generator maintains state across calls.

Stateful generators are serialized per-instance by the async scheduler
(row group N must complete before N+1 starts for that generator).
"""
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is_stateful could read as vague — lots of things have state. May be requires_sequential_execution?

Copy link
Contributor

@johnnygreco johnnygreco Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still trying to fully understand this property. I think we want the "stateful" part in there because this is for columns like the seed column, which needs to remember where it is at in the generation process – is that right? I think the second part of the docstring is a bit hard to follow (might be just me, though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nabinchha renamed to is_order_dependent - captures the key semantic (output depends on call order) without being as vague as is_stateful. docstring updated with a concrete example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnnygreco yeah exactly - the seed column needs to remember where it is in the dataset. renamed to is_order_dependent to make the intent clearer at a glance.

Comment on lines +74 to +75
# The @custom_column_generator decorator wraps the user function in a sync
# wrapper, so we must unwrap to detect async functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol meant fancy stuff here 🙃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the decorator wrapping forces our hand here - inspect.unwrap is the cleanest way to peek through to the original async function.

andreatgretel and others added 2 commits March 11, 2026 11:50
- add _is_overridden helper for symmetric generate/agenerate guards
- move defensive .copy() into base agenerate, remove subclass overrides
- re-raise as builtin TimeoutError for Python 3.10 compat
- rename is_stateful to is_order_dependent with improved docstring
- replace brittle .fget test with object.__new__
- add async tests for ImageCellGenerator and EmbeddingCellGenerator
Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@andreatgretel andreatgretel merged commit 8fff7c0 into main Mar 11, 2026
49 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add async generator migration with symmetric bridging and statefulness

3 participants