Skip to content

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 26, 2025

In some test cases like ListOffsetsIntegrationTest#testListVersion, we need to update broker config and restart. We can add a helper function for it, so we don't need to reimplement in each case.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Summary by CodeRabbit

  • New Features
    • Added the ability to restart clusters with or without per-server configuration overrides.
  • Tests
    • Introduced new tests to verify cluster restart functionality and configuration override handling.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

Hi @FrankYang0529 ,

Please consider a case below.

  1. build a cluster
  2. Shutdown a broker
  3. Restart cluster.

Maybe cluster will get hanging, thanks.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

1 similar comment
@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@arvi18
Copy link
Author

arvi18 commented Apr 26, 2025

@FrankYang0529 could you please fix the conflicts

Copy link

coderabbitai bot commented Apr 26, 2025

Walkthrough

The changes introduce new restart and shutdown capabilities to the Kafka cluster test infrastructure. The KafkaClusterTestKit class gains public shutdown() and restart() methods, enabling asynchronous shutdown and configurable restart of clusters. The ClusterInstance interface and its implementation in RaftClusterInstance are extended to support cluster restarts with optional per-server configuration overrides. Two new tests are added to verify that cluster restarts correctly apply or preserve configuration overrides. Supporting fields and logic are introduced to cache listener addresses and manage socket factories during restarts.

Changes

File(s) Change Summary
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java Added public shutdown() and restart(Map) methods, new field for listener caching, and made socketFactoryManager mutable.
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java Added default and abstract restart() methods to the interface for standardized cluster restart capability.
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java Implemented the restart(Map) method in RaftClusterInstance, delegating to KafkaClusterTestKit.
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java Added two tests to verify cluster restart behavior with and without configuration overrides.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Method
    participant ClusterInstance
    participant RaftClusterInstance
    participant KafkaClusterTestKit

    Test->>ClusterInstance: restart(Map overrides)
    ClusterInstance->>RaftClusterInstance: restart(Map overrides)
    RaftClusterInstance->>KafkaClusterTestKit: restart(Map overrides)
    KafkaClusterTestKit->>KafkaClusterTestKit: shutdown()
    KafkaClusterTestKit->>KafkaClusterTestKit: Rebuild servers with overrides
    KafkaClusterTestKit->>KafkaClusterTestKit: startup()
Loading

Poem

Hopping through clusters, a rabbit at play,
Restart and shutdown, now part of the day!
With configs that change or quietly stay,
Tests keep us nimble, bugs kept at bay.
🐇✨ Cluster reborn, in a magical way!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

This pull request introduces a new restart method to the KafkaClusterTestKit class, enabling the overriding of server properties and restarting of the cluster within test extensions. This enhancement aims to simplify test cases that require updating broker configurations and restarting the cluster, avoiding redundant reimplementation of the same logic across multiple tests. The changes involve adding a restart method to the ClusterInstance interface and implementing it in RaftClusterInvocationContext and KafkaClusterTestKit. Additionally, the pull request includes new test cases to verify the functionality of the restart method with and without overridden configurations.

Highlights

  • New restart method: A restart method is added to the ClusterInstance interface and implemented in RaftClusterInvocationContext and KafkaClusterTestKit to allow restarting the cluster with overridden configurations.
  • Configuration Overrides: The restart method in KafkaClusterTestKit accepts a map of per-server configuration overrides, allowing specific configurations to be applied to individual brokers and controllers during restart.
  • Listener Management: The restart method captures the listeners before shutdown and reassigns them after restart to ensure proper reconnection.
  • Test Cases: New test cases are added to ClusterTestExtensionsTest to verify the restart method's behavior with and without configuration overrides.

Changelog

Click here to see the changelog
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
    • Added shutdown method to properly shutdown the cluster and capture listeners.
    • Added restart method to allow restarting the cluster with overridden configurations.
    • Modified the class to store listeners per node id.
    • Added logic to apply configuration overrides to brokers and controllers during restart.
    • Added logic to reassign listeners to brokers and controllers during restart.
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
    • Added a restart method with no arguments that calls the restart method with an empty map.
    • Added a restart method that accepts a map of per-server configuration overrides.
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
    • Implemented the restart method to delegate to the clusterTestKit.
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
    • Added a test case testRestartWithOverriddenConfig to verify the restart method with configuration overrides.
    • Added a test case testRestartWithoutOverriddenConfig to verify the restart method without configuration overrides.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


A cluster restarts with grace,
Configs changed, finding its place.
Brokers awaken,
No longer forsaken,
A new test run starts the race.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new restart method to the KafkaClusterTestKit and ClusterInstance interfaces, allowing for overriding server properties and restarting the cluster. This is a useful addition for integration tests that require dynamic configuration changes. The implementation appears to be well-structured, but there are a few areas that could be improved for clarity and robustness.

Summary of Findings

  • Listener Management in Restart: The restart method captures and reapplies listener configurations. It's crucial to ensure this process is robust and handles various listener configurations correctly, especially in complex setups.
  • Socket Factory Reinitialization: The restart method reinitializes the socketFactoryManager. It's important to verify that this reinitialization doesn't lead to resource leaks or conflicts with existing connections.
  • Error Handling in Server Creation: The restart method includes error handling during the creation of ControllerServer and BrokerServer. It's important to ensure that these error handling mechanisms are comprehensive and prevent the cluster from entering an inconsistent state.

Merge Readiness

The changes introduce a valuable feature for testing Kafka clusters. However, there are a few areas that need to be addressed before merging. Specifically, the listener management and socket factory reinitialization in the restart method should be carefully reviewed to ensure robustness and prevent potential issues. Additionally, the error handling mechanisms during server creation should be comprehensive. I am unable to approve this pull request, and recommend that it not be merged until the high severity issues are addressed and the other comments are considered. Please have others review and approve this code before merging.


Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reinitializing socketFactoryManager here might lead to resource leaks if the old socket factories are not properly closed. Ensure that the old socket factories are closed before creating new ones.

        if (socketFactoryManager != null) {
            try {
                socketFactoryManager.close();
            } catch (Exception e) {
                log.warn("Exception while closing socketFactoryManager: {}", e.getMessage(), e);
            }
        }
        socketFactoryManager = new PreboundSocketFactoryManager();

Comment on lines +451 to +460
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider extracting this logic for collecting listeners into a separate, well-named method to improve readability and maintainability. This would also reduce code duplication, as the same logic is repeated for brokers and controllers.

private Set<String> collectListeners(SharedServer server, int nodeId) {
    Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
    server.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
        listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
    });
    if (!server.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
        listeners.add(server.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
            server.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
            server.socketServer().controlPlaneAcceptorOpt().get().localPort());
    }
    return listeners;
}

// Usage in shutdown method:
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
Set<String> listeners = collectListeners(broker.sharedServer(), brokerId);
nodeIdToListeners.put(brokerId, listeners);

}
}

public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's crucial to ensure that the shutdown() method is robust and handles all possible exceptions gracefully. Consider adding more specific exception handling and logging to identify and address any potential issues during shutdown.

    public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
        try {
            shutdown();
        } catch (Exception e) {
            log.error("Exception during shutdown: {}", e.getMessage(), e);
            throw e; // Re-throw the exception to prevent restart
        }

Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line uses String.join to create a comma-separated string of listeners. Ensure that the nodeIdToListeners map contains the correct listener information for each node before joining them. Also, consider adding a check to ensure that the listeners are not empty.

            Set<String> listenersForNode = nodeIdToListeners.get(id);
            if (listenersForNode == null || listenersForNode.isEmpty()) {
                log.warn("No listeners found for node {}", id);
                // Handle the case where there are no listeners, possibly by using a default listener
            }
            config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listenersForNode != null ? listenersForNode : Collections.emptySet()));

Comment on lines +524 to +527
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
throw e;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error handling here uses Utils.swallow. While this prevents the exception from propagating, it might mask important information about why the controller creation failed. Consider re-throwing a more specific exception or logging additional details to aid debugging.

            } catch (Throwable e) {
                log.error("Error creating controller {}", node.id(), e);
                try {
                    sharedServer.stopForController();
                } catch (Throwable e2) {
                    log.warn("sharedServer.stopForController error", e2);
                }
                throw new RuntimeException("Error creating controller " + node.id(), e);
            }

Comment on lines +556 to +559
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
throw e;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the controller creation, the error handling here uses Utils.swallow. Consider re-throwing a more specific exception or logging additional details to aid debugging.

            } catch (Throwable e) {
                log.error("Error creating broker {}", node.id(), e);
                try {
                    sharedServer.stopForBroker();
                } catch (Throwable e2) {
                    log.warn("sharedServer.stopForBroker error", e2);
                }
                throw new RuntimeException("Error creating broker " + node.id(), e);
            }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1)

158-163: Add documentation clarifying when restart() may be invoked

The new convenience restart() method is excellent, but its lifecycle expectations are not obvious to users.
Without guidance, someone might call restart() before the cluster has ever been started, which will currently fail down-stream (see comments on KafkaClusterTestKit#shutdown).

A short Javadoc block like the following would avoid surprises:

+    /**
+     * Restart the cluster instance preserving existing listener ports.
+     *
+     * NOTE: The cluster must have been {@link #start() started} at least once,
+     * otherwise the underlying servers have no bound ports and restart will fail.
+     */
     default void restart() throws Exception {
         restart(Map.of());
     }
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)

335-346: Replace magic broker-wide override key -1 with a named constant

Using a hard-coded -1 to denote “apply to all servers” obscures intent and invites typos.

-        clusterInstance.restart(Collections.singletonMap(-1, Collections.singletonMap("default.replication.factor", 2)));
+        final int ALL_NODES = -1; // consider moving to ClusterInstance or a util
+        clusterInstance.restart(Collections.singletonMap(
+                ALL_NODES,
+                Collections.singletonMap("default.replication.factor", 2)));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and b8b3d0f.

📒 Files selected for processing (4)
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (3 hunks)
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1 hunks)
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java (1 hunks)
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1 hunks)
🔇 Additional comments (2)
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (1)

539-542: Inconsistent KafkaConfig constructor usage

Controllers are rebuilt with new KafkaConfig(config, false) while brokers use new KafkaConfig(config).
Pass the explicit two-arg form everywhere for consistency and to prevent unexpected logging:

-            KafkaConfig nodeConfig = new KafkaConfig(config);
+            KafkaConfig nodeConfig = new KafkaConfig(config, false);
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)

353-360: getInt may throw ConfigException – confirm property is parsed as int

default.replication.factor is typed as short in Kafka configs.
KafkaConfig#getInt will attempt integer parsing and can throw if the value does not fit in an int.

Please verify this passes on all supported JVMs; otherwise switch to getShort or config().originals().get(...) with explicit cast.

Comment on lines +196 to +199
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
clusterTestKit.restart(perServerConfigOverrides);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against restarting an instance that has never been started

If a test calls clusterInstance.restart() before start(), the underlying KafkaClusterTestKit#shutdown tries to gather listener
information from SocketServer acceptors that are not yet created, resulting in NPEs and an empty LISTENERS config on restart.

Add a fast-fail guard:

@@
     @Override
     public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
-        clusterTestKit.restart(perServerConfigOverrides);
+        if (!started.get()) {
+            throw new IllegalStateException("Cannot restart a cluster that has not been started");
+        }
+        clusterTestKit.restart(perServerConfigOverrides);
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
clusterTestKit.restart(perServerConfigOverrides);
}
@Override
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
if (!started.get()) {
throw new IllegalStateException("Cannot restart a cluster that has not been started");
}
clusterTestKit.restart(perServerConfigOverrides);
}

Comment on lines +495 to +506
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();

Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();
controllers.forEach((id, controller) -> {
Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Port reservations are lost on restart – risk of “Address already in use”

restart() creates a fresh PreboundSocketFactoryManager, but the new manager is not told to re-use the previously
bound ports captured in nodeIdToListeners.
If another process grabs one of those ports between shutdown and restart, the cluster start-up will fail.

Consider feeding the cached listener URIs back into the new manager before constructing servers, e.g.:

socketFactoryManager = new PreboundSocketFactoryManager();
nodeIdToListeners.forEach((id, listeners) ->
        listeners.forEach(l -> socketFactoryManager.reserve(id, l)));

(or expose a helper in PreboundSocketFactoryManager).

Comment on lines +533 to +542
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Mutating the live config map may have side-effects

controller.config().originals() and broker.config().originals() return the live backing map of the old
KafkaConfig. Mutating it after shutdown is risky (if another thread still holds a reference) and obscures intent.

Use a defensive copy before modifications:

-            Map<String, Object> config = broker.config().originals();
+            Map<String, Object> config = new HashMap<>(broker.config().originals());

Apply the same change to the controller block above.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
brokers.forEach((id, broker) -> {
- Map<String, Object> config = broker.config().originals();
+ Map<String, Object> config = new HashMap<>(broker.config().originals());
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),

Comment on lines +443 to +486
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

shutdown() assumes acceptors are present – restart will break if the cluster was never started

broker.socketServer().dataPlaneAcceptors() and the control-plane accessor are empty until startup() has bound the ports.
When they are empty, nodeIdToListeners.get(id) remains null, and the subsequent String.join(",", null) in restart() throws an NPE.

Either bail out early when the servers were never started, or initialise nodeIdToListeners with the original listener strings from the config:

-                nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
+                nodeIdToListeners
+                        .computeIfAbsent(brokerId, __ ->
+                                new HashSet<>(List.of(broker.config()
+                                    .originals()
+                                    .getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "")
+                                    .toString().split(","))));

A similar fix is needed for the controller loop below.
Without this, any test that formats the cluster but calls restart() before startup() will consistently fail.

@visz11
Copy link
Collaborator

visz11 commented Jul 1, 2025

/do-refacto

Copy link

refacto-test bot commented Jul 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@visz11
Copy link
Collaborator

visz11 commented Jul 1, 2025

/do-refacto

Copy link

refacto-test bot commented Jul 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@visz11
Copy link
Collaborator

visz11 commented Jul 20, 2025

/refacto-test

Copy link

refacto-test bot commented Jul 20, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@arvi18
Copy link
Author

arvi18 commented Jul 24, 2025

/refacto-test

1 similar comment
@visz11
Copy link
Collaborator

visz11 commented Jul 24, 2025

/refacto-test

Copy link

refacto-test bot commented Jul 24, 2025

PR already reviewed at the latest commit: b8b3d0f.
Please try again with new changes.

@visz11
Copy link
Collaborator

visz11 commented Aug 21, 2025

/refacto-test

Copy link

refacto-test bot commented Aug 21, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-test bot commented Aug 21, 2025

Code Review: Kafka Cluster Restart Implementation

👍 Well Done
Comprehensive Listener Tracking

Socket listeners tracked for proper reconnection during restart, preserving connectivity.

Flexible Configuration Override

Per-server configuration override approach enables targeted testing of different reliability scenarios.

📌 Files Processed
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
📝 Additional Comments
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (3)
Duplicate Resource Collection

Duplicated code pattern for listener collection between broker and controller processing creates maintenance overhead and redundant processing paths. This impacts test execution efficiency and resource utilization.

private void collectListeners(int nodeId, SocketServer socketServer) {
    Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
    socketServer.dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
        listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
    });
    if (!socketServer.controlPlaneAcceptorOpt().isEmpty()) {
        listeners.add(socketServer.controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
            socketServer.controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
            socketServer.controlPlaneAcceptorOpt().get().localPort());
    }
}

// In shutdown method:
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
    int brokerId = entry.getKey();
    BrokerServer broker = entry.getValue();
    collectListeners(brokerId, broker.socketServer());
    futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
        executorService.submit((Runnable) broker::shutdown)));
}

// Later for controllers:
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
    int controllerId = entry.getKey();
    ControllerServer controller = entry.getValue();
    collectListeners(controllerId, controller.socketServer());
    futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
        executorService.submit(controller::shutdown)));
}

Standards:

  • ISO-IEC-25010-Performance-Resource-Utilization
  • Algorithm-Opt-Code-Reuse
  • Java-DRY-Principle
Missing Timeout Handling

The startup() call after restart has no timeout mechanism. In failure scenarios, tests could hang indefinitely waiting for cluster restart, reducing test reliability and increasing CI execution time.

try {
    CompletableFuture<Void> startupFuture = CompletableFuture.runAsync(this::startup);
    startupFuture.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    throw new RuntimeException("Cluster restart timed out after 60 seconds", e);
} catch (ExecutionException | InterruptedException e) {
    throw new RuntimeException("Cluster restart failed", e);
}

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Timeout-Handling
Unsafe Socket Handling

Socket listener configuration is reconstructed without validation. Malformed listener strings could potentially lead to unexpected network exposure in test environments.

private void validateAndSetListeners(Map<String, Object> config, int id) {
    Set<String> listeners = nodeIdToListeners.get(id);
    if (listeners == null || listeners.isEmpty()) {
        throw new IllegalStateException("No listeners found for node " + id);
    }
    
    // Validate listener format
    for (String listener : listeners) {
        if (!listener.matches("[A-Za-z0-9_-]+://[^:]+:\d+")) {
            throw new IllegalArgumentException("Invalid listener format: " + listener);
        }
    }
    
    config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", listeners));
}

Standards:

  • CWE-20
  • OWASP-A05
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java (1)
Inconsistent Default Handling

The default implementation uses Map.of() which creates an empty immutable map. This is inconsistent with the null handling in KafkaClusterTestKit.restart() which should handle null. Using Collections.emptyMap() would be more consistent with other Kafka code patterns.

default void restart() throws Exception {
    restart(Collections.emptyMap());
}

Standards:

  • Logic-Verification-API-Consistency
  • Business-Rule-Coding-Standards
  • Algorithm-Correctness-Pattern-Consistency

}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

socketFactoryManager.close() called without error handling. If close() throws an exception, the shutdown process terminates prematurely, potentially leaving resources unclosed and causing resource leaks.

Suggested change
socketFactoryManager.close();
try {
socketFactoryManager.close();
} catch (Exception e) {
log.error("Error closing socketFactoryManager", e);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +524 to +558
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
throw e;
}
controllers.put(node.id(), controller);
jointServers.put(node.id(), sharedServer);
});

brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
nodeId -> new SharedServer(
nodeConfig,
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
)
);
try {
broker = new BrokerServer(sharedServer);
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked Exception Handling

Raw exceptions are rethrown without wrapping in both controller and broker creation. This loses context about the failure source and can lead to confusing error messages during test failures.

Suggested change
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
throw e;
}
controllers.put(node.id(), controller);
jointServers.put(node.id(), sharedServer);
});
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
nodeId -> new SharedServer(
nodeConfig,
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
)
);
try {
broker = new BrokerServer(sharedServer);
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
// For controller creation error (line 524):
throw new RuntimeException("Failed to create controller " + node.id(), e);
// For broker creation error (line 558):
throw new RuntimeException("Failed to create broker " + node.id(), e);
Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +495 to +497
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Cleanup Risk

If shutdown() throws an exception, the restart method will abort without completing initialization of new servers. This creates a state where the system is partially shut down but not restarted, potentially causing test failures.

Suggested change
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();
try {
shutdown();
} catch (Exception e) {
log.error("Error during shutdown phase of restart", e);
throw new RuntimeException("Failed to shutdown cluster during restart", e);
}
Map<Integer, SharedServer> jointServers = new HashMap<>();
Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +533 to +535
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Null Check

Missing null check for perServerConfigOverrides parameter. If null is passed to restart(), a NullPointerException will occur when attempting to call getOrDefault(), breaking cluster restart functionality.

Suggested change
brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
Map<String, Object> config = broker.config().originals();
if (perServerConfigOverrides != null) {
config.putAll(perServerConfigOverrides.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerConfigOverrides.getOrDefault(id, Collections.emptyMap()));
}
Standards
  • Logic-Verification-Null-Safety
  • Business-Rule-Parameter-Validation
  • Algorithm-Correctness-Defensive-Programming

Comment on lines +502 to +504
Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Null Check

Missing null check for perServerConfigOverrides parameter in controller configuration. If null is passed to restart(), a NullPointerException will occur when attempting to call getOrDefault(), breaking cluster restart functionality.

Suggested change
Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
Map<String, Object> config = controller.config().originals();
if (perServerConfigOverrides != null) {
config.putAll(perServerConfigOverrides.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerConfigOverrides.getOrDefault(id, Collections.emptyMap()));
}
Standards
  • Logic-Verification-Null-Safety
  • Business-Rule-Parameter-Validation
  • Algorithm-Correctness-Defensive-Programming

Comment on lines +496 to +498
shutdown();

Map<Integer, SharedServer> jointServers = new HashMap<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The restart() method creates a new socketFactoryManager without properly closing the previous one. If an exception occurs during restart, the old socketFactoryManager resources may leak. The shutdown() method closes it but exception paths don't ensure closure.

Suggested change
shutdown();
Map<Integer, SharedServer> jointServers = new HashMap<>();
PreboundSocketFactoryManager oldSocketFactoryManager = socketFactoryManager;
socketFactoryManager = new PreboundSocketFactoryManager();
try {
controllers.forEach((id, controller) -> {
Map<String, Object> config = controller.config().originals();
// rest of the method...
});
// rest of the method...
startup();
} catch (Exception e) {
Utils.closeQuietly(socketFactoryManager, "new socketFactoryManager");
socketFactoryManager = oldSocketFactoryManager;
throw e;
} finally {
Utils.closeQuietly(oldSocketFactoryManager, "old socketFactoryManager");
}
Standards
  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-Exception-Safety
  • Business-Rule-Resource-Cleanup

Comment on lines +448 to +455
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

Multiple HashSet allocations occur when nodeIdToListeners.get() is called immediately after computeIfAbsent. This creates unnecessary object allocation and increases GC pressure during cluster restarts.

Suggested change
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
Set<String> listeners = nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Algorithm-Opt-Object-Reuse
  • Java-Memory-Efficiency

Comment on lines +452 to +481
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated Listener Collection

Identical listener collection logic is duplicated for both brokers and controllers. This violates DRY principle and increases maintenance burden when modifying listener collection logic.

Suggested change
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
private void collectListeners(int nodeId, SocketServer socketServer) {
Set<String> listeners = nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
socketServer.dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!socketServer.controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(socketServer.controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
socketServer.controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
socketServer.controlPlaneAcceptorOpt().get().localPort());
}
}
// Then replace the duplicated code in shutdown() with:
// In brokers loop:
collectListeners(brokerId, broker.socketServer());
// In controllers loop:
collectListeners(controllerId, controller.socketServer());
Standards
  • Clean-Code-DRY
  • Refactoring-Extract-Method
  • SOLID-SRP

Comment on lines +496 to +497
shutdown();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unvalidated Configuration Override

Configuration overrides are applied without validation, allowing potentially insecure settings. An attacker could inject malicious configurations that compromise security controls or expose sensitive information.

Suggested change
shutdown();
private static final Set<String> SECURITY_SENSITIVE_CONFIGS = Set.of(
"ssl.keystore.password", "ssl.key.password", "ssl.truststore.password",
"sasl.jaas.config", "listeners.security.protocol.map"
);
private void validateAndApplyConfigs(Map<String, Object> config, Map<Integer, Map<String, Object>> overrides, int id) {
Map<String, Object> defaultOverrides = overrides.getOrDefault(-1, Collections.emptyMap());
Map<String, Object> nodeOverrides = overrides.getOrDefault(id, Collections.emptyMap());
// Validate security-sensitive configurations
Stream.concat(defaultOverrides.keySet().stream(), nodeOverrides.keySet().stream())
.filter(SECURITY_SENSITIVE_CONFIGS::contains)
.findAny()
.ifPresent(key -> {
throw new IllegalArgumentException("Cannot override security-sensitive configuration: " + key);
});
config.putAll(defaultOverrides);
config.putAll(nodeOverrides);
}
Standards
  • CWE-15
  • OWASP-A05

Comment on lines +495 to +500
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();

Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error Handling Gap

Restart method lacks exception handling during shutdown and socket factory recreation. Failure during shutdown leaves system in inconsistent state with no recovery path.

Suggested change
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();
Map<Integer, SharedServer> jointServers = new HashMap<>();
socketFactoryManager = new PreboundSocketFactoryManager();
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
try {
shutdown();
} catch (Exception e) {
log.warn("Error during shutdown phase of restart", e);
// Continue with restart despite shutdown issues
}
Map<Integer, SharedServer> jointServers = new HashMap<>();
socketFactoryManager = new PreboundSocketFactoryManager();
Standards
  • ISO-IEC-25010-Reliability-Recoverability
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Error-Recovery
  • SRE-Graceful-Degradation

Comment on lines +444 to +493
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connection Resource Leakage

The shutdown method doesn't cancel futures when socketFactoryManager.close() completes successfully, potentially causing thread and connection resource leaks. These resources remain allocated until garbage collection.

Suggested change
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
throw e;
} finally {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
}
}
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Java-Resource-Management
  • Netflix-Connection-Lifecycle

Comment on lines +450 to +481
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Listener Collection

Identical listener collection logic is duplicated for brokers and controllers. This violates DRY principle and increases maintenance burden when modifying listener collection logic in the future.

Suggested change
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
private void collectNodeListeners(int nodeId, SocketServer socketServer) {
nodeIdToListeners.computeIfAbsent(nodeId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(nodeId);
socketServer.dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!socketServer.controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(socketServer.controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
socketServer.controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
socketServer.controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(nodeId, listeners);
}
// In shutdown() method:
// Replace broker listener collection with:
collectNodeListeners(brokerId, broker.socketServer());
// Replace controller listener collection with:
collectNodeListeners(controllerId, controller.socketServer());
Standards
  • Clean-Code-DRY
  • Refactoring-Extract-Method
  • Clean-Code-Method-Organization

@visz11
Copy link
Collaborator

visz11 commented Sep 12, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 12, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 12, 2025

Code Review: Kafka Cluster Restart Implementation

👍 Well Done
Comprehensive Configuration Override

Proper configuration management during cluster restart enhances test reliability.

Listener State Preservation

Capturing and restoring listener state prevents connection failures after restart.

Resource Management

Effective tracking of listeners prevents resource leaks during restart.

📌 Files Processed
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
📝 Additional Comments
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (6)
Exception Propagation Risk

Exception handling logs error and swallows stopForBroker exceptions, but then rethrows original exception. Swallowed exceptions could hide cleanup failures affecting restart reliability.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling
  • DbC-Resource-Mgmt
Partial Restart Risk

The restart method calls startup() without handling partial initialization failures. If some nodes start but others fail, cluster could be left in inconsistent state affecting test reliability.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling
Parallel Shutdown Optimization

The shutdown process submits tasks sequentially and waits for completion after each group. Consider collecting all listener information first, then submitting all shutdown tasks in parallel with appropriate dependencies. This would reduce total restart time, especially with many nodes.

Standards:

  • ISO-IEC-25010-Performance-Time-Behaviour
  • Netflix-Hot-Path-Optimization
Extract Listener Collection

Listener collection logic is duplicated for brokers and controllers. Extracting this to a helper method would improve maintainability by centralizing the logic and reducing duplication.

Standards:

  • Clean-Code-DRY
  • Clean-Code-Method-Extraction
Redundant Map Computation

computeIfAbsent creates a HashSet and immediately calls get to retrieve it. More efficient to use the Set returned by computeIfAbsent directly, avoiding redundant map lookup.

Standards:

  • Algorithm-Correctness-Efficiency
  • Logic-Verification-Redundancy
  • Mathematical-Accuracy-Optimization
Unvalidated Configuration Input

Configuration overrides accepted without validation. Malicious test code could inject unsafe configuration values affecting cluster security properties.

Standards:

  • CWE-20
  • OWASP-A05
test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java (1)
Missing Timeout Protection

The waitForReadyBrokers() call has no visible timeout parameter. Without timeout protection, tests could hang indefinitely if brokers fail to become ready after restart.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Error-Handling
  • ISO-IEC-25010-Reliability-Availability

Comment on lines +495 to +500
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();

Map<Integer, SharedServer> jointServers = new HashMap<>();

socketFactoryManager = new PreboundSocketFactoryManager();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Restart Failure

The restart method creates new socketFactoryManager without checking if previous shutdown completed successfully. If shutdown fails, resource conflicts could prevent successful restart causing test failures.

Suggested change
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();
Map<Integer, SharedServer> jointServers = new HashMap<>();
socketFactoryManager = new PreboundSocketFactoryManager();
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
try {
shutdown();
} catch (Exception e) {
log.warn("Error during shutdown before restart", e);
// Force cleanup of socket factory manager to prevent resource conflicts
if (socketFactoryManager != null) {
Utils.closeQuietly(socketFactoryManager, "socketFactoryManager");
}
}
Map<Integer, SharedServer> jointServers = new HashMap<>();
socketFactoryManager = new PreboundSocketFactoryManager();
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Reliability-Recoverability
  • SRE-Error-Handling

Comment on lines +444 to +492
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The shutdown method doesn't cancel futures with interruption when handling exceptions. If a shutdown operation hangs, the thread interrupt flag won't be set, potentially causing resource leaks under failure conditions. Consider using cancel(true) more aggressively or implementing timeout handling.

Suggested change
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries, 30000); // Add timeout of 30 seconds
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries, 30000); // Add timeout of 30 seconds
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
// Ensure socketFactoryManager is closed even on exception
Utils.closeQuietly(socketFactoryManager, "socketFactoryManager");
throw e;
}
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Fault-Tolerance

Comment on lines +495 to +496
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
shutdown();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Validation Check

The restart method doesn't validate if perServerOverriddenConfig is null before using it. This could cause NullPointerException when accessing the map in the subsequent broker/controller configuration logic.

Standards
  • Business-Rule-Validation
  • Logic-Verification-Null-Safety
  • Algorithm-Correctness-Input-Validation

Comment on lines +486 to +492
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Resource Cleanup

When exception occurs during shutdown, only futures are cancelled but socketFactoryManager isn't closed. This resource leak could affect subsequent restart attempts and system stability.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Resource-Mgmt

Comment on lines +443 to +493
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated Restart Logic

The shutdown method contains duplicated logic for collecting listeners from both brokers and controllers. This violates the DRY principle and increases maintenance burden when changes to listener collection are needed.

Standards
  • Clean-Code-DRY
  • Clean-Code-Method-Organization

private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
private PreboundSocketFactoryManager socketFactoryManager;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insecure Socket Handling

Changed socketFactoryManager from final to non-final creates risk of socket factory replacement. An attacker could substitute malicious socket factory enabling MITM attacks during tests.

Standards
  • CWE-829
  • OWASP-A05

@visz11
Copy link
Collaborator

visz11 commented Sep 12, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 12, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 12, 2025

Code Review: Kafka Cluster Test Kit Restart Implementation

👍 Well Done
Comprehensive Restart Implementation

Effectively preserves listener configurations during restart ensuring proper server reconnection.

Configuration Override Flexibility

Dynamic server property overrides enable efficient test configuration with proper hierarchy.

📌 Files Processed
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
📝 Additional Comments
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (6)
Error Handling Improvement

Restart method doesn't handle partial failures during restart. If startup fails after shutdown, system remains down without recovery attempt or clear error state.

Standards:

  • ISO-IEC-25010-Reliability-Recoverability
  • SRE-Error-Handling
Parallel Shutdown Optimization

The shutdown process submits tasks sequentially and waits for all broker shutdowns before starting controller shutdowns. Consider submitting all shutdown tasks in parallel for faster test teardown, especially with large clusters.

Standards:

  • ISO-IEC-25010-Performance-Time-Behaviour
  • Algorithm-Opt-Parallelization
Potential Race Condition

The restart method calls shutdown() then creates new servers without synchronization. This creates a potential race condition if multiple threads access the cluster simultaneously during restart, potentially causing test instability.

Standards:

  • Logic-Verification-Concurrency
  • Algorithm-Correctness-Thread-Safety
  • Business-Rule-State-Management
Mutable Field Protection

Changed socketFactoryManager from final to non-final and added mutable nodeIdToListeners without defensive copying. This reduces encapsulation and could lead to unexpected state modifications.

Standards:

  • Clean-Code-Immutability
  • SOLID-SRP
Secure Listener Configuration

Listener configuration is collected without validation before reuse. Consider validating listener format and sanitizing host/port values before restarting cluster to prevent potential injection attacks.

Standards:

  • CWE-20
  • OWASP-A03
Listener Collection Efficiency

The code calls computeIfAbsent followed by get and later put for the same key. Using the Set returned by computeIfAbsent directly would be more efficient by reducing map operations.

Standards:

  • ISO-IEC-25010-Performance-Resource-Utilization
  • Algorithm-Opt-Map-Operations

Comment on lines +443 to +493
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The shutdown method doesn't cancel futures with interruption when an exception occurs, potentially leaving threads running. This can cause resource leaks and degraded performance in long-running test suites.

Suggested change
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
boolean shutdownSuccessful = false;
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
shutdownSuccessful = true;
} finally {
if (!shutdownSuccessful) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
}
}
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Resource-Management

Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Listener Validation

No validation that nodeIdToListeners.get(id) is non-empty before joining. Empty listener list could cause restart failures with malformed configuration.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +341 to +343
clusterInstance.brokers().values().forEach(broker -> {
Assertions.assertEquals(2, broker.config().getInt("default.replication.factor"));
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Broker Validation

Test assumes brokers exist without validation. In Type.KRAFT configuration, brokers collection could be empty causing test to pass without validating any brokers, making the test ineffective.

Standards
  • Logic-Verification-Completeness
  • Algorithm-Correctness-Test-Coverage
  • Business-Rule-Validation

}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Resource Cleanup

The shutdown method closes socketFactoryManager but doesn't close executorService. This creates potential resource leak when restarting the cluster as the old executor service remains active.

Standards
  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-Cleanup
  • Business-Rule-Completeness

Comment on lines +452 to +461
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Listener Collection

Nearly identical listener collection code is duplicated between broker and controller handling. This violates DRY principle and increases maintenance burden when listener handling needs to change.

Standards
  • Clean-Code-DRY
  • Design-Pattern-Extract-Method

@visz11
Copy link
Collaborator

visz11 commented Sep 12, 2025

/refacto-test-arvi

Copy link

refacto-visz bot commented Sep 12, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-visz bot commented Sep 12, 2025

Code Review: KafkaClusterTestKit Restart Implementation

👍 Well Done
Comprehensive Resource Management

Proper shutdown sequence prevents resource leaks during cluster restart.

Configuration Override Support

Flexible configuration overrides improve test reliability and coverage.

Listener Preservation

Maintaining node listeners during restart prevents connection failures.

📌 Files Processed
  • test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
  • test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
  • test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
📝 Additional Comments
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java (6)
Uncaught Exception Risk

KafkaConfig constructor may throw exceptions that aren't caught. Unhandled exceptions during restart could leave cluster in inconsistent state, affecting test reliability.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling
Potential Resource Leak

The restart method creates a new PreboundSocketFactoryManager without ensuring the old one is properly closed if shutdown() fails. This could lead to resource leaks if the shutdown operation throws an exception.

Standards:

  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-Error-Handling
Lambda Exception Handling

Using forEach with lambdas that throw exceptions is problematic as runtime exceptions will terminate the loop prematurely, leaving some controllers unprocessed. This creates inconsistent restart state where only some controllers are recreated.

Standards:

  • Logic-Verification-Exception-Handling
  • Algorithm-Correctness-Iteration-Completeness
Parallel Shutdown Optimization

The listener collection logic is executed sequentially before submitting shutdown tasks, creating a bottleneck. Consider separating listener collection from shutdown submission to improve restart performance. This would allow faster test execution, especially with larger clusters.

Standards:

  • ISO-IEC-25010-Performance-Time-Behaviour
  • Algorithm-Opt-Parallelization
Extract Listener Method

Listener string formatting logic is complex and repeated. Extracting this into a helper method would improve readability and maintainability when endpoint formatting needs to change.

Standards:

  • Clean-Code-Function-Size
  • Clean-Code-DRY
Sensitive Data Exposure

Listener information stored without access control. Test code captures network endpoints in memory. Could expose internal network configuration if tests are compromised.

Standards:

  • CWE-200
  • OWASP-A02

}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The socketFactoryManager.close() is called before error handling, so if an exception occurs during shutdown, the socketFactoryManager won't be properly closed. This can cause resource leaks during test execution, especially with multiple restart cycles.

Suggested change
socketFactoryManager.close();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
} finally {
socketFactoryManager.close();
}
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Resource-Management

Comment on lines +443 to +493
public void shutdown() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
// Note the shutdown order here is chosen to be consistent with
// `KafkaRaftServer`. See comments in that class for an explanation.
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
executorService.submit((Runnable) broker::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int controllerId = entry.getKey();
ControllerServer controller = entry.getValue();
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(controllerId);
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(controllerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
executorService.submit(controller::shutdown)));
}
waitForAllFutures(futureEntries);
futureEntries.clear();
socketFactoryManager.close();
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
entry.getValue().cancel(true);
}
throw e;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Executor Shutdown

The shutdown method doesn't shut down the executorService after completing all tasks. This could lead to thread leaks during restart operations as new executors may be created while old ones remain active.

Standards
  • Logic-Verification-Resource-Cleanup
  • Algorithm-Correctness-Lifecycle-Management

Comment on lines +500 to +562
socketFactoryManager = new PreboundSocketFactoryManager();
controllers.forEach((id, controller) -> {
Map<String, Object> config = controller.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

TestKitNode node = nodes.controllerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config, false);
SharedServer sharedServer = new SharedServer(
nodeConfig,
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
try {
controller = new ControllerServer(
sharedServer,
KafkaRaftServer.configSchema(),
nodes.bootstrapMetadata());
} catch (Throwable e) {
log.error("Error creating controller {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
throw e;
}
controllers.put(node.id(), controller);
jointServers.put(node.id(), sharedServer);
});

brokers.forEach((id, broker) -> {
Map<String, Object> config = broker.config().originals();
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));

TestKitNode node = nodes.brokerNodes().get(id);
KafkaConfig nodeConfig = new KafkaConfig(config);
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
nodeId -> new SharedServer(
nodeConfig,
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
Collections.emptyList(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
)
);
try {
broker = new BrokerServer(sharedServer);
} catch (Throwable e) {
log.error("Error creating broker {}", node.id(), e);
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
throw e;
}
brokers.put(node.id(), broker);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Error Handling

The restart method lacks proper error handling for nodeIdToListeners.get(id) which could return null if the node ID doesn't exist in the map. This would cause a NullPointerException during String.join operation, breaking the restart process.

Standards
  • Logic-Verification-Null-Safety
  • Algorithm-Correctness-Error-Handling

Comment on lines +452 to +453
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Listener Validation

Listeners set is retrieved immediately after creation without validation. If computeIfAbsent fails, get() could return null causing NullPointerException in subsequent operations.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Preconditions

Comment on lines +448 to +462
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
Set<String> listeners = nodeIdToListeners.get(brokerId);
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
});
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
}
nodeIdToListeners.put(brokerId, listeners);
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Listener Collection

Listener collection logic is duplicated between broker and controller shutdown sections. This violates DRY principle and increases maintenance burden when modifying listener collection logic.

Standards
  • Clean-Code-DRY
  • SOLID-SRP

private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
private PreboundSocketFactoryManager socketFactoryManager;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable Class Field

Changed socketFactoryManager from final to non-final mutable field. This reduces class immutability and could lead to unexpected state changes, making the class harder to reason about.

Standards
  • Clean-Code-Object-State
  • Clean-Code-Immutability

restart(Map.of());
}

void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unvalidated Configuration Override

Method allows arbitrary configuration parameters without validation. Attackers could inject malicious configuration values during tests. Could lead to test environment compromise.

Standards
  • CWE-20
  • OWASP-A04

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants