-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
474df2e
3ede47b
05c2e59
c8a135f
6604b68
72fd9c7
aa4817f
d1558b4
a635772
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,58 +120,6 @@ public class TransactionManager { | |
private final Set<TopicPartition> newPartitionsInTransaction; | ||
private final Set<TopicPartition> pendingPartitionsInTransaction; | ||
private final Set<TopicPartition> partitionsInTransaction; | ||
|
||
/** | ||
* During its normal course of operations, the transaction manager transitions through different internal | ||
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions | ||
* result from actions on one of the following classes of threads: | ||
* | ||
* <ul> | ||
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li> | ||
* <li><em>{@link Sender}</em> thread operations</li> | ||
* </ul> | ||
* | ||
* When an invalid state transition is detected during execution on an <em>application</em> thread, the | ||
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the | ||
* application the opportunity to fix the issue without permanently poisoning the state of the | ||
* transaction manager. The {@link Producer} API calls that perform a state transition include: | ||
* | ||
* <ul> | ||
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li> | ||
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li> | ||
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li> | ||
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()} | ||
* </li> | ||
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls | ||
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} | ||
* </li> | ||
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls | ||
* {@link #maybeAddPartition(TopicPartition)} and | ||
* {@link #maybeTransitionToErrorState(RuntimeException)} | ||
* </li> | ||
* </ul> | ||
* | ||
* <p/> | ||
* | ||
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the | ||
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an | ||
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an | ||
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its | ||
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. | ||
* | ||
* <p/> | ||
* | ||
* It's important to prevent possible corruption when the transaction manager has determined that it is in a | ||
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the | ||
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the | ||
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated | ||
* transactional guarantees are not violated. | ||
* | ||
* <p/> | ||
* | ||
* See KAFKA-14831 for more detail. | ||
*/ | ||
private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition; | ||
private PendingStateTransition pendingTransition; | ||
|
||
// This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. | ||
|
@@ -265,7 +213,6 @@ public TransactionManager(final LogContext logContext, | |
this.newPartitionsInTransaction = new HashSet<>(); | ||
this.pendingPartitionsInTransaction = new HashSet<>(); | ||
this.partitionsInTransaction = new HashSet<>(); | ||
this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); | ||
this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); | ||
this.pendingTxnOffsetCommits = new HashMap<>(); | ||
this.partitionsWithUnresolvedSequences = new HashMap<>(); | ||
|
@@ -275,8 +222,61 @@ public TransactionManager(final LogContext logContext, | |
this.apiVersions = apiVersions; | ||
} | ||
|
||
void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { | ||
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); | ||
/** | ||
* During its normal course of operations, the transaction manager transitions through different internal | ||
* states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions | ||
* result from actions on one of the following classes of threads: | ||
* | ||
* <ul> | ||
* <li><em>Application</em> threads that invokes {@link Producer} API calls</li> | ||
* <li><em>{@link Sender}</em> thread operations</li> | ||
* </ul> | ||
* | ||
* When an invalid state transition is detected during execution on an <em>application</em> thread, the | ||
* {@link #currentState} is <em>not updated</em> and an {@link IllegalStateException} is thrown. This gives the | ||
* application the opportunity to fix the issue without permanently poisoning the state of the | ||
* transaction manager. The {@link Producer} API calls that perform a state transition include: | ||
* | ||
* <ul> | ||
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li> | ||
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li> | ||
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li> | ||
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()} | ||
* </li> | ||
* <li>{@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls | ||
* {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} | ||
* </li> | ||
* <li>{@link Producer#send(ProducerRecord)} (and its variants) calls | ||
* {@link #maybeAddPartition(TopicPartition)} and | ||
* {@link #maybeTransitionToErrorState(RuntimeException)} | ||
* </li> | ||
* </ul> | ||
* | ||
* <p/> | ||
* | ||
* The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the | ||
* <em>{@link Sender}</em> thread. This includes record batching, network I/O, broker response handlers, etc. If an | ||
* invalid state transition is detected in the <em>{@link Sender}</em> thread, in addition to throwing an | ||
* {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its | ||
* {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. | ||
* | ||
* <p/> | ||
* | ||
* It's important to prevent possible corruption when the transaction manager has determined that it is in a | ||
* fatal state. Subsequent transaction operations attempted via either the <em>application</em> or the | ||
* <em>{@link Sender}</em> thread should fail. This is achieved when these operations invoke the | ||
* {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated | ||
* transactional guarantees are not violated. | ||
* | ||
* <p/> | ||
* | ||
* See KAFKA-14831 for more detail. | ||
* | ||
* @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, | ||
* {@code false} to throw an exception without first changing the state | ||
*/ | ||
protected boolean shouldPoisonStateOnInvalidTransition() { | ||
return Thread.currentThread() instanceof Sender.SenderThread; | ||
} | ||
Comment on lines
+279
to
280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Type CastThe implementation depends on thread instance type checking rather than explicit flags. If custom thread implementations are used that don't extend Sender.SenderThread, poisoning behavior will be incorrect, potentially allowing invalid state transitions without proper error handling. Standards
Comment on lines
+278
to
280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Type CheckThe method relies on thread instance check without fallback logic. If thread implementation changes, the poison state logic could fail silently. This creates a potential reliability issue where invalid transitions might not be properly handled. Standards
|
||
|
||
public synchronized TransactionalRequestResult initializeTransactions() { | ||
|
@@ -1063,7 +1063,7 @@ private void transitionTo(State target, RuntimeException error) { | |
String message = idString + "Invalid transition attempted from state " | ||
+ currentState.name() + " to state " + target.name(); | ||
|
||
if (shouldPoisonStateOnInvalidTransition.get()) { | ||
if (shouldPoisonStateOnInvalidTransition()) { | ||
currentState = State.FATAL_ERROR; | ||
lastError = new IllegalStateException(message); | ||
throw lastError; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,7 +149,7 @@ public class TransactionManagerTest { | |
|
||
private RecordAccumulator accumulator = null; | ||
private Sender sender = null; | ||
private TransactionManager transactionManager = null; | ||
private TestableTransactionManager transactionManager = null; | ||
private Node brokerNode = null; | ||
private long finalizedFeaturesEpoch = 0; | ||
|
||
|
@@ -188,7 +188,7 @@ private void initializeTransactionManager(Optional<String> transactionalId, bool | |
.setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)), | ||
finalizedFeaturesEpoch)); | ||
finalizedFeaturesEpoch += 1; | ||
this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), | ||
this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null), | ||
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); | ||
|
||
int batchSize = 16 * 1024; | ||
|
@@ -1038,7 +1038,7 @@ public void testTransactionManagerDisablesV2() { | |
.setMaxVersionLevel((short) 1) | ||
.setMinVersionLevel((short) 1)), | ||
0)); | ||
this.transactionManager = new TransactionManager(logContext, transactionalId, | ||
this.transactionManager = new TestableTransactionManager(logContext, transactionalId, | ||
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); | ||
|
||
int batchSize = 16 * 1024; | ||
|
@@ -3799,10 +3799,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t | |
|
||
@Test | ||
public void testBackgroundInvalidStateTransitionIsFatal() { | ||
initializeTransactionManager(Optional.of(transactionalId), true); | ||
doInitTransactions(); | ||
assertTrue(transactionManager.isTransactional()); | ||
|
||
transactionManager.setPoisonStateOnInvalidTransition(true); | ||
transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true); | ||
Comment on lines
+3802
to
+3806
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Thread CheckThe test sets an override without verifying the thread type first. This bypasses the thread-based detection pattern implemented in the main code, potentially causing false test results. If the test runs on a thread that's already a SenderThread, the override would be redundant and mask potential issues. Standards
|
||
|
||
// Intentionally perform an operation that will cause an invalid state transition. The detection of this | ||
// will result in a poisoning of the transaction manager for all subsequent transactional operations since | ||
|
@@ -4373,4 +4374,31 @@ private void runUntil(Supplier<Boolean> condition) { | |
ProducerTestUtils.runUntil(sender, condition); | ||
} | ||
|
||
/** | ||
* This subclass exists only to optionally change the default behavior related to poisoning the state | ||
* on invalid state transition attempts. | ||
*/ | ||
private static class TestableTransactionManager extends TransactionManager { | ||
|
||
private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride; | ||
|
||
public TestableTransactionManager(LogContext logContext, | ||
String transactionalId, | ||
int transactionTimeoutMs, | ||
long retryBackoffMs, | ||
ApiVersions apiVersions) { | ||
super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions); | ||
this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty(); | ||
} | ||
|
||
private void setShouldPoisonStateOnInvalidTransitionOverride(boolean override) { | ||
shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override); | ||
} | ||
|
||
@Override | ||
protected boolean shouldPoisonStateOnInvalidTransition() { | ||
// If there's an override, use it, otherwise invoke the default (i.e. super class) logic. | ||
return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Override Annotation
The SenderThread constructor doesn't override any parent method but fails to document this is a new constructor. Missing @OverRide annotation could lead to silent failures if parent class constructor signature changes. This reduces reliability during future maintenance.
Standards