diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientInterceptor.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientInterceptor.java new file mode 100644 index 0000000000000..6171025aed5d0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientInterceptor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Configurable; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaClientInterceptor implements Configurable { + protected Map config; + + @Override + public void configure(final Map config) { + this.config = new HashMap<>(config); + } + + public Admin wrapAdminClient(final KafkaAdminClient adminClient) { + return adminClient; + } + + public Consumer wrapMainConsumer(final KafkaConsumer mainConsumer) { + return mainConsumer; + } + + public Consumer wrapRestoreConsumer(final KafkaConsumer restoreConsumer) { + return restoreConsumer; + } + + public Consumer wrapGlobalConsumer(final KafkaConsumer globalConsumer) { + return globalConsumer; + } + + public Producer wrapProducer(final KafkaProducer producer) { + return producer; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 089d0b28206ae..6ebfc9bac52d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.MemberToRemove; import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions; @@ -30,6 +31,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricConfig; @@ -38,6 +40,7 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -178,6 +181,7 @@ public class KafkaStreams implements AutoCloseable { private final DelegatingStateRestoreListener delegatingStateRestoreListener; private final UUID processId; private final KafkaClientSupplier clientSupplier; + private final KafkaClientInterceptor interceptorSupplier; protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener; @@ -452,8 +456,8 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler un * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. * The handler will execute on the thread that produced the exception. * In order to get the thread that threw the exception, use {@code Thread.currentThread()}. - *

- * Note, this handler must be thread safe, since it will be shared among all threads, and invoked from any + * + *

Note, this handler must be thread safe, since it will be shared among all threads, and invoked from any * thread that encounters such an exception. * * @param userStreamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads @@ -837,7 +841,7 @@ void setUserStandbyListener(final StandbyUpdateListener userStandbyListener) { */ public KafkaStreams(final Topology topology, final Properties props) { - this(topology, new StreamsConfig(props)); + this(topology, new StreamsConfig(props), (KafkaClientInterceptor) null, Time.SYSTEM); } /** @@ -852,12 +856,19 @@ public KafkaStreams(final Topology topology, * for the new {@code KafkaStreams} instance * @throws StreamsException if any fatal error occurs */ + @Deprecated public KafkaStreams(final Topology topology, final Properties props, final KafkaClientSupplier clientSupplier) { this(topology, new StreamsConfig(props), clientSupplier, Time.SYSTEM); } + public KafkaStreams(final Topology topology, + final Properties props, + final KafkaClientInterceptor interceptorSupplier) { + this(topology, new StreamsConfig(props), interceptorSupplier, Time.SYSTEM); + } + /** * Create a {@code KafkaStreams} instance. *

@@ -872,7 +883,7 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final Properties props, final Time time) { - this(topology, new StreamsConfig(props), time); + this(topology, new StreamsConfig(props), (KafkaClientInterceptor) null, time); } /** @@ -888,6 +899,7 @@ public KafkaStreams(final Topology topology, * @param time {@code Time} implementation; cannot be null * @throws StreamsException if any fatal error occurs */ + @Deprecated public KafkaStreams(final Topology topology, final Properties props, final KafkaClientSupplier clientSupplier, @@ -895,6 +907,13 @@ public KafkaStreams(final Topology topology, this(topology, new StreamsConfig(props), clientSupplier, time); } + public KafkaStreams(final Topology topology, + final Properties props, + final KafkaClientInterceptor interceptorSupplier, + final Time time) { + this(topology, new StreamsConfig(props), interceptorSupplier, time); + } + /** * Create a {@code KafkaStreams} instance. *

@@ -907,7 +926,7 @@ public KafkaStreams(final Topology topology, */ public KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs) { - this(topology, applicationConfigs, applicationConfigs.getKafkaClientSupplier()); + this(topology, applicationConfigs, (KafkaClientInterceptor) null, Time.SYSTEM); } /** @@ -922,10 +941,17 @@ public KafkaStreams(final Topology topology, * for the new {@code KafkaStreams} instance * @throws StreamsException if any fatal error occurs */ + @Deprecated public KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier) { - this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, null, Time.SYSTEM); + } + + public KafkaStreams(final Topology topology, + final StreamsConfig applicationConfigs, + final KafkaClientInterceptor interceptorSuppliern) { + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, interceptorSuppliern, Time.SYSTEM); } /** @@ -942,26 +968,36 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs, final Time time) { - this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, applicationConfigs.getKafkaClientSupplier(), time); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, null, time); } + // why was this not public ? private KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier, final Time time) throws StreamsException { - this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, time); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, null, time); + } + + public KafkaStreams(final Topology topology, + final StreamsConfig applicationConfigs, + final KafkaClientInterceptor interceptorSupplier, + final Time time) throws StreamsException { + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, null, interceptorSupplier, time); } protected KafkaStreams(final TopologyMetadata topologyMetadata, final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier) throws StreamsException { - this(topologyMetadata, applicationConfigs, clientSupplier, Time.SYSTEM); + // NameTopolgies are going to be deprecated -- no need to worry about client-interceptor -- does not need to work with KIP-1071 + this(topologyMetadata, applicationConfigs, clientSupplier, null, Time.SYSTEM); } - @SuppressWarnings("this-escape") + @SuppressWarnings({"this-escape", "deprecation"}) private KafkaStreams(final TopologyMetadata topologyMetadata, final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier, + final KafkaClientInterceptor interceptorSupplier, final Time time) throws StreamsException { this.applicationConfigs = applicationConfigs; this.time = time; @@ -972,7 +1008,12 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, final boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology(); try { - stateDirectory = new StateDirectory(applicationConfigs, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies()); + stateDirectory = new StateDirectory( + applicationConfigs, + time, + topologyMetadata.hasPersistentStores(), + topologyMetadata.hasNamedTopologies() + ); processId = stateDirectory.initializeProcessId(); } catch (final ProcessorStateException fatal) { Utils.closeQuietly(stateDirectory, "streams state directory"); @@ -991,9 +1032,41 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, this.log = logContext.logger(getClass()); topologyMetadata.setLog(logContext); + if (clientSupplier != null) { + if (interceptorSupplier != null) { + throw new ConfigException("use only one of both"); + } + this.clientSupplier = clientSupplier; + } else if (applicationConfigs.originals().containsKey(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG)) { + if (interceptorSupplier != null) { + throw new ConfigException("use only one of both"); + } + this.clientSupplier = applicationConfigs.getKafkaClientSupplier(); + } else { + this.clientSupplier = null; + } + // TODO: add config for interceptor supplier ? + this.interceptorSupplier = interceptorSupplier; + if (this.interceptorSupplier != null) { + this.interceptorSupplier.configure(applicationConfigs.originals()); + } + // use client id instead of thread client id since this admin client may be shared among threads - this.clientSupplier = clientSupplier; - adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); + if (this.clientSupplier != null) { + adminClient = this.clientSupplier.getAdmin( + applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)) + ); + } else { + if (this.interceptorSupplier == null) { + adminClient = Admin.create(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); + } else { + adminClient = this.interceptorSupplier.wrapAdminClient( + (KafkaAdminClient) Admin.create( + applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)) + ) + ); + } + } log.info("Kafka Streams version: {}", ClientMetrics.version()); log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); @@ -1034,7 +1107,11 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, globalStreamThread = new GlobalStreamThread( topologyMetadata.globalTaskTopology(), applicationConfigs, - clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)), + this.clientSupplier != null ? + this.clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)) : + this.interceptorSupplier == null ? + new KafkaConsumer<>(applicationConfigs.getGlobalConsumerConfigs(clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer()) : + this.interceptorSupplier.wrapGlobalConsumer(new KafkaConsumer<>(applicationConfigs.getGlobalConsumerConfigs(clientId), new ByteArrayDeserializer(), new ByteArrayDeserializer())), stateDirectory, cacheSizePerThread, streamsMetrics, @@ -1068,6 +1145,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin topologyMetadata, applicationConfigs, clientSupplier, + interceptorSupplier, adminClient, processId, clientId, diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 79ebf661573ab..19b6fb57d3625 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1886,7 +1886,14 @@ public static Set verifyTopologyOptimizationConfigs(final String config) * Return configured KafkaClientSupplier * @return Configured KafkaClientSupplier */ + @Deprecated public KafkaClientSupplier getKafkaClientSupplier() { + if (originals().containsKey(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG)) { + if (originals().containsKey("enable.kip1071")) { + throw new ConfigException("not compatible"); + } + log.warn("Deprecated"); + } return getConfiguredInstance(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, KafkaClientSupplier.class); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 1bbb4b921baea..daac1b0c22450 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Metric; @@ -25,6 +26,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaClientInterceptor; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -58,6 +60,7 @@ class ActiveTaskCreator { private final ThreadCache cache; private final Time time; private final KafkaClientSupplier clientSupplier; + private final KafkaClientInterceptor interceptorSupplier; private final String threadId; private final int threadIdx; private final UUID processId; @@ -78,6 +81,7 @@ class ActiveTaskCreator { final ThreadCache cache, final Time time, final KafkaClientSupplier clientSupplier, + final KafkaClientInterceptor interceptorSupplier, final String threadId, final int threadIdx, final UUID processId, @@ -92,6 +96,7 @@ class ActiveTaskCreator { this.cache = cache; this.time = time; this.clientSupplier = clientSupplier; + this.interceptorSupplier = interceptorSupplier; this.threadId = threadId; this.threadIdx = threadIdx; this.processId = processId; @@ -121,7 +126,11 @@ private Producer producer() { applicationConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + processId + "-" + threadIdx ); } - return clientSupplier.getProducer(producerConfig); + if (clientSupplier != null) { + return clientSupplier.getProducer(producerConfig); + } else { + return interceptorSupplier.wrapProducer(new KafkaProducer<>(producerConfig)); + } } public void reInitializeThreadProducer() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5c58fce1e99c7..1a595116d6a4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -38,6 +39,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaClientInterceptor; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; @@ -355,6 +357,7 @@ public boolean isStartingRunningOrPartitionAssigned() { public static StreamThread create(final TopologyMetadata topologyMetadata, final StreamsConfig config, final KafkaClientSupplier clientSupplier, + final KafkaClientInterceptor interceptorSupplier, final Admin adminClient, final UUID processId, final String clientId, @@ -382,7 +385,14 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, log.info("Creating restore consumer client"); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId)); - final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); + final Consumer restoreConsumer; + if (clientSupplier != null) { + restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); + } else if (interceptorSupplier == null) { + restoreConsumer = new KafkaConsumer<>(restoreConsumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } else { + restoreConsumer = interceptorSupplier.wrapRestoreConsumer(new KafkaConsumer<>(restoreConsumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer())); + } final StoreChangelogReader changelogReader = new StoreChangelogReader( time, @@ -407,6 +417,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, cache, time, clientSupplier, + interceptorSupplier, threadId, threadIdx, processId, @@ -470,7 +481,14 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final Consumer mainConsumer = clientSupplier.getConsumer(consumerConfigs); + final Consumer mainConsumer; + if (clientSupplier != null) { + mainConsumer = clientSupplier.getConsumer(consumerConfigs); + } else if (interceptorSupplier == null) { + mainConsumer = new KafkaConsumer<>(consumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } else { + mainConsumer = interceptorSupplier.wrapMainConsumer(new KafkaConsumer<>(consumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer())); + } taskManager.setMainConsumer(mainConsumer); referenceContainer.mainConsumer = mainConsumer; diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index a37c56443c289..1f2f63e2236e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -49,6 +49,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -59,7 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; @@ -143,7 +144,7 @@ public class KafkaStreamsTest { private static final String CLIENT_ID = "test-client"; private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); - private MockClientSupplier supplier; + private MockClientInterceptor interceptor; private MockTime time; private Properties props; private MockAdminClient adminClient; @@ -183,9 +184,9 @@ public void onChange(final KafkaStreams.State newState, @BeforeEach public void before(final TestInfo testInfo) throws Exception { time = new MockTime(); - supplier = new MockClientSupplier(); - supplier.setCluster(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)))); - adminClient = (MockAdminClient) supplier.getAdmin(null); + interceptor= new MockClientInterceptor(); + interceptor.setCluster(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)))); + adminClient = (MockAdminClient) interceptor.wrapAdminClient(null); streamsStateListener = new StateListenerStub(); props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID + safeUniqueTestName(testInfo)); @@ -247,6 +248,7 @@ private void prepareStreams() { any(TopologyMetadata.class), any(StreamsConfig.class), any(KafkaClientSupplier.class), + any(KafkaClientInterceptor.class), any(Admin.class), any(UUID.class), any(String.class), @@ -282,9 +284,9 @@ private void prepareStreams() { return null; }).when(mock).start(); doAnswer(invocation -> { - supplier.restoreConsumer.close(); + interceptor.restoreConsumer.close(); - for (final MockProducer producer : supplier.producers) { + for (final MockProducer producer : interceptor.producers) { producer.close(); } globalThreadState.set(GlobalStreamThread.State.DEAD); @@ -311,9 +313,9 @@ private AtomicReference prepareStreamThread(final StreamThre private void prepareConsumer(final StreamThread thread, final AtomicReference state) { doAnswer(invocation -> { - supplier.consumer.close(); - supplier.restoreConsumer.close(); - for (final MockProducer producer : supplier.producers) { + interceptor.consumer.close(); + interceptor.restoreConsumer.close(); + for (final MockProducer producer : interceptor.producers) { producer.close(); } state.set(StreamThread.State.DEAD); @@ -359,7 +361,7 @@ public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.close(); assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); @@ -373,7 +375,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Ex final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.setStateListener(streamsStateListener); assertEquals(0, streamsStateListener.numChanges); @@ -455,7 +457,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception builder.globalTable("anyTopic"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class); - final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { streams.close(); waitForCondition( @@ -465,9 +467,9 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception assertThat(appender.getMessages(), not(hasItem(containsString("ERROR")))); } - assertTrue(supplier.consumer.closed()); - assertTrue(supplier.restoreConsumer.closed()); - for (final MockProducer p : supplier.producers) { + assertTrue(interceptor.consumer.closed()); + assertTrue(interceptor.restoreConsumer.closed()); + for (final MockProducer p : interceptor.producers) { assertTrue(p.closed()); } } @@ -487,7 +489,7 @@ public void testStateThreadClose() throws Exception { final StreamsBuilder builder = getBuilderWithSource(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { assertEquals(NUM_THREADS, streams.threads.size()); assertEquals(streams.state(), KafkaStreams.State.CREATED); @@ -529,7 +531,7 @@ public void testStateGlobalThreadClose() throws Exception { builder.globalTable("anyTopic"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class); - final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -567,7 +569,7 @@ public void testInitializesAndDestroysMetricsReporters() { prepareThreadState(streamThreadTwo, state2); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { final int newInitCount = MockMetricsReporter.INIT_COUNT.get(); final int initDiff = newInitCount - oldInitCount; assertEquals(1, initDiff, "some reporters including MockMetricsReporter should be initialized by calling on construction"); @@ -585,7 +587,7 @@ public void testCloseIsIdempotent() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.close(); final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); @@ -601,7 +603,7 @@ public void testPauseResume() { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); streams.pause(); assertTrue(streams.isPaused()); @@ -618,7 +620,7 @@ public void testStartingPaused() { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); // This test shows that a KafkaStreams instance can be started "paused" - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.pause(); streams.start(); assertTrue(streams.isPaused()); @@ -635,7 +637,7 @@ public void testShowPauseResumeAreIdempotent() { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); // This test shows that a KafkaStreams instance can be started "paused" - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); streams.pause(); assertTrue(streams.isPaused()); @@ -656,7 +658,7 @@ public void shouldAddThreadWhenRunning() throws Exception { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final int oldSize = streams.threads.size(); waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); @@ -670,7 +672,7 @@ public void shouldNotAddThreadWhenCreated() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { final int oldSize = streams.threads.size(); assertThat(streams.addStreamThread(), equalTo(Optional.empty())); assertThat(streams.threads.size(), equalTo(oldSize)); @@ -682,7 +684,7 @@ public void shouldNotAddThreadWhenClosed() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { final int oldSize = streams.threads.size(); streams.close(); assertThat(streams.addStreamThread(), equalTo(Optional.empty())); @@ -700,7 +702,7 @@ public void shouldNotAddThreadWhenError() { // make sure we have the global state thread running too final StreamsBuilder builder = getBuilderWithSource(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { final int oldSize = streams.threads.size(); streams.start(); streams.globalStreamThread.shutdown(); @@ -718,7 +720,7 @@ public void shouldNotReturnDeadThreads() { prepareThreadState(streamThreadTwo, state2); prepareThreadLock(streamThreadOne); prepareThreadLock(streamThreadTwo); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); streamThreadOne.shutdown(); final Set threads = streams.metadataForLocalThreads(); @@ -738,7 +740,7 @@ public void shouldRemoveThread() throws Exception { when(streamThreadOne.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true); when(streamThreadOne.isThreadAlive()).thenReturn(true); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final int oldSize = streams.threads.size(); waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, @@ -754,7 +756,7 @@ public void shouldNotRemoveThreadWhenNotRunning() { prepareStreamThread(streamThreadOne, 1); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); try (final KafkaStreams streams = - new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThat(streams.removeStreamThread(), equalTo(Optional.empty())); assertThat(streams.threads.size(), equalTo(1)); } @@ -767,7 +769,7 @@ public void testCannotStartOnceClosed() { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); streams.close(); try { @@ -786,7 +788,7 @@ public void shouldNotSetGlobalRestoreListenerAfterStarting() { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); try { streams.setGlobalStateRestoreListener(null); @@ -804,7 +806,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); } @@ -815,7 +817,7 @@ public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreat prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); } @@ -826,7 +828,7 @@ public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandle prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); } } @@ -836,7 +838,7 @@ public void shouldThrowExceptionSettingStateListenerNotInCreateState() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); try { streams.setStateListener(null); @@ -852,7 +854,7 @@ public void shouldAllowCleanupBeforeStartAndAfterClose() { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { try { streams.cleanUp(); streams.start(); @@ -870,7 +872,7 @@ public void shouldThrowOnCleanupWhileRunning() throws Exception { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -892,7 +894,7 @@ public void shouldThrowOnCleanupWhilePaused() throws Exception { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -915,7 +917,7 @@ public void shouldThrowOnCleanupWhileShuttingDown() throws Exception { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); prepareTerminableThread(streamThreadOne); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -936,11 +938,11 @@ public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeav prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); prepareTerminableThread(streamThreadOne); - final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); + final MockClientInterceptor mockClientInterceptor = spy(MockClientInterceptor.class); - when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient); + when(mockClientInterceptor.wrapAdminClient(any())).thenReturn(adminClient); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientInterceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -965,7 +967,7 @@ public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeav prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); prepareTerminableThread(streamThreadOne); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, @@ -988,7 +990,7 @@ public void shouldNotGetAllTasksWhenNotRunning() throws Exception { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(StreamsNotStartedException.class, streams::metadataForAllStreamsClients); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); @@ -1005,7 +1007,7 @@ public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception { final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(StreamsNotStartedException.class, () -> streams.streamsMetadataForStore("store")); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); @@ -1022,7 +1024,7 @@ public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", new StringSerializer())); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); @@ -1038,7 +1040,7 @@ public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", new StringSerializer())); } @@ -1051,7 +1053,7 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing( final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(0)))); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); @@ -1068,7 +1070,7 @@ public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws Exce final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore()))); @@ -1082,7 +1084,7 @@ public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws Exception final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(StreamsNotStartedException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("store", keyValueStore()))); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); @@ -1105,10 +1107,10 @@ public void shouldReturnEmptyLocalStorePartitionLags() { allFuture.complete(Collections.emptyMap()); final MockAdminClient mockAdminClient = spy(MockAdminClient.class); - final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); - when(mockClientSupplier.getAdmin(any())).thenReturn(mockAdminClient); + final MockClientInterceptor mockClientInterceptor = spy(MockClientInterceptor.class); + when(mockClientInterceptor.wrapAdminClient(any())).thenReturn(mockAdminClient); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientInterceptor, time)) { streams.start(); assertEquals(0, streams.allLocalStorePartitionLags().size()); } @@ -1122,7 +1124,7 @@ public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Excepti prepareTerminableThread(streamThreadOne); // do not use mock time so that it can really elapse - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor)) { assertFalse(streams.close(Duration.ofMillis(10L))); } } @@ -1134,7 +1136,7 @@ public void shouldThrowOnNegativeTimeoutForClose() throws Exception { prepareStreamThread(streamThreadTwo, 2); prepareTerminableThread(streamThreadOne); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(IllegalArgumentException.class, () -> streams.close(Duration.ofMillis(-1L))); } } @@ -1146,7 +1148,7 @@ public void shouldNotBlockInCloseForZeroDuration() throws Exception { prepareStreamThread(streamThreadTwo, 2); prepareTerminableThread(streamThreadOne); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { // with mock time that does not elapse, close would not return if it ever waits on the state transition assertFalse(streams.close(Duration.ZERO)); } @@ -1161,7 +1163,7 @@ public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThread final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ofMillis(10L)); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor)) { assertFalse(streams.close(closeOptions)); } } @@ -1175,7 +1177,7 @@ public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ofMillis(-1L)); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions)); } } @@ -1189,7 +1191,7 @@ public void shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ZERO); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor)) { assertFalse(streams.close(closeOptions)); } } @@ -1201,14 +1203,14 @@ public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreads prepareStreamThread(streamThreadTwo, 2); prepareTerminableThread(streamThreadOne); - final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); + final MockClientInterceptor mockClientInterceptor = spy(MockClientInterceptor.class); - when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient); + when(mockClientInterceptor.wrapAdminClient(any())).thenReturn(adminClient); final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ofMillis(10L)); closeOptions.leaveGroup(true); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientInterceptor)) { assertFalse(streams.close(closeOptions)); } } @@ -1220,13 +1222,13 @@ public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() prepareStreamThread(streamThreadTwo, 2); prepareTerminableThread(streamThreadOne); - final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); - when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient); + final MockClientInterceptor mockClientInterceptor = spy(MockClientInterceptor.class); + when(mockClientInterceptor.wrapAdminClient(any())).thenReturn(adminClient); final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ofMillis(-1L)); closeOptions.leaveGroup(true); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientInterceptor, time)) { assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions)); } } @@ -1238,14 +1240,14 @@ public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() prepareStreamThread(streamThreadTwo, 2); prepareTerminableThread(streamThreadOne); - final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); + final MockClientInterceptor mockClientInterceptor = spy(MockClientInterceptor.class); - when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient); + when(mockClientInterceptor.wrapAdminClient(any())).thenReturn(adminClient); final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ZERO); closeOptions.leaveGroup(true); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientInterceptor)) { assertFalse(streams.close(closeOptions)); } } @@ -1268,7 +1270,7 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() thro builder.table("topic", Materialized.as("store")); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.DEBUG.name()); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); } @@ -1280,6 +1282,7 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() thro } } + @SuppressWarnings("deprecation") @Test public void shouldGetClientSupplierFromConfigForConstructor() throws Exception { prepareStreams(); @@ -1290,7 +1293,7 @@ public void shouldGetClientSupplierFromConfigForConstructor() throws Exception { final StreamsConfig config = new StreamsConfig(props); final StreamsConfig mockConfig = spy(config); - when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier); + when(mockConfig.getKafkaClientSupplier()).thenReturn(new DefaultKafkaClientSupplier()); try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig)) { // no-op @@ -1299,6 +1302,7 @@ public void shouldGetClientSupplierFromConfigForConstructor() throws Exception { verify(mockConfig, times(2)).getKafkaClientSupplier(); } + @SuppressWarnings("deprecation") @Test public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exception { prepareStreams(); @@ -1309,7 +1313,7 @@ public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exc final StreamsConfig config = new StreamsConfig(props); final StreamsConfig mockConfig = spy(config); - when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier); + when(mockConfig.getKafkaClientSupplier()).thenReturn(new DefaultKafkaClientSupplier()); try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, time)) { // no-op @@ -1318,6 +1322,7 @@ public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exc verify(mockConfig, times(2)).getKafkaClientSupplier(); } + @SuppressWarnings("deprecation") @Test public void shouldUseProvidedClientSupplier() throws Exception { prepareStreams(); @@ -1329,7 +1334,7 @@ public void shouldUseProvidedClientSupplier() throws Exception { final StreamsConfig config = new StreamsConfig(props); final StreamsConfig mockConfig = spy(config); - try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier)) { + try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, interceptor)) { // no-op } // It's called once in above when mock @@ -1351,7 +1356,7 @@ public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() { builder.table("topic", Materialized.as("store")); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.INFO.name()); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); } executorsMockedStatic.verify(() -> Executors.newSingleThreadScheduledExecutor(any(ThreadFactory.class))); @@ -1376,7 +1381,7 @@ public void shouldCleanupOldStateDirs() { final StreamsBuilder builder = new StreamsBuilder(); builder.table("topic", Materialized.as("store")); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { streams.start(); } } @@ -1451,7 +1456,7 @@ public void statefulTopologyShouldCreateStateDirectory(final TestInfo testInfo) @Test public void shouldThrowTopologyExceptionOnEmptyTopology() { prepareStreams(); - try (final KafkaStreams ignored = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) { + try (final KafkaStreams ignored = new KafkaStreams(new StreamsBuilder().build(), props, interceptor, time)) { fail("Should have thrown TopologyException"); } catch (final TopologyException e) { assertThat( @@ -1466,7 +1471,7 @@ public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() { prepareStreams(); final StreamsBuilder builder = new StreamsBuilder(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { assertThat(streams.threads.size(), equalTo(0)); } } @@ -1476,7 +1481,7 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws Exception { prepareStreams(); final StreamsBuilder builder = new StreamsBuilder(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { assertThat(streams.threads.size(), equalTo(0)); assertEquals(streams.state(), KafkaStreams.State.CREATED); @@ -1500,7 +1505,7 @@ public void shouldThrowOnClientInstanceIdsWithNegativeTimeout() { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { final IllegalArgumentException error = assertThrows( IllegalArgumentException.class, () -> streams.clientInstanceIds(Duration.ofMillis(-1L)) @@ -1518,7 +1523,7 @@ public void shouldThrowOnClientInstanceIdsWhenNotStarted() { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { final IllegalStateException error = assertThrows( IllegalStateException.class, () -> streams.clientInstanceIds(Duration.ZERO) @@ -1536,7 +1541,7 @@ public void shouldThrowOnClientInstanceIdsWhenClosed() { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.close(); final IllegalStateException error = assertThrows( @@ -1556,7 +1561,7 @@ public void shouldThrowStreamsExceptionWhenAdminNotInitialized() { prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final StreamsException error = assertThrows( @@ -1584,7 +1589,7 @@ public void shouldNotCrashButThrowLaterIfAdminTelemetryDisabled() { // set threads to zero to simplify set setup props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ZERO); @@ -1609,7 +1614,7 @@ public void shouldThrowTimeExceptionWhenAdminTimesOut() { adminClient.setClientInstanceId(Uuid.randomUuid()); adminClient.injectTimeoutException(1); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); assertThrows( @@ -1627,7 +1632,7 @@ public void shouldReturnAdminInstanceID() { // set threads to zero to simplify set setup props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); assertThat( @@ -1647,7 +1652,7 @@ public void shouldThrowTimeoutExceptionWhenMainConsumerFutureDoesNotComplete() { .thenReturn(Collections.singletonMap("consumer", new KafkaFutureImpl<>())); adminClient.setClientInstanceId(Uuid.randomUuid()); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final TimeoutException timeoutException = assertThrows( TimeoutException.class, @@ -1673,7 +1678,7 @@ public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() final StreamsBuilder builder = getBuilderWithSource(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { streams.start(); when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any())) @@ -1697,7 +1702,7 @@ public void shouldThrowTimeoutExceptionWhenThreadProducerFutureDoesNotComplete() when(streamThreadOne.producersClientInstanceIds(any())).thenReturn(new KafkaFutureImpl<>()); adminClient.setClientInstanceId(Uuid.randomUuid()); - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, interceptor, time)) { streams.start(); final TimeoutException timeoutException = assertThrows( @@ -1797,7 +1802,7 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) { final StreamsBuilder builder = getBuilderWithSource(); builder.globalTable("anyTopic"); - try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, interceptor, time)) { streams.start(); when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any())) @@ -1886,7 +1891,7 @@ private void startStreamsAndCheckDirExists(final Topology topology, final boolea assertEquals(shouldFilesExist, context.arguments().get(2)); })) { - try (final KafkaStreams ignored = new KafkaStreams(topology, props, supplier, time)) { + try (final KafkaStreams ignored = new KafkaStreams(topology, props, interceptor, time)) { // verify that stateDirectory constructor was called assertFalse(stateDirectoryMockedConstruction.constructed().isEmpty()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index ffb1e4496226a..08a49e94f57c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -1201,6 +1201,7 @@ public void shouldReturnTaskAssignorClass() { assertEquals("LegacyStickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG)); } + @SuppressWarnings("deprecation") @Test public void shouldReturnDefaultClientSupplier() { final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 7fff8099a16df..3d05d047587aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.Kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -27,6 +28,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.streams.KafkaClientInterceptor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -119,11 +121,11 @@ public static void closeCluster() { private static final String PARTITIONED_TOPIC_1 = "partitioned-1"; private static final String PARTITIONED_TOPIC_2 = "partitioned-2"; - private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); + private static final String STRING_SERDE_CLASSNAME = Serdes.StringSerde.class.getName(); private Properties streamsConfiguration; private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated"; private KafkaStreams streams; - private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0); + private static final AtomicInteger topicSuffixGenerator = new AtomicInteger(0); private String outputTopic; @BeforeEach @@ -174,9 +176,9 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception { pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List assignedTopics = new CopyOnWriteArrayList<>(); - streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + streams = new KafkaStreams(builder.build(), streamsConfiguration, new KafkaClientInterceptor() { @Override - public Consumer getConsumer(final Map config) { + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @Override public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { @@ -210,7 +212,7 @@ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopol final StreamsBuilder builder = new StreamsBuilder(); final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); - final KStream otherStream = builder.stream(Pattern.compile("not-a-match")); + builder.stream(Pattern.compile("not-a-match")); pattern1Stream .selectKey((k, v) -> k) @@ -274,9 +276,9 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception { pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); - streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + streams = new KafkaStreams(builder.build(), streamsConfiguration, new KafkaClientInterceptor() { @Override - public Consumer getConsumer(final Map config) { + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @Override public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { @@ -387,9 +389,9 @@ public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception final List leaderAssignment = new CopyOnWriteArrayList<>(); final List followerAssignment = new CopyOnWriteArrayList<>(); - partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), streamsConfiguration, new KafkaClientInterceptor() { @Override - public Consumer getConsumer(final Map config) { + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @Override public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { @@ -399,9 +401,9 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list } }); - partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), streamsConfiguration, new KafkaClientInterceptor() { @Override - public Consumer getConsumer(final Map config) { + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @Override public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index e2d5bf746871f..f81f166b2ea4f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -309,6 +309,7 @@ private void createTasks() { new ThreadCache(new LogContext(), 0L, streamsMetrics), new MockTime(), mockClientSupplier, + null, "clientId-StreamThread-0", 0, uuid, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java index b46b92d9de76d..03e85639d1b39 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -102,7 +102,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest { emptySet()); private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); - private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); private static final String USER_END_POINT = "localhost:8080"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; @@ -158,7 +158,7 @@ private void overwriteInternalTopicManagerWithMock() { final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java index 65786cf79b810..5ed3fe2faec19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -118,7 +118,7 @@ public class RackAwarenessStreamsPartitionAssignorTest { } private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); - private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); private static final String USER_END_POINT = "localhost:8080"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; @@ -171,7 +171,7 @@ private void overwriteInternalTopicManagerWithMock() { final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index ffd5a2f6bc7cd..e5b60d84531ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; @@ -84,7 +85,7 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockApiProcessor; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.MockStandbyUpdateListener; import org.apache.kafka.test.MockStateRestoreListener; @@ -186,7 +187,7 @@ public class StreamThreadTest { private final Metrics metrics = new Metrics(); private final MockTime mockTime = new MockTime(); private final String stateDir = TestUtils.tempDirectory().getPath(); - private final MockClientSupplier clientSupplier = new MockClientSupplier(); + private final MockClientInterceptor clientInterceptor = new MockClientInterceptor(); private final ConsumedInternal consumed = new ConsumedInternal<>(); private final ChangelogReader changelogReader = new MockChangelogReader(); private StateDirectory stateDirectory = null; @@ -298,11 +299,7 @@ private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final StreamsConfig config, final Time time) { - if (!StreamsConfig.AT_LEAST_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { - clientSupplier.setApplicationIdForProducer(APPLICATION_ID); - } - - clientSupplier.setCluster(createCluster()); + clientInterceptor.setCluster(createCluster()); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, @@ -322,8 +319,9 @@ private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") return StreamThread.create( topologyMetadata, config, - clientSupplier, - clientSupplier.getAdmin(config.getAdminConfigs(clientId)), + null, + clientInterceptor, + clientInterceptor.wrapAdminClient(null), PROCESS_ID, clientId, streamsMetrics, @@ -721,8 +719,9 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final lenient().when(consumer.poll(any())).thenReturn(ConsumerRecords.empty()); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); - final MockConsumerClientSupplier mockClientSupplier = new MockConsumerClientSupplier(consumer); - mockClientSupplier.setCluster(createCluster()); + final MockConsumerClientInterceptor mockClientInterceptor= new MockConsumerClientInterceptor(consumer); + mockClientInterceptor.configure(config.originals()); + mockClientInterceptor.setCluster(createCluster()); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); @@ -737,8 +736,9 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final thread = StreamThread.create( topologyMetadata, config, - mockClientSupplier, - mockClientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), + null, + mockClientInterceptor, + clientInterceptor.wrapAdminClient(null), PROCESS_ID, CLIENT_ID, streamsMetrics, @@ -753,7 +753,7 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final mockExceptionHandler ); - mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L); + mockClientInterceptor.nextRebalanceMs().set(mockTime.milliseconds() - 1L); thread.start(); TestUtils.waitForCondition( @@ -785,8 +785,9 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stat final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); - final MockConsumerClientSupplier mockClientSupplier = new MockConsumerClientSupplier(consumer); - mockClientSupplier.setCluster(createCluster()); + final MockConsumerClientInterceptor mockClientInterceptor = new MockConsumerClientInterceptor(consumer); + mockClientInterceptor.configure(config.originals()); + mockClientInterceptor.setCluster(createCluster()); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); @@ -799,8 +800,9 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stat thread = StreamThread.create( topologyMetadata, config, - mockClientSupplier, - mockClientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), + null, + mockClientInterceptor, + clientInterceptor.wrapAdminClient(null), PROCESS_ID, CLIENT_ID, streamsMetrics, @@ -815,7 +817,7 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stat null ); - mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L); + mockClientInterceptor.nextRebalanceMs().set(mockTime.milliseconds() - 1L); thread.taskManager().handleRebalanceStart(emptySet()); thread.start(); @@ -833,7 +835,7 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stat // Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we // don't trigger one before we can shut down, since the rebalance must be ended // for the thread to fully shut down - assertThat(mockClientSupplier.nextRebalanceMs().get(), not(0L)); + assertThat(mockClientInterceptor.nextRebalanceMs().get(), not(0L)); thread.taskManager().handleRebalanceComplete(); @@ -844,22 +846,20 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stat } - private static class MockConsumerClientSupplier extends MockClientSupplier { + private static class MockConsumerClientInterceptor extends MockClientInterceptor { final Consumer mockConsumer; - final Map consumerConfigs = new HashMap<>(); - MockConsumerClientSupplier(final Consumer mockConsumer) { + MockConsumerClientInterceptor(final Consumer mockConsumer) { this.mockConsumer = mockConsumer; } @Override - public Consumer getConsumer(final Map config) { - consumerConfigs.putAll(config); + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { return mockConsumer; } AtomicLong nextRebalanceMs() { - return ((ReferenceContainer) consumerConfigs.get( + return ((ReferenceContainer) config.get( StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR) ).nextScheduledRebalanceMs; } @@ -1255,13 +1255,13 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions)); - assertEquals(1, clientSupplier.producers.size()); - final Producer globalProducer = clientSupplier.producers.get(0); + assertEquals(1, clientInterceptor.producers.size()); + final Producer globalProducer = clientInterceptor.producers.get(0); for (final Task task : thread.readOnlyActiveTasks()) { assertSame(globalProducer, ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()); } - assertSame(clientSupplier.consumer, thread.mainConsumer()); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); + assertSame(clientInterceptor.consumer, thread.mainConsumer()); + assertSame(clientInterceptor.restoreConsumer, thread.restoreConsumer()); } @ParameterizedTest @@ -1296,9 +1296,9 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl runOnce(processingThreadsEnabled); - assertThat(clientSupplier.producers.size(), is(1)); - assertSame(clientSupplier.consumer, thread.mainConsumer()); - assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer()); + assertThat(clientInterceptor.producers.size(), is(1)); + assertSame(clientInterceptor.consumer, thread.mainConsumer()); + assertSame(clientInterceptor.restoreConsumer, thread.restoreConsumer()); } @ParameterizedTest @@ -1528,7 +1528,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); - final MockConsumer consumer = clientSupplier.consumer; + final MockConsumer consumer = clientInterceptor.consumer; consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); @@ -1551,7 +1551,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr runOnce(processingThreadsEnabled); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); - final MockProducer producer = clientSupplier.producers.get(0); + final MockProducer producer = clientInterceptor.producers.get(0); // change consumer subscription from "pattern" to "manual" to be able to call .addRecords() consumer.updateBeginningOffsets(Collections.singletonMap(assignedPartitions.iterator().next(), 0L)); @@ -1620,7 +1620,7 @@ private void testThrowingDurringCommitTransactionException(final RuntimeExceptio // need to process a record to enable committing addRecord(mockConsumer, 0L); - final MockProducer producer = clientSupplier.producers.get(0); + final MockProducer producer = clientInterceptor.producers.get(0); runOnce(processingThreadsEnabled); if (processingThreadsEnabled) { TestUtils.waitForCondition(() -> !producer.uncommittedRecords().isEmpty(), "Processing threads to process record"); @@ -1769,7 +1769,7 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); - final MockConsumer consumer = clientSupplier.consumer; + final MockConsumer consumer = clientInterceptor.consumer; consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); @@ -1792,7 +1792,7 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm runOnce(processingThreadsEnabled); assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); - final MockProducer producer = clientSupplier.producers.get(0); + final MockProducer producer = clientInterceptor.producers.get(0); producer.commitTransactionException = e; mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); @@ -1810,9 +1810,9 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm assertThat(producer.commitCount(), equalTo(0L)); - assertTrue(clientSupplier.producers.get(0).transactionInFlight()); - assertFalse(clientSupplier.producers.get(0).transactionCommitted()); - assertFalse(clientSupplier.producers.get(0).closed()); + assertTrue(clientInterceptor.producers.get(0).transactionInFlight()); + assertFalse(clientInterceptor.producers.get(0).transactionCommitted()); + assertFalse(clientInterceptor.producers.get(0).closed()); assertEquals(1, thread.readOnlyActiveTasks().size()); } @@ -1860,7 +1860,7 @@ public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterE // need to process a record to enable committing addRecord(mockConsumer, 0L); - final MockProducer producer = clientSupplier.producers.get(0); + final MockProducer producer = clientInterceptor.producers.get(0); if (processingThreadsEnabled) { assertTrue(runUntilTimeoutOrCondition(() -> runOnce(processingThreadsEnabled), () -> !producer.history().isEmpty())); @@ -1881,7 +1881,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - clientSupplier.setCluster(createCluster()); + clientInterceptor.setCluster(createCluster()); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, @@ -1901,8 +1901,9 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU thread = StreamThread.create( topologyMetadata, config, - clientSupplier, - clientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), + null, + clientInterceptor, + clientInterceptor.wrapAdminClient(null), PROCESS_ID, CLIENT_ID, streamsMetrics, @@ -1960,7 +1961,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean state internalStreamsBuilder.buildAndOptimizeTopology(); thread = createStreamThread(CLIENT_ID, config, new MockTime(1)); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; + final MockConsumer restoreConsumer = clientInterceptor.restoreConsumer; restoreConsumer.updatePartitions( STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, Collections.singletonList( @@ -2011,7 +2012,7 @@ public void shouldUpdateStandbyTask(final boolean stateUpdaterEnabled, final boo final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; + final MockConsumer restoreConsumer = clientInterceptor.restoreConsumer; setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, false); @@ -2134,7 +2135,7 @@ public void shouldNotUpdateStandbyTaskWhenPaused(final boolean stateUpdaterEnabl final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; + final MockConsumer restoreConsumer = clientInterceptor.restoreConsumer; setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, true); @@ -2252,8 +2253,8 @@ public void process(final Record record) {} thread.taskManager().handleAssignment(activeTasks, emptyMap()); - clientSupplier.consumer.assign(assignedPartitions); - clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + clientInterceptor.consumer.assign(assignedPartitions); + clientInterceptor.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); runOnce(processingThreadsEnabled); @@ -2262,7 +2263,7 @@ public void process(final Record record) {} assertEquals(0, punctuatedWallClockTime.size()); mockTime.sleep(100L); - clientSupplier.consumer.addRecord(new ConsumerRecord<>( + clientInterceptor.consumer.addRecord(new ConsumerRecord<>( topic1, 1, 100L, @@ -2336,8 +2337,8 @@ public void close() {} thread.taskManager().handleAssignment(activeTasks, emptyMap()); - clientSupplier.consumer.assign(assignedPartitions); - clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + clientInterceptor.consumer.assign(assignedPartitions); + clientInterceptor.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); runOnce(processingThreadsEnabled); @@ -2349,7 +2350,7 @@ public void close() {} assertEquals(1, peekedContextTime.size()); assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); - clientSupplier.consumer.addRecord(new ConsumerRecord<>( + clientInterceptor.consumer.addRecord(new ConsumerRecord<>( topic1, 1, 110L, @@ -3323,14 +3324,14 @@ public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final b private void getMainAndRestoreConsumerInstanceId(final boolean injectTimeException, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { final Uuid consumerInstanceId = Uuid.randomUuid(); - clientSupplier.consumer.setClientInstanceId(consumerInstanceId); + clientInterceptor.consumer.setClientInstanceId(consumerInstanceId); if (injectTimeException) { - clientSupplier.consumer.injectTimeoutException(1); + clientInterceptor.consumer.injectTimeoutException(1); } final Uuid restoreInstanceId = Uuid.randomUuid(); - clientSupplier.restoreConsumer.setClientInstanceId(restoreInstanceId); + clientInterceptor.restoreConsumer.setClientInstanceId(restoreInstanceId); if (injectTimeException) { - clientSupplier.restoreConsumer.injectTimeoutException(1); + clientInterceptor.restoreConsumer.injectTimeoutException(1); } thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); @@ -3369,7 +3370,7 @@ private void getProducerInstanceId(final boolean injectTimeException, final bool if (injectTimeException) { producer.injectTimeoutException(1); } - clientSupplier.prepareProducer(producer); + clientInterceptor.prepareProducer(producer); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3436,7 +3437,7 @@ public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean st @ParameterizedTest @MethodSource("data") public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - clientSupplier.consumer.disableTelemetry(); + clientInterceptor.consumer.disableTelemetry(); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3452,7 +3453,7 @@ public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateU @ParameterizedTest @MethodSource("data") public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - clientSupplier.restoreConsumer.disableTelemetry(); + clientInterceptor.restoreConsumer.disableTelemetry(); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3471,7 +3472,7 @@ public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean sta public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { final MockProducer producer = new MockProducer<>(); producer.disableTelemetry(); - clientSupplier.prepareProducer(producer); + clientInterceptor.prepareProducer(producer); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3488,8 +3489,8 @@ public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdat @ParameterizedTest @MethodSource("data") public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid()); - clientSupplier.consumer.injectTimeoutException(-1); + clientInterceptor.consumer.setClientInstanceId(Uuid.randomUuid()); + clientInterceptor.consumer.injectTimeoutException(-1); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3512,8 +3513,8 @@ public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnab @ParameterizedTest @MethodSource("data") public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid()); - clientSupplier.restoreConsumer.injectTimeoutException(-1); + clientInterceptor.restoreConsumer.setClientInstanceId(Uuid.randomUuid()); + clientInterceptor.restoreConsumer.injectTimeoutException(-1); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); @@ -3539,7 +3540,7 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final MockProducer producer = new MockProducer<>(); producer.setClientInstanceId(Uuid.randomUuid()); producer.injectTimeoutException(-1); - clientSupplier.prepareProducer(producer); + clientInterceptor.prepareProducer(producer); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java index b195600c3925e..216f8cd31b727 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -219,7 +219,7 @@ private void completeLargeAssignment(final int numPartitions, final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( new MockTime(), new StreamsConfig(configMap), - new MockClientSupplier().restoreConsumer, + new MockClientInterceptor().restoreConsumer, false )); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 4702681a650af..71126af5f6e8b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -72,7 +72,7 @@ import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -222,7 +222,7 @@ public class StreamsPartitionAssignorTest { ); private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); - private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); private static final String USER_END_POINT = "localhost:8080"; private static final String OTHER_END_POINT = "other:9090"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; @@ -313,7 +313,7 @@ private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boo final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps(parameterizedConfig)), - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, mockCreateInternalTopics )); @@ -1697,7 +1697,7 @@ public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut(fin final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, new StreamsConfig(configProps(parameterizedConfig)), - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ) { @Override @@ -1736,7 +1736,7 @@ public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut(final final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, config, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ) { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index acd790a4a44c2..a08442ac26cda 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index a43dd4a527812..620e2d6d42b9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.hamcrest.BaseMatcher; @@ -713,11 +713,11 @@ static Map configProps(final String rackAwareConfig, final int r static InternalTopicManager mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int tpSize, final int partitionSize) { final MockTime time = new MockTime(); final StreamsConfig streamsConfig = new StreamsConfig(configProps(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); @@ -897,11 +897,11 @@ static Map> getTaskChangelogMapForAllTasks() { static InternalTopicManager mockInternalTopicManagerForChangelog() { final MockTime time = new MockTime(); final StreamsConfig streamsConfig = new StreamsConfig(configProps(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 30b0ed2104c67..d4796503171e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.junit.jupiter.api.Assertions; @@ -124,7 +124,7 @@ public class RackAwareTaskAssignorTest { private final MockTime time = new MockTime(); private final StreamsConfig streamsConfig = new StreamsConfig(configProps(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)); - private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); private int trafficCost; private int nonOverlapCost; @@ -132,7 +132,7 @@ public class RackAwareTaskAssignorTest { private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index dd703893e9abb..052ad78ab8371 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockClientInterceptor; import org.apache.kafka.test.MockInternalTopicManager; import org.junit.jupiter.api.BeforeAll; @@ -160,11 +160,11 @@ private static Harness initializeCluster(final int numStatelessTasks, final MockTime time = new MockTime(); final StreamsConfig streamsConfig = new StreamsConfig(configProps(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientInterceptor mockClientInterceptor = new MockClientInterceptor(); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( time, streamsConfig, - mockClientSupplier.restoreConsumer, + mockClientInterceptor.restoreConsumer, false ); final InternalTopicManager spyTopicManager = spy(mockInternalTopicManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 7603cd63fc892..db2d57e556363 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -210,7 +210,7 @@ public void cleanUp() throws IOException { @Test public void shouldFindKeyValueStores() { - mockThread(true); + mockThread(); final List> kvStores = provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore())); assertEquals(2, kvStores.size()); @@ -222,7 +222,7 @@ public void shouldFindKeyValueStores() { @Test public void shouldFindTimestampedKeyValueStores() { - mockThread(true); + mockThread(); final List>> tkvStores = provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore())); assertEquals(2, tkvStores.size()); @@ -234,7 +234,7 @@ public void shouldFindTimestampedKeyValueStores() { @Test public void shouldNotFindKeyValueStoresAsTimestampedStore() { - mockThread(true); + mockThread(); final InvalidStateStoreException exception = assertThrows( InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore())) @@ -252,7 +252,7 @@ public void shouldNotFindKeyValueStoresAsTimestampedStore() { @Test public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() { - mockThread(true); + mockThread(); final List>> tkvStores = provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.keyValueStore())); assertEquals(2, tkvStores.size()); @@ -264,7 +264,7 @@ public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() { @Test public void shouldFindWindowStores() { - mockThread(true); + mockThread(); final List> windowStores = provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore())); assertEquals(2, windowStores.size()); @@ -276,7 +276,7 @@ public void shouldFindWindowStores() { @Test public void shouldFindTimestampedWindowStores() { - mockThread(true); + mockThread(); final List>> windowStores = provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore())); assertEquals(2, windowStores.size()); @@ -288,7 +288,7 @@ public void shouldFindTimestampedWindowStores() { @Test public void shouldNotFindWindowStoresAsTimestampedStore() { - mockThread(true); + mockThread(); final InvalidStateStoreException exception = assertThrows( InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore())) @@ -306,7 +306,7 @@ public void shouldNotFindWindowStoresAsTimestampedStore() { @Test public void shouldFindTimestampedWindowStoresAsWindowStore() { - mockThread(true); + mockThread(); final List>> windowStores = provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.windowStore())); assertEquals(2, windowStores.size()); @@ -318,7 +318,7 @@ public void shouldFindTimestampedWindowStoresAsWindowStore() { @Test public void shouldFindSessionStores() { - mockThread(true); + mockThread(); final List> sessionStores = provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore())); assertEquals(2, sessionStores.size()); @@ -329,7 +329,7 @@ public void shouldFindSessionStores() { @Test public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() { - mockThread(true); + mockThread(); taskOne.store("kv-store").close(); assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()))); @@ -337,7 +337,7 @@ public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() { @Test public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() { - mockThread(true); + mockThread(); taskOne.store("timestamped-kv-store").close(); assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()))); @@ -345,7 +345,7 @@ public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() { @Test public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() { - mockThread(true); + mockThread(); taskOne.store("window-store").close(); assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore()))); @@ -353,7 +353,7 @@ public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() { @Test public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() { - mockThread(true); + mockThread(); taskOne.store("timestamped-window-store").close(); assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()))); @@ -361,7 +361,7 @@ public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() { @Test public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() { - mockThread(true); + mockThread(); taskOne.store("session-store").close(); assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore()))); @@ -369,7 +369,7 @@ public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() { @Test public void shouldReturnEmptyListIfNoStoresFoundWithName() { - mockThread(true); + mockThread(); assertEquals( Collections.emptyList(), provider.stores(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()))); @@ -377,7 +377,7 @@ public void shouldReturnEmptyListIfNoStoresFoundWithName() { @Test public void shouldReturnSingleStoreForPartition() { - mockThread(true); + mockThread(); { final List> kvStores = provider.stores( @@ -406,7 +406,7 @@ public void shouldReturnSingleStoreForPartition() { @Test public void shouldReturnEmptyListForInvalidPartitions() { - mockThread(true); + mockThread(); assertEquals( Collections.emptyList(), provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(2)) @@ -486,11 +486,9 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, ); } - private void mockThread(final boolean initialized) { + private void mockThread() { when(threadMock.readOnlyActiveTasks()).thenReturn(new HashSet<>(tasks.values())); - when(threadMock.state()).thenReturn( - initialized ? StreamThread.State.RUNNING : StreamThread.State.PARTITIONS_ASSIGNED - ); + when(threadMock.state()).thenReturn(StreamThread.State.RUNNING); } private void configureClients(final MockConsumer restoreConsumer, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 9c359faca81ad..a4e37a4e8cbbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KafkaClientInterceptor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -38,7 +38,6 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.TaskManager; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; @@ -124,15 +123,15 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) { config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); - final KafkaClientSupplier kafkaClientSupplier; + final KafkaClientInterceptor clientInterceptor; if (streamsProperties.containsKey("test.future.metadata")) { - kafkaClientSupplier = new FutureKafkaClientSupplier(); + clientInterceptor = new FutureKafkaClientInterceptor(); } else { - kafkaClientSupplier = new DefaultKafkaClientSupplier(); + clientInterceptor = new KafkaClientInterceptor(); } config.putAll(streamsProperties); - return new KafkaStreams(builder.build(), config, kafkaClientSupplier); + return new KafkaStreams(builder.build(), config, clientInterceptor); } private static void buildFKTable(final KStream primaryTable, @@ -144,9 +143,10 @@ private static void buildFKTable(final KStream primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } - private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { + private static class FutureKafkaClientInterceptor extends KafkaClientInterceptor { @Override - public Consumer getConsumer(final Map config) { + public Consumer wrapMainConsumer(final KafkaConsumer consumer) { + // TODO: verify if this test setup will work with KIP-1071 config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, FutureStreamsPartitionAssignor.class.getName()); return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientInterceptor.java b/streams/src/test/java/org/apache/kafka/test/MockClientInterceptor.java new file mode 100644 index 0000000000000..7858138430d0a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockClientInterceptor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.streams.KafkaClientInterceptor; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class MockClientInterceptor extends KafkaClientInterceptor { + private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); + + private Cluster cluster; + private String applicationId; + + public MockAdminClient adminClient = new MockAdminClient(); + private final List> preparedProducers = new LinkedList<>(); + public final List> producers = new LinkedList<>(); + public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + public void setCluster(final Cluster cluster) { + this.cluster = cluster; + this.adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(-1)); + } + + @Override + public void configure(final Map config) { + super.configure(config); + this.applicationId = (String) config.get(StreamsConfig.APPLICATION_ID_CONFIG); + } + + @Override + public Admin wrapAdminClient(final KafkaAdminClient admin) { + return adminClient; + } + + public void prepareProducer(final MockProducer producer) { + preparedProducers.add(producer); + } + + @Override + public Producer wrapProducer(final KafkaProducer kafkaProducer) { + final MockProducer producer; + if (preparedProducers.isEmpty()) { + producer = new MockProducer<>(cluster, true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); + } else { + producer = preparedProducers.remove(0); + } + + producers.add(producer); + return producer; + } + + @Override + public Consumer wrapMainConsumer(final KafkaConsumer kafkaConsumer) { + return consumer; + } + + @Override + public Consumer wrapRestoreConsumer(final KafkaConsumer kafkaConsumer) { + return restoreConsumer; + } + + @Override + public Consumer wrapGlobalConsumer(final KafkaConsumer kafkaConsumer) { + return restoreConsumer; + } +} diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 88f24e8d9bde4..b530e26149b6a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -32,8 +32,8 @@ import java.util.List; import java.util.Map; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertFalse; public class MockClientSupplier implements KafkaClientSupplier { @@ -43,7 +43,7 @@ public class MockClientSupplier implements KafkaClientSupplier { private String applicationId; public MockAdminClient adminClient = new MockAdminClient(); - private List> preparedProducers = new LinkedList<>(); + private final List> preparedProducers = new LinkedList<>(); public final List> producers = new LinkedList<>(); public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index 8b049a01ee494..ded6bdccfbb14 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Time; @@ -40,12 +41,18 @@ public MockInternalTopicManager(final Time time, final StreamsConfig streamsConfig, final MockConsumer restoreConsumer, final boolean mockCreateInternalTopics) { - super(time, new MockClientSupplier().getAdmin(streamsConfig.originals()), streamsConfig); + super(time, admin(streamsConfig.originals()), streamsConfig); this.restoreConsumer = restoreConsumer; this.mockCreateInternalTopics = mockCreateInternalTopics; } + private static Admin admin(final Map config) { + final MockClientInterceptor clientInterceptor = new MockClientInterceptor(); + clientInterceptor.configure(config); + return clientInterceptor.wrapAdminClient(null); + } + @Override public Set makeReady(final Map topics) { for (final InternalTopicConfig topic : topics.values()) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 6c797a0a280b6..3592cb20c5ef2 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;