diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
new file mode 100644
index 0000000000000..80d89db73e8cd
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Allows only client configurations specified via connector.client.config.override.allowlist to be
+ * overridden by connectors. By default, connector.client.config.override.allowlist is empty so connectors
+ * can't override any client configurations.
+ */
+public class AllowlistConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
+
+ public static final String ALLOWLIST_CONFIG = "connector.client.config.override.allowlist";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AllowlistConnectorClientConfigOverridePolicy.class);
+ private static final List ALLOWLIST_CONFIG_DEFAULT = List.of();
+ private static final String ALLOWLIST_CONFIG_DOC = "List of client configurations that can be overridden by " +
+ "connectors. If empty, connectors can't override any client configurations.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
+
+ private List allowlist = ALLOWLIST_CONFIG_DEFAULT;
+
+ @Override
+ protected String policyName() {
+ return "Allowlist";
+ }
+
+ @Override
+ protected boolean isAllowed(ConfigValue configValue) {
+ return allowlist.contains(configValue.name());
+ }
+
+ @Override
+ public void configure(Map configs) {
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
+ allowlist = config.getList(ALLOWLIST_CONFIG);
+ LOGGER.info("Setting up Allowlist policy for ConnectorClientConfigOverride. This will allow the following client configurations"
+ + " to be overridden. {}", allowlist);
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
index 8ec04f97261e4..0c7bdad265979 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
@@ -32,7 +32,9 @@
/**
* Allows all {@code sasl} configurations to be overridden via the connector configs by setting {@code connector.client.config.override.policy} to
* {@code Principal}. This allows to set a principal per connector.
+ * @deprecated Use {@link AllowlistConnectorClientConfigOverridePolicy} instead.
*/
+@Deprecated(since = " 4.2", forRemoval = true)
public class PrincipalConnectorClientConfigOverridePolicy extends AbstractConnectorClientConfigOverridePolicy {
private static final Logger log = LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class);
@@ -52,6 +54,9 @@ protected boolean isAllowed(ConfigValue configValue) {
@Override
public void configure(Map configs) {
+ log.warn("The Principal ConnectorClientConfigOverridePolicy is deprecated, use the Allowlist policy instead. "
+ + "To replicate the Principal policy behavior, set the connector.client.config.override.allowlist configuration to \"{}\"",
+ String.join(",", ALLOWED_CONFIG));
log.info("Setting up Principal policy for ConnectorClientConfigOverride. This will allow `sasl` client configuration to be "
+ "overridden.");
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 8d953d7ded35b..35326c03ae3db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -27,6 +27,7 @@
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
@@ -158,9 +159,10 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
"Class name or alias of implementation of ConnectorClientConfigOverridePolicy. Defines what client configurations can be "
- + "overridden by the connector. The default implementation is All, meaning connector configurations can override all client properties. "
- + "The other possible policies in the framework include None to disallow connectors from overriding client properties, "
- + "and Principal to allow connectors to override only client principals.";
+ + "overridden by the connector. The default policy is All, meaning connector configurations can override all client properties. "
+ + "The other possible policies in the framework include Allowlist to specify allowed configurations via "
+ + "" + AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG + ", None to disallow connectors from overriding "
+ + "client properties, and Principal (now deprecated) to allow connectors to override only client principals.";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
index 8b76ce452b659..beb6f23be60d1 100644
--- a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
+++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -15,4 +15,5 @@
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy
-org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
\ No newline at end of file
+org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 0000000000000..3326a5744c0b0
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+public class AllowlistConnectorClientConfigOverridePolicyTest extends BaseConnectorClientConfigOverridePolicyTest {
+
+ private static final List ALL_CONFIGS = Stream.of(
+ ProducerConfig.configNames(),
+ ConsumerConfig.configNames(),
+ AdminClientConfig.configNames())
+ .flatMap(Collection::stream)
+ .toList();
+
+ private AllowlistConnectorClientConfigOverridePolicy policy;
+
+ @BeforeEach
+ public void setUp() {
+ policy = new AllowlistConnectorClientConfigOverridePolicy();
+ }
+
+ @Override
+ protected ConnectorClientConfigOverridePolicy policyToTest() {
+ return policy;
+ }
+
+ @Test
+ public void testDenyAllByDefault() {
+ for (String config : ALL_CONFIGS) {
+ testInvalidOverride(Map.of(config, new Object()));
+ }
+ }
+
+ @Test
+ public void testAllowConfigs() {
+ Set allowedConfigs = Set.of(
+ ProducerConfig.ACKS_CONFIG,
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG
+ );
+ policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, String.join(",", allowedConfigs)));
+ for (String config : ALL_CONFIGS) {
+ if (!allowedConfigs.contains(config)) {
+ testInvalidOverride(Map.of(config, new Object()));
+ } else {
+ testValidOverride(Map.of(config, new Object()));
+ }
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index c42402eea2ef1..36a14f7544042 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -19,12 +19,14 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -48,6 +50,13 @@ public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
+ private Map workerConfigs;
+
+ @BeforeEach
+ public void setup() {
+ workerConfigs = Map.of();
+ }
+
@Test
public void testCreateWithOverridesForNonePolicy() {
Map props = basicConnectorConfig();
@@ -93,9 +102,38 @@ public void testCreateWithAllowedOverridesForDefaultPolicy() throws Exception {
assertPassCreateConnector(null, props);
}
+ @Test
+ public void testCreateWithoutOverridesForAllowlistPolicy() throws Exception {
+ // setup up props for the sink connector
+ Map props = basicConnectorConfig();
+ assertPassCreateConnector("Allowlist", props);
+ }
+
+ @Test
+ public void testCreateWithNotAllowedOverridesForAllowlistPolicy() {
+ workerConfigs = Map.of(
+ AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, CommonClientConfigs.CLIENT_RACK_CONFIG
+ );
+ // setup up props for the sink connector
+ Map props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+ assertFailCreateConnector("Allowlist", props);
+ }
+
+ @Test
+ public void testCreateWithAllowedOverridesForAllowlistPolicy() throws Exception {
+ workerConfigs = Map.of(
+ AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, CommonClientConfigs.CLIENT_ID_CONFIG
+ );
+ // setup up props for the sink connector
+ Map props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+ assertPassCreateConnector("Allowlist", props);
+ }
+
private EmbeddedConnectCluster connectClusterWithPolicy(String policy) {
// setup Connect worker properties
- Map workerProps = new HashMap<>();
+ Map workerProps = new HashMap<>(workerConfigs);
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
if (policy != null) {
workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, policy);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 956153db5c68c..02caa21812f6b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -32,6 +32,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
@@ -696,6 +697,7 @@ private PluginDesc> transformationPluginDesc() {
return new PluginDesc(SampleTransformation.class, "1.0", PluginType.TRANSFORMATION, classLoader);
}
+ @SuppressWarnings("removal")
@Test
public void testConfigValidationPrincipalOnlyOverride() {
final Class extends Connector> connectorClass = SampleSourceConnector.class;
@@ -788,6 +790,50 @@ public void testConfigValidationAllOverride() {
verifyValidationIsolation();
}
+ @Test
+ public void testConfigValidationAllowlistOverride() {
+ final Class extends Connector> connectorClass = SampleSourceConnector.class;
+ AllowlistConnectorClientConfigOverridePolicy policy = new AllowlistConnectorClientConfigOverridePolicy();
+ policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, "acks"));
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, policy);
+
+ Map config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+ config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+ config.put("required", "value"); // connector required config
+ String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
+ String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
+ config.put(ackConfigKey, "none");
+ config.put(saslConfigKey, "jaas_config");
+
+ ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
+ assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
+
+ // We expect there to be errors due to sasl.jaas.config not being allowed. Note that these assertions depend heavily on
+ // the config fields for SourceConnectorConfig, but we expect these to change rarely.
+ assertEquals(SampleSourceConnector.class.getName(), result.name());
+ // Each transform also gets its own group
+ List expectedGroups = List.of(
+ ConnectorConfig.COMMON_GROUP,
+ ConnectorConfig.TRANSFORMS_GROUP,
+ ConnectorConfig.PREDICATES_GROUP,
+ ConnectorConfig.ERROR_GROUP,
+ SourceConnectorConfig.TOPIC_CREATION_GROUP,
+ SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP
+ );
+ assertEquals(expectedGroups, result.groups());
+ assertEquals(1, result.errorCount());
+ // Base connector config has 19 fields, connector's configs add 7, and 2 producer overrides
+ assertEquals(28, result.configs().size());
+ assertTrue(result.configs().stream().anyMatch(
+ configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
+ assertTrue(result.configs().stream().anyMatch(
+ configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
+
+ verifyValidationIsolation();
+ }
+
static final class TestClientConfigOverridePolicy extends AllConnectorClientConfigOverridePolicy implements Monitorable {
private static MetricName metricName = null;
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 80343474d7051..c0d22cd0fbfb7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -200,6 +200,13 @@
+ A new implementation of ConnectorClientConfigOverridePolicy, AllowlistConnectorClientConfigOverridePolicy,
+ has been added. This enables specifying the configurations that connectors can override via connector.client.config.override.allowlist.
+ From Kafka 5.0.0, this will be the default connector.client.config.override.policy
+ policy. The PrincipalConnectorClientConfigOverridePolicy policy is now deprecated and will be removed in Kafka 5.0.0.
+ For further details, please refer to KIP-1188.
+