diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 03423e3e88c95..492422cd5640f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -43,6 +43,7 @@
+
diff --git a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 783dcb1564adf..099ec05c14b93 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -27,6 +27,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
@@ -51,6 +52,7 @@ public class TestKitNodes {
public static class Builder {
private boolean combined;
private String clusterId;
+ private Path baseDirectory;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
@@ -104,6 +106,11 @@ public Builder setPerServerProperties(Map> perServe
return this;
}
+ public Builder setBaseDirectory(Path baseDirectory) {
+ this.baseDirectory = baseDirectory;
+ return this;
+ }
+
public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
@@ -128,8 +135,9 @@ public TestKitNodes build() {
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}
-
- String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
+ if (baseDirectory == null) {
+ this.baseDirectory = TestUtils.tempDirectory().toPath();
+ }
if (clusterId == null) {
clusterId = Uuid.randomUuid().toString();
}
@@ -160,7 +168,7 @@ public TestKitNodes build() {
for (int id : controllerNodeIds) {
TestKitNode controllerNode = TestKitNodes.buildControllerNode(
id,
- baseDirectory,
+ baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap())
@@ -172,7 +180,7 @@ public TestKitNodes build() {
for (int id : brokerNodeIds) {
TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
id,
- baseDirectory,
+ baseDirectory.toFile().getAbsolutePath(),
clusterId,
controllerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()),
@@ -181,7 +189,7 @@ public TestKitNodes build() {
brokerNodes.put(id, brokerNode);
}
- return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
+ return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
}
}
diff --git a/test-common/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java b/test-common/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
index 97798e68fe24f..1ee8bc2628f3b 100644
--- a/test-common/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
+++ b/test-common/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
@@ -21,6 +21,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
@@ -32,7 +33,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaClusterTestKitTest {
@ParameterizedTest
@@ -88,7 +89,7 @@ public void testCreateClusterWithBadPerServerProperties() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
+ public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(5).
@@ -118,8 +119,23 @@ public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
});
- } catch (Exception e) {
- fail("failed to init cluster", e);
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithSpecificBaseDir() throws Exception {
+ Path baseDirectory = TestUtils.tempDirectory().toPath();
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBaseDirectory(baseDirectory).
+ setNumBrokerNodes(1).
+ setCombined(true).
+ setNumControllerNodes(1).build()).build()) {
+ assertEquals(cluster.nodes().baseDirectory(), baseDirectory.toFile().getAbsolutePath());
+ cluster.nodes().controllerNodes().values().forEach(controller ->
+ assertTrue(Paths.get(controller.metadataDirectory()).startsWith(baseDirectory)));
+ cluster.nodes().brokerNodes().values().forEach(broker ->
+ assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
}