Skip to content

Commit

Permalink
KAFKA-17574 Allow overriding TestKitNodes baseDirectory (#17225)
Browse files Browse the repository at this point in the history
This allows shutting down a KafkaClusterTestKit from a JVM shutdown hook without risking error logs because the base directory has already been deleted by the shutdown hook TestUtils.tempDirectory sets up.

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
srdo authored Oct 24, 2024
1 parent 24c6e8d commit 98b7e4d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 9 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.server.common.MetadataVersion;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -51,6 +52,7 @@ public class TestKitNodes {
public static class Builder {
private boolean combined;
private String clusterId;
private Path baseDirectory;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
Expand Down Expand Up @@ -104,6 +106,11 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe
return this;
}

public Builder setBaseDirectory(Path baseDirectory) {
this.baseDirectory = baseDirectory;
return this;
}

public Builder setBrokerListenerName(ListenerName listenerName) {
this.brokerListenerName = listenerName;
return this;
Expand All @@ -128,8 +135,9 @@ public TestKitNodes build() {
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
}

String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
}
if (clusterId == null) {
clusterId = Uuid.randomUuid().toString();
}
Expand Down Expand Up @@ -160,7 +168,7 @@ public TestKitNodes build() {
for (int id : controllerNodeIds) {
TestKitNode controllerNode = TestKitNodes.buildControllerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap())
Expand All @@ -172,7 +180,7 @@ public TestKitNodes build() {
for (int id : brokerNodeIds) {
TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
id,
baseDirectory,
baseDirectory.toFile().getAbsolutePath(),
clusterId,
controllerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()),
Expand All @@ -181,7 +189,7 @@ public TestKitNodes build() {
brokerNodes.put(id, brokerNode);
}

return new TestKitNodes(baseDirectory, clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
brokerListenerName, brokerSecurityProtocol, new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -32,7 +33,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class KafkaClusterTestKitTest {
@ParameterizedTest
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testCreateClusterWithBadPerServerProperties() {

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(5).
Expand Down Expand Up @@ -118,8 +119,23 @@ public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
});
} catch (Exception e) {
fail("failed to init cluster", e);
}
}

@Test
public void testCreateClusterWithSpecificBaseDir() throws Exception {
Path baseDirectory = TestUtils.tempDirectory().toPath();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBaseDirectory(baseDirectory).
setNumBrokerNodes(1).
setCombined(true).
setNumControllerNodes(1).build()).build()) {
assertEquals(cluster.nodes().baseDirectory(), baseDirectory.toFile().getAbsolutePath());
cluster.nodes().controllerNodes().values().forEach(controller ->
assertTrue(Paths.get(controller.metadataDirectory()).startsWith(baseDirectory)));
cluster.nodes().brokerNodes().values().forEach(broker ->
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
}

0 comments on commit 98b7e4d

Please sign in to comment.