Skip to content

KAFKA-17862: [buffer pool] corruption during buffer reuse from the pool #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.compress.NoCompression;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;


public class ProducerIntegrationTest {

@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
}

}


@SuppressWarnings({"unchecked", "this-escape"})
private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
Map<String, Object> config = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
);
return new EvilKafkaProducerBuilder().build(config);
}

static class EvilKafkaProducerBuilder {

Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);

String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;

@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}


private ProducerInterceptors buildInterceptors() {
this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
return this.interceptors;
}

private Partitioner buildPartition() {
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
return this.partitioner;
}

private Sender buildSender() {
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = ClientUtils.createNetworkClient(config,
this.metrics,
"producer",
logContext,
apiVersions,
Time.SYSTEM,
maxInflightRequests,
metadata,
throttleTimeSensor,
null);

short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
this.sender = new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
config.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
Time.SYSTEM,
requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
null) {
@Override
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
return this.sender;
}

private RecordAccumulator buildAccumulator() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(
config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
metrics,
ProducerConfig.PARTITIONER_CLASS_CONFIG);
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
this.accumulator = new RecordAccumulator(logContext,
batchSize,
NoCompression.NONE,
(int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE),
retryBackoffMs,
retryBackoffMaxMs,
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
),
metrics,
"producer-metrics",
Time.SYSTEM,
null,
new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics,
Time.SYSTEM, "producer-metrics"));
return accumulator;
}

private ProducerMetadata buildMetadata() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
List.of(),
reporters,
List.of(
Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(),
Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
this.metadata = new ProducerMetadata(retryBackoffMs,
retryBackoffMaxMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
return metadata;
}

private KafkaThread ioThread() {
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
ioThread.start();
return ioThread;
}
}

static class EvilBufferPool extends BufferPool {

public EvilBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
super(memory, poolableSize, metrics, time, metricGrpName);
}

/**
* Override deallocate to intentionally corrupt the ByteBuffer being returned to the pool.
* This is used to simulate a scenario where an in-flight buffer is mistakenly reused
* and its contents are unexpectedly modified, helping expose buffer reuse bugs.
*/
@Override
public void deallocate(ByteBuffer buffer, int size) {
// Ensure atomicity using reentrant behavior
lock.lock();
try {
Arrays.fill(buffer.array(), (byte) 0);
super.deallocate(buffer, size);
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BufferPool {

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
protected final ReentrantLock lock;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Changing the lock to protected seems reasonable to allow subclasses to access it, but consider if there are other ways to achieve the same goal without exposing the lock directly. If this is the best approach, please add a comment explaining why this change was necessary and how subclasses should use the lock safely.

Suggested change
protected final ReentrantLock lock;
/**
* The lock protecting all mutable state in the pool.
* It is protected to allow subclasses in testing to access it, but should be used with caution.
*/
protected final ReentrantLock lock;

private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,43 +172,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
this.accumulator.deallocate(batch);
}

/**
* Get the in-flight batches that has reached delivery timeout.
*/
private List<ProducerBatch> getExpiredInflightBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();

for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
List<ProducerBatch> partitionInFlightBatches = entry.getValue();
if (partitionInFlightBatches != null) {
Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
while (iter.hasNext()) {
ProducerBatch batch = iter.next();
if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
iter.remove();
// expireBatches is called in Sender.sendProducerData, before client.poll.
// The !batch.isDone() invariant should always hold. An IllegalStateException
// exception will be thrown if the invariant is violated.
if (!batch.isDone()) {
expiredBatches.add(batch);
} else {
throw new IllegalStateException(batch.topicPartition + " batch created at " +
batch.createdMs + " gets unexpected final state " + batch.finalState());
}
} else {
accumulator.maybeUpdateNextBatchExpiryTime(batch);
break;
}
}
if (partitionInFlightBatches.isEmpty()) {
batchIt.remove();
}
}
}
return expiredBatches;
}

private void addToInflightBatches(List<ProducerBatch> batches) {
for (ProducerBatch batch : batches) {
List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
Expand Down Expand Up @@ -355,7 +318,8 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) {
return false;
}

private long sendProducerData(long now) {
// Visible for testing
protected long sendProducerData(long now) {
Comment on lines +321 to +322

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Making this method protected increases its visibility. It's good that it's visible for testing, but it's important to document why this change was made and how it should be used. This is particularly important since this method is part of the core sending logic.

Suggested change
// Visible for testing
protected long sendProducerData(long now) {
/**
* Send data in the accumulator to the brokers.
* Visible for testing.
*/
@VisibleForTesting
protected long sendProducerData(long now) {

MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
Expand Down Expand Up @@ -404,9 +368,7 @@ private long sendProducerData(long now) {
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

Comment on lines 371 to 372

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Removing expiredInflightBatches simplifies the logic, which is good. Can you add a comment explaining why this was removed, and how the expiry is now handled?

Suggested change
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Remove expiredInflightBatches since expiry is now handled in the accumulator directly.
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);

// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
Expand Down