-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic #16494
Conversation
@Benchmark | ||
public void records() { | ||
// original one | ||
records.recordsWithNestedList("topic2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have to consume the result to make sure JVM won't eliminate it for optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will avoid it by using Blackhole
Hi @chia7712, Here are the benchmark results after adding
|
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
Outdated
Show resolved
Hide resolved
Hello @chia7712 , I have write another implementation which is almost same as original logic. The different of new one is that it filter the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for patch. Please remove some unnecessary scenario to cleanup this PR
return Collections.unmodifiableList(recs); | ||
} | ||
|
||
public Iterable<ConsumerRecord<K, V>> records(String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not moving this new method to origin ConsumerRecords
?
|
||
public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) { | ||
this.iterables = iterables; | ||
} | ||
|
||
public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables, Predicate<ConsumerRecord<K, V>> predicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this version as it has big performance issue, right?
public Iterable<ConsumerRecord<K, V>> records(String topic) { | ||
if (topic == null) | ||
throw new IllegalArgumentException("Topic must be non-null."); | ||
return new ConcatenatedIterable<>(records.values(), record -> record.topic().equals(topic)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to use ConcatenatedIterable
. for example:
public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
return () -> new AbstractIterator<ConsumerRecord<K, V>>() {
final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> iter = records.entrySet().iterator();
Iterator<ConsumerRecord<K, V>> current = null;
@Override
protected ConsumerRecord<K, V> makeNext() {
if (current == null || !current.hasNext()) {
while (iter.hasNext()) {
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = iter.next();
if (entry.getKey().topic().equals(topic) && !entry.getValue().isEmpty()) {
current = entry.getValue().iterator();
break;
}
}
}
if (current == null || !current.hasNext()) return allDone();
return current.next();
}
};
}
Hi @chia7712 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for this patch
|
||
@Benchmark | ||
public void recordsWithFilterIterator(Blackhole blackhole) { | ||
blackhole.consume(records.records("topic2")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please iterate the Iterable
since the cost of iteration is important too.
@Benchmark | ||
public void records(Blackhole blackhole) { | ||
// original one | ||
for (ConsumerRecord<Integer, String> record : records.recordsWithNestedList("topic2")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have two benchmarks: 1) create iterable 2) iterate all records
I assume your approach will have better score in "create iterable" and similar score in "iterate all records"
Hi @chia7712
|
@frankvicky the jmh result is good to me. Could you please adjust the PR to add a subclass of |
Hi @chia7712 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for this patch
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class LegacyConsumerRecords<K, V> extends ConsumerRecords<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this to jmh module
@frankvicky please add jmh result (according to latest commit) to the description |
Hi @chia7712
|
The benchmark of recent commits:
|
I have increased the number of warmup iterations to make the benchmark results more stable.
|
@frankvicky Could you please revise the topic ? |
@frankvicky could you please rebase code to run CI again? |
@dajac Do you have free cycle to take a look at this PR? It brings a bit performance improvement when the |
Hi @chia7712 |
I have make a new implementation of
ConsumerRecords#records(String)
and I want to test this implementation in CI.Committer Checklist (excluded from commit message)