-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-19164: Keep track of groups when deleting transactional offsets #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
When deleting pending transactional offsets, we must preserve the list of groups associated with the producer ID, otherwise we cannot clean up the list of pending transactions for the group once the transaction is committed or aborted.
WalkthroughAdds a boolean Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant OffsetMetadataManager
participant OffsetsMain as Offsets(false)
participant OffsetsPending as Offsets(true)
participant Group
Client->>OffsetMetadataManager: Commit offset (no producerId)
OffsetMetadataManager->>OffsetsMain: add/update offset
Note right of OffsetsMain#gray: preserveGroups = false
Client->>OffsetMetadataManager: Commit offset (with producerId)
OffsetMetadataManager->>OffsetsPending: add/update pending transactional offset
Note right of OffsetsPending#lightblue: preserveGroups = true
Client->>OffsetMetadataManager: Delete pending transactional offset (tombstone)
OffsetMetadataManager->>OffsetsPending: remove offset
OffsetsPending-->>OffsetMetadataManager: retain empty group (preserveGroups=true)
OffsetMetadataManager->>OffsetMetadataManager: cleanupExpiredOffsets()
OffsetsMain-->>OffsetMetadataManager: remove group if empty (preserveGroups=false)
OffsetsPending-->>OffsetMetadataManager: keep group until transaction marker cleared
OffsetMetadataManager->>Group: decide group deletion / emit tombstones
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
@coderabbitai full review |
✅ Actions performedFull review triggered. |
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces a mechanism to preserve group information when deleting transactional offsets, which is crucial for cleaning up pending transactions. The changes seem well-structured and include a new test case to validate the cleanup behavior. Overall, the changes look good, but I have a few suggestions for improvement.
Summary of Findings
- Clarity of 'preserveGroups' usage: The purpose of the
preserveGroups
flag could be made more explicit in theOffsets
class constructor to improve readability. - Test case description: The test case
testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets
could benefit from a more descriptive name or additional comments to clarify its purpose.
Merge Readiness
The changes appear to be well-implemented and include a new test case. However, addressing the comments regarding the clarity of preserveGroups
and the test case description would further improve the quality of the code. I am unable to directly approve the pull request, and recommend that others review and approve this code before merging. I recommend that the pull request not be merged until the high severity issues are addressed (at a minimum).
/do-refacto |
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
/refacto-test |
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: Transaction Management in Offset Handling👍 Well Done
📌 Files Processed
📝 Additional Comments
|
this.config = config; | ||
this.metrics = metrics; | ||
this.offsets = new Offsets(); | ||
this.offsets = new Offsets(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Constructor Parameter
The Offsets constructor is modified to take a boolean parameter, but the main offsets instance is initialized with the parameterless constructor which no longer exists. This would cause compilation failure since the constructor signature has changed.
Standards
- Algorithm-Correctness-Parameter-Validation
- Logic-Verification-Constructor-Consistency
private Offsets(boolean preserveGroups) { | ||
this.preserveGroups = preserveGroups; | ||
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uninitialized Flag Parameter
The Offsets constructor was modified to accept a preserveGroups parameter but the original constructor call at line 288 passes false without explanation. This could lead to inconsistent group preservation behavior.
Standards
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- ISO-IEC-25010-Reliability-Fault-Tolerance
/** | ||
* Whether to preserve empty entries for groups when removing offsets. | ||
* We use this to keep track of the groups associated with pending transactions. | ||
*/ | ||
private final boolean preserveGroups; | ||
|
||
/** | ||
* The offsets keyed by group id, topic name and partition id. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup; | ||
|
||
private Offsets() { | ||
private Offsets(boolean preserveGroups) { | ||
this.preserveGroups = preserveGroups; | ||
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Insufficient Constructor Documentation
The constructor parameter is documented but lacks explanation of when to use true/false values and their implications. This makes future modifications risky as developers may not understand the full impact of this parameter.
Standards
- Clean-Code-Documentation
- Clean-Code-Naming
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (1)
859-862
: Bug: cleanupExpiredOffsets may delete a group with open transactions when it has no committed offsetsEarly return ignores openTransactionsByGroup. This contradicts the later guard and can incorrectly mark the group deletable while a transaction is still open.
Apply:
- if (offsetsByTopic == null) { - return true; - } + if (offsetsByTopic == null) { + // No committed offsets left; only deletable if there are no open transactions. + return !openTransactionsByGroup.containsKey(groupId); + }Consider adding a unit test (see suggested test in the test file review).
🧹 Nitpick comments (2)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1)
2596-2656
: Add a test where the group has only pending offsets (no committed ones)This would catch an edge case where a group with zero committed offsets but open transactions should not be considered deletable by cleanup. Suggest adding a test that: creates only a transactional commit, asserts cleanupExpiredOffsets(...) returns false, then commits/aborts the txn and asserts it returns true.
Apply this test diff near the existing tests:
+ @Test + public void testCleanupExpiredOffsetsWithOnlyPendingTransactionalOffsets() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMinutes(1) + .build(); + + long t0 = context.time.milliseconds(); + // Only a pending transactional offset (no committed offsets). + context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, t0 + 10); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(o -> o.commitTimestampMs))); + when(group.isSubscribedToTopic("foo")).thenReturn(false); + + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + // Should not be deletable while the transaction is open. + List<CoordinatorRecord> records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(List.of(), records); + + // Once the transaction ends, the group becomes deletable. + context.replayEndTransactionMarker(10L, TransactionResult.ABORT); + assertTrue(context.cleanupExpiredOffsets("group-id", new ArrayList<>())); + }group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (1)
201-206
: Constructor flag is clear; add brief usage doc for true/falseThe preserveGroups intent is good. A short note on where each mode is used (main store=false, pending txns=true) will help future maintainers.
private class Offsets { - /** - * Whether to preserve empty entries for groups when removing offsets. - * We use this to keep track of the groups associated with pending transactions. - */ + /** + * Whether to preserve empty entries for groups when removing offsets. + * Usage: + * - preserveGroups=false for the main committed offsets store (prune empty groups). + * - preserveGroups=true for pending transactional offsets (retain empty groups so we + * can still track open transactions per group until the txn end marker arrives). + */ private final boolean preserveGroups; @@ - private Offsets(boolean preserveGroups) { + private Offsets(boolean preserveGroups) { this.preserveGroups = preserveGroups; this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); }Also applies to: 212-215
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
(4 hunks)group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
(1 hunks)
🔇 Additional comments (4)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1)
2596-2656
: Good regression coverage for deleted pending transactional offsetsThe scenario is well modeled and ensures the group isn't deletable while the transaction is still open, even after the pending offset tombstone is written.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (3)
266-267
: Correct: avoid removing empty group entries for pending transactional offsetsThis is the key to keep group->producer linkage after tombstoning pending offsets.
288-288
: Main store initialization with preserveGroups=false is appropriateMaintains prior pruning behavior for committed offsets.
1005-1006
: Pending store initialization with preserveGroups=true is appropriateEnsures group association survives offset deletions until txn end.
/refacto-test |
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: Transactional Offset Management👍 Well Done
📌 Files Processed
📝 Additional Comments
|
this.config = config; | ||
this.metrics = metrics; | ||
this.offsets = new Offsets(); | ||
this.offsets = new Offsets(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Group Preservation
Main offsets store initialized with preserveGroups=false while pending transactional offsets use true. Inconsistent behavior could lead to premature group deletion.
this.offsets = new Offsets(false); | |
this.offsets = new Offsets(true); |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
if (!preserveGroups && topicOffsets.isEmpty()) | ||
offsetsByGroup.remove(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group Preservation Overhead
The preserveGroups
flag controls whether empty group entries are retained in memory. For transactional offsets, this causes unnecessary memory retention as groups with deleted offsets remain in the map until transaction completion.
if (!preserveGroups && topicOffsets.isEmpty()) | |
offsetsByGroup.remove(groupId); | |
if (!preserveGroups && topicOffsets.isEmpty()) | |
offsetsByGroup.remove(groupId); | |
else if (preserveGroups && topicOffsets.isEmpty() && !openTransactionsByGroup.containsKey(groupId)) | |
offsetsByGroup.remove(groupId); |
Standards
- ISO-IEC-25010-Performance-Resource-Utilization
- Algorithm-Opt-Memory-Management
if (!preserveGroups && topicOffsets.isEmpty()) | ||
offsetsByGroup.remove(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group Preservation Logic
The original code incorrectly removed groups with empty topic offsets even when they had pending transactions. This could cause premature group deletion and data inconsistency when transactions complete later.
if (!preserveGroups && topicOffsets.isEmpty()) | |
offsetsByGroup.remove(groupId); | |
if (!preserveGroups && topicOffsets.isEmpty()) | |
offsetsByGroup.remove(groupId); |
Standards
- Business-Rule-Validation
- Logic-Verification-Completeness
- Algorithm-Correctness-State-Preservation
private Offsets(boolean preserveGroups) { | ||
this.preserveGroups = preserveGroups; | ||
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean Parameter Clarity
Boolean parameter lacks semantic clarity, making intention hard to understand from call sites. Consider using a more descriptive enum or builder pattern to make the parameter's purpose self-documenting.
Standards
- Clean-Code-Meaningful-Names
- Clean-Code-Functions
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup; | ||
|
||
private Offsets() { | ||
private Offsets(boolean preserveGroups) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Resource Exhaustion
Preserving empty group entries indefinitely could lead to memory exhaustion. Attackers could create numerous groups with pending transactions, depleting coordinator resources.
private Offsets(boolean preserveGroups) { | |
this.preserveGroups = preserveGroups; | |
// TODO: Add monitoring and limits for preserved empty groups to prevent resource exhaustion |
Standards
- CWE-400
- OWASP-A04
When deleting pending transactional offsets, we must preserve the list
of groups associated with the producer ID, otherwise we cannot clean up
the list of pending transactions for the group once the transaction is
committed or aborted.
Summary by CodeRabbit
Bug Fixes
Tests