Skip to content

Commit 35d286b

Browse files
belieferHyukjinKwon
authored andcommitted
[SPARK-31228][DSTREAMS] Add version information to the configuration of Kafka
### What changes were proposed in this pull request? Add version information to the configuration of Kafka. I sorted out some information show below. Item name | Since version | JIRA ID | Commit ID | Note -- | -- | -- | -- | -- spark.streaming.kafka.consumer.cache.enabled | 2.2.1 | SPARK-19185 | 02cf178#diff-c465bbcc83b2ecc7530d1c0128e4432b |   spark.streaming.kafka.consumer.poll.ms | 2.0.1 | SPARK-12177 | 3134f11#diff-4597d93a0e951f7199697dba7dd0dc32 |   spark.streaming.kafka.consumer.cache.initialCapacity | 2.0.1 | SPARK-12177 | 3134f11#diff-4597d93a0e951f7199697dba7dd0dc32 |   spark.streaming.kafka.consumer.cache.maxCapacity | 2.0.1 | SPARK-12177 | 3134f11#diff-4597d93a0e951f7199697dba7dd0dc32 |   spark.streaming.kafka.consumer.cache.loadFactor | 2.0.1 | SPARK-12177 | 3134f11#diff-4597d93a0e951f7199697dba7dd0dc32 |   spark.streaming.kafka.maxRatePerPartition | 1.3.0 | SPARK-4964 | a119cae#diff-26cb4369f86050dc2e75cd16291b2844 |   spark.streaming.kafka.minRatePerPartition | 2.4.0 | SPARK-25233 | 135ff16#diff-815f6ec5caf9e4beb355f5f981171f1f |   spark.streaming.kafka.allowNonConsecutiveOffsets | 2.3.1 | SPARK-24067 | 1d598b7#diff-4597d93a0e951f7199697dba7dd0dc32 |   spark.kafka.producer.cache.timeout | 2.2.1 | SPARK-19968 | f6730a7#diff-ac8844e8d791a75aaee3d0d10bfc1f2a |   spark.kafka.producer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-21869 | 7bff2db#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.cache.capacity | 3.0.0 | SPARK-27687 | efa3035#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.cache.jmx.enable | 3.0.0 | SPARK-25151 | 594c9c5#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.fetchedData.cache.timeout | 3.0.0 | SPARK-25151 | 594c9c5#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 3.0.0 | SPARK-25151 | 594c9c5#diff-ea8349d528fe8d1b0a8ffa2840ff4bcd |   spark.kafka.clusters.${cluster}.auth.bootstrap.servers | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.security.protocol | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.sasl.kerberos.service.name | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.ssl.truststore.location | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.ssl.truststore.password | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.ssl.keystore.location | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.ssl.keystore.password | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.ssl.key.password | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   spark.kafka.clusters.${cluster}.sasl.token.mechanism | 3.0.0 | SPARK-27294 | 2f55809#diff-7df71bd47f5a3428ebdb05ced3c31f49 |   ### Why are the changes needed? Supplemental configuration version information. ### Does this PR introduce any user-facing change? 'No'. ### How was this patch tested? Exists UT Closes apache#27989 from beliefer/add-version-to-kafka-config. Authored-by: beliefer <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 8b798c1 commit 35d286b

File tree

4 files changed

+54
-18
lines changed

4 files changed

+54
-18
lines changed

docs/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2661,6 +2661,7 @@ Spark subsystems.
26612661
<a href="streaming-kafka-0-10-integration.html">Kafka Integration guide</a>
26622662
for more details.
26632663
</td>
2664+
<td>1.3.0</td>
26642665
</tr>
26652666
<tr>
26662667
<td><code>spark.streaming.kafka.minRatePerPartition</code></td>
@@ -2669,6 +2670,7 @@ Spark subsystems.
26692670
Minimum rate (number of records per second) at which data will be read from each Kafka
26702671
partition when using the new Kafka direct stream API.
26712672
</td>
2673+
<td>2.4.0</td>
26722674
</tr>
26732675
<tr>
26742676
<td><code>spark.streaming.ui.retainedBatches</code></td>

docs/structured-streaming-kafka-integration.md

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,28 +525,32 @@ The caching key is built up from the following information:
525525
The following properties are available to configure the consumer pool:
526526

527527
<table class="table">
528-
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
528+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
529529
<tr>
530530
<td>spark.kafka.consumer.cache.capacity</td>
531531
<td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
532532
<td>64</td>
533+
<td>3.0.0</td>
533534
</tr>
534535
<tr>
535536
<td>spark.kafka.consumer.cache.timeout</td>
536537
<td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
537538
<td>5m (5 minutes)</td>
539+
<td>3.0.0</td>
538540
</tr>
539541
<tr>
540542
<td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
541543
<td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
542544
<td>1m (1 minute)</td>
545+
<td>3.0.0</td>
543546
</tr>
544547
<tr>
545548
<td>spark.kafka.consumer.cache.jmx.enable</td>
546549
<td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
547550
The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
548551
</td>
549552
<td>false</td>
553+
<td>3.0.0</td>
550554
</tr>
551555
</table>
552556

@@ -571,16 +575,18 @@ Note that it doesn't leverage Apache Commons Pool due to the difference of chara
571575
The following properties are available to configure the fetched data pool:
572576

573577
<table class="table">
574-
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
578+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
575579
<tr>
576580
<td>spark.kafka.consumer.fetchedData.cache.timeout</td>
577581
<td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
578582
<td>5m (5 minutes)</td>
583+
<td>3.0.0</td>
579584
</tr>
580585
<tr>
581586
<td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
582587
<td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
583588
<td>1m (1 minute)</td>
589+
<td>3.0.0</td>
584590
</tr>
585591
</table>
586592

@@ -816,16 +822,18 @@ It will use different Kafka producer when delegation token is renewed; Kafka pro
816822
The following properties are available to configure the producer pool:
817823

818824
<table class="table">
819-
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
825+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
820826
<tr>
821827
<td>spark.kafka.producer.cache.timeout</td>
822828
<td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
823829
<td>10m (10 minutes)</td>
830+
<td>2.2.1</td>
824831
</tr>
825832
<tr>
826833
<td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
827834
<td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
828835
<td>1m (1 minute)</td>
836+
<td>3.0.0</td>
829837
</tr>
830838
</table>
831839

@@ -935,14 +943,15 @@ When none of the above applies then unsecure connection assumed.
935943
Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations.
936944

937945
<table class="table">
938-
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
946+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
939947
<tr>
940948
<td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
941949
<td>None</td>
942950
<td>
943951
A list of coma separated host/port pairs to use for establishing the initial connection
944952
to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token.
945953
</td>
954+
<td>3.0.0</td>
946955
</tr>
947956
<tr>
948957
<td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
@@ -953,6 +962,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
953962
If multiple clusters match the address, an exception will be thrown and the query won't be started.
954963
Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression.
955964
</td>
965+
<td>3.0.0</td>
956966
</tr>
957967
<tr>
958968
<td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
@@ -962,6 +972,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
962972
<code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
963973
and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
964974
</td>
975+
<td>3.0.0</td>
965976
</tr>
966977
<tr>
967978
<td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
@@ -970,13 +981,15 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
970981
The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.
971982
For further details please see Kafka documentation. Only used to obtain delegation token.
972983
</td>
984+
<td>3.0.0</td>
973985
</tr>
974986
<tr>
975987
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
976988
<td>None</td>
977989
<td>
978990
The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
979991
</td>
992+
<td>3.0.0</td>
980993
</tr>
981994
<tr>
982995
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
@@ -985,6 +998,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
985998
The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured.
986999
For further details please see Kafka documentation. Only used to obtain delegation token.
9871000
</td>
1001+
<td>3.0.0</td>
9881002
</tr>
9891003
<tr>
9901004
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
@@ -993,6 +1007,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
9931007
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
9941008
For further details please see Kafka documentation. Only used to obtain delegation token.
9951009
</td>
1010+
<td>3.0.0</td>
9961011
</tr>
9971012
<tr>
9981013
<td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
@@ -1001,6 +1016,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
10011016
The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured.
10021017
For further details please see Kafka documentation. Only used to obtain delegation token.
10031018
</td>
1019+
<td>3.0.0</td>
10041020
</tr>
10051021
<tr>
10061022
<td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
@@ -1009,6 +1025,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
10091025
The password of the private key in the key store file. This is optional for client.
10101026
For further details please see Kafka documentation. Only used to obtain delegation token.
10111027
</td>
1028+
<td>3.0.0</td>
10121029
</tr>
10131030
<tr>
10141031
<td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
@@ -1017,6 +1034,7 @@ Delegation tokens can be obtained from multiple clusters and <code>${cluster}</c
10171034
SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here.
10181035
For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token.
10191036
</td>
1037+
<td>3.0.0</td>
10201038
</tr>
10211039
</table>
10221040

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,30 @@ package object kafka010 { // scalastyle:ignore
2929
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
3030
ConfigBuilder("spark.kafka.producer.cache.timeout")
3131
.doc("The expire time to remove the unused producers.")
32+
.version("2.2.1")
3233
.timeConf(TimeUnit.MILLISECONDS)
3334
.createWithDefaultString("10m")
3435

3536
private[kafka010] val PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
3637
ConfigBuilder("spark.kafka.producer.cache.evictorThreadRunInterval")
3738
.doc("The interval of time between runs of the idle evictor thread for producer pool. " +
3839
"When non-positive, no idle evictor thread will be run.")
40+
.version("3.0.0")
3941
.timeConf(TimeUnit.MILLISECONDS)
4042
.createWithDefaultString("1m")
4143

4244
private[kafka010] val CONSUMER_CACHE_CAPACITY =
4345
ConfigBuilder("spark.kafka.consumer.cache.capacity")
4446
.doc("The maximum number of consumers cached. Please note it's a soft limit" +
4547
" (check Structured Streaming Kafka integration guide for further details).")
48+
.version("3.0.0")
4649
.intConf
4750
.createWithDefault(64)
4851

4952
private[kafka010] val CONSUMER_CACHE_JMX_ENABLED =
5053
ConfigBuilder("spark.kafka.consumer.cache.jmx.enable")
5154
.doc("Enable or disable JMX for pools created with this configuration instance.")
55+
.version("3.0.0")
5256
.booleanConf
5357
.createWithDefault(false)
5458

@@ -57,13 +61,15 @@ package object kafka010 { // scalastyle:ignore
5761
.doc("The minimum amount of time a consumer may sit idle in the pool before " +
5862
"it is eligible for eviction by the evictor. " +
5963
"When non-positive, no consumers will be evicted from the pool due to idle time alone.")
64+
.version("3.0.0")
6065
.timeConf(TimeUnit.MILLISECONDS)
6166
.createWithDefaultString("5m")
6267

6368
private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
6469
ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
6570
.doc("The interval of time between runs of the idle evictor thread for consumer pool. " +
6671
"When non-positive, no idle evictor thread will be run.")
72+
.version("3.0.0")
6773
.timeConf(TimeUnit.MILLISECONDS)
6874
.createWithDefaultString("1m")
6975

@@ -72,13 +78,15 @@ package object kafka010 { // scalastyle:ignore
7278
.doc("The minimum amount of time a fetched data may sit idle in the pool before " +
7379
"it is eligible for eviction by the evictor. " +
7480
"When non-positive, no fetched data will be evicted from the pool due to idle time alone.")
81+
.version("3.0.0")
7582
.timeConf(TimeUnit.MILLISECONDS)
7683
.createWithDefaultString("5m")
7784

7885
private[kafka010] val FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
7986
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval")
8087
.doc("The interval of time between runs of the idle evictor thread for fetched data pool. " +
8188
"When non-positive, no idle evictor thread will be run.")
89+
.version("3.0.0")
8290
.timeConf(TimeUnit.MILLISECONDS)
8391
.createWithDefaultString("1m")
8492
}

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,43 +26,51 @@ package object kafka010 { //scalastyle:ignore
2626

2727
private[spark] val CONSUMER_CACHE_ENABLED =
2828
ConfigBuilder("spark.streaming.kafka.consumer.cache.enabled")
29+
.version("2.2.1")
2930
.booleanConf
3031
.createWithDefault(true)
3132

3233
private[spark] val CONSUMER_POLL_MS =
3334
ConfigBuilder("spark.streaming.kafka.consumer.poll.ms")
34-
.longConf
35-
.createOptional
35+
.version("2.0.1")
36+
.longConf
37+
.createOptional
3638

3739
private[spark] val CONSUMER_CACHE_INITIAL_CAPACITY =
3840
ConfigBuilder("spark.streaming.kafka.consumer.cache.initialCapacity")
39-
.intConf
40-
.createWithDefault(16)
41+
.version("2.0.1")
42+
.intConf
43+
.createWithDefault(16)
4144

4245
private[spark] val CONSUMER_CACHE_MAX_CAPACITY =
4346
ConfigBuilder("spark.streaming.kafka.consumer.cache.maxCapacity")
44-
.intConf
45-
.createWithDefault(64)
47+
.version("2.0.1")
48+
.intConf
49+
.createWithDefault(64)
4650

4751
private[spark] val CONSUMER_CACHE_LOAD_FACTOR =
4852
ConfigBuilder("spark.streaming.kafka.consumer.cache.loadFactor")
49-
.doubleConf
50-
.createWithDefault(0.75)
53+
.version("2.0.1")
54+
.doubleConf
55+
.createWithDefault(0.75)
5156

5257
private[spark] val MAX_RATE_PER_PARTITION =
5358
ConfigBuilder("spark.streaming.kafka.maxRatePerPartition")
54-
.longConf
55-
.createWithDefault(0)
59+
.version("1.3.0")
60+
.longConf
61+
.createWithDefault(0)
5662

5763
private[spark] val MIN_RATE_PER_PARTITION =
5864
ConfigBuilder("spark.streaming.kafka.minRatePerPartition")
59-
.longConf
60-
.createWithDefault(1)
65+
.version("2.4.0")
66+
.longConf
67+
.createWithDefault(1)
6168

6269
private[spark] val ALLOW_NON_CONSECUTIVE_OFFSETS =
6370
ConfigBuilder("spark.streaming.kafka.allowNonConsecutiveOffsets")
64-
.booleanConf
65-
.createWithDefault(false)
71+
.version("2.3.1")
72+
.booleanConf
73+
.createWithDefault(false)
6674

6775
}
6876

0 commit comments

Comments
 (0)