Skip to content

Commit 702b416

Browse files
committed
HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-38153
1 parent e44cf34 commit 702b416

File tree

8 files changed

+55
-26
lines changed

8 files changed

+55
-26
lines changed

itests/qtest-druid/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>2.5.0</kafka.test.version>
39+
<kafka.test.version>3.9.1</kafka.test.version>
4040
<druid.guice.version>4.1.0</druid.guice.version>
4141
<slf4j.version>1.7.30</slf4j.version>
4242
</properties>
@@ -226,6 +226,11 @@
226226
<artifactId>kafka-clients</artifactId>
227227
<version>${kafka.test.version}</version>
228228
</dependency>
229+
<dependency>
230+
<groupId>org.apache.kafka</groupId>
231+
<artifactId>kafka-server</artifactId>
232+
<version>${kafka.test.version}</version>
233+
</dependency>
229234
<dependency>
230235
<groupId>org.slf4j</groupId>
231236
<artifactId>slf4j-api</artifactId>

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hive.kafka;
2020

2121
import kafka.server.KafkaConfig;
22-
import kafka.server.KafkaServerStartable;
22+
import kafka.server.KafkaServer;
2323

2424
import org.apache.commons.io.FileUtils;
2525
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.producer.ProducerRecord;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.kafka.common.utils.Time;
3233

3334
import com.google.common.base.Throwables;
3435
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Properties;
4546
import java.util.stream.IntStream;
47+
import scala.Option;
4648

4749
/**
4850
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
5456
private static final String LOCALHOST = "localhost";
5557

5658

57-
private final KafkaServerStartable serverStartable;
59+
private final KafkaServer server;
5860
private final int brokerPort;
5961
private final String kafkaServer;
6062

@@ -94,21 +96,21 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
9496
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
9597
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
9698

97-
this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
99+
this.server = new KafkaServer(KafkaConfig.fromProps(properties),Time.SYSTEM, Option.empty(), false);
98100
}
99101

100102

101103
@Override
102104
protected void serviceStart() throws Exception {
103-
serverStartable.startup();
105+
server.startup();
104106
log.info("Kafka Server Started on port {}", brokerPort);
105107

106108
}
107109

108110
@Override
109111
protected void serviceStop() throws Exception {
110112
log.info("Stopping Kafka Server");
111-
serverStartable.shutdown();
113+
server.shutdown();
112114
log.info("Kafka Server Stopped");
113115
}
114116

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import org.apache.kafka.clients.producer.ProducerConfig;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
2929
import org.apache.kafka.clients.producer.RecordMetadata;
30+
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
3031
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
3132
import org.apache.kafka.common.Metric;
3233
import org.apache.kafka.common.MetricName;
3334
import org.apache.kafka.common.PartitionInfo;
3435
import org.apache.kafka.common.TopicPartition;
36+
import org.apache.kafka.common.errors.InterruptException;
3537
import org.apache.kafka.common.errors.ProducerFencedException;
38+
import org.apache.kafka.common.Uuid;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841

@@ -67,6 +70,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
6770
kafkaProducer = new KafkaProducer<>(properties);
6871
}
6972

73+
@Override
74+
public Uuid clientInstanceId(Duration timeout) {
75+
throw new UnsupportedOperationException();
76+
}
77+
7078
@Override public void initTransactions() {
7179
kafkaProducer.initTransactions();
7280
}
@@ -138,11 +146,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
138146

139147
Object transactionManager = getValue(kafkaProducer, "transactionManager");
140148

141-
Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
149+
Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
142150
invoke(transactionManager,
143151
"transitionTo",
144152
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
145-
invoke(topicPartitionBookkeeper, "reset");
153+
invoke(txnPartitionMap, "reset");
146154
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
147155
setValue(producerIdAndEpoch, "producerId", producerId);
148156
setValue(producerIdAndEpoch, "epoch", epoch);
@@ -183,8 +191,14 @@ private void flushNewPartitions() {
183191
LOG.info("Flushing new partitions");
184192
TransactionalRequestResult result = enqueueNewPartitions();
185193
Object sender = getValue(kafkaProducer, "sender");
194+
RecordAccumulator accumulator = (RecordAccumulator) getValue(kafkaProducer, "accumulator");
195+
accumulator.beginFlush();
186196
invoke(sender, "wakeup");
187-
result.await();
197+
try {
198+
accumulator.awaitFlushCompletion();
199+
} catch (InterruptedException e) {
200+
throw new InterruptException("Flush interrupted.", e);
201+
}
188202
}
189203

190204
private synchronized TransactionalRequestResult enqueueNewPartitions() {
@@ -194,9 +208,10 @@ private synchronized TransactionalRequestResult enqueueNewPartitions() {
194208
"enqueueRequest",
195209
new Class[] {txnRequestHandler.getClass().getSuperclass()},
196210
new Object[] {txnRequestHandler});
197-
return (TransactionalRequestResult) getValue(txnRequestHandler,
211+
TransactionalRequestResult result= (TransactionalRequestResult) getValue(txnRequestHandler,
198212
txnRequestHandler.getClass().getSuperclass(),
199213
"result");
214+
return result;
200215
}
201216

202217
@SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
140140
}
141141
} else {
142142
// case seek to beginning of stream
143-
consumer.seekToBeginning(Collections.singleton(topicPartition));
143+
consumer.seekToBeginning(topicPartitionList);
144144
// seekToBeginning is lazy thus need to call position() or poll(0)
145145
this.startOffset = consumer.position(topicPartition);
146146
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",

kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
package org.apache.hadoop.hive.kafka;
2020

2121
import com.google.common.collect.ImmutableMap;
22-
import org.apache.kafka.clients.consumer.ConsumerConfig;
23-
import org.apache.kafka.clients.consumer.ConsumerRecord;
24-
import org.apache.kafka.clients.consumer.ConsumerRecords;
25-
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.clients.consumer.*;
2623
import org.apache.kafka.clients.producer.ProducerConfig;
2724
import org.apache.kafka.clients.producer.ProducerRecord;
2825
import org.apache.kafka.common.TopicPartition;
@@ -158,7 +155,9 @@
158155
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
159156
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
160157
secondProducer.resumeTransaction(3434L, (short) 12);
161-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
158+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
159+
new TopicPartition("dummy_topic", 0),
160+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
162161
secondProducer.close(Duration.ZERO);
163162
}
164163

@@ -169,7 +168,9 @@
169168
producer.close(Duration.ZERO);
170169
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
171170
secondProducer.resumeTransaction(pid, (short) 12);
172-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
171+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
172+
new TopicPartition("dummy_topic", 0),
173+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
173174
secondProducer.close(Duration.ZERO);
174175
}
175176

@@ -180,7 +181,9 @@
180181
producer.close(Duration.ZERO);
181182
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
182183
secondProducer.resumeTransaction(45L, epoch);
183-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
184+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
185+
new TopicPartition("dummy_topic", 0),
186+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
184187
secondProducer.close(Duration.ZERO);
185188
}
186189
}

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import kafka.zk.EmbeddedZookeeper;
2727
import org.apache.commons.io.FileUtils;
2828
import org.apache.hadoop.hive.common.IPStackUtils;
29-
import org.apache.kafka.common.network.Mode;
29+
import org.apache.kafka.common.network.ConnectionMode;
3030
import org.apache.kafka.common.utils.Time;
3131
import org.apache.kafka.test.TestSslUtils;
3232
import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Properties;
4343
import java.util.stream.Collectors;
44+
import scala.Option;
4445

4546
/**
4647
* Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,7 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
106107
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
107108
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
108109
truststoreFile = File.createTempFile("kafka_truststore", "jks");
109-
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
110+
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER).createNewTrustStore(truststoreFile).build());
110111
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
111112
}
112113
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +117,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
116117
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
117118
kafkaServer.startup();
118119
kafkaServer.zkClient();
119-
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
120+
adminZkClient = new AdminZkClient(kafkaServer.zkClient(),Option.empty());
120121
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
121-
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
122+
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$,false);
122123
}
123124

124125
/**

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import javax.annotation.Nullable;
4646
import java.nio.charset.Charset;
47+
import java.time.Duration;
4748
import java.util.Arrays;
4849
import java.util.Iterator;
4950
import java.util.List;
@@ -251,8 +252,10 @@ private void compareIterator(List<ConsumerRecord<byte[], byte[]>> expected,
251252

252253
private void setupConsumer() {
253254
Properties consumerProps = new Properties();
255+
consumerProps.setProperty("group.protocol", "classic");
256+
consumerProps.setProperty("group.id", "test-group-" + UUID.randomUUID());
257+
consumerProps.setProperty("auto.offset.reset","earliest");
254258
consumerProps.setProperty("enable.auto.commit", "false");
255-
consumerProps.setProperty("auto.offset.reset", "none");
256259
consumerProps.setProperty("bootstrap.servers", KafkaBrokerResource.BROKER_IP_PORT);
257260
conf.set("kafka.bootstrap.servers", KafkaBrokerResource.BROKER_IP_PORT);
258261
conf.set(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(),
@@ -301,10 +304,10 @@ private static void sendData(List<ConsumerRecord<byte[], byte[]>> recordList, @N
301304
producer.close();
302305
}
303306

304-
@After public void tearDown() {
307+
@After public void tearDown() throws InterruptedException {
305308
this.kafkaRecordIterator = null;
306309
if (this.consumer != null) {
307-
this.consumer.close();
310+
this.consumer.close(Duration.ZERO);
308311
}
309312
}
310313

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<junit.version>4.13.2</junit.version>
173173
<junit.jupiter.version>5.13.3</junit.jupiter.version>
174174
<junit.vintage.version>5.13.3</junit.vintage.version>
175-
<kafka.version>2.5.0</kafka.version>
175+
<kafka.version>3.9.1</kafka.version>
176176
<kryo.version>5.5.0</kryo.version>
177177
<reflectasm.version>1.11.9</reflectasm.version>
178178
<kudu.version>1.17.0</kudu.version>

0 commit comments

Comments
 (0)