Skip to content

Commit

Permalink
KAFKA-17525 Convert the UnknownServerException to InvalidRequestExcep…
Browse files Browse the repository at this point in the history
…tion when altering client-metrics config at runtime (apache#17168)

Reviewers: Apoorv Mittal <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
TaiJuWu authored and tedyu committed Jan 6, 2025
1 parent 3f5cea0 commit 8569a79
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
19 changes: 19 additions & 0 deletions core/src/test/java/kafka/admin/ClientTelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -57,11 +58,14 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(value = ClusterTestExtensions.class)
Expand Down Expand Up @@ -123,6 +127,17 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru
}
}

@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
public void testIntervalMsParser(ClusterInstance clusterInstance) {
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
try (Admin client = clusterInstance.createAdminClient()) {
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts));

Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
}
}

@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
Expand All @@ -141,6 +156,10 @@ public void testMetrics(ClusterInstance clusterInstance) {
}
}

private static String[] toArray(List<String>... lists) {
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
}

/**
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
* {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static void validate(String subscriptionName, Properties properties) {
validateProperties(properties);
}

@SuppressWarnings("unchecked")
private static void validateProperties(Properties properties) {
// Make sure that all the properties are valid
properties.forEach((key, value) -> {
Expand All @@ -131,9 +132,11 @@ private static void validateProperties(Properties properties) {
}
});

Map<String, Object> parsed = CONFIG.parse(properties);

// Make sure that push interval is between 100ms and 1 hour.
if (properties.containsKey(PUSH_INTERVAL_MS)) {
int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)",
pushIntervalMs, PUSH_INTERVAL_MS);
Expand All @@ -143,7 +146,7 @@ private static void validateProperties(Properties properties) {

// Make sure that client match patterns are valid by parsing them.
if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
List<String> patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
List<String> patterns = (List<String>) parsed.get(CLIENT_MATCH_PATTERN);
// Parse the client matching patterns to validate if the patterns are valid.
parseMatchingPatterns(patterns);
}
Expand Down

0 comments on commit 8569a79

Please sign in to comment.