Skip to content

Update Transactional producer to translate retriable into abortable exxceptions #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

Problem

  • Currently, when a transactional producer encounters retriable errors (like COORDINATOR_LOAD_IN_PROGRESS) and exhausts all retries, finally returns retriable error to Application Layer.
  • Application reties can cause duplicate records. As a fix we are transitioning all retriable errors as Abortable Error in transaction producer path.

Solution

Modified the TransactionManager to automatically transition retriable errors to abortable errors after all retries are exhausted. This ensures that applications can abort transaction when they encounter TransactionAbortableException

RefreshRetriableException like CoordinatorNotAvailableException will be refreshed internally [code] till reties are expired, then it will be treated as retriable errors and translated to TransactionAbortableException

Testing

Added test testSenderShouldTransitionToAbortableAfterRetriesExhausted to verify in sender thread:

  • Retriable errors are properly converted to abortable state after retries
  • Transaction state transitions correctly and subsequent operations fail appropriately with TransactionAbortableException

Summary by CodeRabbit

  • Bug Fixes
    • Improved error handling for transactional producers by ensuring that retriable errors which exhaust all retries are now reported as abortable transaction errors.
  • New Features
    • Enhanced error reporting with more informative exceptions when a transaction is aborted after retries are exhausted.
  • Tests
    • Added tests to verify correct transition to abortable error state after retry exhaustion in transactional scenarios.

Copy link

coderabbitai bot commented Apr 21, 2025

Walkthrough

The changes introduce enhanced error handling for transactional state transitions in Kafka's producer internals. When a retriable exception leads to a transition to an abortable or fatal error state, the exception is now wrapped in a TransactionAbortableException for clarity. A new constructor is added to TransactionAbortableException to support this wrapping. Additionally, a test is introduced to verify that the transaction manager transitions correctly to an abortable state after exhausting retries on retriable errors, and that subsequent operations fail as expected.

Changes

File(s) Change Summary
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Enhanced the transitionTo method to wrap RetriableException in a TransactionAbortableException when transitioning to abortable or fatal error states. No other logic was modified.
clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java Added a constructor accepting both a message and a cause, and introduced a serialVersionUID field for serialization compatibility.
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java Added a new test to verify abortable state transition after retries are exhausted, and a helper method to configure retries in test setup. The new test asserts correct state transitions and exception propagation. The original retry test was repositioned but not changed.

Sequence Diagram(s)

sequenceDiagram
    participant Producer
    participant TransactionManager
    participant Broker

    Producer->>TransactionManager: beginTransaction()
    Producer->>TransactionManager: send(produceRequest)
    TransactionManager->>Broker: sendRequest
    Broker-->>TransactionManager: RetriableException (e.g., COORDINATOR_LOAD_IN_PROGRESS)
    loop Retry until retries exhausted
        TransactionManager->>Broker: resendRequest
        Broker-->>TransactionManager: RetriableException
    end
    TransactionManager->>TransactionManager: transitionTo(ABORTABLE_ERROR, wrap RetriableException)
    Producer->>TransactionManager: send(anotherRequest)
    TransactionManager-->>Producer: TransactionAbortableException
Loading

Poem

In the warren of code, a new path appears,
Where retries are tracked through transactional gears.
If hope is exhausted and errors persist,
The manager now wraps them—no detail is missed.
With tests to ensure the right state is found,
The burrow is safer, with logic more sound.
🐇✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.31.1)
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (1)

3692-3695: Reduce overload proliferation in setupWithTransactionState

A fourth overload was added solely to pass retries, which now yields:

(txnManager, guaranteeOrder, pool)
(txnManager, guaranteeOrder, pool, retries)
(txnManager, guaranteeOrder, pool, updateMetadata)
(txnManager, guaranteeOrder, pool, updateMetadata, retries, lingerMs)

With three boolean/int permutations the call‑site can become ambiguous and maintenance‑heavy.
Consider replacing the overloads with one of:

  1. A builder/parameter object (ProducerTestConfig) that collects all optional knobs; or
  2. Keep a single “kitchen‑sink” method and use named parameters at the call‑site.

Either alternative avoids the exponential growth of overloads and makes future additions safer.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b93f62e.

📒 Files selected for processing (3)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (2 hunks)
  • clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java (1)
  • TransactionAbortableException (19-30)
🔇 Additional comments (4)
clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java (2)

20-25: Good enhancement to support exception wrapping

Adding a constructor that accepts both a message and a cause enables proper exception chaining when converting from RetriableException to TransactionAbortableException, maintaining the original error context.


21-21: Good practice adding serialVersionUID

Adding the serialVersionUID field ensures serialization compatibility, which is important for exception classes that might be serialized and deserialized across different JVMs.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (2)

40-40: Necessary import for the exception

Adding the import for TransactionAbortableException is necessary to support the new error handling logic in the transitionTo method.


1077-1081: Good implementation for converting retriable to abortable exceptions

This change properly implements the conversion of RetriableException to TransactionAbortableException when transactions transition to error states. This addresses the core issue where retriable errors were still returned as retriable after exhausting all retries, potentially causing duplicate records.

The error message clearly indicates the situation: "Transaction Request was aborted after exhausting retries."

Comment on lines +1078 to +1080
if (error instanceof RetriableException) {
error = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", error);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify test coverage for exception conversion

The wrapping of RetriableException in TransactionAbortableException is a behavior change that should be thoroughly tested. Ensure test cases cover scenarios where retries are exhausted and confirm the exception type received by the application layer.


🏁 Script executed:

#!/bin/bash
# Searching for tests covering the TransactionAbortableException scenario

echo "Checking for tests that verify exception conversion:"
rg -l "TransactionAbortableException.*RetriableException|RetriableException.*TransactionAbortableException" --type java clients/src/test/

echo -e "\nChecking for tests with 'exhausted' retries:"
rg -l "exhausted.*retries|retries.*exhausted" --type java clients/src/test/

Length of output: 525


Add test for exception conversion in TransactionManager

The new wrapping logic in TransactionManager (clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:1078‑1080) is a behavior change and needs explicit coverage. Although SenderTest.java already checks for “exhausted retries” (clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java), it does not verify that a RetriableException is wrapped as a TransactionAbortableException.

Please add or update a unit test (e.g. in a new TransactionManagerTest) to:

  • Simulate a failed transactional request that throws a RetriableException on the final retry.
  • Assert that the thrown exception is a TransactionAbortableException.
  • Verify that the original RetriableException is preserved as the cause.

Comment on lines +3004 to +3055
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);

// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);

// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();

// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);

// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();

// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());

// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);


// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Harden the new abort‑on‑retry‑exhaustion test

The test does the job functionally, but a few tweaks will make it more deterministic and easier to maintain:

@@
-    // No timeout guard – a hang here stalls the whole suite
+    @Timeout(10)   // keep the suite moving even if the sender hangs

@@
-        // Sleep for retry backoff
-        time.sleep(RETRY_BACKOFF_MS);
+        // Advance mock time instead of real sleeping (keeps the test instant)
+        time.sleep(RETRY_BACKOFF_MS);
@@
-        // Second produce request - should fail with TransactionAbortableException
-        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
-        client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
-        // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
-        sender.runOnce();
-        assertFutureFailure(future2, TransactionAbortableException.class);
+        // Second produce request – should be failed immediately
+        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
+        sender.runOnce(); // no response required – batch is aborted locally
+        assertFutureFailure(future2, TransactionAbortableException.class);
@@
-        try {
-            txnManager.beginCommit();
-            fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
-        } catch (KafkaException e) {
-            assertEquals(TransactionAbortableException.class, e.getCause().getClass());
-        }
+        assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
+            "beginCommit() should fail once the manager is in the abortable state");

Rationale:

  1. @Timeout prevents the build hanging if the state machine changes.
  2. Advancing the same MockTime is already correct; explicitly noting this avoids confusion about real sleeps.
  3. client.prepareResponse(... Errors.NONE ...) is never consumed (no request is sent once the manager is abortable) and may leave a stray response in the mock queue—better to drop it.
  4. Use assertThrows instead of try/catch for readability and to avoid accidental swallowing of the wrong exception.

To confirm JUnit versions and existing imports before adjusting annotations or assertions, let’s inspect the top of SenderTest.java and search for existing @Test, assertThrows, and any usage of @Timeout across the repo.


🏁 Script executed:

#!/bin/bash
# Show the first 200 lines of SenderTest.java to inspect imports and annotations
head -n 200 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

# Find all @Test annotations in this file (to see JUnit4 vs JUnit5)
rg "@Test" -n clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java || true

# Check for any assertThrows usage in this test
rg "assertThrows" -n clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java || true

# Scan the repository for @Timeout (to see if JUnit5’s Timeout is already used elsewhere)
rg "@Timeout" -n .

Length of output: 49834


Harden testSenderShouldTransitionToAbortableAfterRetriesExhausted

Apply a timeout guard, clarify mock‐time usage, drop the unused mock response, and switch to assertThrows for readability:

@@ clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
-    @Test
+    @Timeout(10L)   // prevent suite hang if state machine changes
+    @Test
     public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
@@
-        // Sleep for retry backoff
-        time.sleep(RETRY_BACKOFF_MS);
+        // Advance mock time for retry backoff (instant)
+        time.sleep(RETRY_BACKOFF_MS);
@@
-        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
-        client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
-        // Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
-        sender.runOnce();
-        assertFutureFailure(future2, TransactionAbortableException.class);
+        Future<RecordMetadata> future2 = appendToAccumulator(tp0);
+        // Batch is aborted locally in abortable state; no mock response needed
+        sender.runOnce();
+        assertFutureFailure(future2, TransactionAbortableException.class);
@@
-        try {
-            txnManager.beginCommit();
-            fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
-        } catch (KafkaException e) {
-            assertEquals(TransactionAbortableException.class, e.getCause().getClass());
-        }
+        assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
+            "beginCommit() should fail once the manager is in the abortable state");

Rationale:

  • @Timeout guards against potential hangs.
  • The comment now reflects that MockTime.sleep is instantaneous.
  • Removing the unused prepareResponse prevents stray responses in the mock queue.
  • assertThrows improves clarity and avoids silencing unexpected exceptions.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);
// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);
// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Sleep for retry backoff
time.sleep(RETRY_BACKOFF_MS);
// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());
// Second produce request - should fail with TransactionAbortableException
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);
// Verify transaction API requests also fail with TransactionAbortableException
try {
txnManager.beginCommit();
fail("Expected beginCommit() to fail with TransactionAbortableException when in abortable error state");
} catch (KafkaException e) {
assertEquals(TransactionAbortableException.class, e.getCause().getClass());
}
}
@Timeout(10L) // prevent suite hang if state machine changes
@Test
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(
logContext,
"testRetriableException",
60000,
RETRY_BACKOFF_MS,
apiVersions
);
// Setup with transaction state and initialize transactions with single retry
setupWithTransactionState(txnManager, false, null, 1);
doInitTransactions(txnManager, producerIdAndEpoch);
// Begin transaction and add partition
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
// First produce request
appendToAccumulator(tp0);
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Advance mock time for retry backoff (instant)
time.sleep(RETRY_BACKOFF_MS);
// Second attempt to process record - PREPARE the response before sending
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
sender.runOnce();
// Now transaction should be in abortable state after retry is exhausted
assertTrue(txnManager.hasAbortableError());
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
// Batch is aborted locally in abortable state; no mock response needed
sender.runOnce();
assertFutureFailure(future2, TransactionAbortableException.class);
assertThrows(TransactionAbortableException.class, txnManager::beginCommit,
"beginCommit() should fail once the manager is in the abortable state");
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants