diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java b/core/src/test/java/kafka/admin/ClientTelemetryTest.java index ff044a8d2e4a3..9e2a9f0057b86 100644 --- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java +++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.StringDeserializer; @@ -57,11 +58,14 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static java.util.Arrays.asList; import static org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) @@ -123,6 +127,17 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru } } + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testIntervalMsParser(ClusterInstance clusterInstance) { + List alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(), + "--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb"); + try (Admin client = clusterInstance.createAdminClient()) { + ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts)); + + Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts)); + assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName())); + } + } @ClusterTest(types = Type.KRAFT) public void testMetrics(ClusterInstance clusterInstance) { @@ -141,6 +156,10 @@ public void testMetrics(ClusterInstance clusterInstance) { } } + private static String[] toArray(List... lists) { + return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); + } + /** * We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the * {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java index 3f19291fc8327..830bd29a243cc 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java @@ -123,6 +123,7 @@ public static void validate(String subscriptionName, Properties properties) { validateProperties(properties); } + @SuppressWarnings("unchecked") private static void validateProperties(Properties properties) { // Make sure that all the properties are valid properties.forEach((key, value) -> { @@ -131,9 +132,11 @@ private static void validateProperties(Properties properties) { } }); + Map parsed = CONFIG.parse(properties); + // Make sure that push interval is between 100ms and 1 hour. if (properties.containsKey(PUSH_INTERVAL_MS)) { - int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS); if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)", pushIntervalMs, PUSH_INTERVAL_MS); @@ -143,7 +146,7 @@ private static void validateProperties(Properties properties) { // Make sure that client match patterns are valid by parsing them. if (properties.containsKey(CLIENT_MATCH_PATTERN)) { - List patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(",")); + List patterns = (List) parsed.get(CLIENT_MATCH_PATTERN); // Parse the client matching patterns to validate if the patterns are valid. parseMatchingPatterns(patterns); }