From e6100627b6ab044eb5d9975bc7e9890e68685644 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 22 Oct 2025 18:12:09 +0200 Subject: [PATCH 1/3] KAFKA-19824: New AllowlistConnectorClientConfigOverridePolicy (KIP-1188) --- ...stConnectorClientConfigOverridePolicy.java | 64 ++++++++++++++++ ...alConnectorClientConfigOverridePolicy.java | 2 + .../kafka/connect/runtime/WorkerConfig.java | 6 +- ...policy.ConnectorClientConfigOverridePolicy | 3 +- ...nnectorClientConfigOverridePolicyTest.java | 76 +++++++++++++++++++ .../ConnectorClientPolicyIntegrationTest.java | 40 +++++++++- .../connect/runtime/AbstractHerderTest.java | 46 +++++++++++ docs/upgrade.html | 7 ++ 8 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java new file mode 100644 index 0000000000000..80d89db73e8cd --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.connector.policy; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Allows only client configurations specified via connector.client.config.override.allowlist to be + * overridden by connectors. By default, connector.client.config.override.allowlist is empty so connectors + * can't override any client configurations. + */ +public class AllowlistConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy { + + public static final String ALLOWLIST_CONFIG = "connector.client.config.override.allowlist"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AllowlistConnectorClientConfigOverridePolicy.class); + private static final List ALLOWLIST_CONFIG_DEFAULT = List.of(); + private static final String ALLOWLIST_CONFIG_DOC = "List of client configurations that can be overridden by " + + "connectors. If empty, connectors can't override any client configurations."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC); + + private List allowlist = ALLOWLIST_CONFIG_DEFAULT; + + @Override + protected String policyName() { + return "Allowlist"; + } + + @Override + protected boolean isAllowed(ConfigValue configValue) { + return allowlist.contains(configValue.name()); + } + + @Override + public void configure(Map configs) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); + allowlist = config.getList(ALLOWLIST_CONFIG); + LOGGER.info("Setting up Allowlist policy for ConnectorClientConfigOverride. This will allow the following client configurations" + + " to be overridden. {}", allowlist); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java index 8ec04f97261e4..cd3ec037fa007 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java @@ -32,7 +32,9 @@ /** * Allows all {@code sasl} configurations to be overridden via the connector configs by setting {@code connector.client.config.override.policy} to * {@code Principal}. This allows to set a principal per connector. + * @deprecated Use {@link AllowlistConnectorClientConfigOverridePolicy} instead. */ +@Deprecated(since = " 4.2", forRemoval = true) public class PrincipalConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy { private static final Logger log = LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 8d953d7ded35b..102548cba72f0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode; import org.apache.kafka.connect.runtime.rest.RestServerConfig; @@ -159,8 +160,9 @@ public class WorkerConfig extends AbstractConfig { public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC = "Class name or alias of implementation of ConnectorClientConfigOverridePolicy. Defines what client configurations can be " + "overridden by the connector. The default implementation is All, meaning connector configurations can override all client properties. " - + "The other possible policies in the framework include None to disallow connectors from overriding client properties, " - + "and Principal to allow connectors to override only client principals."; + + "The other possible policies in the framework include Allowlist to specify allowed configurations via " + + "" + AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG + ", None to disallow connectors from overriding " + + "client properties, and Principal (now deprecated) to allow connectors to override only client principals."; public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All"; diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy index 8b76ce452b659..beb6f23be60d1 100644 --- a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy @@ -15,4 +15,5 @@ org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy -org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy \ No newline at end of file +org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy +org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java new file mode 100644 index 0000000000000..3326a5744c0b0 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.connector.policy; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +public class AllowlistConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest { + + private static final List ALL_CONFIGS = Stream.of( + ProducerConfig.configNames(), + ConsumerConfig.configNames(), + AdminClientConfig.configNames()) + .flatMap(Collection::stream) + .toList(); + + private AllowlistConnectorClientConfigOverridePolicy policy; + + @BeforeEach + public void setUp() { + policy = new AllowlistConnectorClientConfigOverridePolicy(); + } + + @Override + protected ConnectorClientConfigOverridePolicy policyToTest() { + return policy; + } + + @Test + public void testDenyAllByDefault() { + for (String config : ALL_CONFIGS) { + testInvalidOverride(Map.of(config, new Object())); + } + } + + @Test + public void testAllowConfigs() { + Set allowedConfigs = Set.of( + ProducerConfig.ACKS_CONFIG, + ConsumerConfig.CLIENT_ID_CONFIG, + AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG + ); + policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, String.join(",", allowedConfigs))); + for (String config : ALL_CONFIGS) { + if (!allowedConfigs.contains(config)) { + testInvalidOverride(Map.of(config, new Object())); + } else { + testValidOverride(Map.of(config, new Object())); + } + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java index c42402eea2ef1..36a14f7544042 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java @@ -19,12 +19,14 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -48,6 +50,13 @@ public class ConnectorClientPolicyIntegrationTest { private static final int NUM_WORKERS = 1; private static final String CONNECTOR_NAME = "simple-conn"; + private Map workerConfigs; + + @BeforeEach + public void setup() { + workerConfigs = Map.of(); + } + @Test public void testCreateWithOverridesForNonePolicy() { Map props = basicConnectorConfig(); @@ -93,9 +102,38 @@ public void testCreateWithAllowedOverridesForDefaultPolicy() throws Exception { assertPassCreateConnector(null, props); } + @Test + public void testCreateWithoutOverridesForAllowlistPolicy() throws Exception { + // setup up props for the sink connector + Map props = basicConnectorConfig(); + assertPassCreateConnector("Allowlist", props); + } + + @Test + public void testCreateWithNotAllowedOverridesForAllowlistPolicy() { + workerConfigs = Map.of( + AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, CommonClientConfigs.CLIENT_RACK_CONFIG + ); + // setup up props for the sink connector + Map props = basicConnectorConfig(); + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test"); + assertFailCreateConnector("Allowlist", props); + } + + @Test + public void testCreateWithAllowedOverridesForAllowlistPolicy() throws Exception { + workerConfigs = Map.of( + AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, CommonClientConfigs.CLIENT_ID_CONFIG + ); + // setup up props for the sink connector + Map props = basicConnectorConfig(); + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test"); + assertPassCreateConnector("Allowlist", props); + } + private EmbeddedConnectCluster connectClusterWithPolicy(String policy) { // setup Connect worker properties - Map workerProps = new HashMap<>(); + Map workerProps = new HashMap<>(workerConfigs); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000)); if (policy != null) { workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, policy); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 956153db5c68c..2c7d28cd132c4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy; @@ -696,6 +697,7 @@ private PluginDesc> transformationPluginDesc() { return new PluginDesc(SampleTransformation.class, "1.0", PluginType.TRANSFORMATION, classLoader); } + @SuppressWarnings("removal") @Test public void testConfigValidationPrincipalOnlyOverride() { final Class connectorClass = SampleSourceConnector.class; @@ -788,6 +790,50 @@ public void testConfigValidationAllOverride() { verifyValidationIsolation(); } + @Test + public void testConfigValidationAllowlistOverride() { + final Class connectorClass = SampleSourceConnector.class; + AllowlistConnectorClientConfigOverridePolicy policy = new AllowlistConnectorClientConfigOverridePolicy(); + policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, "acks")); + AbstractHerder herder = createConfigValidationHerder(connectorClass, policy); + + Map config = new HashMap<>(); + config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName()); + config.put(ConnectorConfig.NAME_CONFIG, "connector-name"); + config.put("required", "value"); // connector required config + String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG); + String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG); + config.put(ackConfigKey, "none"); + config.put(saslConfigKey, "jaas_config"); + + ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); + assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); + + // We expect there to be errors due to sasl.jaas.config not being allowed Note that these assertions depend heavily on + // the config fields for SourceConnectorConfig, but we expect these to change rarely. + assertEquals(SampleSourceConnector.class.getName(), result.name()); + // Each transform also gets its own group + List expectedGroups = List.of( + ConnectorConfig.COMMON_GROUP, + ConnectorConfig.TRANSFORMS_GROUP, + ConnectorConfig.PREDICATES_GROUP, + ConnectorConfig.ERROR_GROUP, + SourceConnectorConfig.TOPIC_CREATION_GROUP, + SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP, + SourceConnectorConfig.OFFSETS_TOPIC_GROUP + ); + assertEquals(expectedGroups, result.groups()); + assertEquals(1, result.errorCount()); + // Base connector config has 19 fields, connector's configs add 7, and 2 producer overrides + assertEquals(28, result.configs().size()); + assertTrue(result.configs().stream().anyMatch( + configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty())); + assertTrue(result.configs().stream().anyMatch( + configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty())); + + verifyValidationIsolation(); + } + static final class TestClientConfigOverridePolicy extends AllConnectorClientConfigOverridePolicy implements Monitorable { private static MetricName metricName = null; diff --git a/docs/upgrade.html b/docs/upgrade.html index 80343474d7051..c0d22cd0fbfb7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -200,6 +200,13 @@
Notable changes in 4 KafkaStreams#close(org.apache.kafka.streams.CloseOptions). For further details, please refer to KIP-1153. +
  • + A new implementation of ConnectorClientConfigOverridePolicy, AllowlistConnectorClientConfigOverridePolicy, + has been added. This enables specifying the configurations that connectors can override via connector.client.config.override.allowlist. + From Kafka 5.0.0, this will be the default connector.client.config.override.policy + policy. The PrincipalConnectorClientConfigOverridePolicy policy is now deprecated and will be removed in Kafka 5.0.0. + For further details, please refer to KIP-1188. +
  • Upgrading to 4.1.0

    From 59bfb19319b6b39eb1b93f79586c9da898fede20 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 27 Oct 2025 12:30:59 +0100 Subject: [PATCH 2/3] Add deprecation warning log to Principal policy --- .../policy/PrincipalConnectorClientConfigOverridePolicy.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java index cd3ec037fa007..0c7bdad265979 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java @@ -54,6 +54,9 @@ protected boolean isAllowed(ConfigValue configValue) { @Override public void configure(Map configs) { + log.warn("The Principal ConnectorClientConfigOverridePolicy is deprecated, use the Allowlist policy instead. " + + "To replicate the Principal policy behavior, set the connector.client.config.override.allowlist configuration to \"{}\"", + String.join(",", ALLOWED_CONFIG)); log.info("Setting up Principal policy for ConnectorClientConfigOverride. This will allow `sasl` client configuration to be " + "overridden."); } From 95ed3db04e7edff4567b545899be8c64a5b3ecba Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 27 Oct 2025 13:02:07 +0100 Subject: [PATCH 3/3] Address Andrew's comments --- .../java/org/apache/kafka/connect/runtime/WorkerConfig.java | 2 +- .../org/apache/kafka/connect/runtime/AbstractHerderTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 102548cba72f0..35326c03ae3db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -159,7 +159,7 @@ public class WorkerConfig extends AbstractConfig { public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy"; public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC = "Class name or alias of implementation of ConnectorClientConfigOverridePolicy. Defines what client configurations can be " - + "overridden by the connector. The default implementation is All, meaning connector configurations can override all client properties. " + + "overridden by the connector. The default policy is All, meaning connector configurations can override all client properties. " + "The other possible policies in the framework include Allowlist to specify allowed configurations via " + "" + AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG + ", None to disallow connectors from overriding " + "client properties, and Principal (now deprecated) to allow connectors to override only client principals."; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 2c7d28cd132c4..02caa21812f6b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -809,7 +809,7 @@ public void testConfigValidationAllowlistOverride() { ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); assertEquals(ConnectorType.SOURCE, herder.connectorType(config)); - // We expect there to be errors due to sasl.jaas.config not being allowed Note that these assertions depend heavily on + // We expect there to be errors due to sasl.jaas.config not being allowed. Note that these assertions depend heavily on // the config fields for SourceConnectorConfig, but we expect these to change rarely. assertEquals(SampleSourceConnector.class.getName(), result.name()); // Each transform also gets its own group