-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-19160: Improve performance of fetching stable offsets #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() { | |
|
||
/** | ||
* The open transactions (producer ids) keyed by group. | ||
* Tracks whether groups have any open transactions. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; | ||
|
||
/** | ||
* The open transactions (producer ids) keyed by group id, topic name and partition id. | ||
* Tracks whether partitions have any pending transactional offsets. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition; | ||
|
||
private class Offsets { | ||
/** | ||
* The offsets keyed by group id, topic name and partition id. | ||
|
@@ -281,6 +288,7 @@ private OffsetAndMetadata remove( | |
this.offsets = new Offsets(); | ||
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); | ||
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); | ||
this.openTransactionsByGroupTopicAndPartition = new TimelineHashMap<>(snapshotRegistry, 0); | ||
} | ||
|
||
/** | ||
|
@@ -650,24 +658,18 @@ public int deleteAllOffsets( | |
// Delete all the pending transactional offsets too. Here we only write a tombstone | ||
// if the topic-partition was not in the main storage because we don't need to write | ||
// two consecutive tombstones. | ||
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); | ||
if (openTransactions != null) { | ||
openTransactions.forEach(producerId -> { | ||
Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId); | ||
if (pendingOffsets != null) { | ||
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> pendingGroupOffsets = | ||
pendingOffsets.offsetsByGroup.get(groupId); | ||
if (pendingGroupOffsets != null) { | ||
pendingGroupOffsets.forEach((topic, offsetsByPartition) -> { | ||
offsetsByPartition.keySet().forEach(partition -> { | ||
if (!hasCommittedOffset(groupId, topic, partition)) { | ||
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); | ||
numDeletedOffsets.getAndIncrement(); | ||
} | ||
}); | ||
}); | ||
} | ||
} | ||
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = | ||
openTransactionsByGroupTopicAndPartition.get(groupId); | ||
if (openTransactionsByTopic != null) { | ||
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> { | ||
openTransactionsByPartition.forEach((partition, producerIds) -> { | ||
producerIds.forEach(producerId -> { | ||
if (!hasCommittedOffset(groupId, topic, partition)) { | ||
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); | ||
numDeletedOffsets.getAndIncrement(); | ||
} | ||
}); | ||
}); | ||
}); | ||
} | ||
|
||
|
@@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets( | |
String topic, | ||
int partition | ||
) { | ||
final TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); | ||
if (openTransactions == null) return false; | ||
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = | ||
openTransactionsByGroupTopicAndPartition.get(groupId); | ||
if (openTransactionsByTopic == null) return false; | ||
|
||
for (Long producerId : openTransactions) { | ||
Offsets offsets = pendingTransactionalOffsets.get(producerId); | ||
if (offsets != null && offsets.get(groupId, topic, partition) != null) { | ||
return true; | ||
} | ||
} | ||
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); | ||
if (openTransactionsByPartition == null) return false; | ||
|
||
return false; | ||
TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition); | ||
return openTransactions != null && !openTransactions.isEmpty(); | ||
} | ||
|
||
/** | ||
|
@@ -1005,21 +1005,41 @@ public void replay( | |
openTransactionsByGroup | ||
.computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) | ||
.add(producerId); | ||
openTransactionsByGroupTopicAndPartition | ||
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) | ||
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) | ||
.computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) | ||
.add(producerId); | ||
} | ||
} else { | ||
if (offsets.remove(groupId, topic, partition) != null) { | ||
metrics.decrementNumOffsets(); | ||
} | ||
|
||
// Remove all the pending offset commits related to the tombstone. | ||
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); | ||
if (openTransactions != null) { | ||
openTransactions.forEach(openProducerId -> { | ||
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); | ||
if (pendingOffsets != null) { | ||
pendingOffsets.remove(groupId, topic, partition); | ||
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = | ||
openTransactionsByGroupTopicAndPartition.get(groupId); | ||
if (openTransactionsByTopic != null) { | ||
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); | ||
if (openTransactionsByPartition != null) { | ||
TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition); | ||
if (openTransactions != null) { | ||
openTransactions.forEach(openProducerId -> { | ||
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); | ||
if (pendingOffsets != null) { | ||
pendingOffsets.remove(groupId, topic, partition); | ||
} | ||
}); | ||
|
||
openTransactionsByPartition.remove(partition); | ||
if (openTransactionsByPartition.isEmpty()) { | ||
openTransactionsByTopic.remove(topic); | ||
} | ||
if (openTransactionsByTopic.isEmpty()) { | ||
openTransactionsByGroupTopicAndPartition.remove(groupId); | ||
} | ||
Comment on lines
+1033
to
+1040
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete Resource CleanupNested resource cleanup doesn't check if openTransactionsByPartition exists before removing. If null, NullPointerException could occur during transaction completion, potentially causing transaction processing failures. Standards
|
||
} | ||
Comment on lines
+1034
to
1041
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete Transaction CleanupThe cleanup logic removes entries from openTransactionsByGroupTopicAndPartition but fails to check if the producer still has other pending offsets before removing it from openTransactionsByGroup. This can leave stale entries in openTransactionsByGroup causing memory leaks. Standards
|
||
}); | ||
} | ||
Comment on lines
+1020
to
+1042
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate Code PathsSimilar nested map traversal and cleanup logic appears multiple times in the code. This duplication increases maintenance burden as any change to the cleanup logic must be applied consistently across all occurrences. Standards
|
||
} | ||
} | ||
} | ||
|
@@ -1031,6 +1051,7 @@ public void replay( | |
* @param result The result of the transaction. | ||
* @throws RuntimeException if the transaction can not be completed. | ||
*/ | ||
@SuppressWarnings("NPathComplexity") | ||
public void replayEndTransactionMarker( | ||
long producerId, | ||
TransactionResult result | ||
|
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker( | |
return; | ||
} | ||
|
||
pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> { | ||
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); | ||
if (openTransactions != null) { | ||
openTransactions.remove(producerId); | ||
if (openTransactions.isEmpty()) { | ||
pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { | ||
TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId); | ||
if (groupTransactions != null) { | ||
groupTransactions.remove(producerId); | ||
if (groupTransactions.isEmpty()) { | ||
openTransactionsByGroup.remove(groupId); | ||
} | ||
} | ||
|
||
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = | ||
openTransactionsByGroupTopicAndPartition.get(groupId); | ||
if (openTransactionsByTopic == null) return; | ||
|
||
topicOffsets.forEach((topic, partitionOffsets) -> { | ||
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); | ||
if (openTransactionsByPartition == null) return; | ||
|
||
partitionOffsets.keySet().forEach(partitionId -> { | ||
TimelineHashSet<Long> partitionTransactions = openTransactionsByPartition.get(partitionId); | ||
if (partitionTransactions != null) { | ||
partitionTransactions.remove(producerId); | ||
if (partitionTransactions.isEmpty()) { | ||
openTransactionsByPartition.remove(partitionId); | ||
} | ||
if (openTransactionsByPartition.isEmpty()) { | ||
openTransactionsByTopic.remove(topic); | ||
} | ||
if (openTransactionsByTopic.isEmpty()) { | ||
openTransactionsByGroupTopicAndPartition.remove(groupId); | ||
} | ||
} | ||
}); | ||
}); | ||
}); | ||
|
||
if (result == TransactionResult.COMMIT) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate tombstones may be emitted for the same partition
producerIds.forEach(...)
iterates once per producer id, potentially adding the same tombstone several times when multiple producers have pending offsets for the identical<group, topic, partition>
.This inflates
records
and over‑countsnumDeletedOffsets
, causing needless log traffic and skewed metrics.This preserves the original semantics while guaranteeing a single tombstone per partition.