diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 69012f0c3135a..40b6fad5a3aca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -983,6 +983,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, metrics, clientId, processId.toString(), + applicationId, time ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index a0999a36c60bd..c56f079b0644d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -90,6 +90,7 @@ public int hashCode() { private final Map parentSensors; private final String clientId; private final String processId; + private final String applicationId; private final Version version; private final Deque clientLevelMetrics = new LinkedList<>(); @@ -118,6 +119,7 @@ public int hashCode() { public static final String CLIENT_ID_TAG = "client-id"; public static final String PROCESS_ID_TAG = "process-id"; + public static final String APPLICATION_ID_TAG = "application-id"; public static final String THREAD_ID_TAG = "thread-id"; public static final String TASK_ID_TAG = "task-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -168,11 +170,13 @@ public int hashCode() { public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String processId, + final String applicationId, final Time time) { Objects.requireNonNull(metrics, "Metrics cannot be null"); this.metrics = metrics; this.clientId = clientId; this.processId = processId; + this.applicationId = applicationId; version = Version.LATEST; rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time); @@ -284,6 +288,7 @@ public Map clientLevelTagMap() { final Map tagMap = new LinkedHashMap<>(); tagMap.put(CLIENT_ID_TAG, clientId); tagMap.put(PROCESS_ID_TAG, processId); + tagMap.put(APPLICATION_ID_TAG, applicationId); return tagMap; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index dd3960c17ec4f..c2aaf7f7373e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time); private final String threadId = Thread.currentThread().getName(); private final Initializer initializer = () -> 0L; private final Aggregator aggregator = (aggKey, value, aggregate) -> aggregate + 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 362c32592ca8e..2d8375939d6c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest { private ChangelogReader changeLogReader; private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime()); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime()); private final Map properties = mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index b6d41966257a5..cd10f2a77039a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -106,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index ff428828e05b8..6ec219a8cd917 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -132,7 +132,7 @@ public void process(final Record record) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), time, "clientId", stateRestoreListener, @@ -169,7 +169,7 @@ public List partitionsFor(final String topic) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), time, "clientId", stateRestoreListener, @@ -418,7 +418,7 @@ public List partitionsFor(final String topic) { consumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), time, "clientId", stateRestoreListener, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java index a2e6820f901df..54238d8bd6b61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -23,6 +23,6 @@ public class MockStreamsMetrics extends StreamsMetricsImpl { public MockStreamsMetrics(final Metrics metrics) { - super(metrics, "test", "processId", new MockTime()); + super(metrics, "test", "processId", "applicationId", new MockTime()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 6b7c06580c02e..ccd47e60d9f63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -279,7 +279,7 @@ public Set partitions() { public void testMetricsWithBuiltInMetricsVersionLatest() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet()); @@ -363,7 +363,7 @@ public void process(final Record record) { public void testTopologyLevelClassCastExceptionDirect() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet()); @@ -441,7 +441,7 @@ private InternalProcessorContext mockInternalProcessorContext() final InternalProcessorContext internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); when(internalProcessorContext.taskId()).thenReturn(TASK_ID); - when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime())); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime())); when(internalProcessorContext.topic()).thenReturn(TOPIC); when(internalProcessorContext.partition()).thenReturn(PARTITION); when(internalProcessorContext.offset()).thenReturn(OFFSET); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 1dd19fb9cf7e0..9cd4e5fbc63b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -72,7 +72,7 @@ public class RecordQueueTest { private final Metrics metrics = new Metrics(); private final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>( StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index a509d14c97437..d10b2800dc9ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) { public void shouldExposeProcessMetrics() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 6be27187f0fc7..df7f83d05fe62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -113,7 +113,7 @@ public class StandbyTaskTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time); private File baseDir; private StreamsConfig config; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 3b795310b3615..ed0fb0b4f6793 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -954,7 +954,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); + directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", "applicationId", time), new LogContext("test")); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 385719530b9d5..71fd00c77ce3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -2604,7 +2604,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { streamsMetrics, null ); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", "applicationId", time); // The processor topology is missing the topics final ProcessorTopology topology = withSources(emptyList(), mkMap()); @@ -3238,7 +3238,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", "processId", time), + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time), stateDirectory, cache, time, @@ -3275,7 +3275,7 @@ private StreamTask createStatelessTask(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", "processId", time), + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time), stateDirectory, cache, time, @@ -3311,7 +3311,7 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode tasks) { } final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); @@ -1945,6 +1948,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU metrics, APPLICATION_ID, PROCESS_ID.toString(), + APPLICATION_ID, mockTime ); @@ -2705,7 +2709,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2766,7 +2770,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2835,7 +2839,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final doNothing().when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2901,7 +2905,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT doNothing().when(consumer).enforceRebalance("Active tasks corrupted"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2964,7 +2968,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3199,7 +3203,7 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3258,7 +3262,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3660,6 +3664,7 @@ public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabl metrics, APPLICATION_ID, PROCESS_ID.toString(), + APPLICATION_ID, mockTime ); @@ -3720,6 +3725,7 @@ public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpda metrics, APPLICATION_ID, PROCESS_ID.toString(), + APPLICATION_ID, mockTime ); @@ -3787,6 +3793,7 @@ public void testStreamsRebalanceDataWithStreamsProtocol() { metrics, APPLICATION_ID, PROCESS_ID.toString(), + APPLICATION_ID, mockTime ); @@ -3882,7 +3889,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -3942,7 +3949,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4011,7 +4018,7 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4071,7 +4078,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4131,7 +4138,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4200,7 +4207,7 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4296,7 +4303,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { "", taskManager, null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), topologyMetadata, PROCESS_ID, "thread-id", @@ -4348,7 +4355,7 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) { private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( new TopologyMetadata(internalTopologyBuilder, config), config, @@ -4407,7 +4414,7 @@ private StreamThread buildStreamThread(final Consumer consumer, final StreamsConfig config, final TopologyMetadata topologyMetadata) { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); return new StreamThread( mockTime, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index bdb9d028c5436..7070bb4df18cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -48,6 +48,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP; @@ -95,6 +96,7 @@ public class StreamsMetricsImplTest { private static final String INTERNAL_PREFIX = "internal"; private static final String CLIENT_ID = "test-client"; private static final String PROCESS_ID = "test-process"; + private static final String APPLICATION_ID = "test-app"; private static final String THREAD_ID1 = "test-thread-1"; private static final String TASK_ID1 = "test-task-1"; private static final String TASK_ID2 = "test-task-2"; @@ -131,13 +133,14 @@ public class StreamsMetricsImplTest { private final String metricNamePrefix = "metric"; private final String group = "group"; private final Map tags = mkMap(mkEntry("tag", "value")); - private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID)); + private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), + mkEntry(PROCESS_ID_TAG, PROCESS_ID), mkEntry(APPLICATION_ID_TAG, APPLICATION_ID)); private final MetricName metricName1 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags); private final MetricName metricName2 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags); private final MockTime time = new MockTime(0); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, time); private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) { final StringBuffer message = new StringBuffer(); @@ -251,7 +254,8 @@ public void shouldGetNewThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -263,7 +267,8 @@ public void shouldGetExistingThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -275,7 +280,8 @@ public void shouldGetNewTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -292,7 +298,8 @@ public void shouldGetExistingTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -309,7 +316,8 @@ public void shouldGetNewTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -328,7 +336,8 @@ public void shouldGetExistingTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -347,7 +356,8 @@ public void shouldGetNewStoreLevelSensorIfNoneExists() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; final ArgumentCaptor sensorKeys = setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -365,7 +375,8 @@ public void shouldGetExistingStoreLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -381,7 +392,8 @@ public void shouldGetExistingStoreLevelSensor() { public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL); @@ -393,7 +405,8 @@ public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -405,7 +418,8 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -417,7 +431,8 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); final Thread otherThread = @@ -432,7 +447,8 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws I public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -456,7 +472,8 @@ public void shouldAddNewStoreLevelMutableMetric() { .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -489,7 +506,8 @@ public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -539,7 +557,8 @@ public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws @Test public void shouldRemoveStateStoreLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final MetricName metricName1 = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricName metricName2 = @@ -562,7 +581,8 @@ public void shouldGetNewNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -580,7 +600,8 @@ public void shouldGetExistingNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -599,7 +620,8 @@ public void shouldGetNewCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, @@ -618,7 +640,8 @@ public void shouldGetExistingCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, TASK_ID1, @@ -635,7 +658,8 @@ public void shouldGetNewClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -647,7 +671,8 @@ public void shouldGetExistingClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -664,7 +689,8 @@ public void shouldAddClientLevelImmutableMetric() { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value); } @@ -678,7 +704,8 @@ public void shouldAddClientLevelMutableMetric() { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider); } @@ -699,7 +726,8 @@ private void setupRemoveSensorsTest(final Metrics metrics, @Test public void shouldRemoveClientLevelMetricsAndSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); final ArgumentCaptor sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0)); @@ -712,7 +740,8 @@ public void shouldRemoveClientLevelMetricsAndSensors() { @Test public void shouldRemoveThreadLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); addSensorsOnAllLevels(metrics, streamsMetrics); setupRemoveSensorsTest(metrics, THREAD_ID1); @@ -721,7 +750,8 @@ public void shouldRemoveThreadLevelSensors() { @Test public void testNullMetrics() { - assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", PROCESS_ID, time)); + assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", PROCESS_ID, + APPLICATION_ID, time)); } @Test @@ -754,7 +784,8 @@ public void testRemoveSensor() { @Test public void testMultiLevelSensorRemoval() { final Metrics registry = new Metrics(); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, PROCESS_ID, time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); for (final MetricName defaultMetric : registry.metrics().keySet()) { registry.removeMetric(defaultMetric); } @@ -860,7 +891,8 @@ public void testTotalMetricDoesntDecrease() { final MockTime time = new MockTime(1); final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS); final Metrics metrics = new Metrics(config, time); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", PROCESS_ID, APPLICATION_ID, + time); final String scope = "scope"; final String entity = "entity"; @@ -894,7 +926,8 @@ public void testTotalMetricDoesntDecrease() { @Test public void shouldAddLatencyRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); shouldAddCustomSensor( streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -909,7 +942,8 @@ public void shouldAddLatencyRateTotalSensor() { @Test public void shouldAddRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, + time); shouldAddCustomSensor( streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -1035,9 +1069,10 @@ public void shouldThrowIfRateTotalSensorIsAddedWithOddTags() { public void shouldGetClientLevelTagMap() { final Map tagMap = streamsMetrics.clientLevelTagMap(); - assertThat(tagMap.size(), equalTo(2)); + assertThat(tagMap.size(), equalTo(3)); assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG), equalTo(CLIENT_ID)); assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG), equalTo(PROCESS_ID)); + assertThat(tagMap.get(StreamsMetricsImpl.APPLICATION_ID_TAG), equalTo(APPLICATION_ID)); } @Test @@ -1045,7 +1080,8 @@ public void shouldGetStoreLevelTagMap() { final String taskName = "test-task"; final String storeType = "remote-window"; final String storeName = "window-keeper"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); final Map tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); @@ -1060,7 +1096,8 @@ public void shouldGetStoreLevelTagMap() { @Test public void shouldGetCacheLevelTagMap() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); final String taskName = "taskName"; final String storeName = "storeName"; @@ -1077,7 +1114,8 @@ public void shouldGetCacheLevelTagMap() { @Test public void shouldGetThreadLevelTagMap() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); final Map tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1); @@ -1210,7 +1248,7 @@ public void shouldAddMinAndMaxMetricsToSensor() { @Test public void shouldReturnMetricsVersionCurrent() { assertThat( - new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time).version(), + new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, time).version(), equalTo(Version.LATEST) ); } @@ -1269,7 +1307,8 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { public void shouldAddThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addThreadLevelMutableMetric( "foobar", @@ -1293,7 +1332,8 @@ public void shouldAddThreadLevelMutableMetric() { public void shouldAddThreadLevelMutableMetricWithAdditionalTags() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addThreadLevelMutableMetric( "foobar", @@ -1319,7 +1359,8 @@ public void shouldAddThreadLevelMutableMetricWithAdditionalTags() { public void shouldCleanupThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addThreadLevelMutableMetric( "foobar", "test metric", @@ -1341,7 +1382,8 @@ public void shouldCleanupThreadLevelMutableMetric() { public void shouldAddThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", @@ -1365,7 +1407,8 @@ public void shouldAddThreadLevelImmutableMetric() { public void shouldCleanupThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, + time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", "test metric", diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 0f8ce890b1952..902b362e1532d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1350,7 +1350,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1386,7 +1386,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1425,7 +1425,7 @@ public void shouldHandleTombstoneRecords() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1466,7 +1466,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 7a78716530a18..77a5d9e87106e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -572,7 +572,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic(final SegmentedB dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -612,7 +612,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics(final Segment dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -654,7 +654,7 @@ public void shouldHandleTombstoneRecords(final SegmentedBytesStore.KeySchema sch dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -698,7 +698,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 154517d3b94f4..e09a8ebe57bd0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -97,7 +97,7 @@ private InternalMockProcessorContext mockContext() { TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), streamsConfig, () -> collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 4239e3e5000ed..573cc02a438cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -112,7 +112,7 @@ public void before() { when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( - new StreamsMetricsImpl(new Metrics(), "threadName", "processId", new MockTime()) + new StreamsMetricsImpl(new Metrics(), "threadName", "processId", "applicationId", new MockTime()) ); when(mockContext.taskId()).thenReturn(new TaskId(0, 0)); when(mockContext.appConfigs()).thenReturn(CONFIGS); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index e71704f32af27..27aa52b48c6ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -54,7 +54,7 @@ public class KeyValueSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 1ba655a75ce79..0ecea105badb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -122,7 +122,7 @@ private void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()).thenReturn( - new StreamsMetricsImpl(metrics, "test", "processId", mockTime) + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime) ); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index f8b08a532d12c..4546e6b7c1ffd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -126,7 +126,7 @@ public void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(innerStore.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 2e3c470387c2b..0b6556ac78b4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -127,7 +127,7 @@ private void setUp() { setUpWithoutContext(); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(inner.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index a3fe59c6e8ba6..a3ed1453454e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), @@ -105,7 +105,7 @@ public void setUp() { public void setUpWithoutContextName() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 5c4509bc7a35e..1602f1a115bb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest { @BeforeEach public void setUp() { when(inner.name()).thenReturn(STORE_NAME); - when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); + when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.taskId()).thenReturn(TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 2726ce26aa73b..40e4e52eafae4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -116,7 +116,7 @@ public class MeteredWindowStoreTest { @BeforeEach public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 70224c8013c97..f56ac75a0c911 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -923,7 +923,7 @@ public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -956,7 +956,7 @@ public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -988,7 +988,7 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); final Properties props = StreamsTestUtils.getStreamsConfig(); context = mock(InternalMockProcessorContext.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 69e7d31e31b4b..5f5495af4251b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { public void setUp() { final Metrics metrics = new Metrics(); offset = 0; - streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); + streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); context = new MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 633d14c1e63b6..0acd2a330da22 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -54,7 +54,7 @@ public class TimestampedSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 75c01cef3c953..62a37ff8d0aa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -202,7 +202,7 @@ public void shouldGetPinnedUsageOfBlockCacheWithSingleCache() throws Exception { private void runAndVerifySumOfProperties(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -219,7 +219,7 @@ private void runAndVerifySumOfProperties(final String propertyName) throws Excep private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -236,7 +236,7 @@ private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String proper private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index a0c068b59ee5d..b0ed10c45fe6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -194,7 +194,7 @@ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetric assertThrows( IllegalStateException.class, () -> recorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), TASK_ID1 ) ); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index ed68c86c49020..1361df9a09ed1 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -90,7 +90,7 @@ public InternalMockProcessorContext() { this(null, null, null, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, @@ -108,6 +108,7 @@ public InternalMockProcessorContext(final File stateDir, new Metrics(), "mock", "processId", + "applicationId", new MockTime() ), config, @@ -141,6 +142,7 @@ public InternalMockProcessorContext(final File stateDir, new Metrics(), "mock", "processId", + "applicationId", new MockTime() ), config, @@ -158,7 +160,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), config, null, null, @@ -178,7 +180,7 @@ public InternalMockProcessorContext(final StateSerdes serdes, null, serdes.keySerde(), serdes.valueSerde(), - new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()), + new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, null, @@ -195,7 +197,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 92976875b9321..9b812b389fabc 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -385,6 +385,7 @@ private StreamsMetricsImpl setupMetrics(final StreamsConfig streamsConfig) { metrics, "test-client", "processId", + "applicationId", mockWallClockTime ); TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 0ffea9b49169f..08cf542305cdb 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -238,6 +238,7 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final new Metrics(metricConfig), threadId, "processId", + "applicationId", Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java index 3ffa85a45034c..25f728cd5674a 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -256,6 +256,7 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final new Metrics(metricConfig), threadId, "processId", + "applicationId", Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index ebb38dd773ad6..c760e5dd1d632 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -291,6 +291,7 @@ public void close() { } new Metrics(new MetricConfig()), Thread.currentThread().getName(), "processId", + "applicationId", Time.SYSTEM )); when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));