Skip to content

Commit 5441f5e

Browse files
authored
KAFKA-19616 Add compression type and level support to LogCompactionTester (#20396)
issue: [KAFKA-19616](https://issues.apache.org/jira/browse/KAFKA-19616) **why**: validate log compaction works correctly with compressed data. **what**: adds compression config options to `LogCompactionTester` tool and extends test coverage to validate log compaction with different compression types and levels. Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent e389484 commit 5441f5e

File tree

3 files changed

+52
-16
lines changed

3 files changed

+52
-16
lines changed

tests/kafkatest/services/log_compaction_tester.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService):
3333
"collect_default": True}
3434
}
3535

36-
def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30, tls_version=None):
36+
def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30, tls_version=None, compression_config={}):
3737
super(LogCompactionTester, self).__init__(context, 1)
3838

3939
self.kafka = kafka
4040
self.security_protocol = security_protocol
4141
self.tls_version = tls_version
4242
self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
4343
self.stop_timeout_sec = stop_timeout_sec
44+
self.compression_config = compression_config
4445
self.log_compaction_completed = False
4546

4647
def _worker(self, idx, node):
@@ -63,7 +64,12 @@ def start_cmd(self, node):
6364
cmd += " export CLASSPATH;"
6465
cmd += self.path.script("kafka-run-class.sh", node)
6566
cmd += " %s" % self.java_class_name()
66-
cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes 10" % (self.kafka.bootstrap_servers(self.security_protocol))
67+
cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes 10" % self.kafka.bootstrap_servers(self.security_protocol)
68+
69+
if 'type' in self.compression_config:
70+
cmd += " --compression-type %s" % self.compression_config['type']
71+
if 'level' in self.compression_config:
72+
cmd += " --compression-level %s" % self.compression_config['level']
6773

6874
cmd += " 2>> %s | tee -a %s &" % (self.logs["tool_logs"]["path"], self.logs["tool_logs"]["path"])
6975
return cmd

tests/kafkatest/tests/tools/log_compaction_test.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,25 @@ def start_kafka(self, security_protocol, interbroker_security_protocol):
4949
controller_num_nodes_override=self.num_zk)
5050
self.kafka.start()
5151

52-
def start_test_log_compaction_tool(self, security_protocol):
53-
self.compaction_verifier = LogCompactionTester(self.test_context, self.kafka, security_protocol=security_protocol)
52+
def start_test_log_compaction_tool(self, security_protocol, compression_config={}):
53+
self.compaction_verifier = LogCompactionTester(self.test_context, self.kafka, security_protocol=security_protocol, compression_config=compression_config)
5454
self.compaction_verifier.start()
5555

5656
@cluster(num_nodes=4)
57-
@matrix(metadata_quorum=quorum.all_non_upgrade)
58-
def test_log_compaction(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
57+
@matrix(metadata_quorum=quorum.all_non_upgrade, compression_config=[
58+
{},
59+
{'type': 'gzip', 'level': 1},
60+
{'type': 'gzip', 'level': 9},
61+
{'type': 'snappy'},
62+
{'type': 'lz4', 'level': 1},
63+
{'type': 'lz4', 'level': 10},
64+
{'type': 'zstd', 'level': 1},
65+
{'type': 'zstd', 'level': 10}
66+
])
67+
def test_log_compaction(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, compression_config={}):
5968

6069
self.start_kafka(security_protocol, security_protocol)
61-
self.start_test_log_compaction_tool(security_protocol)
70+
self.start_test_log_compaction_tool(security_protocol, compression_config)
6271

6372
# Verify that compacted data verification completed in LogCompactionTester
64-
wait_until(lambda: self.compaction_verifier.is_done, timeout_sec=180, err_msg="Timed out waiting to complete compaction")
73+
wait_until(lambda: self.compaction_verifier.is_done, timeout_sec=240, err_msg="Timed out waiting to complete compaction")

tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.clients.producer.ProducerConfig;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
2929
import org.apache.kafka.common.config.TopicConfig;
30+
import org.apache.kafka.common.record.CompressionType;
3031
import org.apache.kafka.common.serialization.ByteArraySerializer;
3132
import org.apache.kafka.common.serialization.StringDeserializer;
3233
import org.apache.kafka.common.utils.Exit;
@@ -43,6 +44,7 @@
4344
import java.nio.file.Path;
4445
import java.time.Duration;
4546
import java.util.ArrayList;
47+
import java.util.HashMap;
4648
import java.util.Iterator;
4749
import java.util.LinkedHashSet;
4850
import java.util.List;
@@ -86,6 +88,7 @@ public class LogCompactionTester {
8688
public static class Options {
8789
public final OptionSpec<Long> numMessagesOpt;
8890
public final OptionSpec<String> messageCompressionOpt;
91+
public final OptionSpec<Integer> compressionLevelOpt;
8992
public final OptionSpec<Integer> numDupsOpt;
9093
public final OptionSpec<String> brokerOpt;
9194
public final OptionSpec<Integer> topicsOpt;
@@ -108,6 +111,12 @@ public Options(OptionParser parser) {
108111
.ofType(String.class)
109112
.defaultsTo("none");
110113

114+
compressionLevelOpt = parser
115+
.accepts("compression-level", "The compression level to use with the specified compression type.")
116+
.withOptionalArg()
117+
.describedAs("level")
118+
.ofType(Integer.class);
119+
111120
numDupsOpt = parser
112121
.accepts("duplicates", "The number of duplicates for each key.")
113122
.withRequiredArg()
@@ -240,7 +249,8 @@ public static void main(String[] args) throws Exception {
240249
CommandLineUtils.checkRequiredArgs(parser, optionSet, options.brokerOpt, options.numMessagesOpt);
241250

242251
long messages = optionSet.valueOf(options.numMessagesOpt);
243-
String compressionType = optionSet.valueOf(options.messageCompressionOpt);
252+
CompressionType compressionType = CompressionType.forName(optionSet.valueOf(options.messageCompressionOpt));
253+
Integer compressionLevel = optionSet.valueOf(options.compressionLevelOpt);
244254
int percentDeletes = optionSet.valueOf(options.percentDeletesOpt);
245255
int dups = optionSet.valueOf(options.numDupsOpt);
246256
String brokerUrl = optionSet.valueOf(options.brokerOpt);
@@ -256,7 +266,8 @@ public static void main(String[] args) throws Exception {
256266
System.out.println("Producing " + messages + " messages..to topics " + String.join(",", topics));
257267
Path producedDataFilePath = produceMessages(
258268
brokerUrl, topics, messages,
259-
compressionType, dups, percentDeletes);
269+
compressionType, compressionLevel,
270+
dups, percentDeletes);
260271
System.out.println("Sleeping for " + sleepSecs + "seconds...");
261272
TimeUnit.MILLISECONDS.sleep(sleepSecs * 1000L);
262273
System.out.println("Consuming messages...");
@@ -395,12 +406,22 @@ private static void require(boolean requirement, String message) {
395406
}
396407

397408
private static Path produceMessages(String brokerUrl, Set<String> topics, long messages,
398-
String compressionType, int dups, int percentDeletes) throws IOException {
399-
Map<String, Object> producerProps = Map.of(
400-
ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE),
401-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl,
402-
ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType
403-
);
409+
CompressionType compressionType, Integer compressionLevel,
410+
int dups, int percentDeletes) throws IOException {
411+
Map<String, Object> producerProps = new HashMap<>();
412+
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
413+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
414+
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType.name);
415+
416+
if (compressionLevel != null) {
417+
switch (compressionType) {
418+
case GZIP -> producerProps.put(ProducerConfig.COMPRESSION_GZIP_LEVEL_CONFIG, compressionLevel);
419+
case LZ4 -> producerProps.put(ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, compressionLevel);
420+
case ZSTD -> producerProps.put(ProducerConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, compressionLevel);
421+
default -> System.out.println("Warning: Compression level " + compressionLevel + " is ignored for compression type "
422+
+ compressionType.name + ". Only gzip, lz4, and zstd support compression levels.");
423+
}
424+
}
404425

405426
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
406427
producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {

0 commit comments

Comments
 (0)