From 84ab266be29054fe36b0b8f22d8354e519e97677 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 24 Sep 2024 15:04:46 +0100 Subject: [PATCH 1/4] KAFKA-17516: Synonyms for client metrics configs --- .../scala/kafka/server/ConfigHelper.scala | 45 +++++++++----- .../admin/ConfigCommandIntegrationTest.java | 9 ++- .../api/PlaintextAdminIntegrationTest.scala | 4 -- ...ControllerConfigurationValidatorTest.scala | 12 ++-- .../kafka/server/ClientMetricsManager.java | 12 ++-- .../server/metrics/ClientMetricsConfigs.java | 60 +++++++++++++------ .../server/ClientMetricsManagerTest.java | 30 +++++----- .../metrics/ClientMetricsInstanceTest.java | 8 +-- .../metrics/ClientMetricsTestUtils.java | 10 ++-- 9 files changed, 111 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 344279e98c69d..bf523f8aa035c 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -35,9 +35,9 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.LogConfig -import scala.collection.mutable.ListBuffer import scala.collection.{Map, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @@ -136,21 +136,12 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)) case ConfigResource.Type.CLIENT_METRICS => - val subscriptionName = resource.resourceName - if (subscriptionName == null || subscriptionName.isEmpty) { + if (resource.resourceName == null || resource.resourceName.isEmpty) { throw new InvalidRequestException("Client metrics subscription name must not be empty") } else { - val entityProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)) - val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]() - entityProps.forEach((name, value) => { - configEntries += new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString) - .setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id()) - .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava) - }) - - new DescribeConfigsResponseData.DescribeConfigsResult() - .setErrorCode(Errors.NONE.code) - .setConfigs(configEntries.asJava) + val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName)) + val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps) + createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation)) } case ConfigResource.Type.GROUP => @@ -185,8 +176,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo } } - def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) - (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { + private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) + (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { val allNames = brokerSynonyms(name) val configEntryType = GroupConfig.configType(name).asScala val isSensitive = KafkaConfig.maybeSensitive(configEntryType) @@ -209,6 +200,28 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo .setDocumentation(configDocumentation).setConfigType(dataType.id) } + private def createClientMetricsConfigEntry(clientMetricsConfig: ClientMetricsConfigs, clientMetricsProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) + (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { + val configEntryType = ClientMetricsConfigs.configType(name).asScala + val valueAsString = ConfigDef.convertToString(value, configEntryType.orNull) + val allSynonyms = { + if (!clientMetricsProps.containsKey(name)) { + Nil + } else { + new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) + .setSource(ConfigSource.CLIENT_METRICS_CONFIG.id) :: Nil + } + } + val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source + val synonyms = if (!includeSynonyms) List.empty else allSynonyms + val dataType = configResponseType(configEntryType) + val configDocumentation = if (includeDocumentation) clientMetricsConfig.documentationOf(name) else null + new DescribeConfigsResponseData.DescribeConfigsResourceResult() + .setName(name).setValue(valueAsString).setConfigSource(source) + .setIsSensitive(false).setReadOnly(false).setSynonyms(synonyms.asJava) + .setDocumentation(configDocumentation).setConfigType(dataType.id) + } + def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { val configEntryType = LogConfig.configType(name).asScala diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index c2941847b274e..cf31178413913 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -372,7 +372,7 @@ private void verifyClientMetricsConfigUpdate() throws Exception { alterAndVerifyClientMetricsConfig(client, defaultClientMetricsName, configs); // Delete config - deleteAndVerifyClientMetricsConfigValue(client, defaultClientMetricsName, configs.keySet()); + deleteAndVerifyClientMetricsConfigValue(client, defaultClientMetricsName, configs); // Unknown config configured should fail assertThrows(ExecutionException.class, @@ -695,13 +695,12 @@ private void deleteAndVerifyGroupConfigValue(Admin client, private void deleteAndVerifyClientMetricsConfigValue(Admin client, String clientMetricsName, - Set defaultConfigs) throws Exception { + Map defaultConfigs) throws Exception { ConfigCommand.ConfigCommandOptions deleteOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, asList("--entity-name", clientMetricsName), - asList("--delete-config", String.join(",", defaultConfigs)))); + asList("--delete-config", String.join(",", defaultConfigs.keySet())))); ConfigCommand.alterConfig(client, deleteOpts); - // There are no default configs returned for client metrics - verifyClientMetricsConfig(client, clientMetricsName, Collections.emptyMap()); + verifyClientMetricsConfig(client, clientMetricsName, defaultConfigs); } private void verifyPerBrokerConfigValue(Admin client, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d439a59931d6c..af22a0e62843e 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -981,10 +981,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val groupResource = new ConfigResource(ConfigResource.Type.GROUP, "none_group") val groupResult = client.describeConfigs(Seq(groupResource).asJava).all().get().get(groupResource) assertNotEquals(0, groupResult.entries().size()) - - val metricResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "none_metric") - val metricResult = client.describeConfigs(Seq(metricResource).asJava).all().get().get(metricResource) - assertEquals(0, metricResult.entries().size()) } @ParameterizedTest diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 9f903336d23a7..3056753f53bf4 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -128,9 +128,9 @@ class ControllerConfigurationValidatorTest { @Test def testValidClientMetricsConfig(): Unit = { val config = new util.TreeMap[String, String]() - config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000") - config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency") - config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" + + config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000") + config.put(ClientMetricsConfigs.METRICS_CONFIG, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency") + config.put(ClientMetricsConfigs.MATCH_CONFIG, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" + ",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1," + "client_source_port=1234") validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap()) @@ -147,12 +147,12 @@ class ControllerConfigurationValidatorTest { @Test def testInvalidIntervalClientMetricsConfig(): Unit = { val config = new util.TreeMap[String, String]() - config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10") + config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "10") assertEquals("Invalid value 10 for interval.ms, interval must be between 100 and 3600000 (1 hour)", assertThrows(classOf[InvalidRequestException], () => validator.validate( new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) - config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001") + config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "3600001") assertEquals("Invalid value 3600001 for interval.ms, interval must be between 100 and 3600000 (1 hour)", assertThrows(classOf[InvalidRequestException], () => validator.validate( new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) @@ -170,7 +170,7 @@ class ControllerConfigurationValidatorTest { @Test def testInvalidMatchClientMetricsConfig(): Unit = { val config = new util.TreeMap[String, String]() - config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10") + config.put(ClientMetricsConfigs.MATCH_CONFIG, "10") assertEquals("Illegal client matching pattern: 10", assertThrows(classOf[InvalidConfigurationException], () => validator.validate( new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index 655a3d8625a84..79c84a0f9c3a5 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -235,9 +235,9 @@ public void close() throws Exception { } private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { - List metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); - int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); - List clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + List metrics = configs.getList(ClientMetricsConfigs.METRICS_CONFIG); + int pushInterval = configs.getInt(ClientMetricsConfigs.INTERVAL_MS_CONFIG); + List clientMatchPattern = configs.getList(ClientMetricsConfigs.MATCH_CONFIG); SubscriptionInfo newSubscription = new SubscriptionInfo(subscriptionName, metrics, pushInterval, @@ -319,7 +319,7 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata) { - int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS; + int pushIntervalMs = ClientMetricsConfigs.INTERVAL_MS_DEFAULT; // Keep a set of metrics to avoid duplicates in case of overlapping subscriptions. Set subscribedMetrics = new HashSet<>(); boolean allMetricsSubscribed = false; @@ -328,7 +328,7 @@ private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, Client for (SubscriptionInfo info : subscriptionMap.values()) { if (instanceMetadata.isMatch(info.matchPattern())) { allMetricsSubscribed = allMetricsSubscribed || info.metrics().contains( - ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); subscribedMetrics.addAll(info.metrics()); pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs()); } @@ -341,7 +341,7 @@ private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, Client if (allMetricsSubscribed) { // Only add an * to indicate that all metrics are subscribed. subscribedMetrics.clear(); - subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); } int subscriptionId = computeSubscriptionId(subscribedMetrics, pushIntervalMs, clientInstanceId); diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java index 3f19291fc8327..d5cbe8e7820f8 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java @@ -29,13 +29,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; /** - * Client metric configuration related parameters and the supporting methods like validation, etc. are + * Client metric configuration related parameters and the supporting methods like validation are * defined in this class. *

* { @@ -53,7 +54,8 @@ *

    *
  • "name" is a unique name for the subscription. This is used to identify the subscription in * the broker. Ex: "METRICS-SUB" - *
  • "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * + *
  • "metrics" value should be comma-separated metrics list. A prefix match on the requested metrics * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. * A list containing just an empty string means all metrics subscribed. * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" @@ -61,19 +63,19 @@ *
  • "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client * should push the metrics to the broker. * - *
  • "match" is a comma separated list of client match patterns, in case if there is no matching + *
  • "match" is a comma-separated list of client match patterns, in case if there is no matching * pattern specified then broker considers that as all match which means the associated metrics * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. *
- * For more information please look at kip-714: - * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + * For more information please look at + * KIP-714 */ public class ClientMetricsConfigs extends AbstractConfig { - public static final String SUBSCRIPTION_METRICS = "metrics"; - public static final String PUSH_INTERVAL_MS = "interval.ms"; - public static final String CLIENT_MATCH_PATTERN = "match"; + public static final String METRICS_CONFIG = "metrics"; + public static final String INTERVAL_MS_CONFIG = "interval.ms"; + public static final String MATCH_CONFIG = "match"; public static final String CLIENT_INSTANCE_ID = "client_instance_id"; public static final String CLIENT_ID = "client_id"; @@ -83,9 +85,9 @@ public class ClientMetricsConfigs extends AbstractConfig { public static final String CLIENT_SOURCE_PORT = "client_source_port"; // '*' in client-metrics resource configs indicates that all the metrics are subscribed. - public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "*"; + public static final String ALL_SUBSCRIBED_METRICS = "*"; - public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + public static final int INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes private static final int MIN_INTERVAL_MS = 100; // 100ms private static final int MAX_INTERVAL_MS = 3600000; // 1 hour @@ -99,9 +101,9 @@ public class ClientMetricsConfigs extends AbstractConfig { )); private static final ConfigDef CONFIG = new ConfigDef() - .define(SUBSCRIPTION_METRICS, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Subscription metrics list") - .define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, Importance.MEDIUM, "Push interval in milliseconds") - .define(CLIENT_MATCH_PATTERN, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Client match pattern list"); + .define(METRICS_CONFIG, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Telemetry metric name prefix list") + .define(INTERVAL_MS_CONFIG, Type.INT, INTERVAL_MS_DEFAULT, Importance.MEDIUM, "Metrics push interval in milliseconds") + .define(MATCH_CONFIG, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Client match criteria"); public ClientMetricsConfigs(Properties props) { super(CONFIG, props); @@ -111,6 +113,18 @@ public static ConfigDef configDef() { return CONFIG; } + public static Optional configType(String configName) { + return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type); + } + + public static Map defaultConfigsMap() { + Map clientMetricsProps = new HashMap<>(); + clientMetricsProps.put(METRICS_CONFIG, Collections.emptyList()); + clientMetricsProps.put(INTERVAL_MS_CONFIG, INTERVAL_MS_DEFAULT); + clientMetricsProps.put(MATCH_CONFIG, Collections.emptyList()); + return clientMetricsProps; + } + public static Set names() { return CONFIG.names(); } @@ -132,18 +146,18 @@ private static void validateProperties(Properties properties) { }); // Make sure that push interval is between 100ms and 1 hour. - if (properties.containsKey(PUSH_INTERVAL_MS)) { - int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + if (properties.containsKey(INTERVAL_MS_CONFIG)) { + int pushIntervalMs = Integer.parseInt(properties.getProperty(INTERVAL_MS_CONFIG)); if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)", - pushIntervalMs, PUSH_INTERVAL_MS); + pushIntervalMs, INTERVAL_MS_CONFIG); throw new InvalidRequestException(msg); } } // Make sure that client match patterns are valid by parsing them. - if (properties.containsKey(CLIENT_MATCH_PATTERN)) { - List patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(",")); + if (properties.containsKey(MATCH_CONFIG)) { + List patterns = Arrays.asList(properties.getProperty(MATCH_CONFIG).split(",")); // Parse the client matching patterns to validate if the patterns are valid. parseMatchingPatterns(patterns); } @@ -191,4 +205,14 @@ public static Map parseMatchingPatterns(List patterns) private static boolean isValidParam(String paramName) { return ALLOWED_MATCH_PARAMS.contains(paramName); } + + /** + * Create a client metrics config instance using the given properties and defaults. + */ + public static ClientMetricsConfigs fromProps(Map defaults, Properties overrides) { + Properties props = new Properties(); + props.putAll(defaults); + props.putAll(overrides); + return new ClientMetricsConfigs(props); + } } diff --git a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java index de6aad8f2045b..892ab9663dd36 100644 --- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java @@ -121,13 +121,13 @@ public void testUpdateSubscription() throws Exception { Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> assertTrue(metrics.contains(metric))); // Validate push interval. - assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.INTERVAL_MS_CONFIG), String.valueOf(subscriptionInfo.intervalMs())); // Validate match patterns. - assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), + assertEquals(ClientMetricsTestUtils.DEFAULT_MATCH.size(), subscriptionInfo.matchPattern().size()); - ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { + ClientMetricsTestUtils.DEFAULT_MATCH.forEach(pattern -> { String[] split = pattern.split("="); assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); @@ -212,7 +212,7 @@ public void testGetTelemetry() throws Exception { assertEquals(CompressionType.LZ4.id, response.data().acceptedCompressionTypes().get(1)); assertEquals(CompressionType.GZIP.id, response.data().acceptedCompressionTypes().get(2)); assertEquals(CompressionType.SNAPPY.id, response.data().acceptedCompressionTypes().get(3)); - assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertEquals(ClientMetricsTestUtils.DEFAULT_INTERVAL_MS, response.data().pushIntervalMs()); assertTrue(response.data().deltaTemporality()); assertEquals(100, response.data().telemetryMaxBytes()); assertEquals(Errors.NONE, response.error()); @@ -251,7 +251,7 @@ public void testGetTelemetryWithoutSubscription() throws UnknownHostException { assertTrue(response.data().subscriptionId() != 0); assertTrue(response.data().requestedMetrics().isEmpty()); assertEquals(4, response.data().acceptedCompressionTypes().size()); - assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, response.data().pushIntervalMs()); + assertEquals(ClientMetricsConfigs.INTERVAL_MS_DEFAULT, response.data().pushIntervalMs()); assertTrue(response.data().deltaTemporality()); assertEquals(100, response.data().telemetryMaxBytes()); assertEquals(Errors.NONE, response.error()); @@ -272,7 +272,7 @@ public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException assertNotNull(response.data().clientInstanceId()); assertEquals(Errors.NONE, response.error()); - time.sleep(ClientMetricsConfigs.DEFAULT_INTERVAL_MS); + time.sleep(ClientMetricsConfigs.INTERVAL_MS_DEFAULT); request = new GetTelemetrySubscriptionsRequest.Builder( new GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()), true).build(); @@ -287,7 +287,7 @@ public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHostException { clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); Properties properties = new Properties(); - properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); clientMetricsManager.updateSubscription("sub-2", properties); assertEquals(2, clientMetricsManager.subscriptions().size()); @@ -302,10 +302,10 @@ public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHost assertTrue(response.data().subscriptionId() != 0); assertEquals(1, response.data().requestedMetrics().size()); - assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG)); + assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS)); assertEquals(4, response.data().acceptedCompressionTypes().size()); - assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertEquals(ClientMetricsTestUtils.DEFAULT_INTERVAL_MS, response.data().pushIntervalMs()); assertTrue(response.data().deltaTemporality()); assertEquals(100, response.data().telemetryMaxBytes()); assertEquals(Errors.NONE, response.error()); @@ -403,7 +403,7 @@ public void testGetTelemetryUpdateSubscription() throws UnknownHostException { // Update subscription Properties properties = new Properties(); - properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); clientMetricsManager.updateSubscription("sub-2", properties); assertEquals(2, clientMetricsManager.subscriptions().size()); @@ -647,7 +647,7 @@ public void testPushTelemetryAfterPushIntervalTime() throws UnknownHostException assertEquals(Errors.NONE, response.error()); - time.sleep(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS); + time.sleep(ClientMetricsTestUtils.DEFAULT_INTERVAL_MS); response = clientMetricsManager.processPushTelemetryRequest( request, ClientMetricsTestUtils.requestContext()); @@ -1123,8 +1123,8 @@ public void testPushTelemetryPluginException() throws Exception { @Test public void testCacheEviction() throws Exception { Properties properties = new Properties(); - properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); - properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100"); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); + properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100"); clientMetricsManager.updateSubscription("sub-1", properties); GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( @@ -1160,8 +1160,8 @@ public void testCacheEviction() throws Exception { @Test public void testCacheEvictionWithMultipleClients() throws Exception { Properties properties = new Properties(); - properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); - properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100"); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS); + properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100"); clientMetricsManager.updateSubscription("sub-1", properties); GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java index 01826ac164894..d965c40ce3f8b 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java @@ -35,7 +35,7 @@ public void setUp() throws UnknownHostException { Uuid uuid = Uuid.randomUuid(); ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, ClientMetricsTestUtils.requestContext()); - clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); + clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, null, ClientMetricsConfigs.INTERVAL_MS_DEFAULT); } @Test @@ -47,7 +47,7 @@ public void testMaybeUpdateRequestTimestampValid() { @Test public void testMaybeUpdateGetRequestAfterElapsedTimeValid() { - assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.INTERVAL_MS_DEFAULT)); // Second request should be accepted as time since last request is greater than the push interval. assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); } @@ -61,7 +61,7 @@ public void testMaybeUpdateGetRequestWithImmediateRetryFail() { @Test public void testMaybeUpdatePushRequestAfterElapsedTimeValid() { - assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.INTERVAL_MS_DEFAULT)); // Second request should be accepted as time since last request is greater than the push interval. assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); } @@ -82,7 +82,7 @@ public void testMaybeUpdatePushRequestWithImmediateRetryFail() { @Test public void testMaybeUpdatePushRequestWithImmediateRetryAfterGetValid() { - assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.INTERVAL_MS_DEFAULT)); assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); // Next request after get should be accepted. assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() + 1)); diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java index e60d7d894fa44..395bf73e4207c 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java @@ -41,8 +41,8 @@ public class ClientMetricsTestUtils { public static final String DEFAULT_METRICS = "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency"; - public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds - public static final List DEFAULT_CLIENT_MATCH_PATTERNS = Collections.unmodifiableList(Arrays.asList( + public static final int DEFAULT_INTERVAL_MS = 30 * 1000; // 30 seconds + public static final List DEFAULT_MATCH = Collections.unmodifiableList(Arrays.asList( ClientMetricsConfigs.CLIENT_SOFTWARE_NAME + "=apache-kafka-java", ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION + "=3.5.*" )); @@ -50,9 +50,9 @@ public class ClientMetricsTestUtils { public static Properties defaultProperties() { Properties props = new Properties(); - props.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, DEFAULT_METRICS); - props.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, Integer.toString(DEFAULT_PUSH_INTERVAL_MS)); - props.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, String.join(",", DEFAULT_CLIENT_MATCH_PATTERNS)); + props.put(ClientMetricsConfigs.METRICS_CONFIG, DEFAULT_METRICS); + props.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_INTERVAL_MS)); + props.put(ClientMetricsConfigs.MATCH_CONFIG, String.join(",", DEFAULT_MATCH)); return props; } From 4df9cdf1b750995a9ca516edcc98df33cc175cfa Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 26 Sep 2024 11:19:31 +0100 Subject: [PATCH 2/4] Test failure --- .../test/java/kafka/admin/ConfigCommandIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index cf31178413913..2ad024b311fe7 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -367,11 +367,13 @@ private void verifyClientMetricsConfigUpdate() throws Exception { try (Admin client = cluster.createAdminClient()) { // Add config Map configs = new HashMap<>(); - configs.put("metrics", ""); + configs.put("metrics", "org.apache.kafka.producer."); configs.put("interval.ms", "6000"); alterAndVerifyClientMetricsConfig(client, defaultClientMetricsName, configs); // Delete config + configs.put("metrics", ""); + configs.put("interval.ms", "300000"); deleteAndVerifyClientMetricsConfigValue(client, defaultClientMetricsName, configs); // Unknown config configured should fail From ec3870236b6c189e1e91a643b9448fe0440f1bdd Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 3 Oct 2024 09:12:15 +0100 Subject: [PATCH 3/4] Checkstyle fix --- core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index 7521d02423232..5e73b39d2b950 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -32,7 +32,6 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; From f21f162fc191f41884e6d42891adb5eada718f99 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 20 Jan 2025 09:55:52 +0000 Subject: [PATCH 4/4] Correction after merge --- .../test/java/kafka/admin/ConfigCommandIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index b46344f4bea36..53a6cec2ec84d 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -320,7 +320,7 @@ public void testUpdateInvalidBrokerConfigs() { private void updateAndCheckInvalidBrokerConfig(Optional brokerIdOrDefault) { List alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); try (Admin client = cluster.admin()) { - alterConfigWithAdmin(client, brokerIdOrDefault, Collections.singletonMap("invalid", "2"), alterOpts); + alterConfigWithAdmin(client, brokerIdOrDefault, Map.of("invalid", "2"), alterOpts); Stream describeCommand = Stream.concat( Stream.concat( @@ -340,7 +340,7 @@ private void updateAndCheckInvalidBrokerConfig(Optional brokerIdOrDefaul public void testUpdateInvalidTopicConfigs() throws ExecutionException, InterruptedException { List alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "topics", "--alter"); try (Admin client = cluster.admin()) { - client.createTopics(Collections.singletonList(new NewTopic("test-config-topic", 1, (short) 1))).all().get(); + client.createTopics(List.of(new NewTopic("test-config-topic", 1, (short) 1))).all().get(); assertInstanceOf( InvalidConfigurationException.class, assertThrows(