feat: add async generator migration with symmetric bridging and statefulness#378
Conversation
…fulness - Symmetric generate/agenerate bridging in base ColumnGenerator - is_stateful property; SeedDatasetColumnGenerator declares True - Async wrappers for FromScratchColumnGenerator and ColumnGeneratorFullColumn - Native async paths for ImageCellGenerator and EmbeddingCellGenerator - CustomColumnGenerator.agenerate with full validation parity - Extract _postprocess_result for shared sync/async output validation
Greptile SummaryThis PR (2 of 4 in the async engine migration) makes all column generators async-capable without breaking existing sync usage by adding symmetric
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py | Adds _run_coroutine_sync helper and symmetric generate/agenerate bridging; try/finally pool lifecycle and builtin TimeoutError wrapping look correct after previous fixes. |
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py | Adds agenerate with full-column thread delegation and native async cell-by-cell path; _postprocess_result and _ainvoke_generator_function are clean shared helpers; async full-column path delegates to sync generate (pre-existing limitation acknowledged by authors). |
| packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py | 19 tests covering bridging symmetry, statefulness, async custom generators, and error-path parity; fixture dependencies resolved via parent conftest.py; object.__new__ pattern correctly avoids .fget brittleness. |
Sequence Diagram
sequenceDiagram
participant Caller
participant ColumnGenerator
participant _run_coroutine_sync
participant ThreadPoolExecutor
participant asyncio
note over Caller,asyncio: Sync caller → async-only generator
Caller->>ColumnGenerator: generate(data)
ColumnGenerator->>ColumnGenerator: _is_overridden("agenerate") → True
ColumnGenerator->>_run_coroutine_sync: _run_coroutine_sync(self.agenerate(data))
_run_coroutine_sync->>asyncio: get_running_loop() raises RuntimeError (no loop)
_run_coroutine_sync->>asyncio: asyncio.run(coro)
asyncio-->>_run_coroutine_sync: result
_run_coroutine_sync-->>ColumnGenerator: result
ColumnGenerator-->>Caller: result
note over Caller,asyncio: Async caller (e.g. notebook) → sync-only generator
Caller->>ColumnGenerator: await agenerate(data)
ColumnGenerator->>ColumnGenerator: _is_overridden("generate") → True
ColumnGenerator->>asyncio: asyncio.to_thread(self.generate, data.copy())
asyncio->>ThreadPoolExecutor: run generate(data_copy) in thread
ThreadPoolExecutor-->>asyncio: result
asyncio-->>ColumnGenerator: result
ColumnGenerator-->>Caller: result
note over Caller,asyncio: Async caller with running loop → async-only generator (bridge path)
Caller->>ColumnGenerator: generate(data)
ColumnGenerator->>_run_coroutine_sync: _run_coroutine_sync(self.agenerate(data))
_run_coroutine_sync->>asyncio: get_running_loop() → loop exists
_run_coroutine_sync->>ThreadPoolExecutor: pool.submit(asyncio.run, coro)
ThreadPoolExecutor-->>_run_coroutine_sync: future.result(timeout=300s)
_run_coroutine_sync->>ThreadPoolExecutor: pool.shutdown(wait=True)
_run_coroutine_sync-->>ColumnGenerator: result
ColumnGenerator-->>Caller: result
Last reviewed commit: 1928b32
Use explicit pool lifecycle instead of context manager so that a TimeoutError releases the caller immediately via shutdown(wait=False) rather than blocking on pool.__exit__.
Add @overload declarations so the base agenerate accepts both dict and pd.DataFrame, mirroring the existing generate pattern.
also addressed the |
The else clause after return was unreachable, leaking the ThreadPoolExecutor on every successful call. Capture the result first, shut down the pool, then return.
Ensures ThreadPoolExecutor is shut down on all exit paths, including non-TimeoutError exceptions from the coroutine.
Move duplicated input validation and prompt rendering into _prepare_image_inputs, shared by generate and agenerate.
| def is_stateful(self) -> bool: | ||
| """Whether this generator maintains state across calls. | ||
|
|
||
| Stateful generators are serialized per-instance by the async scheduler | ||
| (row group N must complete before N+1 starts for that generator). | ||
| """ | ||
| return False |
There was a problem hiding this comment.
nit: is_stateful could read as vague — lots of things have state. May be requires_sequential_execution?
There was a problem hiding this comment.
I'm still trying to fully understand this property. I think we want the "stateful" part in there because this is for columns like the seed column, which needs to remember where it is at in the generation process – is that right? I think the second part of the docstring is a bit hard to follow (might be just me, though).
There was a problem hiding this comment.
@nabinchha renamed to is_order_dependent - captures the key semantic (output depends on call order) without being as vague as is_stateful. docstring updated with a concrete example.
There was a problem hiding this comment.
@johnnygreco yeah exactly - the seed column needs to remember where it is in the dataset. renamed to is_order_dependent to make the intent clearer at a glance.
| # The @custom_column_generator decorator wraps the user function in a sync | ||
| # wrapper, so we must unwrap to detect async functions. |
There was a problem hiding this comment.
lol meant fancy stuff here 🙃
There was a problem hiding this comment.
yeah the decorator wrapping forces our hand here - inspect.unwrap is the cleanest way to peek through to the original async function.
- add _is_overridden helper for symmetric generate/agenerate guards - move defensive .copy() into base agenerate, remove subclass overrides - re-raise as builtin TimeoutError for Python 3.10 compat - rename is_stateful to is_order_dependent with improved docstring - replace brittle .fget test with object.__new__ - add async tests for ImageCellGenerator and EmbeddingCellGenerator
* fix: address review feedback on async engine dev note - Fix wall-clock claim: 41% -> 22% to match benchmark table - Fix dual-model speedup rounding: 1.7x -> 1.6x (10.0/6.1 = 1.64) - Fix run_config API: use dd.set_run_config() instead of passing to create() * docs: add async engine dev note Add "Async All the Way Down" dev note covering the async task-queue scheduler built across PRs #356, #378, #404, #429, #456. Includes benchmark results, architecture diagrams, and DAG shape illustrations. * feat: add docs preview workflow for PRs Build MkDocs site on PRs that touch docs and deploy to Cloudflare Pages. Each PR gets a browseable preview URL posted as a comment. Notebook tutorials use placeholder stubs since they require API keys to execute. Requires CLOUDFLARE_API_TOKEN and CLOUDFLARE_ACCOUNT_ID repo secrets. * fix: update speedup chart alt text from 1.7x to 1.6x * docs: improve timeline figure context and labeling Add DAG subtitle to sync-vs-async timeline figure and bridge the surrounding text to explain which workload shape is being shown. * edits+additions to async-all-the-way-down dev notes * clarify two semaphore dance * remove dead link * replace hero image * docs: update scale figures with nginx-accurate data and adjust sizing Regenerate scale-model-timeline and scale-boxplot from nginx access logs (column_progress.csv, sync/summary.json) instead of buffered execution logs. Optimize both PNGs to palette mode. Adjust figure widths and update model timeline commentary. * add link from owning-the-model-stack to async-dev-node * docs: address review feedback on async blog post - Tighten intro to a concise abstract, move pipeline narrative into "The Bottleneck Was Structural" section - Remove multi-column generators / seed readers paragraph (TMI) - Clarify sync engine ran columns sequentially within each batch --------- Co-authored-by: Nabin Mulepati <nmulepati@nvidia.com>
Summary
PR 2 of 4 in the async engine migration (#346).
Full plan:
plans/346/async-generators-and-task-queue.mdPR stack:
This PR makes all column generators async-capable without breaking existing sync usage. Generators can implement either
generate()oragenerate()(or both), and the base class bridges between them automatically.Changes
Added
generate/ageneratebridging inColumnGenerator- implement one, get the other for free_run_coroutine_synchelper for running coroutines from sync contexts (including notebooks with running event loops)_is_overriddenhelper for symmetric override detection in bothgenerate()andagenerate()is_order_dependentproperty onColumnGenerator(defaultFalse);SeedDatasetColumnGeneratordeclaresTruedata.copy()in baseagenerateto prevent caller mutation across threadsFromScratchColumnGenerator.agenerate_from_scratchasync wrapperImageCellGeneratorandEmbeddingCellGeneratorwith extracted_prepare_*_inputsmethodsCustomColumnGenerator.ageneratewith full validation parity (required_columns, output shape, error wrapping)_postprocess_resultextracted for shared sync/async output validationChanged
TimeoutErrorfor Python 3.10 compatFixed
shutdown(wait=False)instead of blocking on pool cleanupAttention Areas
base.py- the symmetric bridging logic,_is_overriddenhelper, and_run_coroutine_syncare the foundation for the rest of the stackcustom.py- async branch now runs the same validation as sync;_postprocess_resultis shared between both pathsCloses #381
Description updated with AI