Skip to content

[FEAT] 2PC implementation#132

Open
gaurav7261 wants to merge 1 commit into
ClickHouse:mainfrom
gaurav7261:feat/2PC_implementation
Open

[FEAT] 2PC implementation#132
gaurav7261 wants to merge 1 commit into
ClickHouse:mainfrom
gaurav7261:feat/2PC_implementation

Conversation

@gaurav7261

@gaurav7261 gaurav7261 commented Apr 30, 2026

Copy link
Copy Markdown

Summary

Adds an exactly-once ClickHouse sink for Flink, closing the gap called out in the README ("Currently the sink does not support exactly-once semantics"). The sink is built on the Flink 2.0 Sink + SupportsCommitter + CommittingSinkWriter APIs and relies on ClickHouse's insert_deduplication_token for idempotent commits. Also ports the same design to the flink-connector-clickhouse-1.17 module using the Flink 1.17 TwoPhaseCommittingSink API.

eBay block-aggregator / ClickLoad style "deterministic block reconstruction + CH dedup" pattern, adapted to Flink 2PC.


Problem

The existing ClickHouseAsyncSink is built on Flink's AsyncSinkBase. Under failure:

  • Retries re-queue entire batches without any dedup key, so block content can drift across attempts and CH block-hash dedup may miss.
  • After exhausting retries the batch is silently dropped (numOfDroppedRecords increments, resultHandler.completeExceptionally is called). That is data loss, not at-least-once.
  • ASYNC_OPERATIONS=true on the insert means CH acks before data is durable — client ack cannot be taken as "landed".

Fixing these in-place would require reshaping the async framework. A cleaner option is a second sink that uses Flink's native 2PC path.


Design

The sink follows the same deterministic-block-reconstruction principle used by eBay's Block Aggregator and ClickHouse's ClickLoad script, but plugged into Flink's 2PC protocol rather than into Kafka consumer state or a staging table.

Flow

write(element)          → serialize, append to in-memory buffer. No CH I/O.
prepareCommit()         → chunk buffer into committables; compute deterministic token
                          per chunk. Buffer drains; records become Flink checkpoint state.
  ↓ (Flink persists committables to checkpoint storage; checkpoint barrier completes)
commit(requests)        → HTTP POST each committable with insert_deduplication_token set.
                          Retryable failures → retryLater(); permanent → signalFailedWithKnownReason.

Token formula

token = SHA-256(
    "ch-flink-tpc:" || subtaskId || ":" || checkpointId || ":" || seqInCheckpoint || ":" ||
    payloadBytes
)

Position binding (subtask / ckpt / seq) + content binding (payload hash). Deterministic across restarts: same bytes always produce the same token. If Flink re-parallelizes on recovery and a different subtask sees the same records, the token still matches because the payload is identical. ClickHouse drops the retry at the ReplicatedMergeTree level.

Prior art

  • eBay Block Aggregator — "deterministic block reconstruction + ClickHouse block-hash dedup" in C++ against Kafka; state stored as offsets-and-metadata in __consumer_offsets. We use the same principle but store committables in Flink checkpoint state instead.
  • ClickHouse ClickLoad script — uses a staging table plus ALTER TABLE ... MOVE PARTITION for atomic EO bulk loads. Alternative path; stronger guarantees than dedup tokens but heavier. Noted as a possible future mode in Javadoc.
  • clickhouse-kafka-connect — same deterministic-reconstruction pattern with state stored in a KeeperMap table; Java-side state machine in Processing.java.
  • Flink EO Blog (Nowojski, 2018) — the 2PC framework reference used as the model.

The CH Supercharge Part 3 blog explicitly warns that pure block-hash dedup is fragile under thread-level non-determinism and window overflow. The content-bound token addresses both.


Changes

New package sink/tpc/ in both 2.0 and 1.17 modules

File Role
ClickHouseSink.java Implements Sink + SupportsCommitter (2.0) / TwoPhaseCommittingSink (1.17).
ClickHouseCommittable.java Committable POJO: {payload, tableName, format, deduplicationToken, recordCount}.
ClickHouseCommittableSerializer.java V1 format with backward-compat shim. Checkpointed committables survive task failure.
ClickHouseCommittingWriter.java Buffers records between checkpoints; prepareCommit chunks buffer and stamps each chunk with a content-bound token.
ClickHouseCommitter.java Blocking HTTP insert per committable with insert_deduplication_token; retry classification via CH numeric error codes; client recycling after sustained connection failures; commit latency / dedup-anomaly metrics.

ClickHouseClientConfig (2.0 module)

Adds public synchronized void resetClient() so the committer can recycle the cached HTTP client after N consecutive connection failures, mirroring the fresh-client-per-attempt practice in BlockSupportedBufferFlushTask.


Operational features

Feature Implementation
Retry classification CH numeric codes — 76, 209, 210, 241, 246, 252, 319, 999 retryable; 60, 62, 81, 497 permanent. Falls back to message heuristics for unknown codes.
Commit timeout future.get(commitTimeoutMs, MILLISECONDS) — default 120 s — prevents stuck-commit deadlock.
Client recycling ClickHouseClientConfig.resetClient() triggered after 2 consecutive connection-level failures.
Metrics commitsSucceeded, commitsRetried, commitsFailedPermanent, recordsCommitted, bytesCommitted, writtenRowMismatches, commitLatencyMs. writtenRowMismatches > 0 is the ARV-analog alarm.
No ASYNC_OPERATIONS Explicitly sets async_operations=false at insert time so client ack means durable write.

Usage

ClickHouseClientConfig cfg = new ClickHouseClientConfig(url, user, pass, db, "events");

ClickHouseSink<Event> sink = new ClickHouseSink<>(
    elementConverter,
    cfg,
    ClickHouseFormat.RowBinary,
    /* maxBatchSize */        10_000,
    /* maxBatchSizeInBytes */ 64L * 1024 * 1024
);

env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

stream.sinkTo(sink);

Required ClickHouse config

ALTER TABLE events_replicated ON CLUSTER <cluster>
MODIFY SETTING
    replicated_deduplication_window         = 10000,
    replicated_deduplication_window_seconds = 86400;

The default window of 100 blocks is too small for extended retry scenarios.

Required Flink topology

Source → sink must be a forward connection (same parallelism, no rebalance / keyBy) to preserve source-replay determinism. The Javadoc spells this out; sources like Kafka with fixed parallelism and file sources satisfy it.


Tests

Unit tests (no Docker, run on CI)

  • ClickHouseCommittableSerializerTest — 5 tests: V1 round-trip with normal / empty / 10 MB payloads, version rejection, version stability.
  • ClickHouseTokenDeterminismTest — 6 tests: same inputs → same token; different (subtask | checkpoint | sequence | payload) → different token; 1-bit payload flip produces different token.

All 11 pass locally:

BUILD SUCCESSFUL
ClickHouseCommittableSerializerTest > roundTrip_handlesEmptyPayload()         PASSED
ClickHouseCommittableSerializerTest > deserialize_unsupportedVersionThrows()  PASSED
ClickHouseCommittableSerializerTest > version_isPositive()                    PASSED
ClickHouseCommittableSerializerTest > roundTrip_handlesLargePayload()         PASSED
ClickHouseCommittableSerializerTest > roundTrip_preservesAllFields()          PASSED
ClickHouseTokenDeterminismTest      > sameInputsProduceSameToken()            PASSED
ClickHouseTokenDeterminismTest      > differentSequenceChangesToken()         PASSED
ClickHouseTokenDeterminismTest      > differentPayloadChangesToken()          PASSED
ClickHouseTokenDeterminismTest      > differentSubtaskChangesToken()          PASSED
ClickHouseTokenDeterminismTest      > differentCheckpointChangesToken()       PASSED
ClickHouseTokenDeterminismTest      > oneBitDiffProducesDifferentToken()      PASSED

Integration tests (requires Docker)

ClickHouseSinkTpcTests:

  • exactlyOnceHappyPath_allRowsCommitted — bounded Flink job with EXACTLY_ONCE checkpointing; verifies row count matches input.
  • clickHouseTokenDedup_sameTokenCollapsesDuplicate — two inserts with same token against CH testcontainer; verifies the second is dropped.
  • committableRoundTrip_isLossless — serializer sanity in integration context.

Deliberate omissions vs. eBay Block Aggregator

Documented in ClickHouseSink Javadoc. Summary:

  • Cross-replica fencing (metadata-compare) — not applicable; Flink guarantees single-writer per committable.
  • LocalLoaderLock / DistributedLoaderLock (ZooKeeper) — not needed; Flink's checkpoint alignment + 2PC give equivalent exclusivity.
  • Reference offset + gap detection — Flink source offsets + checkpoint IDs cover this invariant.
  • Quorum pre-check (system.zookeeper probe when insert_quorum is set) — revisit if quorum errors dominate in practice.
  • ZooKeeper heartbeat before retry — Flink's own health checks make this redundant.
  • TableSchemaUpdateTracker (schema evolution) — current scaffold requires a job restart on schema change. Explicitly out of scope.

Limitations / known gaps

  • Non-deterministic sources (event-time windows with late data, random shuffles) break EO — documented in Javadoc as a hard requirement.
  • Parallelism change via savepoint has not been chaos-tested yet.
  • No StatefulSink for writer state; buffer is reconstructed from source replay on restart. Trade-off: simpler state at the cost of a re-run of uncommitted records (which is safe because committables were never ack'd).
  • Commit-time client recycling covers HTTP socket issues; it does not work around deeper CH server partitions — those produce permanent failures that rightfully fail the job.

Compile verification

Both modules compile against their respective Flink versions:

> Task :flink-connector-clickhouse-1.17:compileJava
> Task :flink-connector-clickhouse-2.0.0:compileJava
BUILD SUCCESSFUL

Notes for reviewers

  • ClickHouseClientConfig#resetClient() is a small public API addition to the 2.0 module. Minimal blast radius (no existing callers); would also make sense in the 1.17 module if you want parity — happy to mirror it.
  • Token formula is duplicated in ClickHouseTokenDeterminismTest on purpose so any drift in the writer produces a test failure.
  • writtenRowMismatches counter is the production alert signal — non-zero means something upstream is double-producing or the dedup window has overflowed. Worth a dashboard.
  • Javadoc on ClickHouseSink explicitly lists the prior art (eBay Block Aggregator, ClickLoad, clickhouse-kafka-connect) and source-determinism requirement so downstream users know what they are signing up for.

@gaurav7261

Copy link
Copy Markdown
Author

@mzitnik @kurnoolsaketh please review

@mzitnik

mzitnik commented Apr 30, 2026

Copy link
Copy Markdown
Contributor

@gaurav7261
Thanks for your contribution

@chernser chernser requested a review from mzitnik May 13, 2026 20:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants