From 4912a6966f98b639ef1b4fbb63a7f68adf3046df Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 21 Aug 2024 02:34:53 +0800 Subject: [PATCH 01/15] wip --- .../tools/reassign/ReassignPartitionsCommandOptions.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 058827bfeaf7f..639b456439e13 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { final OptionSpec reassignmentJsonFileOpt; final OptionSpec topicsToMoveJsonFileOpt; final OptionSpec brokerListOpt; + final OptionSpec bootstrapControllerOpt; final OptionSpec disableRackAware; final OptionSpec interBrokerThrottleOpt; final OptionSpec replicaAlterLogDirsThrottleOpt; @@ -83,6 +84,13 @@ public ReassignPartitionsCommandOptions(String[] args) { .withRequiredArg() .describedAs("brokerlist") .ofType(String.class); + + bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + + "By default, the tool will get the controller from the broker.") + .withRequiredArg() + .describedAs("bootstrap controller") + .ofType(String.class); + disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment"); interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " + "This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " + From be9f5c2ba173946e04bc7506cd4ed66668d73027 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 22 Aug 2024 02:02:55 +0800 Subject: [PATCH 02/15] wip --- .../reassign/ReassignPartitionsCommand.java | 22 ++++++++++++------- .../ReassignPartitionsCommandOptions.java | 13 ++++++----- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 0dedfc431e8aa..b5192ae7d394e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -1405,9 +1405,15 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { } OptionSpec action = allActions.get(0); - - if (!opts.options.has(opts.bootstrapServerOpt)) - CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server"); + + OptionSpec bootstrapOpt = null; + + if(opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); + else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server or --bootstrap-controller"); + else + bootstrapOpt = opts.options.has(opts.bootstrapServerOpt) ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt; // Make sure that we have all the required arguments for our action. Map, List>> requiredArgs = new HashMap<>(); @@ -1432,32 +1438,32 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { Map, List>> permittedArgs = new HashMap<>(); permittedArgs.put(opts.verifyOpt, Arrays.asList( - opts.bootstrapServerOpt, + bootstrapOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt )); permittedArgs.put(opts.generateOpt, Arrays.asList( - opts.bootstrapServerOpt, + bootstrapOpt, opts.brokerListOpt, opts.commandConfigOpt, opts.disableRackAware )); permittedArgs.put(opts.executeOpt, Arrays.asList( opts.additionalOpt, - opts.bootstrapServerOpt, + bootstrapOpt, opts.commandConfigOpt, opts.interBrokerThrottleOpt, opts.replicaAlterLogDirsThrottleOpt, opts.timeoutOpt )); permittedArgs.put(opts.cancelOpt, Arrays.asList( - opts.bootstrapServerOpt, + bootstrapOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt, opts.timeoutOpt )); permittedArgs.put(opts.listOpt, Arrays.asList( - opts.bootstrapServerOpt, + bootstrapOpt, opts.commandConfigOpt )); diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 639b456439e13..d419f7c06df1c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -56,7 +56,7 @@ public ReassignPartitionsCommandOptions(String[] args) { // Arguments bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.") - .withRequiredArg() + .withOptionalArg() .describedAs("Server(s) to use for bootstrapping") .ofType(String.class); @@ -84,11 +84,12 @@ public ReassignPartitionsCommandOptions(String[] args) { .withRequiredArg() .describedAs("brokerlist") .ofType(String.class); - - bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + - "By default, the tool will get the controller from the broker.") - .withRequiredArg() - .describedAs("bootstrap controller") + + bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A comma-separated list of bootstrap.controllers " + + "that can be supplied instead of bootstrap-servers.\n" + + "This is useful for administrators who wish to bypass the brokers.") + .withOptionalArg() + .describedAs("controller to connect to") .ofType(String.class); disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment"); From 40b3001f683cd5b77e6658cbed64d3ef1eb1c0b1 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 22 Aug 2024 20:42:36 +0800 Subject: [PATCH 03/15] add new test for ReassignPartitionsCommandArgsTest --- .../reassign/ReassignPartitionsCommand.java | 13 +++++++---- .../ReassignPartitionsCommandArgsTest.java | 23 ++++++++++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index b5192ae7d394e..22286fb3e320f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -129,7 +129,12 @@ public static void main(String[] args) { Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + + if (opts.options.has(opts.bootstrapControllerOpt)) { + props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt)); + } else { + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + } props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool"); adminClient = Admin.create(props); handleAction(adminClient, opts); @@ -1408,10 +1413,10 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { OptionSpec bootstrapOpt = null; - if(opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) - CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); + if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) - CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server or --bootstrap-controller"); + CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); else bootstrapOpt = opts.options.has(opts.bootstrapServerOpt) ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt; diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java index 4837cf1b74eff..5f93f53d77e97 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java @@ -29,7 +29,7 @@ @Timeout(60) public class ReassignPartitionsCommandArgsTest { - public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify --bootstrap-server"; + public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server"; @BeforeEach public void setUp() { @@ -289,4 +289,25 @@ public void shouldNotAllowCancelWithoutReassignmentJsonFile() { "--preserve-throttles"}; shouldFailWith("Missing required argument \"[reassignment-json-file]\"", args); } + + @Test + public void shouldAllowBootstrapControllerArg() { + String[] args = new String[] { + "--bootstrap-controller", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json"}; + ReassignPartitionsCommand.validateAndParseArgs(args); + } + + @Test + public void shouldNotAllowBootstrapControllerAndBootstrapServerArg() { + String[] args = new String[] { + "--bootstrap-server", "localhost:1234", + "--bootstrap-controller", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json"}; + shouldFailWith("Please don't specify both --bootstrap-server and --bootstrap-controller", args); + } } From 6411ba43de485e7c672efbe2834f1cc04265905e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 22 Aug 2024 20:44:18 +0800 Subject: [PATCH 04/15] rewrite bootstrapControllerOpt description --- .../tools/reassign/ReassignPartitionsCommandOptions.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index d419f7c06df1c..1577498e38dd7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -85,11 +85,10 @@ public ReassignPartitionsCommandOptions(String[] args) { .describedAs("brokerlist") .ofType(String.class); - bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A comma-separated list of bootstrap.controllers " + - "that can be supplied instead of bootstrap-servers.\n" + - "This is useful for administrators who wish to bypass the brokers.") + bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + + "By default, the tool will get the controller from the broker.") .withOptionalArg() - .describedAs("controller to connect to") + .describedAs("bootstrap controller to connect to") .ofType(String.class); disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment"); From 3f34b6d1c1dcda967fc549fbf49692b2464e0427 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 20:08:54 +0800 Subject: [PATCH 05/15] fix the bootstrap controller description --- .../kafka/tools/reassign/ReassignPartitionsCommandOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 1577498e38dd7..81f53bb4be9a5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -86,7 +86,7 @@ public ReassignPartitionsCommandOptions(String[] args) { .ofType(String.class); bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + - "By default, the tool will get the controller from the broker.") + "By default, the tool will get the quorum controller.") .withOptionalArg() .describedAs("bootstrap controller to connect to") .ofType(String.class); From 3db69e772a698b886cda1d2b0ed2c7a2ac83d5a4 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 20:12:23 +0800 Subject: [PATCH 06/15] fix the bootstrap server description --- .../kafka/tools/reassign/ReassignPartitionsCommandOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 81f53bb4be9a5..611b9eb7a996d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -55,7 +55,7 @@ public ReassignPartitionsCommandOptions(String[] args) { listOpt = parser.accepts("list", "List all active partition reassignments."); // Arguments - bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.") + bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping.") .withOptionalArg() .describedAs("Server(s) to use for bootstrapping") .ofType(String.class); From 9fced4213177c6f4e1ac928d483b12f25e51a816 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 20:37:04 +0800 Subject: [PATCH 07/15] fix test and add validate --- .../tools/reassign/ReassignPartitionsCommand.java | 10 +++++++++- .../reassign/ReassignPartitionsCommandArgsTest.java | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 22286fb3e320f..bf09d01056412 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -129,7 +129,7 @@ public static void main(String[] args) { Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); - + validateBootstrapControllerNotSupportedAction(opts); if (opts.options.has(opts.bootstrapControllerOpt)) { props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt)); } else { @@ -156,6 +156,14 @@ public static void main(String[] args) { } } + private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { + if (opts.options.has(opts.bootstrapControllerOpt)) { + if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.listOpt)) { + throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); + } + } + } + private static void handleAction(Admin adminClient, ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TerseException { if (opts.options.has(opts.verifyOpt)) { verifyAssignment(adminClient, diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java index 5f93f53d77e97..9036fe31171aa 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java @@ -29,7 +29,7 @@ @Timeout(60) public class ReassignPartitionsCommandArgsTest { - public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server"; + public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server or --bootstrap-controller"; @BeforeEach public void setUp() { From 4570cf990fd2bf38f8b4ab5797be83632042dfc6 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 20:50:40 +0800 Subject: [PATCH 08/15] cancellation is work --- .../ReassignPartitionsCommandTest.java | 103 ++++++++++-------- 1 file changed, 60 insertions(+), 43 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 15658b8fb3774..af1f499e1b84c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -334,47 +335,13 @@ public void testProduceAndConsumeWithReassignmentInProgress() throws Exception { * Test running a reassignment and then cancelling it. */ @ClusterTest - public void testCancellation() throws Exception { - createTopics(); - TopicPartition foo0 = new TopicPartition("foo", 0); - TopicPartition baz1 = new TopicPartition("baz", 1); - - produceMessages(foo0.topic(), foo0.partition(), 200); - produceMessages(baz1.topic(), baz1.partition(), 200); - String assignment = "{\"version\":1,\"partitions\":" + - "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + - "{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}"; - try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { - assertEquals(unthrottledBrokerConfigs, - describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet())); - long interBrokerThrottle = 1L; - runExecuteAssignment(false, assignment, interBrokerThrottle, -1L); - waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); - - Map partStates = new HashMap<>(); - - partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false)); - partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false)); + public void testCancellationWithBootstrapServer() throws Exception { + testCancellationAction(true); + } - // Verify that the reassignment is running. The very low throttle should keep it - // from completing before this runs. - waitForVerifyAssignment(admin, assignment, true, - new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false)); - // Cancel the reassignment. - assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true)); - // Broker throttles are still active because we passed --preserve-throttles - waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); - // Cancelling the reassignment again should reveal nothing to cancel. - assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false)); - // This time, the broker throttles were removed. - waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); - // Verify that there are no ongoing reassignments. - assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing); - } - // Verify that the partition is removed from cancelled replicas - verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); - verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3)); + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) + public void testCancellationWithBootstrapController() throws Exception { + testCancellationAction(false); } @ClusterTest @@ -408,7 +375,7 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception { } // Now cancel the assignment and verify that the partition is removed from cancelled replicas - assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true)); + assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true)); verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); } @@ -725,9 +692,16 @@ private void runExecuteAssignment(Boolean additional, private Map.Entry, Set> runCancelAssignment( String jsonString, - Boolean preserveThrottles + Boolean preserveThrottles, + Boolean useBootstrapServer ) { - try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Map config; + if (useBootstrapServer) { + config = Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + } else { + config = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()); + } + try (Admin admin = Admin.create(config)) { return cancelAssignment(admin, jsonString, preserveThrottles, 10000L, Time.SYSTEM); } catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) { throw new RuntimeException(e); @@ -759,6 +733,49 @@ public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws ExecutionEx } } + private void testCancellationAction(boolean useBootstrapServer) throws InterruptedException { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + TopicPartition baz1 = new TopicPartition("baz", 1); + + produceMessages(foo0.topic(), foo0.partition(), 200); + produceMessages(baz1.topic(), baz1.partition(), 200); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + assertEquals(unthrottledBrokerConfigs, + describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet())); + long interBrokerThrottle = 1L; + runExecuteAssignment(false, assignment, interBrokerThrottle, -1L); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + + Map partStates = new HashMap<>(); + + partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false)); + partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false)); + + // Verify that the reassignment is running. The very low throttle should keep it + // from completing before this runs. + waitForVerifyAssignment(admin, assignment, true, + new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false)); + // Cancel the reassignment. + assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true, useBootstrapServer)); + // Broker throttles are still active because we passed --preserve-throttles + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + // Cancelling the reassignment again should reveal nothing to cancel. + assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false, useBootstrapServer)); + // This time, the broker throttles were removed. + waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); + // Verify that there are no ongoing reassignments. + assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing); + } + // Verify that the partition is removed from cancelled replicas + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); + verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3)); + } + /** * Remove a set of throttled partitions and reset the overall replication quota. */ From 00ff09690792824bc4104bac4cb6b1185d6998c0 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 21:20:35 +0800 Subject: [PATCH 09/15] add new test for ReassignPartitionsCommand --- .../reassign/ReassignPartitionsCommand.java | 2 +- .../ReassignPartitionsCommandTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index bf09d01056412..f34d930256e00 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -158,7 +158,7 @@ public static void main(String[] args) { private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { if (opts.options.has(opts.bootstrapControllerOpt)) { - if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.listOpt)) { + if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.listOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index af1f499e1b84c..f96c8755270cc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -95,6 +95,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -198,6 +199,24 @@ public void testHighWaterMarkAfterPartitionReassignment() throws Exception { }, "Timeout for waiting offset"); } } + + @ClusterTest + public void testGenerateAssignmentWithBootstrapServer() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 100); + + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + generateAssignment(admin, assignment, "1,2,3", false); + Map finalAssignment = singletonMap(foo0, + new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true)); + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + } + } @ClusterTest public void testAlterReassignmentThrottle() throws Exception { From 097a526fa8a80053d06fe6540242525045d28c14 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Sep 2024 21:23:37 +0800 Subject: [PATCH 10/15] add new test fot curReassignmentsToString --- .../ReassignPartitionsCommandTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index f96c8755270cc..1cf35666a13a2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -94,6 +94,7 @@ import static org.apache.kafka.tools.ToolsTestUtils.throttleAllBrokersReplication; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.curReassignmentsToString; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment; @@ -481,6 +482,33 @@ topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), } } + @ClusterTest + public void testCurrentReassignmentsToStringAroundAssigment() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 200); + + // Execute the assignment + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + runExecuteAssignment(false, assignment, -1L, -1L); + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Map finalAssignment = singletonMap(foo0, + new PartitionReassignmentState(asList(3, 1, 2), asList(3, 1, 2), true)); + + assertEquals("Current partition reassignments:\n" + + "foo-0: replicas: 3,1,2,0. adding: 3. removing: 0.", + curReassignmentsToString(admin)); + + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + + assertEquals("No partition reassignments found.", curReassignmentsToString(admin)); + } + } + private void createTopics() { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { Map> fooReplicasAssignments = new HashMap<>(); From fe0d82387afa08905800a0e43d3533492a645e7e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 2 Sep 2024 21:39:09 +0800 Subject: [PATCH 11/15] fix unsupported action --- .../apache/kafka/tools/reassign/ReassignPartitionsCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index f34d930256e00..10669df9ff1b9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -158,7 +158,7 @@ public static void main(String[] args) { private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { if (opts.options.has(opts.bootstrapControllerOpt)) { - if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.listOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { + if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); } } From 4a6c4986bf15527cb3e91eaf12d186304fff63fa Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 2 Sep 2024 23:05:21 +0800 Subject: [PATCH 12/15] remove test --- .../ReassignPartitionsCommandTest.java | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 1cf35666a13a2..ec09fa37686fe 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -482,33 +482,6 @@ topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), } } - @ClusterTest - public void testCurrentReassignmentsToStringAroundAssigment() throws Exception { - createTopics(); - TopicPartition foo0 = new TopicPartition("foo", 0); - produceMessages(foo0.topic(), foo0.partition(), 200); - - // Execute the assignment - String assignment = "{\"version\":1,\"partitions\":" + - "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}"; - - runExecuteAssignment(false, assignment, -1L, -1L); - try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { - Map finalAssignment = singletonMap(foo0, - new PartitionReassignmentState(asList(3, 1, 2), asList(3, 1, 2), true)); - - assertEquals("Current partition reassignments:\n" + - "foo-0: replicas: 3,1,2,0. adding: 3. removing: 0.", - curReassignmentsToString(admin)); - - waitForVerifyAssignment(admin, assignment, false, - new VerifyAssignmentResult(finalAssignment)); - - assertEquals("No partition reassignments found.", curReassignmentsToString(admin)); - } - } - private void createTopics() { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { Map> fooReplicasAssignments = new HashMap<>(); From 1fe7c524cb1092390b10105f1cccfe955a7e718d Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 16 Sep 2024 16:38:16 +0800 Subject: [PATCH 13/15] addressed by comments --- .../reassign/ReassignPartitionsCommand.java | 35 +++++++++---------- .../ReassignPartitionsCommandOptions.java | 2 +- .../ReassignPartitionsCommandArgsTest.java | 11 +++++- .../ReassignPartitionsCommandTest.java | 1 - 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 10669df9ff1b9..c671dbd23335c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -129,7 +129,6 @@ public static void main(String[] args) { Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); - validateBootstrapControllerNotSupportedAction(opts); if (opts.options.has(opts.bootstrapControllerOpt)) { props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt)); } else { @@ -156,14 +155,6 @@ public static void main(String[] args) { } } - private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { - if (opts.options.has(opts.bootstrapControllerOpt)) { - if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { - throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); - } - } - } - private static void handleAction(Admin adminClient, ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TerseException { if (opts.options.has(opts.verifyOpt)) { verifyAssignment(adminClient, @@ -1419,14 +1410,14 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { OptionSpec action = allActions.get(0); - OptionSpec bootstrapOpt = null; + boolean isBootstrapServer = true; if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); - else - bootstrapOpt = opts.options.has(opts.bootstrapServerOpt) ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt; + else + isBootstrapServer = opts.options.has(opts.bootstrapServerOpt); // Make sure that we have all the required arguments for our action. Map, List>> requiredArgs = new HashMap<>(); @@ -1451,32 +1442,32 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo Map, List>> permittedArgs = new HashMap<>(); permittedArgs.put(opts.verifyOpt, Arrays.asList( - bootstrapOpt, + opts.bootstrapServerOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt )); permittedArgs.put(opts.generateOpt, Arrays.asList( - bootstrapOpt, + opts.bootstrapServerOpt, opts.brokerListOpt, opts.commandConfigOpt, opts.disableRackAware )); permittedArgs.put(opts.executeOpt, Arrays.asList( opts.additionalOpt, - bootstrapOpt, + opts.bootstrapServerOpt, opts.commandConfigOpt, opts.interBrokerThrottleOpt, opts.replicaAlterLogDirsThrottleOpt, opts.timeoutOpt )); permittedArgs.put(opts.cancelOpt, Arrays.asList( - bootstrapOpt, + isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt, opts.timeoutOpt )); permittedArgs.put(opts.listOpt, Arrays.asList( - bootstrapOpt, + isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt )); @@ -1488,10 +1479,18 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo String.format("Option \"%s\" can't be used with action \"%s\"", opt, action)); } }); - + validateBootstrapControllerNotSupportedAction(opts); return opts; } + private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { + if (opts.options.has(opts.bootstrapControllerOpt)) { + if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { + throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); + } + } + } + static Set alterReplicaLogDirs(Admin adminClient, Map assignment) throws InterruptedException { Set results = new HashSet<>(); diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 611b9eb7a996d..2d31c5a902ab4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -86,7 +86,7 @@ public ReassignPartitionsCommandOptions(String[] args) { .ofType(String.class); bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + - "By default, the tool will get the quorum controller.") + "By default, the tool will get the quorum controller. This option supports the actions --cancel and --list.") .withOptionalArg() .describedAs("bootstrap controller to connect to") .ofType(String.class); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java index 9036fe31171aa..bd7b04a0d5cda 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java @@ -292,12 +292,21 @@ public void shouldNotAllowCancelWithoutReassignmentJsonFile() { @Test public void shouldAllowBootstrapControllerArg() { + String[] args = new String[] { + "--bootstrap-controller", "localhost:1234", + "--cancel", + "--reassignment-json-file", "myfile.json"}; + ReassignPartitionsCommand.validateAndParseArgs(args); + } + + @Test + public void shouldNotAllowBootstrapControllerArgWithUnsupportedAction() { String[] args = new String[] { "--bootstrap-controller", "localhost:1234", "--generate", "--broker-list", "101,102", "--topics-to-move-json-file", "myfile.json"}; - ReassignPartitionsCommand.validateAndParseArgs(args); + shouldFailWith("Option \"[bootstrap-controller]\" can't be used with action \"[generate]", args); } @Test diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index ec09fa37686fe..f96c8755270cc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -94,7 +94,6 @@ import static org.apache.kafka.tools.ToolsTestUtils.throttleAllBrokersReplication; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment; -import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.curReassignmentsToString; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment; From 6bec152bc1e5bb568f5137e4e8640a8971facb7c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 16 Sep 2024 16:44:26 +0800 Subject: [PATCH 14/15] addressed by comments --- .../kafka/tools/reassign/ReassignPartitionsCommand.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index c671dbd23335c..6c4da23354db5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -1410,14 +1410,12 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { OptionSpec action = allActions.get(0); - boolean isBootstrapServer = true; - if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); - else - isBootstrapServer = opts.options.has(opts.bootstrapServerOpt); + + boolean isBootstrapServer = opts.options.has(opts.bootstrapServerOpt); // Make sure that we have all the required arguments for our action. Map, List>> requiredArgs = new HashMap<>(); From 4dc4e625edbee46494c99d5e3dad2e9a600fe1ac Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 16 Sep 2024 16:46:43 +0800 Subject: [PATCH 15/15] remove unused method --- .../kafka/tools/reassign/ReassignPartitionsCommand.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 6c4da23354db5..d6f57a49a6d50 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -1477,18 +1477,9 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo String.format("Option \"%s\" can't be used with action \"%s\"", opt, action)); } }); - validateBootstrapControllerNotSupportedAction(opts); return opts; } - private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) { - if (opts.options.has(opts.bootstrapControllerOpt)) { - if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) { - throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action."); - } - } - } - static Set alterReplicaLogDirs(Admin adminClient, Map assignment) throws InterruptedException { Set results = new HashSet<>();