From da0c8779a566437a40b3ef4e4c66937718fe51e0 Mon Sep 17 00:00:00 2001 From: Francis Godinho Date: Mon, 15 Dec 2025 19:05:36 -0800 Subject: [PATCH 1/2] KAFKA-19925: Fix transaction timeout handling during broker upgrades --- .../tests/core/transactions_upgrade_test.py | 2 +- .../tools/TransactionalMessageCopier.java | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py index 724605c6b24e6..6c952f0a626aa 100644 --- a/tests/kafkatest/tests/core/transactions_upgrade_test.py +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -179,7 +179,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic self.perform_upgrade(from_kafka_version) - copier_timeout_sec = 180 + copier_timeout_sec = 360 for copier in copiers: wait_until(lambda: copier.is_done, timeout_sec=copier_timeout_sec, diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index f99252d37add8..17994b71e962e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -308,9 +308,11 @@ public static void runEventLoop(Namespace parsedArgs) { String consumerGroup = parsedArgs.getString("consumerGroup"); - final KafkaProducer producer = createProducer(parsedArgs); + KafkaProducer producer = createProducer(parsedArgs); final KafkaConsumer consumer = createConsumer(parsedArgs); + int producerNumber = 0; + final AtomicLong remainingMessages = new AtomicLong( parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages")); @@ -387,7 +389,17 @@ public void onPartitionsAssigned(Collection partitions) { long messagesSentWithinCurrentTxn = records.count(); ConsumerGroupMetadata groupMetadata = useGroupMetadata ? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup); - producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); + try { + producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); + } catch (KafkaException e) { + // in case the producer gets stuck here, create a new one and continue the loop + try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {} + parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++); + producer = createProducer(parsedArgs); + producer.initTransactions(); + resetToLastCommittedPositions(consumer); + continue; + } if (enableRandomAborts && random.nextInt() % 3 == 0) { abortTransactionAndResetPosition(producer, consumer); @@ -402,7 +414,7 @@ public void onPartitionsAssigned(Collection partitions) { } catch (KafkaException e) { log.debug("Aborting transaction after catching exception", e); abortTransactionAndResetPosition(producer, consumer); - } + } } } } catch (WakeupException e) { From d4330056e5e4a2956ac146c24dfd3427ca0fd327 Mon Sep 17 00:00:00 2001 From: Francis Godinho <55031793+FrancisGodinho@users.noreply.github.com> Date: Wed, 17 Dec 2025 23:13:20 -0800 Subject: [PATCH 2/2] Update tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Vincent Potuček <8830888+Pankraz76@users.noreply.github.com> --- .../java/org/apache/kafka/tools/TransactionalMessageCopier.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 17994b71e962e..b9345d76cb42d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -414,7 +414,7 @@ public void onPartitionsAssigned(Collection partitions) { } catch (KafkaException e) { log.debug("Aborting transaction after catching exception", e); abortTransactionAndResetPosition(producer, consumer); - } + } } } } catch (WakeupException e) {