Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -831,11 +831,11 @@ private void flushCurrentBatch() {
}

/**
* Flushes the current batch if it is transactional or if it has passed the append linger time.
* Flushes the current batch if it is transactional, if it has passed the append linger time, or if it is full.
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
Expand Down Expand Up @@ -912,6 +912,24 @@ public void run() {
}
}

/**
* Completes the given event once all pending writes are completed.
*
* @param event The event to complete once all pending
* writes are completed.
*/
private void waitForPendingWrites(DeferredEvent event) {
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
} else {
event.complete(null);
}
}
}

/**
* Appends records to the log and replay them to the state machine.
*
Expand Down Expand Up @@ -941,17 +959,8 @@ private void append(

if (records.isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
} else {
event.complete(null);
}
}
// the response can be returned once any pending write operations complete.
waitForPendingWrites(event);
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, they are appended to the opened batch.
Expand Down Expand Up @@ -986,22 +995,13 @@ private void append(

if (isAtomic) {
// Compute the estimated size of the records.
int estimatedSize = AbstractRecords.estimateSizeInBytes(
int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
compression.type(),
CompressionType.NONE,
recordsToAppend
);

// Check if the current batch has enough space. We check this before
// replaying the records in order to avoid having to revert back
// changes if the records do not fit within a batch.
if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
throw new RecordTooLargeException("Message batch size is " + estimatedSize +
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + currentBatch.maxBatchSize + ".");
}

if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
// Otherwise, we write the current batch, allocate a new one and re-verify
// whether the records fit in it.
// If flushing fails, we don't catch the exception in order to let
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -544,27 +545,43 @@ public CoordinatorShardBuilder<MockCoordinatorShard, String> get() {
}
}

private static MemoryRecords records(
public static MemoryRecords records(
long timestamp,
Compression compression,
String... records
) {
return records(timestamp, Arrays.stream(records).collect(Collectors.toList()));
return records(timestamp, compression, Arrays.stream(records).toList());
}

private static MemoryRecords records(
public static MemoryRecords records(
long timestamp,
String... records
) {
return records(timestamp, Compression.NONE, Arrays.stream(records).toList());
}

public static MemoryRecords records(
long timestamp,
List<String> records
) {
return records(timestamp, Compression.NONE, records);
}

public static MemoryRecords records(
long timestamp,
Compression compression,
List<String> records
) {
if (records.isEmpty())
return MemoryRecords.EMPTY;

List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))
).collect(Collectors.toList());
).toList();

int sizeEstimate = AbstractRecords.estimateSizeInBytes(
RecordVersion.current().value,
CompressionType.NONE,
compression.type(),
simpleRecords
);

Expand All @@ -573,7 +590,7 @@ private static MemoryRecords records(
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer,
RecordVersion.current().value,
Compression.NONE,
compression,
TimestampType.CREATE_TIME,
0L,
timestamp,
Expand Down Expand Up @@ -4945,6 +4962,186 @@ public void testRecordEventPurgatoryTime() throws Exception {
verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1);
}

@Test
public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Exception {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
Compression compression = Compression.gzip().build();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withCompression(compression)
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1 with the small records, batch will be about half full
long firstBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records, "response1")
);

// A batch has been created.
assertNotNull(ctx.currentBatch);

// Verify the state - batch is not yet flushed
assertEquals(List.of(), writer.entries(TP));

// Create a record of highly compressible data
List<String> mediumRecord = List.of("a".repeat((int) (0.75 * maxBatchSize)));

// Write #2 with the large record. This record is too large to go into the previous batch
// uncompressed but fits in a new buffer, so we should flush the previous batch and allocate
// a new one.
long secondBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(mediumRecord, "response2")
);

// Create a large record of highly compressible data
List<String> largeRecord = List.of("a".repeat(3 * maxBatchSize));

// Write #2 with the large record. This record is too large to go into the previous batch
// uncompressed but will fit in the new buffer once compressed, so we should flush the
// previous batch and successfully allocate a new batch for this record. The new batch
// will also trigger an immediate flush.
long thirdBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(largeRecord, "response3")
);

// Verify the state.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, mediumRecord.get(0)),
new MockCoordinatorShard.RecordAndMetadata(3, largeRecord.get(0))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(firstBatchTimestamp, compression, records),
records(secondBatchTimestamp, compression, mediumRecord),
records(thirdBatchTimestamp, compression, largeRecord)
), writer.entries(TP));

// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertEquals(4L, ctx.coordinator.lastCommittedOffset());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
}

@Test
public void testLargeUncompressibleRecordTriggersFlushAndFails() throws Exception {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
Compression compression = Compression.gzip().build();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withCompression(compression)
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1 with the small records, batch will be about half full
long firstBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records, "response1")
);

// A batch has been created.
assertNotNull(ctx.currentBatch);

// Verify the state - batch is not yet flushed
assertEquals(List.of(), writer.entries(TP));

// Create a large record of not very compressible data
char[] payload = new char[3 * maxBatchSize];
Random offset = new Random();
for (int i = 0; i < payload.length; i++) {
payload[i] = (char) ('a' + ((char) offset.nextInt() % 26));
}
List<String> largeRecord = List.of(new String(payload));

// Write #2 with the large record. This record is too large to go into the previous batch
// and is not compressible so it should be flushed. It is also too large to fit in a new batch
// so the write should fail with RecordTooLargeException
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(largeRecord, "response2")
);

// Verify the state. The first batch was flushed and the largeRecord
// write failed.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(firstBatchTimestamp, compression, records)
), writer.entries(TP));
}

@Test
public void testWriteEventCompletesOnlyOnce() throws Exception {
// Completes once via timeout, then again with HWM update.
Expand Down
Loading