Skip to content

fix(actions): make async_commit_enabled actually use async offset commits#16776

Open
max-datahub wants to merge 1 commit intomasterfrom
fix/actions-async-commit-bug
Open

fix(actions): make async_commit_enabled actually use async offset commits#16776
max-datahub wants to merge 1 commit intomasterfrom
fix/actions-async-commit-bug

Conversation

@max-datahub
Copy link
Collaborator

Summary

The async_commit_enabled configuration flag in KafkaEventSource has been non-functional since datahub-actions was moved into the OSS repo (#13120). When enabled, it was supposed to switch from per-event synchronous Kafka offset commits (~3ms each) to periodic background commits via librdkafka's auto-commit mechanism. Instead, due to a logic error in ack(), synchronous commits were always performed for processed events regardless of the flag.

The Bug

The ack() method had this condition:

if processed or not self.source_config.async_commit_enabled:
    # synchronous commit (with retry)
    ...
else:
    # store offset for periodic autocommit
    self._store_offsets(event)

When processed=True (which is the case for every normally processed event — the pipeline coerces None to True in pipeline.py:run()), the or short-circuits and the synchronous commit path is always taken, regardless of async_commit_enabled.

processed async_commit_enabled processed or not async Path
True False True sync commit
True True True sync commit (bug)
False False True sync commit
False True False store offsets

The async path was only reachable when processed=False AND async_commit_enabled=True — a combination that essentially never occurs in practice.

The Fix

The condition now branches solely on async_commit_enabled:

if not self.source_config.async_commit_enabled:
    # synchronous commit
    ...
else:
    self._store_offsets(event)

The constructor already correctly configured librdkafka for the "manual store + auto commit" pattern (enable.auto.offset.store=False, enable.auto.commit=True). Only the ack() method needed fixing.

Impact

  • No behavior change for existing users. async_commit_enabled defaults to False, so the synchronous commit path remains the default.
  • Users who opt in to async_commit_enabled: true will now actually get asynchronous commits, with offsets stored locally and committed periodically by librdkafka's background thread.
  • Tradeoff: With async commits, up to async_commit_interval milliseconds of events may be redelivered after a consumer crash. For idempotent actions this is a non-issue. The interval is configurable (default 10s).

Benchmark Results

This fix has been tested and verified at scale on an EKS cluster with MSK (kafka.t3.small × 2, 5 partitions):

Configuration Throughput Notes
Single pod, sync commits (before fix) 326 events/sec ~3ms sync commit per event is the bottleneck
5 pods, sync commits (before fix) 1,408 events/sec 86% scaling efficiency
Single pod, async commits (after fix) 8,200 events/sec 25× improvement, bottleneck shifts to CPU

Benchmark used a noop action (counter only — no I/O, no payload parsing) to isolate framework overhead from action logic. This measures the upper-bound framework ceiling: Kafka poll → Avro deserialization → event routing → offset commit.

Test Plan

  • Added unit tests for ack() covering all 4 combinations of async_commit_enabled × processed
  • Verified the new test fails against the old code (the async_mode + processed=True case hits synchronous commit instead of store)
  • Verified all 6 Kafka source tests pass with the fix
  • Verified at scale on EKS with MSK (326 → 8,200 events/sec)

Made with Cursor

The `ack()` method in `KafkaEventSource` had a logic error where the
condition `if processed or not self.source_config.async_commit_enabled`
short-circuited to synchronous commits whenever `processed=True` —
which is the case for every normally processed event. This made the
`async_commit_enabled` config flag effectively dead code.

The constructor correctly configured librdkafka for the "manual store +
auto commit" pattern (enable.auto.offset.store=False,
enable.auto.commit=True), but `ack()` never reached the
`_store_offsets()` path for processed events.

The fix changes the condition to branch solely on
`async_commit_enabled`, so sync mode always does synchronous commits
and async mode always stores offsets for periodic background commit.

Benchmarked at 25x throughput improvement (326 -> 8,200 events/sec
on a single pod with a noop action) when async commits work correctly.

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution PR or Issue raised by member(s) of DataHub Community needs-review Label for PRs that need review from a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants