Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17516: Synonyms for client metrics configs #17264

Merged
merged 14 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig

import scala.collection.mutable.ListBuffer
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
Expand Down Expand Up @@ -136,21 +136,12 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))

case ConfigResource.Type.CLIENT_METRICS =>
val subscriptionName = resource.resourceName
if (subscriptionName == null || subscriptionName.isEmpty) {
if (resource.resourceName == null || resource.resourceName.isEmpty) {
throw new InvalidRequestException("Client metrics subscription name must not be empty")
} else {
val entityProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName))
val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]()
entityProps.forEach((name, value) => {
configEntries += new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString)
.setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id())
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)
})

new DescribeConfigsResponseData.DescribeConfigsResult()
.setErrorCode(Errors.NONE.code)
.setConfigs(configEntries.asJava)
val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps)
createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation))
}

case ConfigResource.Type.GROUP =>
Expand Down Expand Up @@ -185,8 +176,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
}
}

def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
Expand All @@ -210,6 +201,28 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}

private def createClientMetricsConfigEntry(clientMetricsConfig: ClientMetricsConfigs, clientMetricsProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = ClientMetricsConfigs.configType(name).toScala
val valueAsString = ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
if (!clientMetricsProps.containsKey(name)) {
List.empty
} else {
List(new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.CLIENT_METRICS_CONFIG.id))
}
}
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) clientMetricsConfig.documentationOf(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult()
.setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(false).setReadOnly(false).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}

def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name).toScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,10 +944,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val groupResource = new ConfigResource(ConfigResource.Type.GROUP, "none_group")
val groupResult = client.describeConfigs(Seq(groupResource).asJava).all().get().get(groupResource)
assertNotEquals(0, groupResult.entries().size())

val metricResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "none_metric")
val metricResult = client.describeConfigs(Seq(metricResource).asJava).all().get().get(metricResource)
assertEquals(0, metricResult.entries().size())
Comment on lines -948 to -950
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do need to remove this line? Is it because now default configs will always appear for unknown subscription? Will it add confusion when describing? Should we just add a note in describe that if subscription doesn not exist then default values will be displayed, and check for subscriptions list to find which all subscriptions are applied to the cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes, that's what happens. Previously, if you displayed a non-existent resource, it didn't say "This doesn't exist", it just printed an empty list. Now, it prints the defaults. What ideally would happen is that it would say "This resource doesn't exist." but I think that's tricky to achieve. If you have ideas here, let me know. I'd prefer if the resource was flagged as an error in this case but the client metrics config manager in the broker is a tricky beast I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've dug more into this. The behaviour is the same for groups and client metrics. If you describe a non-existent configs resource, it returns an empty Properties() (this is in ConfigurationsImage.configProperties(ConfigResource). Then, the defaults are applied to this empty set which makes it look like the config resource exists.

I think it's worth making this work properly such that describing an existing resource works, and a non-existent resource fails neatly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree shall we do it as part of this PR or shall take it separtely? Whatever works with you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having taken a look at the code in more detail, I would prefer to do it as a follow-on PR. This PR would grow quite a bit doing both things at once.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected as well, thanks a lot @AndrewJSchofield for the fix and additional work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, is there a patch or ticket?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a small KIP I think.

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class ControllerConfigurationValidatorTest {
@Test
def testValidClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000")
config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000")
config.put(ClientMetricsConfigs.METRICS_CONFIG, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
config.put(ClientMetricsConfigs.MATCH_CONFIG, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1," +
"client_source_port=1234")
validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())
Expand All @@ -147,12 +147,12 @@ class ControllerConfigurationValidatorTest {
@Test
def testInvalidIntervalClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10")
config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "10")
assertEquals("Invalid value 10 for interval.ms, interval must be between 100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage)

config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001")
config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "3600001")
assertEquals("Invalid value 3600001 for interval.ms, interval must be between 100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage)
Expand All @@ -170,7 +170,7 @@ class ControllerConfigurationValidatorTest {
@Test
def testInvalidMatchClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10")
config.put(ClientMetricsConfigs.MATCH_CONFIG, "10")
assertEquals("Illegal client matching pattern: 10",
assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class KafkaApisTest extends Logging {
val subscriptionName = "client_metric_subscription_1"
val authorizedResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)

val props = ClientMetricsTestUtils.defaultProperties
val props = ClientMetricsTestUtils.defaultTestProperties
val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
props.forEach((x, y) =>
configEntries.add(new AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], y.asInstanceOf[String])))
Expand Down Expand Up @@ -440,7 +440,7 @@ class KafkaApisTest extends Logging {

val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
val cmConfigs = ClientMetricsTestUtils.defaultProperties
val cmConfigs = ClientMetricsTestUtils.defaultTestProperties
when(configRepository.config(resource)).thenReturn(cmConfigs)

metadataCache = mock(classOf[KRaftMetadataCache])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ public void close() throws Exception {
}

private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) {
List<String> metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
List<String> metrics = configs.getList(ClientMetricsConfigs.METRICS_CONFIG);
int pushInterval = configs.getInt(ClientMetricsConfigs.INTERVAL_MS_CONFIG);
List<String> clientMatchPattern = configs.getList(ClientMetricsConfigs.MATCH_CONFIG);

SubscriptionInfo newSubscription =
new SubscriptionInfo(subscriptionName, metrics, pushInterval,
Expand Down Expand Up @@ -329,7 +329,7 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst

private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata) {

int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
int pushIntervalMs = ClientMetricsConfigs.INTERVAL_MS_DEFAULT;
// Keep a set of metrics to avoid duplicates in case of overlapping subscriptions.
Set<String> subscribedMetrics = new HashSet<>();
boolean allMetricsSubscribed = false;
Expand All @@ -338,7 +338,7 @@ private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, Client
for (SubscriptionInfo info : subscriptionMap.values()) {
if (instanceMetadata.isMatch(info.matchPattern())) {
allMetricsSubscribed = allMetricsSubscribed || info.metrics().contains(
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
subscribedMetrics.addAll(info.metrics());
pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
}
Expand All @@ -351,7 +351,7 @@ private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, Client
if (allMetricsSubscribed) {
// Only add an * to indicate that all metrics are subscribed.
subscribedMetrics.clear();
subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
}

int subscriptionId = computeSubscriptionId(subscribedMetrics, pushIntervalMs, clientInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
Expand All @@ -52,27 +51,28 @@
* <ul>
* <li> "name" is a unique name for the subscription. This is used to identify the subscription in
* the broker. Ex: "METRICS-SUB"
* <li> "metrics" value should be comma separated metrics list. A prefix match on the requested metrics
*
* <li> "metrics" value should be comma-separated metrics list. A prefix match on the requested metrics
* is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed.
* A list containing just an empty string means all metrics subscribed.
* Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
*
* <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client
* should push the metrics to the broker.
*
* <li> "match" is a comma separated list of client match patterns, in case if there is no matching
* <li> "match" is a comma-separated list of client match patterns, in case if there is no matching
* pattern specified then broker considers that as all match which means the associated metrics
* applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*"
* which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc.
* </ul>
* For more information please look at kip-714:
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
* For more information please look at
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration">KIP-714</a>
*/
public class ClientMetricsConfigs extends AbstractConfig {

public static final String SUBSCRIPTION_METRICS = "metrics";
public static final String PUSH_INTERVAL_MS = "interval.ms";
public static final String CLIENT_MATCH_PATTERN = "match";
public static final String METRICS_CONFIG = "metrics";
public static final String INTERVAL_MS_CONFIG = "interval.ms";
public static final String MATCH_CONFIG = "match";

public static final String CLIENT_INSTANCE_ID = "client_instance_id";
public static final String CLIENT_ID = "client_id";
Expand All @@ -82,34 +82,50 @@ public class ClientMetricsConfigs extends AbstractConfig {
public static final String CLIENT_SOURCE_PORT = "client_source_port";

// '*' in client-metrics resource configs indicates that all the metrics are subscribed.
public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "*";
public static final String ALL_SUBSCRIBED_METRICS = "*";

public static final List<String> METRICS_DEFAULT = List.of();

public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
public static final int INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
private static final int MIN_INTERVAL_MS = 100; // 100ms
private static final int MAX_INTERVAL_MS = 3600000; // 1 hour

private static final Set<String> ALLOWED_MATCH_PARAMS = new HashSet<>(Arrays.asList(
public static final List<String> MATCH_DEFAULT = List.of();

private static final Set<String> ALLOWED_MATCH_PARAMS = Set.of(
CLIENT_INSTANCE_ID,
CLIENT_ID,
CLIENT_SOFTWARE_NAME,
CLIENT_SOFTWARE_VERSION,
CLIENT_SOURCE_ADDRESS,
CLIENT_SOURCE_PORT
));
);

private static final ConfigDef CONFIG = new ConfigDef()
.define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM, "Subscription metrics list")
.define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, Importance.MEDIUM, "Push interval in milliseconds")
.define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM, "Client match pattern list");
.define(METRICS_CONFIG, Type.LIST, METRICS_DEFAULT, Importance.MEDIUM, "Telemetry metric name prefix list")
.define(INTERVAL_MS_CONFIG, Type.INT, INTERVAL_MS_DEFAULT, Importance.MEDIUM, "Metrics push interval in milliseconds")
.define(MATCH_CONFIG, Type.LIST, MATCH_DEFAULT, Importance.MEDIUM, "Client match criteria");

public ClientMetricsConfigs(Properties props) {
super(CONFIG, props);
super(CONFIG, props, false);
}

public static ConfigDef configDef() {
return CONFIG;
}

public static Optional<Type> configType(String configName) {
return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type);
}

public static Map<String, Object> defaultConfigsMap() {
Map<String, Object> clientMetricsProps = new HashMap<>();
clientMetricsProps.put(METRICS_CONFIG, METRICS_DEFAULT);
clientMetricsProps.put(INTERVAL_MS_CONFIG, INTERVAL_MS_DEFAULT);
clientMetricsProps.put(MATCH_CONFIG, MATCH_DEFAULT);
return clientMetricsProps;
}

public static Set<String> names() {
return CONFIG.names();
}
Expand All @@ -134,18 +150,18 @@ private static void validateProperties(Properties properties) {
Map<String, Object> parsed = CONFIG.parse(properties);

// Make sure that push interval is between 100ms and 1 hour.
if (properties.containsKey(PUSH_INTERVAL_MS)) {
Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
if (properties.containsKey(INTERVAL_MS_CONFIG)) {
int pushIntervalMs = (Integer) parsed.get(INTERVAL_MS_CONFIG);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval must be between 100 and 3600000 (1 hour)",
pushIntervalMs, PUSH_INTERVAL_MS);
pushIntervalMs, INTERVAL_MS_CONFIG);
throw new InvalidRequestException(msg);
}
}

// Make sure that client match patterns are valid by parsing them.
if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
List<String> patterns = (List<String>) parsed.get(CLIENT_MATCH_PATTERN);
if (properties.containsKey(MATCH_CONFIG)) {
List<String> patterns = (List<String>) parsed.get(MATCH_CONFIG);
// Parse the client matching patterns to validate if the patterns are valid.
parseMatchingPatterns(patterns);
}
Expand Down Expand Up @@ -193,4 +209,14 @@ public static Map<String, Pattern> parseMatchingPatterns(List<String> patterns)
private static boolean isValidParam(String paramName) {
return ALLOWED_MATCH_PARAMS.contains(paramName);
}

/**
* Create a client metrics config instance using the given properties and defaults.
*/
public static ClientMetricsConfigs fromProps(Map<?, ?> defaults, Properties overrides) {
Properties props = new Properties();
props.putAll(defaults);
props.putAll(overrides);
return new ClientMetricsConfigs(props);
}
}
Loading