Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17525: Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime #17168

Merged
merged 11 commits into from
Oct 4, 2024
18 changes: 18 additions & 0 deletions core/src/test/java/kafka/admin/ClientTelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -58,10 +59,13 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(value = ClusterTestExtensions.class)
Expand Down Expand Up @@ -120,6 +124,20 @@ public void testClientInstanceId(ClusterInstance clusterInstance) throws Interru
}
}

private static String[] toArray(List<String>... lists) {
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a line break

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be better to place the private method after public methods and before static inner class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will notice in future.

@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
public void testIntervalMsParser(ClusterInstance clusterInstance) {
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
try (Admin client = clusterInstance.createAdminClient()) {
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts));

Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
}
}

@ClusterTest(types = Type.KRAFT)
public void testMetrics(ClusterInstance clusterInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static void validate(String subscriptionName, Properties properties) {
validateProperties(properties);
}

@SuppressWarnings("unchecked")
private static void validateProperties(Properties properties) {
// Make sure that all the properties are valid
properties.forEach((key, value) -> {
Expand All @@ -133,8 +134,8 @@ private static void validateProperties(Properties properties) {

// Make sure that push interval is between 100ms and 1 hour.
if (properties.containsKey(PUSH_INTERVAL_MS)) {
int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) {
Integer pushIntervalMs = (Integer) ConfigDef.parseType(PUSH_INTERVAL_MS, properties.getProperty(PUSH_INTERVAL_MS), Type.INT);
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get it right, the issue we are trying to solve is regarding the correct exception when config is not as INT or expected? If that's the case then either of the 3 approach I think would be better:

  1. Make this method non-static, instance method. And remove properties as input. Then instantiate ClientMetricsConfig where they are used and call validate. The downside of this is that validation also happens in ControllerConfigurationValidator where unnecessary ClientMetricsConfig will be created.
  2. Enclose a try/catch in validate method and return what received in exception but can return config exception where exception translates to UnknownServer Exception. This is not very clean but a good way to handle.
  3. Use CONFIG.parse(properties) in validate method to get Map<?,?> valueMaps = CONFIG.parse(properties); and then iterate on map as like we iterate on properties. That shall solve your problem.

I would have chosen option 3. Also if you detail on the prolem satetment in PR description or jira then I can help better. But as far as I think, option 3 might solve your issue.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the updated jira, thank you. Let me know if option 3 works for you, ideally should.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10 , sorry for late response.
I have updated this PR according your great suggestion.
Please take a look.

if (pushIntervalMs == null || pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) {
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, what happens when we use kafka-client-metrics.sh tool to just update pattern or metrics i.e. pushIntervalMs is not updated? Then isn't pushIntervalMs not null?

Copy link
Contributor Author

@TaiJuWu TaiJuWu Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10, thanks for the review and sorry for late response.

If kafka-client-metrics.sh doesn't have pushIntervalMs, it should not enter this block (check in 136).
I added a null check just to ensure that null values are handled properly and suppress compiler warning.
Ideally, the check should always pass.

String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)",
pushIntervalMs, PUSH_INTERVAL_MS);
throw new InvalidRequestException(msg);
Expand All @@ -143,7 +144,7 @@ private static void validateProperties(Properties properties) {

// Make sure that client match patterns are valid by parsing them.
if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
List<String> patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
List<String> patterns = (List<String>) ConfigDef.parseType(CLIENT_MATCH_PATTERN, properties.getProperty(CLIENT_MATCH_PATTERN), Type.LIST);
// Parse the client matching patterns to validate if the patterns are valid.
parseMatchingPatterns(patterns);
}
Expand Down