Skip to content

Commit b065c60

Browse files
KAFKA-19734 Adding application tag to clientstate metric (#20766)
## Summary As a follow-on to the improvements introduced in [KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics) it would be useful to add the application-id as a tag to the `client-state` metric. This allows Kafka Streams developers and operators to connect metrics containing a `thread-id` (which embeds the `application-id`) across separate deployments of Kafka Streams instances, which are members of the same logical application. Reviewers: Bill Bejeck<[email protected]> Matthias Sax<[email protected]>
1 parent 6b187e9 commit b065c60

36 files changed

+172
-110
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
983983
metrics,
984984
clientId,
985985
processId.toString(),
986+
applicationId,
986987
time
987988
);
988989

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public int hashCode() {
9090
private final Map<Sensor, Sensor> parentSensors;
9191
private final String clientId;
9292
private final String processId;
93+
private final String applicationId;
9394

9495
private final Version version;
9596
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -118,6 +119,7 @@ public int hashCode() {
118119

119120
public static final String CLIENT_ID_TAG = "client-id";
120121
public static final String PROCESS_ID_TAG = "process-id";
122+
public static final String APPLICATION_ID_TAG = "application-id";
121123
public static final String THREAD_ID_TAG = "thread-id";
122124
public static final String TASK_ID_TAG = "task-id";
123125
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@@ -168,11 +170,13 @@ public int hashCode() {
168170
public StreamsMetricsImpl(final Metrics metrics,
169171
final String clientId,
170172
final String processId,
173+
final String applicationId,
171174
final Time time) {
172175
Objects.requireNonNull(metrics, "Metrics cannot be null");
173176
this.metrics = metrics;
174177
this.clientId = clientId;
175178
this.processId = processId;
179+
this.applicationId = applicationId;
176180
version = Version.LATEST;
177181
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
178182

@@ -284,6 +288,7 @@ public Map<String, String> clientLevelTagMap() {
284288
final Map<String, String> tagMap = new LinkedHashMap<>();
285289
tagMap.put(CLIENT_ID_TAG, clientId);
286290
tagMap.put(PROCESS_ID_TAG, processId);
291+
tagMap.put(APPLICATION_ID_TAG, applicationId);
287292
return tagMap;
288293
}
289294

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
8080

8181
private final MockTime time = new MockTime();
8282
private final Metrics metrics = new Metrics();
83-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time);
83+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
8484
private final String threadId = Thread.currentThread().getName();
8585
private final Initializer<Long> initializer = () -> 0L;
8686
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;

streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
7272
private ChangelogReader changeLogReader;
7373

7474
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
75-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
75+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime());
7676
private final Map<String, Object> properties = mkMap(
7777
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
7878
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
106106

107107
// need an auto-tick timer to work for draining with timeout
108108
private final Time time = new MockTime(1L);
109-
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
109+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time);
110110
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
111111
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
112112
private final TopologyMetadata topologyMetadata = unnamedTopology().build();

streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void process(final Record<Object, Object> record) {
132132
mockConsumer,
133133
new StateDirectory(config, time, true, false),
134134
0,
135-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
135+
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
136136
time,
137137
"clientId",
138138
stateRestoreListener,
@@ -169,7 +169,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
169169
mockConsumer,
170170
new StateDirectory(config, time, true, false),
171171
0,
172-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
172+
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
173173
time,
174174
"clientId",
175175
stateRestoreListener,
@@ -418,7 +418,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
418418
consumer,
419419
new StateDirectory(config, time, true, false),
420420
0,
421-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
421+
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
422422
time,
423423
"clientId",
424424
stateRestoreListener,

streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@
2323
public class MockStreamsMetrics extends StreamsMetricsImpl {
2424

2525
public MockStreamsMetrics(final Metrics metrics) {
26-
super(metrics, "test", "processId", new MockTime());
26+
super(metrics, "test", "processId", "applicationId", new MockTime());
2727
}
2828
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public Set<TopicPartition> partitions() {
279279
public void testMetricsWithBuiltInMetricsVersionLatest() {
280280
final Metrics metrics = new Metrics();
281281
final StreamsMetricsImpl streamsMetrics =
282-
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
282+
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
283283
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
284284
final ProcessorNode<Object, Object, Object, Object> node =
285285
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
@@ -363,7 +363,7 @@ public void process(final Record<Object, Object> record) {
363363
public void testTopologyLevelClassCastExceptionDirect() {
364364
final Metrics metrics = new Metrics();
365365
final StreamsMetricsImpl streamsMetrics =
366-
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
366+
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
367367
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
368368
final ProcessorNode<Object, Object, Object, Object> node =
369369
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
@@ -441,7 +441,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
441441
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
442442

443443
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
444-
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
444+
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()));
445445
when(internalProcessorContext.topic()).thenReturn(TOPIC);
446446
when(internalProcessorContext.partition()).thenReturn(PARTITION);
447447
when(internalProcessorContext.offset()).thenReturn(OFFSET);

streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class RecordQueueTest {
7272

7373
private final Metrics metrics = new Metrics();
7474
private final StreamsMetricsImpl streamsMetrics =
75-
new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
75+
new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime());
7676

7777
final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
7878
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),

streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) {
9797
public void shouldExposeProcessMetrics() {
9898
final Metrics metrics = new Metrics();
9999
final StreamsMetricsImpl streamsMetrics =
100-
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
100+
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
101101
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
102102
final SourceNode<String, String> node =
103103
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());

0 commit comments

Comments
 (0)