Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17374: add bootstrap.controller to kafka-reassign-partitions.sh #16964

Merged
merged 16 commits into from
Oct 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ public static void main(String[] args) {
Properties props = opts.options.has(opts.commandConfigOpt)
? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
: new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
validateBootstrapControllerNotSupportedAction(opts);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move this to validateAndParseArgs?

if (opts.options.has(opts.bootstrapControllerOpt)) {
props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt));
} else {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
}
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool");
adminClient = Admin.create(props);
handleAction(adminClient, opts);
Expand All @@ -151,6 +156,14 @@ public static void main(String[] args) {
}
}

private static void validateBootstrapControllerNotSupportedAction(ReassignPartitionsCommandOptions opts) {
if (opts.options.has(opts.bootstrapControllerOpt)) {
if (opts.options.has(opts.verifyOpt) || opts.options.has(opts.executeOpt) || opts.options.has(opts.generateOpt)) {
throw new UnsupportedOperationException("The --bootstrap-controller option is not supported with these action.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the unsupported actions to the comment?

}
}
}

private static void handleAction(Admin adminClient, ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TerseException {
if (opts.options.has(opts.verifyOpt)) {
verifyAssignment(adminClient,
Expand Down Expand Up @@ -1405,9 +1418,15 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
}

OptionSpec<?> action = allActions.get(0);

if (!opts.options.has(opts.bootstrapServerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server");

OptionSpec<String> bootstrapOpt = null;

if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller");
else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller");
else
bootstrapOpt = opts.options.has(opts.bootstrapServerOpt) ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt;

// Make sure that we have all the required arguments for our action.
Map<OptionSpec<?>, List<OptionSpec<?>>> requiredArgs = new HashMap<>();
Expand All @@ -1432,32 +1451,32 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
Map<OptionSpec<?>, List<OptionSpec<?>>> permittedArgs = new HashMap<>();

permittedArgs.put(opts.verifyOpt, Arrays.asList(
opts.bootstrapServerOpt,
bootstrapOpt,
opts.commandConfigOpt,
opts.preserveThrottlesOpt
));
permittedArgs.put(opts.generateOpt, Arrays.asList(
opts.bootstrapServerOpt,
bootstrapOpt,
opts.brokerListOpt,
opts.commandConfigOpt,
opts.disableRackAware
));
permittedArgs.put(opts.executeOpt, Arrays.asList(
opts.additionalOpt,
opts.bootstrapServerOpt,
bootstrapOpt,
opts.commandConfigOpt,
opts.interBrokerThrottleOpt,
opts.replicaAlterLogDirsThrottleOpt,
opts.timeoutOpt
));
permittedArgs.put(opts.cancelOpt, Arrays.asList(
opts.bootstrapServerOpt,
bootstrapOpt,
opts.commandConfigOpt,
opts.preserveThrottlesOpt,
opts.timeoutOpt
));
permittedArgs.put(opts.listOpt, Arrays.asList(
opts.bootstrapServerOpt,
bootstrapOpt,
opts.commandConfigOpt
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
final OptionSpec<String> reassignmentJsonFileOpt;
final OptionSpec<String> topicsToMoveJsonFileOpt;
final OptionSpec<String> brokerListOpt;
final OptionSpec<String> bootstrapControllerOpt;
final OptionSpec<?> disableRackAware;
final OptionSpec<Long> interBrokerThrottleOpt;
final OptionSpec<Long> replicaAlterLogDirsThrottleOpt;
Expand All @@ -54,8 +55,8 @@ public ReassignPartitionsCommandOptions(String[] args) {
listOpt = parser.accepts("list", "List all active partition reassignments.");

// Arguments
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.")
.withRequiredArg()
bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping.")
.withOptionalArg()
.describedAs("Server(s) to use for bootstrapping")
.ofType(String.class);

Expand Down Expand Up @@ -83,6 +84,13 @@ public ReassignPartitionsCommandOptions(String[] args) {
.withRequiredArg()
.describedAs("brokerlist")
.ofType(String.class);

bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " +
"By default, the tool will get the quorum controller.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the supported actions to the docs

.withOptionalArg()
.describedAs("bootstrap controller to connect to")
.ofType(String.class);

disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment");
interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@Timeout(60)
public class ReassignPartitionsCommandArgsTest {
public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify --bootstrap-server";
public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server or --bootstrap-controller";

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -289,4 +289,25 @@ public void shouldNotAllowCancelWithoutReassignmentJsonFile() {
"--preserve-throttles"};
shouldFailWith("Missing required argument \"[reassignment-json-file]\"", args);
}

@Test
public void shouldAllowBootstrapControllerArg() {
String[] args = new String[] {
"--bootstrap-controller", "localhost:1234",
"--generate",
"--broker-list", "101,102",
"--topics-to-move-json-file", "myfile.json"};
ReassignPartitionsCommand.validateAndParseArgs(args);
}

@Test
public void shouldNotAllowBootstrapControllerAndBootstrapServerArg() {
String[] args = new String[] {
"--bootstrap-server", "localhost:1234",
"--bootstrap-controller", "localhost:1234",
"--generate",
"--broker-list", "101,102",
"--topics-to-move-json-file", "myfile.json"};
shouldFailWith("Please don't specify both --bootstrap-server and --bootstrap-controller", args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
Expand Down Expand Up @@ -93,7 +94,9 @@
import static org.apache.kafka.tools.ToolsTestUtils.throttleAllBrokersReplication;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.curReassignmentsToString;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -197,6 +200,24 @@ public void testHighWaterMarkAfterPartitionReassignment() throws Exception {
}, "Timeout for waiting offset");
}
}

@ClusterTest
public void testGenerateAssignmentWithBootstrapServer() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 100);

try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
generateAssignment(admin, assignment, "1,2,3", false);
Map<TopicPartition, PartitionReassignmentState> finalAssignment = singletonMap(foo0,
new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true));
waitForVerifyAssignment(admin, assignment, false,
new VerifyAssignmentResult(finalAssignment));
}
}

@ClusterTest
public void testAlterReassignmentThrottle() throws Exception {
Expand Down Expand Up @@ -334,47 +355,13 @@ public void testProduceAndConsumeWithReassignmentInProgress() throws Exception {
* Test running a reassignment and then cancelling it.
*/
@ClusterTest
public void testCancellation() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
TopicPartition baz1 = new TopicPartition("baz", 1);

produceMessages(foo0.topic(), foo0.partition(), 200);
produceMessages(baz1.topic(), baz1.partition(), 200);
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet()));
long interBrokerThrottle = 1L;
runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);

Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();

partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false));
partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false));
public void testCancellationWithBootstrapServer() throws Exception {
testCancellationAction(true);
}

// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(admin, assignment, true,
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
// Cancel the reassignment.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true));
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);
// Cancelling the reassignment again should reveal nothing to cancel.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false));
// This time, the broker throttles were removed.
waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing);
}
// Verify that the partition is removed from cancelled replicas
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3));
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testCancellationWithBootstrapController() throws Exception {
testCancellationAction(false);
}

@ClusterTest
Expand Down Expand Up @@ -408,7 +395,7 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception {
}

// Now cancel the assignment and verify that the partition is removed from cancelled replicas
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true));
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
}
Expand Down Expand Up @@ -725,9 +712,16 @@ private void runExecuteAssignment(Boolean additional,

private Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(
String jsonString,
Boolean preserveThrottles
Boolean preserveThrottles,
Boolean useBootstrapServer
) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
Map<String, Object> config;
if (useBootstrapServer) {
config = Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
} else {
config = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers());
}
try (Admin admin = Admin.create(config)) {
return cancelAssignment(admin, jsonString, preserveThrottles, 10000L, Time.SYSTEM);
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -759,6 +753,49 @@ public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws ExecutionEx
}
}

private void testCancellationAction(boolean useBootstrapServer) throws InterruptedException {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
TopicPartition baz1 = new TopicPartition("baz", 1);

produceMessages(foo0.topic(), foo0.partition(), 200);
produceMessages(baz1.topic(), baz1.partition(), 200);
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet()));
long interBrokerThrottle = 1L;
runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);

Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();

partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false));
partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false));

// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(admin, assignment, true,
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
// Cancel the reassignment.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true, useBootstrapServer));
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);
// Cancelling the reassignment again should reveal nothing to cancel.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false, useBootstrapServer));
// This time, the broker throttles were removed.
waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing);
}
// Verify that the partition is removed from cancelled replicas
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3));
}

/**
* Remove a set of throttled partitions and reset the overall replication quota.
*/
Expand Down