Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand All @@ -36,6 +38,25 @@ import scala.collection.Seq
*/
abstract class BaseConsumerTest extends AbstractConsumerTest {

private var currentTestName: String = _

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
currentTestName = testInfo.getTestMethod.get().getName
super.setUp(testInfo)
}

override protected def brokerPropertyOverrides(properties: Properties): Unit = {
super.brokerPropertyOverrides(properties)

if (currentTestName != null && currentTestName.equals("testCoordinatorFailover")) {
// Enable controlled shutdown to allow the broker to notify the controller before shutting down.
// This speeds up the test by triggering an immediate failover instead of waiting for the
// broker session timeout (default: 9s) to expire.
properties.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true")
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testSimpleConsumption(groupProtocol: String): Unit = {
Expand Down
Loading