Skip to content

Commit 388739f

Browse files
authored
KAFKA-19760: RecordTooLargeExceptions in group coordinator when offsets.topic.compression.codec is used (#20653)
The group coordinator has been having issues with unknown errors. The theory is that this is caused by optimistic compression estimates which cause unchecked batch overflows when trying to write. This PR adds a check for uncompressed record size to flush batches more eagerly and avoid overfilling partially-full batches. This should make the group coordinator errors less frequent. Also added tests to ensure this change does not impact desired behavior for large compressible records. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 087028c commit 388739f

File tree

3 files changed

+317
-36
lines changed

3 files changed

+317
-36
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.kafka.common.compress.Compression;
2222
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
2323
import org.apache.kafka.common.errors.NotCoordinatorException;
24-
import org.apache.kafka.common.errors.RecordTooLargeException;
2524
import org.apache.kafka.common.errors.TimeoutException;
2625
import org.apache.kafka.common.protocol.Errors;
2726
import org.apache.kafka.common.record.AbstractRecords;
27+
import org.apache.kafka.common.record.CompressionType;
2828
import org.apache.kafka.common.record.ControlRecordType;
2929
import org.apache.kafka.common.record.EndTransactionMarker;
3030
import org.apache.kafka.common.record.MemoryRecords;
@@ -829,11 +829,11 @@ private void flushCurrentBatch() {
829829
}
830830

831831
/**
832-
* Flushes the current batch if it is transactional or if it has passed the append linger time.
832+
* Flushes the current batch if it is transactional, if it has passed the append linger time, or if it is full.
833833
*/
834834
private void maybeFlushCurrentBatch(long currentTimeMs) {
835835
if (currentBatch != null) {
836-
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
836+
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
837837
flushCurrentBatch();
838838
}
839839
}
@@ -911,6 +911,24 @@ public void run() {
911911
}
912912
}
913913

914+
/**
915+
* Completes the given event once all pending writes are completed.
916+
*
917+
* @param event The event to complete once all pending
918+
* writes are completed.
919+
*/
920+
private void waitForPendingWrites(DeferredEvent event) {
921+
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
922+
currentBatch.deferredEvents.add(event);
923+
} else {
924+
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
925+
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
926+
} else {
927+
event.complete(null);
928+
}
929+
}
930+
}
931+
914932
/**
915933
* Appends records to the log and replay them to the state machine.
916934
*
@@ -940,17 +958,8 @@ private void append(
940958

941959
if (records.isEmpty()) {
942960
// If the records are empty, it was a read operation after all. In this case,
943-
// the response can be returned directly iff there are no pending write operations;
944-
// otherwise, the read needs to wait on the last write operation to be completed.
945-
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
946-
currentBatch.deferredEvents.add(event);
947-
} else {
948-
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
949-
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
950-
} else {
951-
event.complete(null);
952-
}
953-
}
961+
// the response can be returned once any pending write operations complete.
962+
waitForPendingWrites(event);
954963
} else {
955964
// If the records are not empty, first, they are applied to the state machine,
956965
// second, they are appended to the opened batch.
@@ -984,27 +993,18 @@ private void append(
984993
}
985994

986995
if (isAtomic) {
987-
// Compute the estimated size of the records.
988-
int estimatedSize = AbstractRecords.estimateSizeInBytes(
996+
// Compute the size of the records.
997+
int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes(
989998
currentBatch.builder.magic(),
990-
compression.type(),
999+
CompressionType.NONE,
9911000
recordsToAppend
9921001
);
9931002

994-
// Check if the current batch has enough space. We check this before
995-
// replaying the records in order to avoid having to revert back
996-
// changes if the records do not fit within a batch.
997-
if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
998-
throw new RecordTooLargeException("Message batch size is " + estimatedSize +
999-
" bytes in append to partition " + tp + " which exceeds the maximum " +
1000-
"configured size of " + currentBatch.maxBatchSize + ".");
1001-
}
1002-
1003-
if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
1004-
// Otherwise, we write the current batch, allocate a new one and re-verify
1005-
// whether the records fit in it.
1006-
// If flushing fails, we don't catch the exception in order to let
1007-
// the caller fail the current operation.
1003+
if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
1004+
// Start a new batch when the total uncompressed data size would exceed
1005+
// the max batch size. We still allow atomic writes with an uncompressed size
1006+
// larger than the max batch size as long as they compress down to under the max
1007+
// batch size. These large writes go into a batch by themselves.
10081008
flushCurrentBatch();
10091009
maybeAllocateNewBatch(
10101010
producerId,
@@ -1075,8 +1075,8 @@ private void append(
10751075
// Add the event to the list of pending events associated with the batch.
10761076
currentBatch.deferredEvents.add(event);
10771077

1078-
// Write the current batch if it is transactional or if the linger timeout
1079-
// has expired.
1078+
// Write the current batch if it is transactional, if the linger timeout
1079+
// has expired, or if it is full.
10801080
// If flushing fails, we don't catch the exception in order to let
10811081
// the caller fail the current operation.
10821082
maybeFlushCurrentBatch(currentTimeMs);

0 commit comments

Comments
 (0)