Skip to content

Persistence in iterate operator #214

@zxqfd555

Description

@zxqfd555

Is your feature request related to a problem? Please describe.

The iterate operator currently has no persistence support. On restart, pipelines that use iterate must recompute the fixed point from scratch — even if the converged output was identical to the previous run. This is expensive for pipelines where fixed-point convergence is slow or where input data is large. All other stateful operators (reduce, groupby, deduplicate, etc) support persistence; iterate is the only exception.

The root cause is architectural: iterate uses a nested timely scope with a compound timestamp Product<Timestamp, u32> (outer time × iteration counter). The existing persistence machinery is hardcoded to plain Timestamp and cannot be made generic over MaybeTotalTimestamp, so persistence cannot be wired inside the iterative subscope.

Describe the solution you'd like

The key insight is that we don't need to persist anything inside the loop. At fixed point, by definition logic(F) = F — the converged output F is a complete description of the operator's state. If we save F and inject it back as the loop's starting point on restart, the loop immediately converges (zero additional iterations) for unchanged input and propagates only incremental deltas for new input.

The converged output lives in the outer scope with plain Timestamp after .leave(). We can persist it with the existing persist_state() / read_persisted_state() functions, unchanged. The inner scope is never touched.

Concretely, the data flow changes from:

outer_collection
  → [enter subscope] → SafeVariable(initial = outer.enter())
  → user logic → [feedback] → (converges)
  → [.leave()] → outer result

to:

outer_collection
  → read_persisted_state(reader) → persisted_collection  (outer scope, T=0)
  → [enter subscope] → SafeVariable(initial = outer.enter() + persisted.enter())
  → user logic → [feedback] → (converges, 0 iterations for unchanged data)
  → [.leave()] → outer result
  → persist_state(writer) → [storage]

The persisted data enters at Timestamp(0) in the outer scope, becomes Product(Timestamp(0), 0) inside the subscope, and serves as the warm starting point. DD's incremental computation handles the rest correctly.

The implementation requires changes in five places — all in the outer scope, none inside the loop:

  1. DataflowGraphInner::iterate() — assign a stable effective_persistent_id() per iterated table, load state via create_operator_snapshot_reader, create writers via create_operator_snapshot_writer.
  2. BeforeIterate — carry a persisted_collections vec alongside existing handles, threaded through from iterate().
  3. IteratedUniverse::create / IteratedColumn::create — concat the persisted collection into SafeVariable::new_from(outer.enter().concat(persisted.enter()), step).
  4. IteratedColumn::finish / IteratedUniverse::finish — wrap .leave() result with persist_state(writer) before arranging.
  5. TimestampBasedPersistenceWrapper — minor extension to expose a path for creating writer/reader pairs for iterate tables.

Everything else (read_persisted_state, persist_state, PersistenceTime for Timestamp, ConcreteSnapshotMerger, OperatorSnapshotWriter/Reader, EmptyPersistenceWrapper inside the loop) is reused unchanged.

Edge cases:

  • First run (no persisted state): read_persisted_state returns an empty collection; behavior degrades to today's behavior with no special casing needed.
  • Multiple iterated tables: each gets its own stable persistent ID via effective_persistent_id().
  • Iteration limit: the limit: Option<u32> filter in AfterIterate::apply_limit() is unaffected — the persisted output reflects a valid prior fixed point regardless.
  • filter_out_persisted: persist_state already skips rows at T == persistence_time() (persist.rs:769), so injected baseline rows are not redundantly re-written to storage.
  • iterated_with_universe tables: IteratedUniverse follows the same injection and output-persist pattern.

Describe alternatives you've considered

Implementing PersistenceTime for Product<Timestamp, u32> to enable persistence inside the loop. This is explicitly ruled out in the codebase (persist.rs:182–188) because the snapshot writer cannot be made generic over MaybeTotalTimestamp without a significant architectural refactor. Persisting the outer output instead avoids this entirely.

Additional context

This change requires no new persistence semantics, no new traits, no changes to storage backends or the ConcreteSnapshotMerger, and no changes to connector offset persistence. The entire solution stays in the outer scope, treating iterate as a black box that produces a persistent collection — exactly like any other stateful operator does today.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions