Skip to content

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

This is part of the client side changes required to enable 2PC for
KIP-939

Producer Config:
transaction.two.phase.commit.enable The default would be ‘false’. If
set to ‘true’, the broker is informed that the client is participating
in two phase commit protocol and transactions that this client starts
never expire.

Overloaded InitProducerId method
If the value is 'true' then the corresponding field is set in the
InitProducerIdRequest

Copy link

coderabbitai bot commented Apr 21, 2025

Warning

Rate limit exceeded

@visz11 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 27 minutes and 26 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 4a08713.

📒 Files selected for processing (9)
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (2 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (4 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (18 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (5 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

@visz11
Copy link
Collaborator

visz11 commented Jul 31, 2025

/refacto-test

Copy link

refacto-test bot commented Jul 31, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-test bot commented Jul 31, 2025

Solid Implementation - Important Configuration Validation Needed

Review Summary

This PR introduces a well-designed enhancement to Kafka's transaction management capabilities by adding support for two-phase commit (2PC) transactions. The implementation demonstrates careful attention to backward compatibility and configuration validation. Our analysis identified 3 actionable items that should be addressed before merging, primarily focusing on configuration validation and transaction initialization. Once these are addressed, this will be a robust addition to Kafka's transaction management capabilities.

Well Done!!!

  • Excellent implementation of a new configuration option for two-phase commit transactions with proper defaults
  • Thoughtful extension of transaction initialization API with backward compatibility
  • Comprehensive test coverage for the new transaction initialization method
  • Careful validation to prevent incompatible configuration settings

Files Processed

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 355-639
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 644-690
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java 255-276

📌 Additional Comments (3)

Performance

Transaction Manager Constructor Complexity

Explanation: Adding a new boolean parameter to the TransactionManager constructor increases method signature complexity and potential memory overhead. As more transaction features are added in the future, this constructor could become unwieldy.

TransactionManager constructor with enable2PC parameter

Fix Suggestion: Consider using a configuration object or builder pattern to manage growing constructor complexity

+ public static class TransactionConfig {
+  private final boolean enable2PC;
+  private final int transactionTimeoutMs;
+  
+  public TransactionConfig(boolean enable2PC, int transactionTimeoutMs) {
+  this.enable2PC = enable2PC;
+  this.transactionTimeoutMs = transactionTimeoutMs;
+  }
+ }
+ 
+ public TransactionManager(final LogContext logContext,
+  final String transactionalId,
+  final TransactionConfig config,
+  final long retryBackoffMs,
+  final ApiVersions apiVersions) {
Rationale
  • Reduces constructor parameter count and improves future extensibility
  • Provides a more flexible configuration management approach
  • Minimal performance overhead with potential long-term maintainability benefits
  • Makes it easier to add new transaction configuration options in the future
References
  • Standard: Object-Oriented Design Principles
---

Security

Two-Phase Commit Transaction Initialization Extension

Explanation: The code extends transaction initialization to support an optional two-phase commit flag, allowing more flexible transaction management without introducing significant security risks.

initializeTransactionState method

Fix Suggestion: Added optional keepPreparedTxn parameter to transaction initialization

+ // Consider adding logging for two-phase commit transaction initializations
+ if (keepPreparedTxn) {
+  log.info("Initializing transactions with prepared transaction retention enabled");
+ }
Rationale
  • Provides controlled mechanism for retaining prepared transactions
  • Allows external transaction managers more flexibility in transaction lifecycle management
  • Maintains existing security checks during transaction initialization
  • Adding logging improves auditability of two-phase commit operations
References
  • CWE: CWE-285
  • OWASP: A04:2021-Insecure Design
---

Reliability

Overloaded Transaction Initialization Method

Explanation: The new overloaded initTransactions method provides additional flexibility for managing transactions, specifically supporting two-phase commit scenarios by allowing retention of prepared transactions.

public void initTransactions(boolean keepPreparedTxn) {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
 long now = time.nanoseconds();
 TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
 sender.wakeup();
 result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
 producerMetrics.recordInit(time.nanoseconds() - now);
}

Fix Suggestion: Implement robust transaction initialization with optional prepared transaction retention

+ // Consider adding validation for the keepPreparedTxn parameter
+ if (keepPreparedTxn && !transactionManager.isTransactionTwoPhaseCommitEnabled()) {
+  throw new IllegalStateException(
+  "Cannot retain prepared transactions when two-phase commit is not enabled. " +
+  "Please set " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + " to true."
+  );
+ }
Rationale
  • Provides more granular control over transaction initialization
  • Supports complex distributed transaction scenarios
  • Maintains backward compatibility with existing transaction model
  • Added validation ensures consistent configuration across the API
References
  • Standard: ISO/IEC 25010 - Functional Suitability, Flexibility
---

@visz11
Copy link
Collaborator

visz11 commented Aug 7, 2025

/refacto-test

Copy link

refacto-test bot commented Aug 7, 2025

PR already reviewed at the latest commit: 4a08713.
Please try again with new changes.

@visz11
Copy link
Collaborator

visz11 commented Sep 10, 2025

/refacto-test

Copy link

refacto-test bot commented Sep 10, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-test bot commented Sep 10, 2025

Code Review: Transaction Management Implementation

👍 Well Done
Transaction Recovery Support

Two-phase commit support enables reliable transaction recovery across system failures.

Clean Configuration Validation

Validation prevents incompatible transaction timeout with two-phase commit.

Backward Compatibility Maintenance

Preserves existing functionality while adding new transaction capabilities.

📌 Files Processed
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (2)
Transaction Recovery Security

The keepPreparedTxn parameter enables transaction recovery without validation checks. Malicious clients could potentially recover and commit transactions they don't own if authorization isn't properly enforced.

Standards:

  • CWE-285
  • OWASP-A01
Transaction Documentation Needed

Documentation lacks explicit mention that keepPreparedTxn=true requires transaction.two.phase.commit.enable=true. Clear dependency documentation would prevent misconfigurations.

Standards:

  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Documentation
  • SRE-Documentation
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (1)
Test Coverage Enhancement

Test cases don't verify behavior when keepPreparedTxn=true but enable2PC=false in actual transaction processing. Additional test case needed to verify proper error handling.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Testing
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
Transaction Recovery Optimization

The synchronized methods could cause contention under high transaction initialization load. Consider using finer-grained locking or a non-blocking approach for the initialization path, especially important for recovery scenarios where multiple producers might initialize simultaneously after a failure.

Standards:

  • ISO-IEC-25010-Performance-Time-Behaviour
  • Netflix-Hot-Path-Optimization
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (2)
Conflicting Documentation Logic

Documentation states transactions "never expire" but doesn't explicitly mention this overrides transaction.timeout.ms. This logical disconnect is partially addressed in validation code but not clearly documented here.

Standards:

  • Business-Rule-Documentation
  • Logic-Verification-Consistency
Configuration Validation Duplication

Configuration validation logic is embedded in postProcessAndValidateIdempotenceConfigs method. Consider extracting validation into a dedicated method to improve readability and maintainability.

Standards:

  • Clean-Code-Function-Size
  • Clean-Code-Function-Responsibility

Comment on lines +682 to +686
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Validation Check

The new initTransactions method lacks validation for 2PC compatibility when keepPreparedTxn=true. Without validation, using keepPreparedTxn without enable2PC could cause transaction recovery failures.

Suggested change
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
if (keepPreparedTxn && !transactionManager.isEnable2PC()) {
throw new IllegalStateException("Cannot use keepPreparedTxn=true without enabling two-phase commit");
}
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Preconditions

Comment on lines 290 to +297
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Parameter Clarity

Boolean parameters reduce readability and create implicit dependencies. Consider using enums or constants for the keepPreparedTxn parameter to improve code clarity and maintainability.

Standards
  • Clean-Code-Boolean-Parameters
  • Clean-Code-Meaningful-Names

Comment on lines 316 to +322
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
.setProducerEpoch(producerIdAndEpoch.epoch)
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitProducerId Request Optimization

The InitProducerId request now includes two additional flags that increase request size. For high-throughput systems with frequent producer initialization, consider batching producer initializations or implementing a connection pooling strategy to amortize the cost of these larger requests.

Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Hot-Path-Optimization

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Transaction Management Enhancements

👍 Well Done
Comprehensive Transaction Support

Added two-phase commit capability enhances transaction reliability and recovery.

Backward Compatibility

Maintained API compatibility while extending transaction functionality.

📌 Files Processed
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
Parameter Validation Missing

The new initializeTransactions method lacks parameter validation for keepPreparedTxn. When 2PC is not enabled but keepPreparedTxn is true, this could lead to inconsistent transaction state and potential recovery issues.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (1)
Inconsistent Parameter Ordering

The method overload uses inconsistent parameter ordering compared to the expanded method. This creates maintenance challenges and increases the risk of parameter confusion in future modifications, potentially causing transaction reliability issues.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +627 to +635
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration Validation Gap

The validation prevents setting transaction.timeout.ms with 2PC enabled, but doesn't handle the default transaction timeout. The default timeout will still be used internally even when 2PC is enabled, potentially causing inconsistent behavior.

Suggested change
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
// Override default transaction timeout when 2PC is enabled to ensure consistency
if (enable2PC) {
this.values().put(TRANSACTION_TIMEOUT_CONFIG, Integer.MAX_VALUE);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition

Comment on lines 649 to +651
public void initTransactions() {
initTransactions(false);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Handling

The default implementation of initTransactions() hardcodes 'false' for keepPreparedTxn. This prevents configuration-based defaults and forces explicit method calls for 2PC functionality, potentially causing inconsistent transaction behavior.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Interface-Contract

Comment on lines +1400 to +1402
if (request instanceof InitProducerIdRequest) {
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
requestFlags[0] = initRequest.data().keepPreparedTxn();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Safety Check

The test code accesses initRequest.data() without null checking. If data() returns null, this would cause a NullPointerException, making tests unreliable and potentially masking real issues.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Transaction Management Two-Phase Commit Support

👍 Well Done
Two-Phase Commit Support

Clean implementation of 2PC transaction support with proper configuration validation.

Backward Compatibility Maintained

Default method implementation preserves backward compatibility for existing transaction code.

📌 Files Processed
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (2)
Inconsistent Parameter Validation

The new initializeTransactions(boolean) method doesn't validate the keepPreparedTxn parameter against the enable2PC field. This inconsistent validation pattern could allow incompatible parameter combinations, potentially causing transaction state inconsistencies when keepPreparedTxn=true but enable2PC=false.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition-Validation
  • SRE-Defensive-Programming
Logging Enhancement Needed

Log message doesn't include the enable2PC state, which is critical for debugging transaction initialization issues. Without logging both flags together, it would be difficult to diagnose configuration mismatches where keepPreparedTxn=true but enable2PC=false, potentially causing transaction reliability problems.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Observability
  • DbC-Runtime-Verification
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (1)
Test Coverage Gap

Test doesn't verify behavior when keepPreparedTxn=true but enable2PC=false, which is a potentially problematic configuration. Missing this test case could allow reliability issues where transaction state becomes inconsistent when these incompatible settings are used together.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Test-Coverage
  • DbC-Contract-Verification

Comment on lines +682 to +686
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Transaction Validation

The new initTransactions(boolean) method doesn't validate that keepPreparedTxn=true is only used with enable2PC=true. Using keepPreparedTxn without 2PC enabled could cause transaction state inconsistencies since the broker won't be configured to handle prepared transactions properly.

Suggested change
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
// Validate that keepPreparedTxn is only used with enable2PC
if (keepPreparedTxn && !transactionManager.isEnable2PC()) {
throw new IllegalStateException("Cannot use keepPreparedTxn=true without enabling two-phase commit. " +
"Set " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + "=true in the producer config.");
}
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition-Validation
  • SRE-Error-Prevention

Comment on lines +628 to +630
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration Validation Gap

Configuration validation only checks if transaction timeout is explicitly set, but doesn't validate when enable2PC=true with default transaction timeout. This creates inconsistent validation as the default timeout would still apply but isn't being rejected, potentially causing reliability issues in 2PC transactions.

Suggested change
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
if (enable2PC) {
// When 2PC is enabled, transaction timeout should not be used at all
// as transactions are managed by an external coordinator
if (originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG)) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
// Force transaction timeout to a very large value when 2PC is enabled
// to effectively disable timeout-based transaction expiration
this.values().put(TRANSACTION_TIMEOUT_CONFIG, Integer.MAX_VALUE);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition-Validation
  • SRE-Configuration-Consistency

Comment on lines +653 to +660
/**
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
* with additional capabilities to keep a previously prepared transaction.
* Must be called before any send operations that require a {@code transactionalId}.
* <p>
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
* {@code true}, the producer does <em>not</em> automatically abort existing transactions.
* Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Documentation

Documentation doesn't clearly state that keepPreparedTxn=true requires enable2PC=true configuration. Missing this critical dependency information could lead to misuse of the API and transaction reliability issues when developers use keepPreparedTxn without enabling 2PC.

Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity
  • DbC-Interface-Documentation
  • SRE-Error-Prevention

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Transaction Management Implementation

👍 Well Done
Transaction Recovery Support

Added 2PC transaction support improves reliability for distributed transaction recovery.

Comprehensive Parameter Validation

Configuration validation prevents incompatible transaction settings that could cause reliability issues.

📌 Files Processed
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (1)
Consistent Parameter Naming

Inconsistent naming convention between enable2PC (camelCase) and TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG (uppercase with underscores). This inconsistency could lead to confusion and potential errors in future code modifications.

Standards:

  • ISO-IEC-25010-Maintainability
  • DbC-Naming-Consistency
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
Parameter Documentation Improvement

The enable2PC parameter lacks JavaDoc documentation explaining its purpose and impact on transaction behavior. This could lead to incorrect usage by developers maintaining this code in the future.

Standards:

  • ISO-IEC-25010-Maintainability
  • DbC-Interface-Documentation

Comment on lines +260 to +261
final ApiVersions apiVersions,
final boolean enable2PC) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction Timeout Validation

The validation prevents setting transaction.timeout.ms when 2PC is enabled, but doesn't handle the case where the default transaction.timeout.ms is still used internally. The TransactionManager constructor still receives the transactionTimeoutMs parameter without checking if 2PC is enabled.

Suggested change
final ApiVersions apiVersions,
final boolean enable2PC) {
final ApiVersions apiVersions,
final boolean enable2PC) {
if (enable2PC && transactionTimeoutMs != Integer.MAX_VALUE) {
log.info("Using MAX_VALUE for transaction timeout with 2PC enabled instead of {}", transactionTimeoutMs);
}
Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition

Comment on lines +1412 to +1418
producer.initTransactions(keepPreparedTxn);

// Verify request flags match expected values
assertEquals(keepPreparedTxn, requestFlags[0],
"keepPreparedTxn flag should match input parameter");
assertEquals(enable2PC, requestFlags[1],
"enable2Pc flag should match producer configuration");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Safety Check

The test accesses requestFlags array without checking if the InitProducerIdRequest was actually processed. If the request handler wasn't called, requestFlags would contain default values (false), potentially causing false test failures.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Test-Reliability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants