From d6ed60a84e371e3ee379083456313a969f3cd17f Mon Sep 17 00:00:00 2001 From: Izzy Harker Date: Thu, 16 Oct 2025 10:18:10 -0500 Subject: [PATCH] 4.0 updates --- .../common/runtime/CoordinatorRuntime.java | 52 ++--- .../runtime/CoordinatorRuntimeTest.java | 209 +++++++++++++++++- 2 files changed, 229 insertions(+), 32 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index e1e80476cf810..e61983658fa6c 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.NotCoordinatorException; -import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; @@ -831,11 +831,11 @@ private void flushCurrentBatch() { } /** - * Flushes the current batch if it is transactional or if it has passed the append linger time. + * Flushes the current batch if it is transactional, if it has passed the append linger time, or if it is full. */ private void maybeFlushCurrentBatch(long currentTimeMs) { if (currentBatch != null) { - if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { flushCurrentBatch(); } } @@ -912,6 +912,24 @@ public void run() { } } + /** + * Completes the given event once all pending writes are completed. + * + * @param event The event to complete once all pending + * writes are completed. + */ + private void waitForPendingWrites(DeferredEvent event) { + if (currentBatch != null && currentBatch.builder.numRecords() > 0) { + currentBatch.deferredEvents.add(event); + } else { + if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { + deferredEventQueue.add(coordinator.lastWrittenOffset(), event); + } else { + event.complete(null); + } + } + } + /** * Appends records to the log and replay them to the state machine. * @@ -941,17 +959,8 @@ private void append( if (records.isEmpty()) { // If the records are empty, it was a read operation after all. In this case, - // the response can be returned directly iff there are no pending write operations; - // otherwise, the read needs to wait on the last write operation to be completed. - if (currentBatch != null && currentBatch.builder.numRecords() > 0) { - currentBatch.deferredEvents.add(event); - } else { - if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { - deferredEventQueue.add(coordinator.lastWrittenOffset(), event); - } else { - event.complete(null); - } - } + // the response can be returned once any pending write operations complete. + waitForPendingWrites(event); } else { // If the records are not empty, first, they are applied to the state machine, // second, they are appended to the opened batch. @@ -986,22 +995,13 @@ private void append( if (isAtomic) { // Compute the estimated size of the records. - int estimatedSize = AbstractRecords.estimateSizeInBytes( + int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes( currentBatch.builder.magic(), - compression.type(), + CompressionType.NONE, recordsToAppend ); - // Check if the current batch has enough space. We check this before - // replaying the records in order to avoid having to revert back - // changes if the records do not fit within a batch. - if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { - throw new RecordTooLargeException("Message batch size is " + estimatedSize + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + currentBatch.maxBatchSize + "."); - } - - if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) { // Otherwise, we write the current batch, allocate a new one and re-verify // whether the records fit in it. // If flushing fails, we don't catch the exception in order to let diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 9198e207e4b10..26e7e7b5f287b 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -69,6 +69,7 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalInt; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -544,15 +545,31 @@ public CoordinatorShardBuilder get() { } } - private static MemoryRecords records( + public static MemoryRecords records( long timestamp, + Compression compression, String... records ) { - return records(timestamp, Arrays.stream(records).collect(Collectors.toList())); + return records(timestamp, compression, Arrays.stream(records).toList()); } - private static MemoryRecords records( + public static MemoryRecords records( long timestamp, + String... records + ) { + return records(timestamp, Compression.NONE, Arrays.stream(records).toList()); + } + + public static MemoryRecords records( + long timestamp, + List records + ) { + return records(timestamp, Compression.NONE, records); + } + + public static MemoryRecords records( + long timestamp, + Compression compression, List records ) { if (records.isEmpty()) @@ -560,11 +577,11 @@ private static MemoryRecords records( List simpleRecords = records.stream().map(record -> new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset())) - ).collect(Collectors.toList()); + ).toList(); int sizeEstimate = AbstractRecords.estimateSizeInBytes( RecordVersion.current().value, - CompressionType.NONE, + compression.type(), simpleRecords ); @@ -573,7 +590,7 @@ private static MemoryRecords records( MemoryRecordsBuilder builder = MemoryRecords.builder( buffer, RecordVersion.current().value, - Compression.NONE, + compression, TimestampType.CREATE_TIME, 0L, timestamp, @@ -4945,6 +4962,186 @@ public void testRecordEventPurgatoryTime() throws Exception { verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1); } + @Test + public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + Compression compression = Compression.gzip().build(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withCompression(compression) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create 2 records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with the small records, batch will be about half full + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records, "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state - batch is not yet flushed + assertEquals(List.of(), writer.entries(TP)); + + // Create a record of highly compressible data + List mediumRecord = List.of("a".repeat((int) (0.75 * maxBatchSize))); + + // Write #2 with the large record. This record is too large to go into the previous batch + // uncompressed but fits in a new buffer, so we should flush the previous batch and allocate + // a new one. + long secondBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(mediumRecord, "response2") + ); + + // Create a large record of highly compressible data + List largeRecord = List.of("a".repeat(3 * maxBatchSize)); + + // Write #2 with the large record. This record is too large to go into the previous batch + // uncompressed but will fit in the new buffer once compressed, so we should flush the + // previous batch and successfully allocate a new batch for this record. The new batch + // will also trigger an immediate flush. + long thirdBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(largeRecord, "response3") + ); + + // Verify the state. + assertEquals(4L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, mediumRecord.get(0)), + new MockCoordinatorShard.RecordAndMetadata(3, largeRecord.get(0)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, compression, records), + records(secondBatchTimestamp, compression, mediumRecord), + records(thirdBatchTimestamp, compression, largeRecord) + ), writer.entries(TP)); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertEquals(4L, ctx.coordinator.lastCommittedOffset()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testLargeUncompressibleRecordTriggersFlushAndFails() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + Compression compression = Compression.gzip().build(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withCompression(compression) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create 2 records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with the small records, batch will be about half full + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records, "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state - batch is not yet flushed + assertEquals(List.of(), writer.entries(TP)); + + // Create a large record of not very compressible data + char[] payload = new char[3 * maxBatchSize]; + Random offset = new Random(); + for (int i = 0; i < payload.length; i++) { + payload[i] = (char) ('a' + ((char) offset.nextInt() % 26)); + } + List largeRecord = List.of(new String(payload)); + + // Write #2 with the large record. This record is too large to go into the previous batch + // and is not compressible so it should be flushed. It is also too large to fit in a new batch + // so the write should fail with RecordTooLargeException + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(largeRecord, "response2") + ); + + // Verify the state. The first batch was flushed and the largeRecord + // write failed. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, compression, records) + ), writer.entries(TP)); + } + @Test public void testWriteEventCompletesOnlyOnce() throws Exception { // Completes once via timeout, then again with HWM update.