From dbd3e2ebd77b201881c6b5b6266da3e69c040680 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Sun, 30 Jun 2024 21:29:02 +0800 Subject: [PATCH 01/14] Beanchmark: Stream, forEach --- .../clients/consumer/ConsumerRecords.java | 23 ++++++++ .../jmh/record/ConsumerRecordsBenchmark.java | 57 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 757c7f2cabbb7..3a36d695b8457 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * A container that holds the list {@link ConsumerRecord} per partition for a @@ -67,6 +68,28 @@ public Iterable> records(String topic) { return new ConcatenatedIterable<>(recs); } + public Iterable> recordsForEach(String topic) { + if (topic == null) + throw new IllegalArgumentException("Topic must be non-null."); + List>> recs = new ArrayList<>(); + records.forEach((key, value) -> { + if (key.topic().equals(topic)) + recs.add(value); + }); + return new ConcatenatedIterable<>(recs); + } + + public Iterable> recordsWithEntrySetAndStream(String topic) { + if (topic == null) + throw new IllegalArgumentException("Topic must be non-null."); + List>> recs = records.entrySet() + .stream() + .filter(entry -> entry.getKey().topic().equals(topic)) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return new ConcatenatedIterable<>(recs); + } + /** * Get the partitions which have records contained in this record set. * @return the set of partitions with data in this record set (may be empty if no data was returned) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java new file mode 100644 index 0000000000000..e06a97594c7dd --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -0,0 +1,57 @@ +package org.apache.kafka.jmh.record; + + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.openjdk.jmh.annotations.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ConsumerRecordsBenchmark { + private ConsumerRecords records; + + @Setup(Level.Trial) + public void setUp() { + List topics = Arrays.asList("topic1", "topic2", "topic3"); + int recordSize = 10000; + Map>> partitionToRecords = new LinkedHashMap<>(); + for (String topic : topics) { + for (int partition = 0; partition < 10; partition++) { + List> records = new ArrayList<>(recordSize); + for (int offset = 0; offset < recordSize; offset++) { + records.add( + new ConsumerRecord<>(topic, partition, offset, 0L, TimestampType.CREATE_TIME, + 0, 0, offset, String.valueOf(offset), new RecordHeaders(), Optional.empty()) + ); + } + partitionToRecords.put(new TopicPartition(topic, partition), records); + } + } + + records = new ConsumerRecords<>(partitionToRecords); + } + + @Benchmark + public void records() { + records.records("topic2"); + } + + @Benchmark + public void recordsForEach() { + records.recordsForEach("topic2"); + } + + @Benchmark + public void recordsWithEntrySetAndStream() { + records.recordsWithEntrySetAndStream("topic2"); + } +} From 5247a1c7b2dd63d01add37bae916bffa370ef9ee Mon Sep 17 00:00:00 2001 From: frankvicky Date: Sun, 30 Jun 2024 22:26:53 +0800 Subject: [PATCH 02/14] Beanchmark: Custom filterIterator and benchmark --- .../clients/consumer/ConsumerRecords.java | 39 ++++++++++++++++--- .../jmh/record/ConsumerRecordsBenchmark.java | 8 +++- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 3a36d695b8457..419d4af4be199 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -90,6 +91,12 @@ public Iterable> recordsWithEntrySetAndStream(String topic) return new ConcatenatedIterable<>(recs); } + public Iterable> recordsWithFilterIterator(String topic) { + if (topic == null) + throw new IllegalArgumentException("Topic must be non-null."); + return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic)); + } + /** * Get the partitions which have records contained in this record set. * @return the set of partitions with data in this record set (may be empty if no data was returned) @@ -116,11 +123,17 @@ public int count() { private static class ConcatenatedIterable implements Iterable> { private final Iterable>> iterables; + private Predicate> predicate = null; public ConcatenatedIterable(Iterable>> iterables) { this.iterables = iterables; } + public ConcatenatedIterable(Iterable>> iterables, Predicate> predicate) { + this.iterables = iterables; + this.predicate = predicate; + } + @Override public Iterator> iterator() { return new AbstractIterator>() { @@ -128,13 +141,27 @@ public Iterator> iterator() { Iterator> current; protected ConsumerRecord makeNext() { - while (current == null || !current.hasNext()) { - if (iters.hasNext()) - current = iters.next().iterator(); - else - return allDone(); + while (true) { + while (current == null || !current.hasNext()) { + if (iters.hasNext()) { + current = iters.next().iterator(); + } else { + return allDone(); + } + } + + ConsumerRecord next = current.next(); + + if (predicate != null) { + if (predicate.test(next)) { + return next; + } else { + continue; + } + } else { + return next; + } } - return current.next(); } }; } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index e06a97594c7dd..668665f154b1d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -15,7 +15,8 @@ @Fork(value = 1) @Warmup(iterations = 5) @Measurement(iterations = 10) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@BenchmarkMode(Mode.AverageTime) public class ConsumerRecordsBenchmark { private ConsumerRecords records; @@ -54,4 +55,9 @@ public void recordsForEach() { public void recordsWithEntrySetAndStream() { records.recordsWithEntrySetAndStream("topic2"); } + + @Benchmark + public void recordsWithFilterIterator() { + records.recordsWithFilterIterator("topic2"); + } } From 4ab4fd3c69655c26e58ce66575345fc6a57912b3 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Sun, 30 Jun 2024 22:55:17 +0800 Subject: [PATCH 03/14] Beanchmark: Refactor to avoid double-while loop --- .../clients/consumer/ConsumerRecords.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 419d4af4be199..ab5c86e0cac20 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -142,27 +142,29 @@ public Iterator> iterator() { protected ConsumerRecord makeNext() { while (true) { - while (current == null || !current.hasNext()) { - if (iters.hasNext()) { - current = iters.next().iterator(); - } else { + if (current == null || !current.hasNext()) { + if (!advanceToNextIterator()) { return allDone(); } } ConsumerRecord next = current.next(); - if (predicate != null) { - if (predicate.test(next)) { - return next; - } else { - continue; - } - } else { + if (predicate == null || predicate.test(next)) { return next; } } } + + private boolean advanceToNextIterator() { + while (iters.hasNext()) { + current = iters.next().iterator(); + if (current.hasNext()) { + return true; + } + } + return false; + } }; } } From 2597de30f8d50c627b905e33c5151e062fa969ac Mon Sep 17 00:00:00 2001 From: frankvicky Date: Sun, 30 Jun 2024 23:30:13 +0800 Subject: [PATCH 04/14] Beanchmark: Remove unnecessary method. --- .../clients/consumer/ConsumerRecords.java | 27 +--------- .../jmh/record/ConsumerRecordsBenchmark.java | 51 ++++++++++++++----- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index ab5c86e0cac20..a82a57b4e6775 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; /** * A container that holds the list {@link ConsumerRecord} per partition for a @@ -58,7 +57,7 @@ public List> records(TopicPartition partition) { /** * Get just the records for the given topic */ - public Iterable> records(String topic) { + public Iterable> recordsWithNestedList(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); List>> recs = new ArrayList<>(); @@ -69,29 +68,7 @@ public Iterable> records(String topic) { return new ConcatenatedIterable<>(recs); } - public Iterable> recordsForEach(String topic) { - if (topic == null) - throw new IllegalArgumentException("Topic must be non-null."); - List>> recs = new ArrayList<>(); - records.forEach((key, value) -> { - if (key.topic().equals(topic)) - recs.add(value); - }); - return new ConcatenatedIterable<>(recs); - } - - public Iterable> recordsWithEntrySetAndStream(String topic) { - if (topic == null) - throw new IllegalArgumentException("Topic must be non-null."); - List>> recs = records.entrySet() - .stream() - .filter(entry -> entry.getKey().topic().equals(topic)) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); - return new ConcatenatedIterable<>(recs); - } - - public Iterable> recordsWithFilterIterator(String topic) { + public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic)); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 668665f154b1d..1af00cf3373df 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.jmh.record; @@ -6,9 +22,25 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; -import org.openjdk.jmh.annotations.*; -import java.util.*; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @@ -43,21 +75,12 @@ public void setUp() { @Benchmark public void records() { - records.records("topic2"); - } - - @Benchmark - public void recordsForEach() { - records.recordsForEach("topic2"); - } - - @Benchmark - public void recordsWithEntrySetAndStream() { - records.recordsWithEntrySetAndStream("topic2"); + // original one + records.recordsWithNestedList("topic2"); } @Benchmark public void recordsWithFilterIterator() { - records.recordsWithFilterIterator("topic2"); + records.records("topic2"); } } From 9eb4e0fb22cf80f4ccfcd34f96377d0bb99cb0ac Mon Sep 17 00:00:00 2001 From: frankvicky Date: Mon, 1 Jul 2024 23:38:09 +0800 Subject: [PATCH 05/14] Beanchmark: Use Blackhole to avoid JVM optimizing --- .../kafka/jmh/record/ConsumerRecordsBenchmark.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 1af00cf3373df..a7c214f91ce16 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -34,6 +34,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; import java.util.ArrayList; import java.util.Arrays; @@ -74,13 +75,13 @@ public void setUp() { } @Benchmark - public void records() { + public void records(Blackhole blackhole) { // original one - records.recordsWithNestedList("topic2"); + blackhole.consume(records.recordsWithNestedList("topic2")); } @Benchmark - public void recordsWithFilterIterator() { - records.records("topic2"); + public void recordsWithFilterIterator(Blackhole blackhole) { + blackhole.consume(records.records("topic2")); } } From b64a62e1659703e2d27c56212a2f0fbbba1d35aa Mon Sep 17 00:00:00 2001 From: frankvicky Date: Tue, 2 Jul 2024 19:59:42 +0800 Subject: [PATCH 06/14] Beanchmark: Filter Map in the ConcatenatedIterable --- .../clients/consumer/ConsumerRecordsNew.java | 139 +++++++++++++ .../consumer/ConsumerRecordsNewTest.java | 182 ++++++++++++++++++ .../jmh/record/ConsumerRecordsBenchmark.java | 8 + 3 files changed, 329 insertions(+) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java new file mode 100644 index 0000000000000..c96c2357af064 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.AbstractIterator; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** + * A container that holds the list {@link ConsumerRecord} per partition for a + * particular topic. There is one {@link ConsumerRecord} list for every topic + * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. + */ +public class ConsumerRecordsNew implements Iterable> { + public static final ConsumerRecordsNew EMPTY = new ConsumerRecordsNew<>(Collections.emptyMap()); + + private final Map>> records; + + public ConsumerRecordsNew(Map>> records) { + this.records = records; + } + + /** + * Get just the records for the given partition + * + * @param partition The partition to get records for + */ + public List> records(TopicPartition partition) { + List> recs = this.records.get(partition); + if (recs == null) + return Collections.emptyList(); + else + return Collections.unmodifiableList(recs); + } + + public Iterable> records(String topic) { + if (topic == null) + throw new IllegalArgumentException("Topic must be non-null."); + return new ConcatenatedIterable<>(records, record -> record.topic().equals(topic)); + } + + /** + * Get the partitions which have records contained in this record set. + * + * @return the set of partitions with data in this record set (may be empty if no data was returned) + */ + public Set partitions() { + return Collections.unmodifiableSet(records.keySet()); + } + + @Override + public Iterator> iterator() { + return new ConcatenatedIterable<>(records).iterator(); + } + + /** + * The number of records for all topics + */ + public int count() { + int count = 0; + for (List> recs : this.records.values()) + count += recs.size(); + return count; + } + + private static class ConcatenatedIterable implements Iterable> { + + private final Map>> topicPartitionToRecords; + private final Predicate predicate; + + public ConcatenatedIterable(Map>> topicPartitionToRecords) { + this(topicPartitionToRecords, null); + } + + public ConcatenatedIterable(Map>> topicPartitionToRecords, Predicate predicate) { + this.topicPartitionToRecords = topicPartitionToRecords; + this.predicate = predicate; + } + + @Override + public Iterator> iterator() { + return new AbstractIterator>() { + final Iterator>>> iterator + = topicPartitionToRecords.entrySet().iterator(); + Iterator> current; + + protected ConsumerRecord makeNext() { + while (current == null || !current.hasNext()) { + if (!advanceToNextIterator()) { + return allDone(); + } + } + return current.next(); + } + + private boolean advanceToNextIterator() { + while (iterator.hasNext()) { + Map.Entry>> next = iterator.next(); + if (predicate == null || predicate.test(next.getKey())) { + current = next.getValue().iterator(); + return true; + } + } + return false; + } + }; + } + } + + public boolean isEmpty() { + return records.isEmpty(); + } + + @SuppressWarnings("unchecked") + public static ConsumerRecordsNew empty() { + return (ConsumerRecordsNew) EMPTY; + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java new file mode 100644 index 0000000000000..b1c48b64b96b5 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerRecordsNewTest { + + @Test + public void testIterator() { + String topic = "topic"; + int recordSize = 10; + int partitionSize = 15; + int emptyPartitionIndex = 3; + ConsumerRecordsNew records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); + Iterator> iterator = records.iterator(); + + int recordCount = 0; + int partitionCount = 0; + int currentPartition = -1; + + while (iterator.hasNext()) { + ConsumerRecord record = iterator.next(); + validateEmptyPartition(record, emptyPartitionIndex); + + // Check if we have moved to a new partition + if (currentPartition != record.partition()) { + // Increment the partition count as we have encountered a new partition + partitionCount++; + // Update the current partition to the new partition + currentPartition = record.partition(); + } + + validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); + recordCount++; + } + + // Including empty partition + assertEquals(partitionSize, partitionCount + 1); + } + + @Test + public void testRecordsByPartition() { + List topics = Arrays.asList("topic1", "topic2"); + int recordSize = 3; + int partitionSize = 5; + int emptyPartitionIndex = 2; + + ConsumerRecordsNew consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + + for (String topic : topics) { + for (int partition = 0; partition < partitionSize; partition++) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + List> records = consumerRecords.records(topicPartition); + + if (partition == emptyPartitionIndex) { + assertTrue(records.isEmpty()); + } else { + assertEquals(recordSize, records.size()); + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + validateRecordPayload(topic, record, partition, i, recordSize); + } + } + } + } + } + + @Test + public void testRecordsByNullTopic() { + String nullTopic = null; + ConsumerRecordsNew consumerRecords = ConsumerRecordsNew.empty(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); + assertEquals("Topic must be non-null.", exception.getMessage()); + } + + + @Test + public void testRecordsByTopic() { + List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); + int recordSize = 3; + int partitionSize = 10; + int emptyPartitionIndex = 6; + int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + + ConsumerRecordsNew consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + + for (String topic : topics) { + Iterable> records = consumerRecords.records(topic); + int recordCount = 0; + int partitionCount = 0; + int currentPartition = -1; + + for (ConsumerRecord record : records) { + validateEmptyPartition(record, emptyPartitionIndex); + + // Check if we have moved to a new partition + if (currentPartition != record.partition()) { + // Increment the partition count as we have encountered a new partition + partitionCount++; + // Update the current partition to the new partition + currentPartition = record.partition(); + } + + validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); + recordCount++; + } + + // Including empty partition + assertEquals(partitionSize, partitionCount + 1); + assertEquals(expectedTotalRecordSizeOfEachTopic, recordCount); + } + } + + private ConsumerRecordsNew buildTopicTestRecords(int recordSize, + int partitionSize, + int emptyPartitionIndex, + Collection topics) { + Map>> partitionToRecords = new LinkedHashMap<>(); + for (String topic : topics) { + for (int i = 0; i < partitionSize; i++) { + List> records = new ArrayList<>(recordSize); + if (i != emptyPartitionIndex) { + for (int j = 0; j < recordSize; j++) { + records.add( + new ConsumerRecord<>(topic, i, j, 0L, TimestampType.CREATE_TIME, + 0, 0, j, String.valueOf(j), new RecordHeaders(), Optional.empty()) + ); + } + } + partitionToRecords.put(new TopicPartition(topic, i), records); + } + } + + return new ConsumerRecordsNew<>(partitionToRecords); + } + + private void validateEmptyPartition(ConsumerRecord record, int emptyPartitionIndex) { + assertNotEquals(emptyPartitionIndex, record.partition(), "Partition " + record.partition() + " is not empty"); + } + + private void validateRecordPayload(String topic, ConsumerRecord record, int currentPartition, int recordCount, int recordSize) { + assertEquals(topic, record.topic()); + assertEquals(currentPartition, record.partition()); + assertEquals(recordCount % recordSize, record.offset()); + assertEquals(recordCount % recordSize, record.key()); + assertEquals(String.valueOf(recordCount % recordSize), record.value()); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index a7c214f91ce16..4a389da37f852 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerRecordsNew; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; @@ -52,6 +53,7 @@ @BenchmarkMode(Mode.AverageTime) public class ConsumerRecordsBenchmark { private ConsumerRecords records; + private ConsumerRecordsNew recordsNew; @Setup(Level.Trial) public void setUp() { @@ -72,6 +74,7 @@ public void setUp() { } records = new ConsumerRecords<>(partitionToRecords); + recordsNew = new ConsumerRecordsNew<>(partitionToRecords); } @Benchmark @@ -84,4 +87,9 @@ public void records(Blackhole blackhole) { public void recordsWithFilterIterator(Blackhole blackhole) { blackhole.consume(records.records("topic2")); } + + @Benchmark + public void records2(Blackhole blackhole) { + blackhole.consume(recordsNew.records("topic2")); + } } From d607e3389bcc3b12c435e798073f91ce01d046c0 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Fri, 5 Jul 2024 17:15:47 +0800 Subject: [PATCH 07/14] Refacotr new implementation --- .../clients/consumer/ConsumerRecords.java | 57 +++--- .../clients/consumer/ConsumerRecordsNew.java | 139 ------------- .../consumer/ConsumerRecordsNewTest.java | 182 ------------------ .../jmh/record/ConsumerRecordsBenchmark.java | 8 - 4 files changed, 27 insertions(+), 359 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java delete mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index a82a57b4e6775..f18fb85379ee7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Predicate; /** * A container that holds the list {@link ConsumerRecord} per partition for a @@ -71,7 +70,27 @@ public Iterable> recordsWithNestedList(String topic) { public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); - return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic)); + return () -> new AbstractIterator>() { + private final Iterator>>> partitionIterator + = records.entrySet().iterator(); + private Iterator> currentRecordIterator = null; + + @Override + protected ConsumerRecord makeNext() { + if (currentRecordIterator == null || !currentRecordIterator.hasNext()) { + while (partitionIterator.hasNext()) { + Map.Entry>> nextPartitionIterator = partitionIterator.next(); + List> records = nextPartitionIterator.getValue(); + if (topic.equals(nextPartitionIterator.getKey().topic()) && !records.isEmpty()) { + currentRecordIterator = records.iterator(); + return currentRecordIterator.next(); + } + } + return allDone(); + } + return currentRecordIterator.next(); + } + }; } /** @@ -100,17 +119,11 @@ public int count() { private static class ConcatenatedIterable implements Iterable> { private final Iterable>> iterables; - private Predicate> predicate = null; public ConcatenatedIterable(Iterable>> iterables) { this.iterables = iterables; } - public ConcatenatedIterable(Iterable>> iterables, Predicate> predicate) { - this.iterables = iterables; - this.predicate = predicate; - } - @Override public Iterator> iterator() { return new AbstractIterator>() { @@ -118,29 +131,13 @@ public Iterator> iterator() { Iterator> current; protected ConsumerRecord makeNext() { - while (true) { - if (current == null || !current.hasNext()) { - if (!advanceToNextIterator()) { - return allDone(); - } - } - - ConsumerRecord next = current.next(); - - if (predicate == null || predicate.test(next)) { - return next; - } - } - } - - private boolean advanceToNextIterator() { - while (iters.hasNext()) { - current = iters.next().iterator(); - if (current.hasNext()) { - return true; - } + while (current == null || !current.hasNext()) { + if (iters.hasNext()) + current = iters.next().iterator(); + else + return allDone(); } - return false; + return current.next(); } }; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java deleted file mode 100644 index c96c2357af064..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordsNew.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.AbstractIterator; - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; - -/** - * A container that holds the list {@link ConsumerRecord} per partition for a - * particular topic. There is one {@link ConsumerRecord} list for every topic - * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. - */ -public class ConsumerRecordsNew implements Iterable> { - public static final ConsumerRecordsNew EMPTY = new ConsumerRecordsNew<>(Collections.emptyMap()); - - private final Map>> records; - - public ConsumerRecordsNew(Map>> records) { - this.records = records; - } - - /** - * Get just the records for the given partition - * - * @param partition The partition to get records for - */ - public List> records(TopicPartition partition) { - List> recs = this.records.get(partition); - if (recs == null) - return Collections.emptyList(); - else - return Collections.unmodifiableList(recs); - } - - public Iterable> records(String topic) { - if (topic == null) - throw new IllegalArgumentException("Topic must be non-null."); - return new ConcatenatedIterable<>(records, record -> record.topic().equals(topic)); - } - - /** - * Get the partitions which have records contained in this record set. - * - * @return the set of partitions with data in this record set (may be empty if no data was returned) - */ - public Set partitions() { - return Collections.unmodifiableSet(records.keySet()); - } - - @Override - public Iterator> iterator() { - return new ConcatenatedIterable<>(records).iterator(); - } - - /** - * The number of records for all topics - */ - public int count() { - int count = 0; - for (List> recs : this.records.values()) - count += recs.size(); - return count; - } - - private static class ConcatenatedIterable implements Iterable> { - - private final Map>> topicPartitionToRecords; - private final Predicate predicate; - - public ConcatenatedIterable(Map>> topicPartitionToRecords) { - this(topicPartitionToRecords, null); - } - - public ConcatenatedIterable(Map>> topicPartitionToRecords, Predicate predicate) { - this.topicPartitionToRecords = topicPartitionToRecords; - this.predicate = predicate; - } - - @Override - public Iterator> iterator() { - return new AbstractIterator>() { - final Iterator>>> iterator - = topicPartitionToRecords.entrySet().iterator(); - Iterator> current; - - protected ConsumerRecord makeNext() { - while (current == null || !current.hasNext()) { - if (!advanceToNextIterator()) { - return allDone(); - } - } - return current.next(); - } - - private boolean advanceToNextIterator() { - while (iterator.hasNext()) { - Map.Entry>> next = iterator.next(); - if (predicate == null || predicate.test(next.getKey())) { - current = next.getValue().iterator(); - return true; - } - } - return false; - } - }; - } - } - - public boolean isEmpty() { - return records.isEmpty(); - } - - @SuppressWarnings("unchecked") - public static ConsumerRecordsNew empty() { - return (ConsumerRecordsNew) EMPTY; - } - -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java deleted file mode 100644 index b1c48b64b96b5..0000000000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsNewTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.record.TimestampType; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class ConsumerRecordsNewTest { - - @Test - public void testIterator() { - String topic = "topic"; - int recordSize = 10; - int partitionSize = 15; - int emptyPartitionIndex = 3; - ConsumerRecordsNew records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); - Iterator> iterator = records.iterator(); - - int recordCount = 0; - int partitionCount = 0; - int currentPartition = -1; - - while (iterator.hasNext()) { - ConsumerRecord record = iterator.next(); - validateEmptyPartition(record, emptyPartitionIndex); - - // Check if we have moved to a new partition - if (currentPartition != record.partition()) { - // Increment the partition count as we have encountered a new partition - partitionCount++; - // Update the current partition to the new partition - currentPartition = record.partition(); - } - - validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); - recordCount++; - } - - // Including empty partition - assertEquals(partitionSize, partitionCount + 1); - } - - @Test - public void testRecordsByPartition() { - List topics = Arrays.asList("topic1", "topic2"); - int recordSize = 3; - int partitionSize = 5; - int emptyPartitionIndex = 2; - - ConsumerRecordsNew consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); - - for (String topic : topics) { - for (int partition = 0; partition < partitionSize; partition++) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - List> records = consumerRecords.records(topicPartition); - - if (partition == emptyPartitionIndex) { - assertTrue(records.isEmpty()); - } else { - assertEquals(recordSize, records.size()); - for (int i = 0; i < records.size(); i++) { - ConsumerRecord record = records.get(i); - validateRecordPayload(topic, record, partition, i, recordSize); - } - } - } - } - } - - @Test - public void testRecordsByNullTopic() { - String nullTopic = null; - ConsumerRecordsNew consumerRecords = ConsumerRecordsNew.empty(); - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); - assertEquals("Topic must be non-null.", exception.getMessage()); - } - - - @Test - public void testRecordsByTopic() { - List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); - int recordSize = 3; - int partitionSize = 10; - int emptyPartitionIndex = 6; - int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); - - ConsumerRecordsNew consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); - - for (String topic : topics) { - Iterable> records = consumerRecords.records(topic); - int recordCount = 0; - int partitionCount = 0; - int currentPartition = -1; - - for (ConsumerRecord record : records) { - validateEmptyPartition(record, emptyPartitionIndex); - - // Check if we have moved to a new partition - if (currentPartition != record.partition()) { - // Increment the partition count as we have encountered a new partition - partitionCount++; - // Update the current partition to the new partition - currentPartition = record.partition(); - } - - validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); - recordCount++; - } - - // Including empty partition - assertEquals(partitionSize, partitionCount + 1); - assertEquals(expectedTotalRecordSizeOfEachTopic, recordCount); - } - } - - private ConsumerRecordsNew buildTopicTestRecords(int recordSize, - int partitionSize, - int emptyPartitionIndex, - Collection topics) { - Map>> partitionToRecords = new LinkedHashMap<>(); - for (String topic : topics) { - for (int i = 0; i < partitionSize; i++) { - List> records = new ArrayList<>(recordSize); - if (i != emptyPartitionIndex) { - for (int j = 0; j < recordSize; j++) { - records.add( - new ConsumerRecord<>(topic, i, j, 0L, TimestampType.CREATE_TIME, - 0, 0, j, String.valueOf(j), new RecordHeaders(), Optional.empty()) - ); - } - } - partitionToRecords.put(new TopicPartition(topic, i), records); - } - } - - return new ConsumerRecordsNew<>(partitionToRecords); - } - - private void validateEmptyPartition(ConsumerRecord record, int emptyPartitionIndex) { - assertNotEquals(emptyPartitionIndex, record.partition(), "Partition " + record.partition() + " is not empty"); - } - - private void validateRecordPayload(String topic, ConsumerRecord record, int currentPartition, int recordCount, int recordSize) { - assertEquals(topic, record.topic()); - assertEquals(currentPartition, record.partition()); - assertEquals(recordCount % recordSize, record.offset()); - assertEquals(recordCount % recordSize, record.key()); - assertEquals(String.valueOf(recordCount % recordSize), record.value()); - } -} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 4a389da37f852..a7c214f91ce16 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.ConsumerRecordsNew; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; @@ -53,7 +52,6 @@ @BenchmarkMode(Mode.AverageTime) public class ConsumerRecordsBenchmark { private ConsumerRecords records; - private ConsumerRecordsNew recordsNew; @Setup(Level.Trial) public void setUp() { @@ -74,7 +72,6 @@ public void setUp() { } records = new ConsumerRecords<>(partitionToRecords); - recordsNew = new ConsumerRecordsNew<>(partitionToRecords); } @Benchmark @@ -87,9 +84,4 @@ public void records(Blackhole blackhole) { public void recordsWithFilterIterator(Blackhole blackhole) { blackhole.consume(records.records("topic2")); } - - @Benchmark - public void records2(Blackhole blackhole) { - blackhole.consume(recordsNew.records("topic2")); - } } From d84af714e217c727c1a3621c32ac896f8ef711c8 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Fri, 5 Jul 2024 18:02:59 +0800 Subject: [PATCH 08/14] Iterate all records in benchmark --- .../jmh/record/ConsumerRecordsBenchmark.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index a7c214f91ce16..6b2ec255a2491 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -36,12 +36,7 @@ import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @@ -77,11 +72,15 @@ public void setUp() { @Benchmark public void records(Blackhole blackhole) { // original one - blackhole.consume(records.recordsWithNestedList("topic2")); + for (ConsumerRecord record : records.recordsWithNestedList("topic2")) { + blackhole.consume(record); + } } @Benchmark public void recordsWithFilterIterator(Blackhole blackhole) { - blackhole.consume(records.records("topic2")); + for (ConsumerRecord record : records.records("topic2")) { + blackhole.consume(record); + } } } From 2ef1e6e61b80cfd3f877be72d10c623865b86205 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Fri, 5 Jul 2024 19:12:29 +0800 Subject: [PATCH 09/14] Add Init benchmark and rename --- .../jmh/record/ConsumerRecordsBenchmark.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 6b2ec255a2491..93b90c303d644 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -36,7 +36,12 @@ import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @@ -71,15 +76,24 @@ public void setUp() { @Benchmark public void records(Blackhole blackhole) { - // original one + blackhole.consume(records.recordsWithNestedList("topic2")); + } + + @Benchmark + public void iteratorRecords(Blackhole blackhole) { for (ConsumerRecord record : records.recordsWithNestedList("topic2")) { blackhole.consume(record); } } @Benchmark - public void recordsWithFilterIterator(Blackhole blackhole) { - for (ConsumerRecord record : records.records("topic2")) { + public void recordsByNewImplementation(Blackhole blackhole) { + blackhole.consume(records.records("topic2")); + } + + @Benchmark + public void iteratorRecordsByNewImplementation(Blackhole blackhole) { + for (ConsumerRecord record : records.recordsWithNestedList("topic2")) { blackhole.consume(record); } } From 81e3439b149d3136275924c08a854e9b54e91cca Mon Sep 17 00:00:00 2001 From: frankvicky Date: Mon, 8 Jul 2024 21:57:44 +0800 Subject: [PATCH 10/14] Add new subclass for legacy implementation --- .../clients/consumer/ConsumerRecords.java | 16 +------- .../consumer/LegacyConsumerRecords.java | 41 +++++++++++++++++++ .../jmh/record/ConsumerRecordsBenchmark.java | 15 ++++--- 3 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index f18fb85379ee7..38670936a563b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AbstractIterator; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -34,7 +33,7 @@ public class ConsumerRecords implements Iterable> { public static final ConsumerRecords EMPTY = new ConsumerRecords<>(Collections.emptyMap()); - private final Map>> records; + protected final Map>> records; public ConsumerRecords(Map>> records) { this.records = records; @@ -56,17 +55,6 @@ public List> records(TopicPartition partition) { /** * Get just the records for the given topic */ - public Iterable> recordsWithNestedList(String topic) { - if (topic == null) - throw new IllegalArgumentException("Topic must be non-null."); - List>> recs = new ArrayList<>(); - for (Map.Entry>> entry : records.entrySet()) { - if (entry.getKey().topic().equals(topic)) - recs.add(entry.getValue()); - } - return new ConcatenatedIterable<>(recs); - } - public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); @@ -116,7 +104,7 @@ public int count() { return count; } - private static class ConcatenatedIterable implements Iterable> { + protected static class ConcatenatedIterable implements Iterable> { private final Iterable>> iterables; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java new file mode 100644 index 0000000000000..146a016efccb7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class LegacyConsumerRecords extends ConsumerRecords { + public LegacyConsumerRecords(Map>> records) { + super(records); + } + + @Override + public Iterable> records(String topic) { + if (topic == null) + throw new IllegalArgumentException("Topic must be non-null."); + List>> recs = new ArrayList<>(); + for (Map.Entry>> entry : records.entrySet()) { + if (entry.getKey().topic().equals(topic)) + recs.add(entry.getValue()); + } + return new ConcatenatedIterable<>(recs); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 93b90c303d644..8f5deb4ba3ed9 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.LegacyConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; @@ -52,6 +53,7 @@ @BenchmarkMode(Mode.AverageTime) public class ConsumerRecordsBenchmark { private ConsumerRecords records; + private LegacyConsumerRecords legacyRecords; @Setup(Level.Trial) public void setUp() { @@ -72,28 +74,29 @@ public void setUp() { } records = new ConsumerRecords<>(partitionToRecords); + legacyRecords = new LegacyConsumerRecords<>(partitionToRecords); } @Benchmark public void records(Blackhole blackhole) { - blackhole.consume(records.recordsWithNestedList("topic2")); + blackhole.consume(records.records("topic2")); } @Benchmark public void iteratorRecords(Blackhole blackhole) { - for (ConsumerRecord record : records.recordsWithNestedList("topic2")) { + for (ConsumerRecord record : records.records("topic2")) { blackhole.consume(record); } } @Benchmark - public void recordsByNewImplementation(Blackhole blackhole) { - blackhole.consume(records.records("topic2")); + public void recordsWithLegacyImplementation(Blackhole blackhole) { + blackhole.consume(legacyRecords.records("topic2")); } @Benchmark - public void iteratorRecordsByNewImplementation(Blackhole blackhole) { - for (ConsumerRecord record : records.recordsWithNestedList("topic2")) { + public void iteratorRecordsByLegacyImplementation(Blackhole blackhole) { + for (ConsumerRecord record : legacyRecords.records("topic2")) { blackhole.consume(record); } } From b974f2895e7f47c505c09bfe3ca7e884f6b8cacb Mon Sep 17 00:00:00 2001 From: frankvicky Date: Tue, 9 Jul 2024 00:06:16 +0800 Subject: [PATCH 11/14] Move legacy implementation to jmh module. --- .../apache/kafka/jmh/record/ConsumerRecordsBenchmark.java | 1 - .../org/apache/kafka/jmh/record}/LegacyConsumerRecords.java | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) rename {clients/src/main/java/org/apache/kafka/clients/consumer => jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record}/LegacyConsumerRecords.java (87%) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 8f5deb4ba3ed9..6e831a340dc0e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.LegacyConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/LegacyConsumerRecords.java similarity index 87% rename from clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java rename to jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/LegacyConsumerRecords.java index 146a016efccb7..abd5db53b094a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/LegacyConsumerRecords.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/LegacyConsumerRecords.java @@ -14,15 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.clients.consumer; +package org.apache.kafka.jmh.record; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import java.util.ArrayList; import java.util.List; import java.util.Map; -public class LegacyConsumerRecords extends ConsumerRecords { +public final class LegacyConsumerRecords extends ConsumerRecords { public LegacyConsumerRecords(Map>> records) { super(records); } From 4f834db4c66e48baf72d33207828a9dc99fc53bd Mon Sep 17 00:00:00 2001 From: frankvicky Date: Tue, 9 Jul 2024 11:04:59 +0800 Subject: [PATCH 12/14] Refactor new implementation and improve iterate performance --- .../clients/consumer/ConsumerRecords.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 38670936a563b..3e7fb40b91874 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -58,25 +58,22 @@ public List> records(TopicPartition partition) { public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); + return () -> new AbstractIterator>() { private final Iterator>>> partitionIterator = records.entrySet().iterator(); - private Iterator> currentRecordIterator = null; + private Iterator> currentRecordIterator = Collections.emptyIterator(); @Override protected ConsumerRecord makeNext() { - if (currentRecordIterator == null || !currentRecordIterator.hasNext()) { - while (partitionIterator.hasNext()) { - Map.Entry>> nextPartitionIterator = partitionIterator.next(); - List> records = nextPartitionIterator.getValue(); - if (topic.equals(nextPartitionIterator.getKey().topic()) && !records.isEmpty()) { - currentRecordIterator = records.iterator(); - return currentRecordIterator.next(); - } + while (!currentRecordIterator.hasNext() && partitionIterator.hasNext()) { + Map.Entry>> nextPartition = partitionIterator.next(); + List> records = nextPartition.getValue(); + if (topic.equals(nextPartition.getKey().topic()) && !records.isEmpty()) { + currentRecordIterator = records.iterator(); } - return allDone(); } - return currentRecordIterator.next(); + return currentRecordIterator.hasNext() ? currentRecordIterator.next() : allDone(); } }; } From 7ce7a38991d5bc844e4c41d79994d5eaab3118d0 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 10 Jul 2024 12:46:44 +0800 Subject: [PATCH 13/14] Do early return if curretnIterator hasNext is true --- .../apache/kafka/clients/consumer/ConsumerRecords.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 3e7fb40b91874..1256cb6ca04b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -66,14 +66,19 @@ public Iterable> records(String topic) { @Override protected ConsumerRecord makeNext() { - while (!currentRecordIterator.hasNext() && partitionIterator.hasNext()) { + if (currentRecordIterator.hasNext()) { + return currentRecordIterator.next(); + } + + while (partitionIterator.hasNext()) { Map.Entry>> nextPartition = partitionIterator.next(); List> records = nextPartition.getValue(); if (topic.equals(nextPartition.getKey().topic()) && !records.isEmpty()) { currentRecordIterator = records.iterator(); + return currentRecordIterator.next(); } } - return currentRecordIterator.hasNext() ? currentRecordIterator.next() : allDone(); + return allDone(); } }; } From 1c141bad4f2d1517de3b0b70ee8b9a6c29e1984f Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 10 Jul 2024 20:31:56 +0800 Subject: [PATCH 14/14] Increase warmup iteration --- .../org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java index 6e831a340dc0e..db186d40e891a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/ConsumerRecordsBenchmark.java @@ -46,7 +46,7 @@ @State(Scope.Benchmark) @Fork(value = 1) -@Warmup(iterations = 5) +@Warmup(iterations = 10) @Measurement(iterations = 10) @OutputTimeUnit(TimeUnit.NANOSECONDS) @BenchmarkMode(Mode.AverageTime)