Skip to content

Commit 2daf5be

Browse files
authored
KAFKA-16024: SaslPlaintextConsumerTest#testCoordinatorFailover is flaky (#20774)
- `broker.session.timeout.ms` defaults to 9s. When a broker goes offline, the group coordinator may take up to this long to be re-elected. - The commit callback retry timeout is currently 10 seconds, which leaves very little buffer. If the metadata hasn’t refreshed yet, the consumer may still send an OFFSET_COMMIT request to the offline coordinator, leading to transient failures. This patch enable `controlled.shutdown.enable` to allow the broker to notify the controller before shutting down. This speeds up the test by triggering an immediate failover instead of waiting for the broker session timeout (default: 9s) to expire. Reviewers: TaiJuWu <[email protected]>, PoAn Yang <[email protected]>
1 parent 0072bd9 commit 2daf5be

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed

core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
2222
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo}
2323
import org.apache.kafka.common.internals.Topic
2424
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
25+
import org.apache.kafka.server.config.ServerConfigs
2526
import org.junit.jupiter.api.Assertions._
27+
import org.junit.jupiter.api.{BeforeEach, TestInfo}
2628
import org.junit.jupiter.params.ParameterizedTest
2729
import org.junit.jupiter.params.provider.MethodSource
2830

@@ -36,6 +38,25 @@ import scala.collection.Seq
3638
*/
3739
abstract class BaseConsumerTest extends AbstractConsumerTest {
3840

41+
private var currentTestName: String = _
42+
43+
@BeforeEach
44+
override def setUp(testInfo: TestInfo): Unit = {
45+
currentTestName = testInfo.getTestMethod.get().getName
46+
super.setUp(testInfo)
47+
}
48+
49+
override protected def brokerPropertyOverrides(properties: Properties): Unit = {
50+
super.brokerPropertyOverrides(properties)
51+
52+
if (currentTestName != null && currentTestName.equals("testCoordinatorFailover")) {
53+
// Enable controlled shutdown to allow the broker to notify the controller before shutting down.
54+
// This speeds up the test by triggering an immediate failover instead of waiting for the
55+
// broker session timeout (default: 9s) to expire.
56+
properties.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true")
57+
}
58+
}
59+
3960
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
4061
@MethodSource(Array("getTestGroupProtocolParametersAll"))
4162
def testSimpleConsumption(groupProtocol: String): Unit = {

0 commit comments

Comments
 (0)