From 6c26595ce3d1608ae98ad4958b2ff8776a025fc3 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 21 Apr 2025 07:32:10 +0530 Subject: [PATCH 1/2] Update Transactional producer to translate retriable into abortable exceptions --- .../internals/TransactionManager.java | 6 +++ .../errors/TransactionAbortableException.java | 7 +++ .../producer/internals/SenderTest.java | 52 ++++++++++++++++++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..2490432221654 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionAbortableException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -1073,6 +1074,11 @@ private void transitionTo(State target, RuntimeException error) { } else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { if (error == null) throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception"); + + if (error instanceof RetriableException) { + error = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", error); + } + lastError = error; } else { lastError = null; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java index aa592d552bf00..544a5c122b2f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java @@ -17,6 +17,13 @@ package org.apache.kafka.common.errors; public class TransactionAbortableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public TransactionAbortableException(String message, Throwable cause) { + super(message, cause); + } + public TransactionAbortableException(String message) { super(message); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index cd1cd2df8828c..8a9c034dac471 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -186,7 +186,7 @@ private static Map partitionRecords(ProduceReques })); return Collections.unmodifiableMap(partitionRecords); } - + @Test public void testSimple() throws Exception { long offset = 0; @@ -3001,7 +3001,51 @@ public void testCustomErrorMessage() throws Exception { } @Test - public void testSenderShouldRetryWithBackoffOnRetriableError() { + public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + TransactionManager txnManager = new TransactionManager( + logContext, + "testRetriableException", + 60000, + RETRY_BACKOFF_MS, + apiVersions + ); + + // Setup with transaction state and initialize transactions with single retry + setupWithTransactionState(txnManager, false, null, 1); + doInitTransactions(txnManager, producerIdAndEpoch); + + // Begin transaction and add partition + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); + sender.runOnce(); + + // First produce request + appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1)); + sender.runOnce(); + + // Sleep for retry backoff + time.sleep(RETRY_BACKOFF_MS); + + // Second attempt to process record - PREPARE the response before sending + client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1)); + sender.runOnce(); + + // Now transaction should be in abortable state after retry is exhausted + assertTrue(txnManager.hasAbortableError()); + + // Second produce request - should fail with TransactionAbortableException + Future future2 = appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1)); + // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state + sender.runOnce(); + assertFutureFailure(future2, TransactionAbortableException.class); + } + + @Test + public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = createTransactionManager(); setupWithTransactionState(transactionManager); @@ -3635,6 +3679,10 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0); } + private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) { + setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0); + } + private void setupWithTransactionState( TransactionManager transactionManager, boolean guaranteeOrder, From b93f62e5c199bafbc823d48babced1709f2319df Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 21 Apr 2025 07:53:26 +0530 Subject: [PATCH 2/2] Update test to include check for transaction APIs --- .../kafka/clients/producer/internals/SenderTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8a9c034dac471..03f3f7ab2859b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -3042,6 +3042,16 @@ public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state sender.runOnce(); assertFutureFailure(future2, TransactionAbortableException.class); + + + // Verify transaction API requests also fail with TransactionAbortableException + try { + txnManager.beginCommit(); + fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state"); + } catch (KafkaException e) { + assertEquals(TransactionAbortableException.class, e.getCause().getClass()); + } + } @Test