Skip to content

Commit

Permalink
KAFKA-17485: POC KIP-1088 KafkaClientInterceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Oct 1, 2024
1 parent 7cca445 commit f15deda
Show file tree
Hide file tree
Showing 24 changed files with 528 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Configurable;

import java.util.HashMap;
import java.util.Map;

public class KafkaClientInterceptor implements Configurable {
protected Map<String, Object> config;

@Override
public void configure(final Map<String, ?> config) {
this.config = new HashMap<>(config);
}

public Admin wrapAdminClient(final KafkaAdminClient adminClient) {
return adminClient;
}

public Consumer<byte[], byte[]> wrapMainConsumer(final KafkaConsumer<byte[], byte[]> mainConsumer) {
return mainConsumer;
}

public Consumer<byte[], byte[]> wrapRestoreConsumer(final KafkaConsumer<byte[], byte[]> restoreConsumer) {
return restoreConsumer;
}

public Consumer<byte[], byte[]> wrapGlobalConsumer(final KafkaConsumer<byte[], byte[]> globalConsumer) {
return globalConsumer;
}

public Producer<byte[], byte[]> wrapProducer(final KafkaProducer<byte[], byte[]> producer) {
return producer;
}
}
106 changes: 92 additions & 14 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
Expand All @@ -30,6 +31,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -38,6 +40,7 @@
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -178,6 +181,7 @@ public class KafkaStreams implements AutoCloseable {
private final DelegatingStateRestoreListener delegatingStateRestoreListener;
private final UUID processId;
private final KafkaClientSupplier clientSupplier;
private final KafkaClientInterceptor interceptorSupplier;
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
Expand Down Expand Up @@ -452,8 +456,8 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler un
* might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic.
* The handler will execute on the thread that produced the exception.
* In order to get the thread that threw the exception, use {@code Thread.currentThread()}.
* <p>
* Note, this handler must be thread safe, since it will be shared among all threads, and invoked from any
*
* <p> Note, this handler must be thread safe, since it will be shared among all threads, and invoked from any
* thread that encounters such an exception.
*
* @param userStreamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
Expand Down Expand Up @@ -837,7 +841,7 @@ void setUserStandbyListener(final StandbyUpdateListener userStandbyListener) {
*/
public KafkaStreams(final Topology topology,
final Properties props) {
this(topology, new StreamsConfig(props));
this(topology, new StreamsConfig(props), (KafkaClientInterceptor) null, Time.SYSTEM);
}

/**
Expand All @@ -852,12 +856,19 @@ public KafkaStreams(final Topology topology,
* for the new {@code KafkaStreams} instance
* @throws StreamsException if any fatal error occurs
*/
@Deprecated
public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier) {
this(topology, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
}

public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientInterceptor interceptorSupplier) {
this(topology, new StreamsConfig(props), interceptorSupplier, Time.SYSTEM);
}

/**
* Create a {@code KafkaStreams} instance.
* <p>
Expand All @@ -872,7 +883,7 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final Properties props,
final Time time) {
this(topology, new StreamsConfig(props), time);
this(topology, new StreamsConfig(props), (KafkaClientInterceptor) null, time);
}

/**
Expand All @@ -888,13 +899,21 @@ public KafkaStreams(final Topology topology,
* @param time {@code Time} implementation; cannot be null
* @throws StreamsException if any fatal error occurs
*/
@Deprecated
public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier,
final Time time) {
this(topology, new StreamsConfig(props), clientSupplier, time);
}

public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientInterceptor interceptorSupplier,
final Time time) {
this(topology, new StreamsConfig(props), interceptorSupplier, time);
}

/**
* Create a {@code KafkaStreams} instance.
* <p>
Expand All @@ -907,7 +926,7 @@ public KafkaStreams(final Topology topology,
*/
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs) {
this(topology, applicationConfigs, applicationConfigs.getKafkaClientSupplier());
this(topology, applicationConfigs, (KafkaClientInterceptor) null, Time.SYSTEM);
}

/**
Expand All @@ -922,10 +941,17 @@ public KafkaStreams(final Topology topology,
* for the new {@code KafkaStreams} instance
* @throws StreamsException if any fatal error occurs
*/
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier) {
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, null, Time.SYSTEM);
}

public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final KafkaClientInterceptor interceptorSuppliern) {
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, interceptorSuppliern, Time.SYSTEM);
}

/**
Expand All @@ -942,26 +968,36 @@ public KafkaStreams(final Topology topology,
public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final Time time) {
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, applicationConfigs.getKafkaClientSupplier(), time);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, null, time);
}

// why was this not public ?
private KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, time);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, null, time);
}

public KafkaStreams(final Topology topology,
final StreamsConfig applicationConfigs,
final KafkaClientInterceptor interceptorSupplier,
final Time time) throws StreamsException {
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, interceptorSupplier, time);
}

protected KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(topologyMetadata, applicationConfigs, clientSupplier, Time.SYSTEM);
// NameTopolgies are going to be deprecated -- no need to worry about client-interceptor -- does not need to work with KIP-1071
this(topologyMetadata, applicationConfigs, clientSupplier, null, Time.SYSTEM);
}

@SuppressWarnings("this-escape")
@SuppressWarnings({"this-escape", "deprecation"})
private KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
final KafkaClientInterceptor interceptorSupplier,
final Time time) throws StreamsException {
this.applicationConfigs = applicationConfigs;
this.time = time;
Expand All @@ -972,7 +1008,12 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
final boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology();

try {
stateDirectory = new StateDirectory(applicationConfigs, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
stateDirectory = new StateDirectory(
applicationConfigs,
time,
topologyMetadata.hasPersistentStores(),
topologyMetadata.hasNamedTopologies()
);
processId = stateDirectory.initializeProcessId();
} catch (final ProcessorStateException fatal) {
Utils.closeQuietly(stateDirectory, "streams state directory");
Expand All @@ -991,9 +1032,41 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);

if (clientSupplier != null) {
if (interceptorSupplier != null) {
throw new ConfigException("use only one of both");
}
this.clientSupplier = clientSupplier;
} else if (applicationConfigs.originals().containsKey(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG)) {
if (interceptorSupplier != null) {
throw new ConfigException("use only one of both");
}
this.clientSupplier = applicationConfigs.getKafkaClientSupplier();
} else {
this.clientSupplier = null;
}
// TODO: add config for interceptor supplier ?
this.interceptorSupplier = interceptorSupplier;
if (this.interceptorSupplier != null) {
this.interceptorSupplier.configure(applicationConfigs.originals());
}

// use client id instead of thread client id since this admin client may be shared among threads
this.clientSupplier = clientSupplier;
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
if (this.clientSupplier != null) {
adminClient = this.clientSupplier.getAdmin(
applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))
);
} else {
if (this.interceptorSupplier == null) {
adminClient = Admin.create(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
} else {
adminClient = this.interceptorSupplier.wrapAdminClient(
(KafkaAdminClient) Admin.create(
applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))
)
);
}
}

log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
Expand Down Expand Up @@ -1034,7 +1107,11 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
globalStreamThread = new GlobalStreamThread(
topologyMetadata.globalTaskTopology(),
applicationConfigs,
clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
this.clientSupplier != null ?
this.clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)) :
this.interceptorSupplier == null ?
new KafkaConsumer<>(applicationConfigs.getGlobalConsumerConfigs(clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer()) :
this.interceptorSupplier.wrapGlobalConsumer(new KafkaConsumer<>(applicationConfigs.getGlobalConsumerConfigs(clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer())),
stateDirectory,
cacheSizePerThread,
streamsMetrics,
Expand Down Expand Up @@ -1068,6 +1145,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
topologyMetadata,
applicationConfigs,
clientSupplier,
interceptorSupplier,
adminClient,
processId,
clientId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,7 +1886,14 @@ public static Set<String> verifyTopologyOptimizationConfigs(final String config)
* Return configured KafkaClientSupplier
* @return Configured KafkaClientSupplier
*/
@Deprecated
public KafkaClientSupplier getKafkaClientSupplier() {
if (originals().containsKey(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG)) {
if (originals().containsKey("enable.kip1071")) {
throw new ConfigException("not compatible");
}
log.warn("Deprecated");
}
return getConfiguredInstance(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG,
KafkaClientSupplier.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
Expand All @@ -25,6 +26,7 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientInterceptor;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -58,6 +60,7 @@ class ActiveTaskCreator {
private final ThreadCache cache;
private final Time time;
private final KafkaClientSupplier clientSupplier;
private final KafkaClientInterceptor interceptorSupplier;
private final String threadId;
private final int threadIdx;
private final UUID processId;
Expand All @@ -78,6 +81,7 @@ class ActiveTaskCreator {
final ThreadCache cache,
final Time time,
final KafkaClientSupplier clientSupplier,
final KafkaClientInterceptor interceptorSupplier,
final String threadId,
final int threadIdx,
final UUID processId,
Expand All @@ -92,6 +96,7 @@ class ActiveTaskCreator {
this.cache = cache;
this.time = time;
this.clientSupplier = clientSupplier;
this.interceptorSupplier = interceptorSupplier;
this.threadId = threadId;
this.threadIdx = threadIdx;
this.processId = processId;
Expand Down Expand Up @@ -121,7 +126,11 @@ private Producer<byte[], byte[]> producer() {
applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + processId + "-" + threadIdx
);
}
return clientSupplier.getProducer(producerConfig);
if (clientSupplier != null) {
return clientSupplier.getProducer(producerConfig);
} else {
return interceptorSupplier.wrapProducer(new KafkaProducer<>(producerConfig));
}
}

public void reInitializeThreadProducer() {
Expand Down
Loading

0 comments on commit f15deda

Please sign in to comment.