Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17574: Allow overriding TestKitNodes baseDirectory #17225

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.server.common.MetadataVersion;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -51,6 +52,7 @@ public class TestKitNodes {
public static class Builder {
private boolean combined;
private String clusterId;
private Path baseDirectory;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
Expand Down Expand Up @@ -104,6 +106,11 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe
return this;
}

public Builder setBaseDirectory(Path baseDirectory) {
this.baseDirectory = baseDirectory;
return this;
}

public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
Expand All @@ -128,8 +135,9 @@ public TestKitNodes build() {
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}

String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
}
if (clusterId == null) {
clusterId = Uuid.randomUuid().toString();
}
Expand Down Expand Up @@ -160,7 +168,7 @@ public TestKitNodes build() {
for (int id : controllerNodeIds) {
TestKitNode controllerNode = TestKitNodes.buildControllerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap())
Expand All @@ -172,7 +180,7 @@ public TestKitNodes build() {
for (int id : brokerNodeIds) {
TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
controllerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()),
Expand All @@ -181,7 +189,7 @@ public TestKitNodes build() {
brokerNodes.put(id, brokerNode);
}

return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -32,7 +33,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class KafkaClusterTestKitTest {
@ParameterizedTest
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testCreateClusterWithBadPerServerProperties() {

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(5).
Expand Down Expand Up @@ -118,8 +119,23 @@ public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
});
} catch (Exception e) {
fail("failed to init cluster", e);
}
}

@Test
public void testCreateClusterWithSpecificBaseDir() throws Exception {
Path baseDirectory = TestUtils.tempDirectory().toPath();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBaseDirectory(baseDirectory).
setNumBrokerNodes(1).
setCombined(true).
setNumControllerNodes(1).build()).build()) {
assertEquals(cluster.nodes().baseDirectory(), baseDirectory.toFile().getAbsolutePath());
cluster.nodes().controllerNodes().values().forEach(controller ->
assertTrue(Paths.get(controller.metadataDirectory()).startsWith(baseDirectory)));
cluster.nodes().brokerNodes().values().forEach(broker ->
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
}