Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,19 @@ public OffsetMetadataManager build() {
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;

private class Offsets {
/**
* Whether to preserve empty entries for groups when removing offsets.
* We use this to keep track of the groups associated with pending transactions.
*/
private final boolean preserveGroups;

/**
* The offsets keyed by group id, topic name and partition id.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;

private Offsets() {
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
Comment on lines +212 to +213
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Parameter Clarity

The boolean parameter creates a primitive obsession code smell. Consider creating an explicit enum or constant for the preserveGroups parameter to improve code readability and make the intention clearer at instantiation sites.

Standards
  • Clean-Code-Parameters
  • Refactoring-Replace-Primitive
  • Maintainability-Quality-Readability

this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
Comment on lines +212 to 214
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Preservation Logic

The original Offsets constructor lacks a parameter to control group preservation behavior. This leads to inconsistent handling of groups with pending transactions. The fix adds a preserveGroups parameter to explicitly control whether empty groups should be preserved during offset removal.

        private Offsets(boolean preserveGroups) {
            this.preserveGroups = preserveGroups;
            this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
        }

        /**
         * Default constructor that doesn't preserve empty groups.
         */
        private Offsets() {
            this(false);
        }
Commitable Suggestion
Suggested change
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
* Default constructor that doesn't preserve empty groups.
*/
private Offsets() {
this(false);
}
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

}

Expand Down Expand Up @@ -256,7 +263,7 @@ private OffsetAndMetadata remove(
if (partitionOffsets.isEmpty())
topicOffsets.remove(topic);

if (topicOffsets.isEmpty())
if (!preserveGroups && topicOffsets.isEmpty())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Removal Condition

The original code unconditionally removes groups when their topic offsets are empty, ignoring pending transactions. The fix adds a conditional check (!preserveGroups) to prevent removing groups that have pending transactional offsets, maintaining data consistency during transaction processing.

            if (!preserveGroups && topicOffsets.isEmpty())
Commitable Suggestion
Suggested change
if (!preserveGroups && topicOffsets.isEmpty())
if (!preserveGroups && topicOffsets.isEmpty())
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Control-Flow
  • Algorithm-Correctness-Condition-Logic

offsetsByGroup.remove(groupId);

return removedValue;
Expand All @@ -278,7 +285,7 @@ private OffsetAndMetadata remove(
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
this.offsets = new Offsets();
this.offsets = new Offsets(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default Offsets Configuration

The original code creates the main offsets store without explicitly setting group preservation behavior. The fix creates Offsets with preserveGroups=false for the main store, making the behavior explicit and consistent with the intended design where empty groups should be removed from the main store.

        this.offsets = new Offsets(false);
Commitable Suggestion
Suggested change
this.offsets = new Offsets(false);
this.offsets = new Offsets(false);
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default Parameter Value

Using a literal false value without context makes the code's intent unclear. Consider creating a named constant like DEFAULT_PRESERVE_GROUPS = false to document the default behavior and improve code readability.

Standards
  • Clean-Code-Constants
  • Maintainability-Quality-Self-Documentation
  • Clean-Code-Naming

this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
Expand Down Expand Up @@ -851,7 +858,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Deletion Logic

The logic only checks openTransactionsByGroup but doesn't verify if there are pending transactional offsets for the group. This could lead to data loss if a group has pending transactional offsets but no open transactions.

            return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Commitable Suggestion
Suggested change
return !openTransactionsByGroup.containsKey(groupId);
return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-State-Consistency

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Deletion Logic

The original code unconditionally returns true when offsetsByTopic is null, indicating the group can be deleted. This is incorrect when the group has pending transactional offsets, as it would lead to premature group deletion. The fix correctly checks if there are open transactions for the group.

            return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Commitable Suggestion
Suggested change
return !openTransactionsByGroup.containsKey(groupId);
return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Control-Flow
  • Algorithm-Correctness-Condition-Logic

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient Group Check

The group existence check has been modified to check openTransactionsByGroup which is more expensive than the previous empty check. This operation now requires a hash lookup for every group check, potentially affecting performance in high-throughput scenarios.

Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Conditional-Efficiency
  • Algorithmic-Complexity-Lookup-Performance

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction State Check

The negated condition makes the logic harder to follow. Consider extracting this check into a descriptive method like hasOpenTransactions(groupId) and inverting the logic to make the intent clearer and improve maintainability.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Readability
  • Clean-Code-Conditionals

}

// We expect the group to exist.
Expand Down Expand Up @@ -995,7 +1002,7 @@ public void replay(
// offsets store. Pending offsets there are moved to the main store when
// the transaction is committed; or removed when the transaction is aborted.
pendingTransactionalOffsets
.computeIfAbsent(producerId, __ -> new Offsets())
.computeIfAbsent(producerId, __ -> new Offsets(true))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transactional Offsets Handling

The original code creates new Offsets instances without preserving empty groups for pending transactions. The fix creates Offsets with preserveGroups=true for pending transactional offsets, ensuring groups with pending transactions aren't prematurely deleted when offsets are removed.

                    .computeIfAbsent(producerId, __ -> new Offsets(true))
Commitable Suggestion
Suggested change
.computeIfAbsent(producerId, __ -> new Offsets(true))
.computeIfAbsent(producerId, __ -> new Offsets(true))
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve Flag Documentation

The true literal lacks context at the call site. Consider creating a named constant like PRESERVE_GROUPS_FOR_TRANSACTIONS = true to document the behavior and improve maintainability when reading transactional offset handling code.

Standards
  • Clean-Code-Constants
  • Maintainability-Quality-Self-Documentation
  • Clean-Code-Naming

.put(
groupId,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,103 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
assertEquals(List.of(), records);
}

@Test
public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);

OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMinutes(1)
.build();

long commitTimestamp = context.time.milliseconds();

context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);

when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
when(group.isSubscribedToTopic("foo")).thenReturn(false);

// Delete the pending transactional offset.
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List.of(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)
))
).iterator());
CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> result = context.deleteOffsets(
new OffsetDeleteRequestData()
.setGroupId("group-id")
.setTopics(requestTopicCollection)
);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1)
);
assertEquals(expectedRecords, result.records());

context.time.sleep(Duration.ofMinutes(1).toMillis());

// The group should not be deleted because it has a pending transaction.
expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
);
List<CoordinatorRecord> records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);

// Commit the ongoing transaction.
context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);

// The group should be deletable now.
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.time.sleep(Duration.ofMinutes(1).toMillis());

records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
Comment on lines +2648 to +2654

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The second half of this test is a bit confusing. After the transaction is committed on line 2646, the group should be deletable. However, the test then re-commits and re-expires the same offset to verify this. This is redundant and makes the test harder to understand.

A simpler approach would be to directly assert that cleanupExpiredOffsets returns true right after the transaction is committed. Since no more offsets are left to expire, the records list should be empty. This would more clearly test the condition for the group being deletable.

Suggested change
// The group should be deletable now.
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.time.sleep(Duration.ofMinutes(1).toMillis());
records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
// The group should be deletable now as there are no more offsets and no pending transactions.
records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
// No more offsets are expired, so the records list should be empty.
assertEquals(List.of(), records);

}

@Test
public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);

OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMinutes(1)
.build();

long commitTimestamp = context.time.milliseconds();

context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);

context.time.sleep(Duration.ofMinutes(1).toMillis());

when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
when(group.isSubscribedToTopic("foo")).thenReturn(false);

// foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits.
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
);
List<CoordinatorRecord> records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);

// No offsets are expired, and the group is still not deleted because it has pending transactional offset commits.
records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(List.of(), records);
}

private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,
Expand Down
Loading