Skip to content

chore: async engine follow-up - rename, preview, lifecycle, progress#456

Merged
andreatgretel merged 16 commits intomainfrom
andreatgretel/chore/async-engine-followup-v2
Mar 25, 2026
Merged

chore: async engine follow-up - rename, preview, lifecycle, progress#456
andreatgretel merged 16 commits intomainfrom
andreatgretel/chore/async-engine-followup-v2

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

@andreatgretel andreatgretel commented Mar 24, 2026

Summary

Follow-up to #449 (async engine PR 6) addressing four open tickets, plus progress bar support and a deadlock fix.

#437 - Rename ColumnWiseDatasetBuilder to DatasetBuilder

  • Renamed class and file (column_wise_builder.py -> dataset_builder.py)
  • Updated all imports, references, and test function names across the codebase

#442 - Async preview path

  • Extracted _prepare_async_run() factory method shared by build and preview
  • Added _build_async_preview() to DatasetBuilder
  • Preview uses free_row_group() to release memory without checkpointing

#444 - Unify row-group lifecycle callback

  • Replaced dual callbacks with single on_finalize_row_group
  • Added detection for fully-dropped row groups

#443 - Consolidated async progress reporting

  • New AsyncProgressReporter replaces per-column logging in async path
  • Added RunConfig.progress_interval parameter (default 5s)
  • Added ContextVar-based row group prefix (x/X) on repetitive log messages
  • Fixed duplicate 100% messages and premature first-report timing

Sticky ANSI progress bars (new)

  • New opt-in RunConfig.progress_bar setting for sticky terminal progress bars
  • Bars stay at the bottom of the terminal while log messages scroll above
  • Pure ANSI escape codes, no new dependencies
  • Falls back to existing log-based output on non-TTY (CI, pipes, notebooks)
  • Progress bar updates on every completion in async path (bypasses time-gated reporting)
  • Key files: sticky_progress_bar.py, run_config.py

Row-group semaphore deadlock fix (new)

  • Fixed latent deadlock where transient failures on all tasks of admitted row groups block the row-group semaphore indefinitely, preventing new row groups from being admitted
  • Solution: eager inline salvage for stalled row groups - retries deferred tasks immediately after each checkpoint pass, so row groups with 0 in-flight work don't waste semaphore slots while other groups are still active
  • If salvage exhausts all retries, failed rows are dropped so the row group can still checkpoint and free its slot
  • Added regression test test_scheduler_rg_semaphore_deadlock_with_transient_failures

Attention Areas

Reviewers: Please pay special attention to the following:

  • async_scheduler.py - Deadlock fix: _salvage_stalled_row_groups and updated main dispatch loop

Test plan

  • test_scheduler_on_finalize_row_group_callback_fires - verifies finalize fires for completed row groups
  • test_scheduler_on_finalize_skips_empty_row_group - verifies dropped row groups skip finalize
  • test_free_row_group_releases_memory / test_free_row_group_idempotent - new buffer manager tests
  • test_scheduler_rg_semaphore_deadlock_with_transient_failures - regression test for deadlock fix
  • Updated integration tests (test_build_async_end_to_end, test_checkpoint_produces_correct_parquet_calls)
  • All renamed test_dataset_builder_* tests pass
  • All 30 async scheduler tests pass
  • 2818/2818 tests pass

Closes #437, closes #442, closes #443, closes #444

Demo script (async progress + salvage)
#!/usr/bin/env python3
"""Temp script to eyeball async progress reporter logs.

Run: DATA_DESIGNER_ASYNC_ENGINE=1 .venv/bin/python scripts/demo_async_progress.py
"""
from __future__ import annotations

import os
import tempfile

os.environ["DATA_DESIGNER_ASYNC_ENGINE"] = "1"

import data_designer.config as dd
from data_designer.config.models import ChatCompletionInferenceParams, ModelConfig
from data_designer.config.run_config import RunConfig
from data_designer.interface import DataDesigner

NUM_RECORDS = 100
BUFFER_SIZE = 10
MAX_PARALLEL = 32

with tempfile.TemporaryDirectory() as tmp:
    designer = DataDesigner(artifact_path=tmp)
    designer.set_run_config(RunConfig(buffer_size=BUFFER_SIZE, progress_bar=True))

    config = dd.DataDesignerConfigBuilder()
    config.delete_model_config("nvidia-text")
    config.add_model_config(
        ModelConfig(
            alias="nvidia-text",
            model="nvidia/nemotron-3-nano-30b-a3b",
            provider="nvidia",
            inference_parameters=ChatCompletionInferenceParams(max_parallel_requests=MAX_PARALLEL),
        )
    )
    config.add_column(
        dd.SamplerColumnConfig(
            name="topic",
            sampler_type=dd.SamplerType.CATEGORY,
            params=dd.CategorySamplerParams(values=["science", "history", "art", "music"]),
        )
    )
    config.add_column(
        dd.LLMTextColumnConfig(
            name="question",
            model_alias="nvidia-text",
            prompt="Write a short trivia question about {{ topic }}.",
        )
    )
    config.add_column(
        dd.LLMTextColumnConfig(
            name="answer",
            model_alias="nvidia-text",
            prompt="Answer this trivia question in one sentence: {{ question }}",
        )
    )
    config.add_column(
        dd.LLMTextColumnConfig(
            name="difficulty",
            model_alias="nvidia-text",
            prompt="Rate the difficulty of this question 1-5: {{ question }}",
        )
    )

    result = designer.create(config, num_records=NUM_RECORDS, dataset_name="progress_demo")

Description updated with AI

@andreatgretel andreatgretel requested a review from a team as a code owner March 24, 2026 19:20
…-group lifecycle

- Rename ColumnWiseDatasetBuilder to DatasetBuilder and
  column_wise_builder.py to dataset_builder.py, update all references
- Extract _prepare_async_run() factory shared by build and preview paths
- Add _build_async_preview() for async preview with no disk checkpoints
- Replace on_row_group_complete/on_checkpoint_complete with single
  on_finalize_row_group callback; caller handles checkpointing
- Add free_row_group() on RowGroupBufferManager for discard-without-write
- Free fully-dropped row groups instead of finalizing them
- Add consolidated AsyncProgressReporter for async generation logging

Closes #437, closes #442, closes #444
- Add AsyncProgressReporter: groups per-column progress into a single
  log block emitted at configurable intervals (default 5s)
- Add quiet mode to ProgressTracker to suppress per-tracker logging
  when used with the consolidated reporter
- Add ContextVar-based row group tagging (RG1, RG2, ...) for log
  messages emitted inside async tasks (samplers, expressions, seeds)
- Add progress_interval to RunConfig for user-configurable reporting
- Remove log_start_async from ProgressTracker (superseded by reporter)

Closes #443
@andreatgretel andreatgretel force-pushed the andreatgretel/chore/async-engine-followup-v2 branch from 436a890 to 709f53d Compare March 24, 2026 19:24
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 24, 2026

Greptile Summary

This PR is a well-structured follow-up to the async engine work (#449), delivering four tracked improvements (rename, preview path, lifecycle callback unification, progress reporting) plus two unplanned additions (sticky ANSI progress bars and a row-group semaphore deadlock fix). The implementation is thorough — prior review concerns around the __enter__/try ordering, lock scope in _emit, the has_row_group preview guard, and exclude_columns in salvage are all resolved.

Key changes:

  • ColumnWiseDatasetBuilderDatasetBuilder (file + class rename, all references updated)
  • _prepare_async_run() factory method deduplicates async setup between build and preview paths
  • on_finalize_row_group replaces the dual-callback pattern; fully-dropped row groups now skip finalization cleanly
  • AsyncProgressReporter consolidates per-column logging with ContextVar-based row-group prefix and time-gated emission
  • StickyProgressBar provides opt-in sticky ANSI bars via RunConfig.progress_bar; falls back gracefully on non-TTY
  • Deadlock fix in _salvage_stalled_row_groups: stalled row groups (0 in-flight, all tasks deferred) are eagerly salvaged inline so their semaphore slots are freed before new groups are admitted

One remaining gap identified: The already_dropped guard added to _salvage_stalled_row_groups (to prevent double-counting when multiple cell tasks exhaust retries on the same row) does not extend to row_index=None (seed/batch) tasks. A pipeline with two independent seed generators on the same row group, both exhausting salvage retries, would double-count the second column's progress. The fix is a symmetric all(is_dropped(...)) check for the batch path.

Confidence Score: 4/5

  • Safe to merge with one targeted fix: extend the already_dropped guard in _salvage_stalled_row_groups to cover seed/batch tasks.
  • The bulk of prior review concerns are resolved (context manager ordering, lock scope, double-counting for cell tasks, preview KeyError). The remaining gap — already_dropped not guarding row_index=None tasks — is a real but rare scenario (multiple independent seed generators both exhausting salvage retries), and its impact is limited to misleading progress percentages rather than data loss or crashes. The deadlock fix, rename, preview path, and progress bar all look correct and well-tested.
  • async_scheduler.py lines 451–461: the already_dropped guard for exhausted seed/batch tasks.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Core async scheduler with deadlock fix and lifecycle callbacks. The _salvage_stalled_row_groups logic correctly guards against double-counting for cell tasks but misses the same guard for seed/batch tasks (row_index=None), which can double-count progress for pipelines with multiple independent seed generators.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/sticky_progress_bar.py New ANSI sticky progress bar. The with ... or contextlib.nullcontext() pattern is correctly adopted. Handler wrapping/unwrapping logic is sound. The exit race window noted in a previous thread is tracked as a follow-up.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py New consolidated async progress reporter. Uses atomic snapshots via get_snapshot(), no longer reaches into private tracker state. Dedup via _last_reported_total is clean. Bar update path correctly bypasses time-gating.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Renamed from ColumnWiseDatasetBuilder with async preview path added. _prepare_async_run factory cleanly shares setup between build and preview. has_row_group(0) guard correctly handles all-dropped preview row group.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py Added has_row_group and free_row_group methods. Logic is correct and well-tested.
packages/data-designer-config/src/data_designer/config/run_config.py Added progress_bar and progress_interval fields with appropriate validators and docstrings.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py Added get_snapshot() for atomic multi-field reads under lock. Clean public interface; _random_emoji is no longer accessed externally.
packages/data-designer-engine/src/data_designer/engine/context.py New ContextVar-based row group tag for log prefixes. Implementation is clean and straightforward.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Comprehensive scheduler tests including new deadlock regression test. All 30 scheduler tests pass.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[run] --> B[log_start]
    A --> C[_admit_row_groups]
    A --> D[_main_dispatch_loop]

    C --> C1[acquire rg_semaphore]
    C1 --> C2[init_row_group in buffer]
    C2 --> C3[_dispatch_seeds\nfrom_scratch tasks]

    D --> D1{early_shutdown?}
    D1 -- yes --> D2[_salvage_stalled_row_groups]
    D2 --> D3[_checkpoint_completed_row_groups]
    D1 -- no --> D4[_run_seeds_complete_check\npre-batch callback]
    D4 --> D5[get_ready_tasks\ndispatch frontier]
    D5 --> D6[_checkpoint_completed_row_groups\nfinalize + release rg_semaphore]
    D6 --> D7{deferred tasks?}
    D7 -- yes --> D8[_salvage_stalled_row_groups\ndeadlock fix: eager inline retry]
    D8 --> D9{all done?}
    D7 -- no --> D9
    D9 -- no --> D10[wait wake_event]
    D10 --> D1
    D9 -- yes --> D11[log_final\nAsyncProgressReporter]

    D8 --> E[_salvage_rounds\nretry deferred tasks]
    E --> F{exhausted retries?}
    F -- yes --> G[already_dropped check\nrecord_failure / _drop_row_group\nexclude_columns guard]
    G --> H[_checkpoint_completed_row_groups]
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 451-461

Comment:
**`already_dropped` guard doesn't cover batch/from-scratch tasks**

The `already_dropped` short-circuit uses `task.row_index is not None` as its first operand, so it always evaluates to `False` for seed/batch tasks (`row_index is None`). This means `record_failure` fires unconditionally for every exhausted seed task, even when a sibling exhausted seed task already called `_drop_row_group` and recorded skips for the same row group.

**Reproducing scenario:** Two independent `from_scratch` generators (different instances, e.g. two `SamplerCategoryGenerator` configs as seed columns) both exhaust salvage retries on row group `rg=0`:

1. Task A (`col=topic`, `row_index=None`): `already_dropped = False``record_failure(topic)`, then `_drop_row_group(0, size, exclude={topic})` marks all rows dropped and calls `record_skipped(language)` for every row.
2. Task B (`col=language`, `row_index=None`): `already_dropped = False` still → `record_failure(language)` fires — but `record_skipped(language)` was already counted in step 1. `language.completed` is now incremented twice per row.

This is the exact same class of bug that was fixed for `row_index is not None` tasks in this PR. The fix should extend the `already_dropped` check to the batch case:

```python
if task.row_index is not None:
    already_dropped = self._tracker.is_dropped(task.row_group, task.row_index)
else:
    rg_size = self._get_rg_size(task.row_group)
    already_dropped = all(self._tracker.is_dropped(task.row_group, ri) for ri in range(rg_size))
if not already_dropped and self._reporter:
    self._reporter.record_failure(task.column)
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (12): Last reviewed commit: "Merge branch 'main' into andreatgretel/c..." | Re-trigger Greptile

Add RunConfig.progress_bar setting that replaces periodic log-line
progress with sticky terminal bars that stay at the bottom while
logs scroll above. Pure ANSI escape codes, no new dependencies.

Disabled by default - existing log-based output unchanged.
Skip the time gate when the progress bar is active so the bar
redraws on every record instead of every progress_interval seconds.
When all tasks for admitted row groups fail with transient errors,
the row-group semaphore never releases, blocking admission of new
row groups. Fix by salvaging stalled row groups inline - retrying
deferred tasks immediately so row groups can checkpoint and free
their semaphore slots.

Also updates row group log format to (x/X) with leading zeros.
Run inline salvage after every checkpoint pass instead of only when
globally stalled. Row groups with 0 in-flight and only deferred tasks
are salvaged immediately, freeing their semaphore slot for new work.
- Use `with` statement for progress bar context (safe __exit__ on error)
- Check bar.is_active instead of bar is not None (non-TTY fallback)
- Record failures (not skips) for tasks that exhaust salvage retries
- Record skipped tasks when pre-batch filtering drops rows
- Pre-compute fixed stats width at bar creation to prevent bar
  resizing when failed count appears
- Cap displayed completed at total to avoid >100% on retries
- Exclude already-failed columns from skip recording to prevent
  double-counting in progress reporter
nabinchha
nabinchha previously approved these changes Mar 25, 2026
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on this one, Andre. The refactoring is clean and the new code is well-tested. A few things to look at below, but nothing blocking.

Warnings

async_scheduler.py:711-714 — Non-retryable from_scratch/batch failure still missing exclude_columns

In _execute_task_inner_impl, when a non-retryable error occurs on a from_scratch or batch task, record_failure(task.column) fires on line 700, then _drop_row_group is called on line 714 without exclude_columns={task.column}. This means _record_skipped_tasks_for_row adds an extra record_skipped for the same column — same double-count pattern that 4890fa31 fixed in _salvage_stalled_row_groups. Suggest:

self._drop_row_group(task.row_group, rg_size, exclude_columns={task.column})

async_scheduler.py:514-517_in_flight_for_rg is dead code

Defined but never called anywhere. Likely superseded by the direct s.in_flight_count == 0 check in the stalled RG detection. Safe to remove.

progress_tracker.py:99get_snapshot return type is an opaque 8-tuple

Callers have to remember positional meaning and there's already unpacking noise with _-prefixed throwaway variables in async_progress_reporter.py. A frozen @dataclass (ProgressSnapshot) would be cleaner — fine as a follow-up.

Suggestions

  • StickyProgressBar is created even when the reporter is None (no CELL_BY_CELL columns) — gate creation on reporter existence
  • _make_wrapper and _wrapped_handlers typed as object instead of Callable[[logging.LogRecord], None]
  • _redraw calls shutil.get_terminal_size() on every update — consider caching under high throughput
  • context.py:13 docstring says "Group x/X" but the actual format is (01/10)
  • _compute_stats_width rate placeholder 9999.9 could be exceeded at very high throughput (low priority)

What Looks Good

  • Deadlock fix is well-designed with a strong regression test
  • _prepare_async_run extraction cleanly eliminates build/preview duplication
  • Rename is thorough — no stale references in source code
  • Latest commit (4890fa31) is thoughtful polish

Ship it (with nits) — only the exclude_columns fix on line 714 is worth addressing before merge.

- Add exclude_columns={task.column} on non-retryable batch/from_scratch
  drop path to prevent double-counting (same pattern as cell path)
- Simplify salvage drop to per-task exclude (is_dropped guard handles
  multi-column case)
- Remove dead _in_flight_for_rg method
- Fix context.py docstring to match actual (x/X) format
Copy link
Copy Markdown
Contributor Author

@andreatgretel andreatgretel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nabinchha! Addressed the blocking item and nits:

  • exclude_columns on the batch/from_scratch drop path - fixed in f3a522f9, now matches the cell path pattern
  • _in_flight_for_rg dead code - removed
  • context.py docstring - fixed to match actual (x/X) format

Agree on the follow-up items (ProgressSnapshot dataclass, gating StickyProgressBar on reporter, typing _make_wrapper, terminal size caching). Will track those separately.

andreatgretel and others added 2 commits March 25, 2026 13:21
After salvage discards a cell task from dispatched (making it
available in the frontier), _drain_frontier broke immediately
because nothing was in-flight yet. The task and its downstream
were never re-dispatched, leaving the row group incomplete.

Fix: only break when both ready and in-flight are empty.
- _drain_frontier: only break when both ready and in-flight are empty
- _salvage_rounds: re-mark sibling columns as dispatched after
  from_scratch retry to prevent duplicate dispatch
- _salvage_stalled_row_groups: separate exhausted tasks from new
  drain failures to avoid treating non-stalled tasks as permanent
- _checkpoint_completed_row_groups: clean up deferred tasks for
  checkpointed row groups
- Early shutdown: salvage stalled row groups before exiting
@andreatgretel
Copy link
Copy Markdown
Contributor Author

A few last minute fixes for salvage edge cases found during review:

  • _drain_frontier was exiting before dispatching ready tasks when nothing was in-flight (e.g. after salvage retries a cell task, its downstream never ran)
  • _salvage_rounds wasn't re-marking sibling columns as dispatched after a from_scratch retry, risking duplicate dispatch
  • New transient failures during a salvage drain were incorrectly treated as permanently failed instead of being preserved for future retries
  • Deferred tasks for already-checkpointed row groups were accumulating in memory
  • Early shutdown now salvages stalled row groups before exiting

When multiple columns fail for the same row, the first drop records
a skip for the other column. Without this guard, record_failure
fires again for the second column, double-counting it.
@andreatgretel andreatgretel merged commit c146af5 into main Mar 25, 2026
46 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants