diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 0dedfc431e8aa..d6f57a49a6d50 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -129,7 +129,11 @@ public static void main(String[] args) { Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + if (opts.options.has(opts.bootstrapControllerOpt)) { + props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt)); + } else { + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + } props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool"); adminClient = Admin.create(props); handleAction(adminClient, opts); @@ -1405,9 +1409,13 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { } OptionSpec action = allActions.get(0); + + if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); + else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller"); - if (!opts.options.has(opts.bootstrapServerOpt)) - CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server"); + boolean isBootstrapServer = opts.options.has(opts.bootstrapServerOpt); // Make sure that we have all the required arguments for our action. Map, List>> requiredArgs = new HashMap<>(); @@ -1451,13 +1459,13 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { opts.timeoutOpt )); permittedArgs.put(opts.cancelOpt, Arrays.asList( - opts.bootstrapServerOpt, + isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt, opts.timeoutOpt )); permittedArgs.put(opts.listOpt, Arrays.asList( - opts.bootstrapServerOpt, + isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt )); @@ -1469,7 +1477,6 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { String.format("Option \"%s\" can't be used with action \"%s\"", opt, action)); } }); - return opts; } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 058827bfeaf7f..2d31c5a902ab4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { final OptionSpec reassignmentJsonFileOpt; final OptionSpec topicsToMoveJsonFileOpt; final OptionSpec brokerListOpt; + final OptionSpec bootstrapControllerOpt; final OptionSpec disableRackAware; final OptionSpec interBrokerThrottleOpt; final OptionSpec replicaAlterLogDirsThrottleOpt; @@ -54,8 +55,8 @@ public ReassignPartitionsCommandOptions(String[] args) { listOpt = parser.accepts("list", "List all active partition reassignments."); // Arguments - bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.") - .withRequiredArg() + bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping.") + .withOptionalArg() .describedAs("Server(s) to use for bootstrapping") .ofType(String.class); @@ -83,6 +84,13 @@ public ReassignPartitionsCommandOptions(String[] args) { .withRequiredArg() .describedAs("brokerlist") .ofType(String.class); + + bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + + "By default, the tool will get the quorum controller. This option supports the actions --cancel and --list.") + .withOptionalArg() + .describedAs("bootstrap controller to connect to") + .ofType(String.class); + disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment"); interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " + "This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " + diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java index 4837cf1b74eff..bd7b04a0d5cda 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandArgsTest.java @@ -29,7 +29,7 @@ @Timeout(60) public class ReassignPartitionsCommandArgsTest { - public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify --bootstrap-server"; + public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server or --bootstrap-controller"; @BeforeEach public void setUp() { @@ -289,4 +289,34 @@ public void shouldNotAllowCancelWithoutReassignmentJsonFile() { "--preserve-throttles"}; shouldFailWith("Missing required argument \"[reassignment-json-file]\"", args); } + + @Test + public void shouldAllowBootstrapControllerArg() { + String[] args = new String[] { + "--bootstrap-controller", "localhost:1234", + "--cancel", + "--reassignment-json-file", "myfile.json"}; + ReassignPartitionsCommand.validateAndParseArgs(args); + } + + @Test + public void shouldNotAllowBootstrapControllerArgWithUnsupportedAction() { + String[] args = new String[] { + "--bootstrap-controller", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json"}; + shouldFailWith("Option \"[bootstrap-controller]\" can't be used with action \"[generate]", args); + } + + @Test + public void shouldNotAllowBootstrapControllerAndBootstrapServerArg() { + String[] args = new String[] { + "--bootstrap-server", "localhost:1234", + "--bootstrap-controller", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json"}; + shouldFailWith("Please don't specify both --bootstrap-server and --bootstrap-controller", args); + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 15658b8fb3774..f96c8755270cc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -94,6 +95,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -197,6 +199,24 @@ public void testHighWaterMarkAfterPartitionReassignment() throws Exception { }, "Timeout for waiting offset"); } } + + @ClusterTest + public void testGenerateAssignmentWithBootstrapServer() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 100); + + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + generateAssignment(admin, assignment, "1,2,3", false); + Map finalAssignment = singletonMap(foo0, + new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true)); + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + } + } @ClusterTest public void testAlterReassignmentThrottle() throws Exception { @@ -334,47 +354,13 @@ public void testProduceAndConsumeWithReassignmentInProgress() throws Exception { * Test running a reassignment and then cancelling it. */ @ClusterTest - public void testCancellation() throws Exception { - createTopics(); - TopicPartition foo0 = new TopicPartition("foo", 0); - TopicPartition baz1 = new TopicPartition("baz", 1); - - produceMessages(foo0.topic(), foo0.partition(), 200); - produceMessages(baz1.topic(), baz1.partition(), 200); - String assignment = "{\"version\":1,\"partitions\":" + - "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + - "{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}"; - try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { - assertEquals(unthrottledBrokerConfigs, - describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet())); - long interBrokerThrottle = 1L; - runExecuteAssignment(false, assignment, interBrokerThrottle, -1L); - waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); - - Map partStates = new HashMap<>(); - - partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false)); - partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false)); + public void testCancellationWithBootstrapServer() throws Exception { + testCancellationAction(true); + } - // Verify that the reassignment is running. The very low throttle should keep it - // from completing before this runs. - waitForVerifyAssignment(admin, assignment, true, - new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false)); - // Cancel the reassignment. - assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true)); - // Broker throttles are still active because we passed --preserve-throttles - waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); - // Cancelling the reassignment again should reveal nothing to cancel. - assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false)); - // This time, the broker throttles were removed. - waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); - // Verify that there are no ongoing reassignments. - assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing); - } - // Verify that the partition is removed from cancelled replicas - verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); - verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3)); + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) + public void testCancellationWithBootstrapController() throws Exception { + testCancellationAction(false); } @ClusterTest @@ -408,7 +394,7 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception { } // Now cancel the assignment and verify that the partition is removed from cancelled replicas - assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true)); + assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true)); verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); } @@ -725,9 +711,16 @@ private void runExecuteAssignment(Boolean additional, private Map.Entry, Set> runCancelAssignment( String jsonString, - Boolean preserveThrottles + Boolean preserveThrottles, + Boolean useBootstrapServer ) { - try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Map config; + if (useBootstrapServer) { + config = Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + } else { + config = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()); + } + try (Admin admin = Admin.create(config)) { return cancelAssignment(admin, jsonString, preserveThrottles, 10000L, Time.SYSTEM); } catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) { throw new RuntimeException(e); @@ -759,6 +752,49 @@ public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws ExecutionEx } } + private void testCancellationAction(boolean useBootstrapServer) throws InterruptedException { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + TopicPartition baz1 = new TopicPartition("baz", 1); + + produceMessages(foo0.topic(), foo0.partition(), 200); + produceMessages(baz1.topic(), baz1.partition(), 200); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + assertEquals(unthrottledBrokerConfigs, + describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet())); + long interBrokerThrottle = 1L; + runExecuteAssignment(false, assignment, interBrokerThrottle, -1L); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + + Map partStates = new HashMap<>(); + + partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false)); + partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false)); + + // Verify that the reassignment is running. The very low throttle should keep it + // from completing before this runs. + waitForVerifyAssignment(admin, assignment, true, + new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false)); + // Cancel the reassignment. + assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true, useBootstrapServer)); + // Broker throttles are still active because we passed --preserve-throttles + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + // Cancelling the reassignment again should reveal nothing to cancel. + assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false, useBootstrapServer)); + // This time, the broker throttles were removed. + waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); + // Verify that there are no ongoing reassignments. + assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing); + } + // Verify that the partition is removed from cancelled replicas + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); + verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3)); + } + /** * Remove a set of throttled partitions and reset the overall replication quota. */