Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17525: Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime #17168

Merged
merged 11 commits into from
Oct 4, 2024
19 changes: 19 additions & 0 deletions core/src/test/java/kafka/admin/ClientTelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -58,11 +59,14 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(value = ClusterTestExtensions.class)
Expand Down Expand Up @@ -124,6 +128,17 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru
}
}

@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
public void testIntervalMsParser(ClusterInstance clusterInstance) {
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
try (Admin client = clusterInstance.createAdminClient()) {
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts));

Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
}
}

@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
Expand All @@ -142,6 +157,10 @@ public void testMetrics(ClusterInstance clusterInstance) {
}
}

private static String[] toArray(List<String>... lists) {
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
}

/**
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
* {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static void validate(String subscriptionName, Properties properties) {
validateProperties(properties);
}

@SuppressWarnings("unchecked")
private static void validateProperties(Properties properties) {
// Make sure that all the properties are valid
properties.forEach((key, value) -> {
Expand All @@ -131,9 +132,11 @@ private static void validateProperties(Properties properties) {
}
});

Map<String, Object> parsed = CONFIG.parse(properties);

// Make sure that push interval is between 100ms and 1 hour.
if (properties.containsKey(PUSH_INTERVAL_MS)) {
int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)",
pushIntervalMs, PUSH_INTERVAL_MS);
Expand All @@ -143,7 +146,7 @@ private static void validateProperties(Properties properties) {

// Make sure that client match patterns are valid by parsing them.
if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
List<String> patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
List<String> patterns = (List<String>) parsed.get(CLIENT_MATCH_PATTERN);
// Parse the client matching patterns to validate if the patterns are valid.
parseMatchingPatterns(patterns);
}
Expand Down