Skip to content

Commit 860aabd

Browse files
committed
major update - at most once semantics
1 parent e8f8a96 commit 860aabd

27 files changed

+3188
-2435
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaShareGroupSource.java

Lines changed: 162 additions & 141 deletions
Large diffs are not rendered by default.

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaShareGroupSourceBuilder.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,34 @@
1818

1919
package org.apache.flink.connector.kafka.source;
2020

21-
import java.util.Arrays;
22-
import java.util.List;
23-
import java.util.Properties;
24-
import java.util.Random;
25-
import java.util.Set;
26-
import java.util.regex.Pattern;
27-
2821
import org.apache.flink.annotation.PublicEvolving;
2922
import org.apache.flink.api.common.serialization.DeserializationSchema;
3023
import org.apache.flink.api.connector.source.Boundedness;
3124
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
3225
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
3326
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
3427
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
35-
import static org.apache.flink.util.Preconditions.checkNotNull;
36-
import static org.apache.flink.util.Preconditions.checkState;
3728
import org.apache.flink.util.function.SerializableSupplier;
29+
3830
import org.apache.kafka.clients.consumer.ConsumerConfig;
3931
import org.apache.kafka.common.TopicPartition;
4032
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
4133
import org.slf4j.Logger;
4234
import org.slf4j.LoggerFactory;
4335

36+
import java.util.Arrays;
37+
import java.util.List;
38+
import java.util.Properties;
39+
import java.util.Random;
40+
import java.util.Set;
41+
import java.util.regex.Pattern;
42+
43+
import static org.apache.flink.util.Preconditions.checkNotNull;
44+
import static org.apache.flink.util.Preconditions.checkState;
45+
4446
/**
45-
* The builder class for {@link KafkaShareGroupSource} to make it easier for users to construct
46-
* a share group-based Kafka source.
47+
* The builder class for {@link KafkaShareGroupSource} to make it easier for users to construct a
48+
* share group-based Kafka source.
4749
*
4850
* <p>The following example shows the minimum setup to create a KafkaShareGroupSource that reads
4951
* String values from Kafka topics using share group semantics:
@@ -58,8 +60,8 @@
5860
* .build();
5961
* }</pre>
6062
*
61-
* <p>The bootstrap servers, topics, share group ID, and deserializer are required fields.
62-
* This source requires Kafka 4.1.0+ with share group support enabled.
63+
* <p>The bootstrap servers, topics, share group ID, and deserializer are required fields. This
64+
* source requires Kafka 4.1.0+ with share group support enabled.
6365
*
6466
* @param <OUT> the output type of the source
6567
*/
@@ -104,8 +106,9 @@ public KafkaShareGroupSourceBuilder<OUT> setBootstrapServers(String bootstrapSer
104106
}
105107

106108
/**
107-
* Sets the share group ID for share group semantics. This is required for share group-based consumption.
108-
* The share group ID is used to coordinate message distribution across multiple consumers.
109+
* Sets the share group ID for share group semantics. This is required for share group-based
110+
* consumption. The share group ID is used to coordinate message distribution across multiple
111+
* consumers.
109112
*
110113
* @param shareGroupId the share group ID
111114
* @return this KafkaShareGroupSourceBuilder
@@ -179,7 +182,8 @@ public KafkaShareGroupSourceBuilder<OUT> setStartingOffsets(
179182
* @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify stopping offsets
180183
* @return this KafkaShareGroupSourceBuilder
181184
*/
182-
public KafkaShareGroupSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
185+
public KafkaShareGroupSourceBuilder<OUT> setBounded(
186+
OffsetsInitializer stoppingOffsetsInitializer) {
183187
this.boundedness = Boundedness.BOUNDED;
184188
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
185189
return this;
@@ -191,7 +195,8 @@ public KafkaShareGroupSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingO
191195
* @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify stopping offsets
192196
* @return this KafkaShareGroupSourceBuilder
193197
*/
194-
public KafkaShareGroupSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) {
198+
public KafkaShareGroupSourceBuilder<OUT> setUnbounded(
199+
OffsetsInitializer stoppingOffsetsInitializer) {
195200
this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
196201
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
197202
return this;
@@ -238,7 +243,7 @@ public KafkaShareGroupSourceBuilder<OUT> setClientIdPrefix(String prefix) {
238243
* @param enabled whether to enable share group metrics
239244
* @return this KafkaShareGroupSourceBuilder
240245
*/
241-
public KafkaShareGroupSourceBuilder<OUT> enableShareGroupMetrics(boolean enabled) {
246+
public KafkaShareGroupSourceBuilder<OUT> enableShareGroupMetrics(boolean enabled) {
242247
this.shareGroupMetricsEnabled = enabled;
243248
return this;
244249
}
@@ -276,7 +281,7 @@ public KafkaShareGroupSourceBuilder<OUT> setProperties(Properties props) {
276281
public KafkaShareGroupSource<OUT> build() {
277282
sanityCheck();
278283
parseAndSetRequiredProperties();
279-
284+
280285
return new KafkaShareGroupSource<>(
281286
subscriber,
282287
startingOffsetsInitializer,
@@ -286,8 +291,7 @@ public KafkaShareGroupSource<OUT> build() {
286291
props,
287292
rackIdSupplier,
288293
shareGroupId,
289-
shareGroupMetricsEnabled
290-
);
294+
shareGroupMetricsEnabled);
291295
}
292296

293297
// Private helper methods
@@ -316,7 +320,7 @@ private void parseAndSetRequiredProperties() {
316320
maybeOverride("group.type", "share", true); // Force share group type
317321
maybeOverride("group.id", shareGroupId, true); // Use share group ID as group ID
318322
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", true);
319-
323+
320324
// Set auto offset reset strategy
321325
maybeOverride(
322326
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -337,7 +341,9 @@ private boolean maybeOverride(String key, String value, boolean override) {
337341
if (override) {
338342
LOG.warn(
339343
"Property {} is provided but will be overridden from {} to {} for share group semantics",
340-
key, userValue, value);
344+
key,
345+
userValue,
346+
value);
341347
props.setProperty(key, value);
342348
overridden = true;
343349
}
@@ -359,11 +365,9 @@ private void sanityCheck() {
359365
checkState(
360366
subscriber != null,
361367
"No topics specified. Use setTopics(), setTopicPattern(), or setPartitions().");
362-
363-
checkNotNull(
364-
deserializationSchema,
365-
"Deserialization schema is required but not provided.");
366-
368+
369+
checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
370+
367371
checkState(
368372
shareGroupId != null && !shareGroupId.trim().isEmpty(),
369373
"Share group ID is required for share group semantics");
@@ -377,4 +381,4 @@ private void validateShareGroupProperties(Properties props) {
377381
"group.type must be 'share' for share group semantics, but was: " + groupType);
378382
}
379383
}
380-
}
384+
}

0 commit comments

Comments
 (0)