From 5e4ebde82a587a24be077b5a54198c0ba2e33c80 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Wed, 22 Oct 2025 22:41:32 +0530 Subject: [PATCH 1/4] Log an error when we get duplicate acquired offsets --- .../internals/ShareCompletedFetch.java | 9 ++- .../internals/ShareCompletedFetchTest.java | 58 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 2c337782dd415..de4d2ecb136d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -100,9 +100,16 @@ public class ShareCompletedFetch { private List buildAcquiredRecordList(List partitionAcquiredRecords) { List acquiredRecordList = new LinkedList<>(); + // Set to find duplicates in case of overlapping acquired records + Set offsets = new HashSet<>(); partitionAcquiredRecords.forEach(acquiredRecords -> { for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); offset++) { - acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); + if (!offsets.add(offset)) { + log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " + + "This indicates a broker processing issue.", offset, partition.topicPartition()); + } else { + acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); + } } }); return acquiredRecordList; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java index a1814fd935c9c..95f1696629235 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ShareCompletedFetchTest { private static final String TOPIC_NAME = "test"; @@ -356,6 +357,63 @@ record = records.get(1); assertEquals(0, records.size()); } + @Test + public void testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() { + int startingOffset = 0; + int numRecords = 20; // Records for 0-19 + + // Create overlapping acquired records: [0-9] and [5-14] + // Offsets 5-9 will be duplicates + List acquiredRecords = new ArrayList<>(); + acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0L) + .setLastOffset(9L) + .setDeliveryCount((short) 1)); + acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(5L) + .setLastOffset(14L) + .setDeliveryCount((short) 2)); + + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setRecords(newRecords(startingOffset, numRecords)) + .setAcquiredRecords(acquiredRecords); + + ShareCompletedFetch completedFetch = newShareCompletedFetch(partitionData); + + Deserializers deserializers = newStringDeserializers(); + + // Fetch records and verify that only 15 unique records are returned (0-14) + ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 20, true); + List> records = batch.getInFlightRecords(); + + // Should get 15 unique records: 0-9 from first range (with deliveryCount=1) + // and 10-14 from second range (with deliveryCount=2) + assertEquals(15, records.size()); + + // Verify first occurrence (offset 5 should have deliveryCount=1 from first range) + ConsumerRecord record5 = records.stream() + .filter(r -> r.offset() == 5L) + .findFirst() + .orElse(null); + assertNotNull(record5); + assertEquals(Optional.of((short) 1), record5.deliveryCount()); + + // Verify offset 10 has deliveryCount=2 from second range + ConsumerRecord record10 = records.stream() + .filter(r -> r.offset() == 10L) + .findFirst() + .orElse(null); + assertNotNull(record10); + assertEquals(Optional.of((short) 2), record10.deliveryCount()); + + // Verify all offsets are unique + Set offsetSet = new HashSet<>(); + for (ConsumerRecord record : records) { + assertTrue(offsetSet.add(record.offset()), + "Duplicate offset found in results: " + record.offset()); + } + } + private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) { LogContext logContext = new LogContext(); ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry(); From 07f950226592d79ba4d30c8929dd27e74da03640 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Thu, 23 Oct 2025 17:15:14 +0530 Subject: [PATCH 2/4] Use ArrayList --- .../kafka/clients/consumer/internals/ShareCompletedFetch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index de4d2ecb136d5..4625f84f0d1ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -41,9 +41,9 @@ import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Optional; @@ -99,7 +99,7 @@ public class ShareCompletedFetch { } private List buildAcquiredRecordList(List partitionAcquiredRecords) { - List acquiredRecordList = new LinkedList<>(); + List acquiredRecordList = new ArrayList<>(); // Set to find duplicates in case of overlapping acquired records Set offsets = new HashSet<>(); partitionAcquiredRecords.forEach(acquiredRecords -> { From a729d1f0b1707e4666324390acef0a450e4b7f1f Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Fri, 24 Oct 2025 11:46:14 +0530 Subject: [PATCH 3/4] Set initial size for array --- .../clients/consumer/internals/ShareCompletedFetch.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 4625f84f0d1ed..4729a9fdfbe3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -99,7 +99,11 @@ public class ShareCompletedFetch { } private List buildAcquiredRecordList(List partitionAcquiredRecords) { - List acquiredRecordList = new ArrayList<>(); + // Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen. + int initialListSize = !partitionAcquiredRecords.isEmpty() ? (int) (partitionAcquiredRecords.get(0).lastOffset() - + partitionAcquiredRecords.get(0).firstOffset() + 1) : 0; + List acquiredRecordList = new ArrayList<>(initialListSize); + // Set to find duplicates in case of overlapping acquired records Set offsets = new HashSet<>(); partitionAcquiredRecords.forEach(acquiredRecords -> { From 8db08e52eb05958f215dcacfff05b20832900b2e Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Fri, 24 Oct 2025 15:57:55 +0530 Subject: [PATCH 4/4] Address comment --- .../clients/consumer/internals/ShareCompletedFetch.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 4729a9fdfbe3f..95e40a3c826c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -100,8 +100,10 @@ public class ShareCompletedFetch { private List buildAcquiredRecordList(List partitionAcquiredRecords) { // Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen. - int initialListSize = !partitionAcquiredRecords.isEmpty() ? (int) (partitionAcquiredRecords.get(0).lastOffset() - - partitionAcquiredRecords.get(0).firstOffset() + 1) : 0; + if (partitionAcquiredRecords.isEmpty()) { + return List.of(); + } + int initialListSize = (int) (partitionAcquiredRecords.get(0).lastOffset() - partitionAcquiredRecords.get(0).firstOffset() + 1); List acquiredRecordList = new ArrayList<>(initialListSize); // Set to find duplicates in case of overlapping acquired records