diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..2490432221654 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionAbortableException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -1073,6 +1074,11 @@ private void transitionTo(State target, RuntimeException error) { } else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { if (error == null) throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception"); + + if (error instanceof RetriableException) { + error = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", error); + } + lastError = error; } else { lastError = null; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java index aa592d552bf00..544a5c122b2f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java @@ -17,6 +17,13 @@ package org.apache.kafka.common.errors; public class TransactionAbortableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public TransactionAbortableException(String message, Throwable cause) { + super(message, cause); + } + public TransactionAbortableException(String message) { super(message); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index cd1cd2df8828c..03f3f7ab2859b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -186,7 +186,7 @@ private static Map partitionRecords(ProduceReques })); return Collections.unmodifiableMap(partitionRecords); } - + @Test public void testSimple() throws Exception { long offset = 0; @@ -3001,7 +3001,61 @@ public void testCustomErrorMessage() throws Exception { } @Test - public void testSenderShouldRetryWithBackoffOnRetriableError() { + public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + TransactionManager txnManager = new TransactionManager( + logContext, + "testRetriableException", + 60000, + RETRY_BACKOFF_MS, + apiVersions + ); + + // Setup with transaction state and initialize transactions with single retry + setupWithTransactionState(txnManager, false, null, 1); + doInitTransactions(txnManager, producerIdAndEpoch); + + // Begin transaction and add partition + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE))); + sender.runOnce(); + + // First produce request + appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1)); + sender.runOnce(); + + // Sleep for retry backoff + time.sleep(RETRY_BACKOFF_MS); + + // Second attempt to process record - PREPARE the response before sending + client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1)); + sender.runOnce(); + + // Now transaction should be in abortable state after retry is exhausted + assertTrue(txnManager.hasAbortableError()); + + // Second produce request - should fail with TransactionAbortableException + Future future2 = appendToAccumulator(tp0); + client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1)); + // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state + sender.runOnce(); + assertFutureFailure(future2, TransactionAbortableException.class); + + + // Verify transaction API requests also fail with TransactionAbortableException + try { + txnManager.beginCommit(); + fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state"); + } catch (KafkaException e) { + assertEquals(TransactionAbortableException.class, e.getCause().getClass()); + } + + } + + @Test + public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = createTransactionManager(); setupWithTransactionState(transactionManager); @@ -3635,6 +3689,10 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0); } + private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) { + setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0); + } + private void setupWithTransactionState( TransactionManager transactionManager, boolean guaranteeOrder,