Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add io-metrics to topic and broker, adapting the active controller in the context of Kraft #4369

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,18 @@ private Comparator<InternalTopic> getComparatorForTopic(
return Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE:
return Comparator.comparing(InternalTopic::getSegmentSize);
case BYTESIN_PERSEC:
return Comparator.comparing(InternalTopic::getBytesInPerSec);
case BYTESOUT_PERSEC:
return Comparator.comparing(InternalTopic::getBytesOutPerSec);
case MSG_RATE:
return Comparator.comparing(InternalTopic::getMessageInMeanRate);
case MSG_5_RATE:
return Comparator.comparing(InternalTopic::getMessageInFiveMinuteRate);
case FETCH_RATE:
return Comparator.comparing(InternalTopic::getFetchRequestsMeanRate);
case FETCH_5_RATE:
return Comparator.comparing(InternalTopic::getFetchRequestsFiveMinuteRate);
case NAME:
default:
return defaultComparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.orElse(null);
topicCount = statistics.getTopicDescriptions().size();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
activeControllers = statistics.getMetrics().getController();
if (activeControllers == null || activeControllers < 0) {
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
}
version = statistics.getVersion();

if (statistics.getLogDirInfo() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.config.ClustersProperties;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -35,6 +36,18 @@ public class InternalTopic {
// rates from metrics
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
private final BigDecimal messageInMeanRate;
private final BigDecimal messageInOneMinuteRate;
private final BigDecimal messageInFiveMinuteRate;
private final BigDecimal messageInFifteenMinuteRate;
private final BigDecimal fetchRequestsMeanRate;
private final BigDecimal fetchRequestsOneMinuteRate;
private final BigDecimal fetchRequestsFiveMinuteRate;
private final BigDecimal fetchRequestsFifteenMinuteRate;
private final BigDecimal produceRequestsMeanRate;
private final BigDecimal produceRequestsOneMinuteRate;
private final BigDecimal produceRequestsFiveMinuteRate;
private final BigDecimal produceRequestsFifteenMinuteRate;

// from log dir data
private final long segmentSize;
Expand Down Expand Up @@ -114,8 +127,54 @@ public static InternalTopic from(TopicDescription topicDescription,
topic.segmentSize(segmentStats.getSegmentSize());
}

topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()) == null
? BigDecimal.ZERO : metrics.getTopicBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()) == null
? BigDecimal.ZERO : metrics.getTopicBytesOutPerSec().get(topicDescription.name()));

topic.messageInMeanRate(metrics.getMessageInMeanRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getMessageInMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.messageInOneMinuteRate(metrics.getMessageInOneMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getMessageInOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.messageInFiveMinuteRate(metrics.getMessageInFiveMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getMessageInFiveMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.messageInFifteenMinuteRate(metrics.getMessageInFifteenMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getMessageInFifteenMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));

topic.fetchRequestsMeanRate(metrics.getFetchRequestsMeanRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getFetchRequestsMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.fetchRequestsOneMinuteRate(metrics.getFetchRequestsOneMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getFetchRequestsOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.fetchRequestsFiveMinuteRate(metrics.getFetchRequestsFiveMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getFetchRequestsFiveMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.fetchRequestsFifteenMinuteRate(metrics.getFetchRequestsFifteenMinuteRate().get(
topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getFetchRequestsFifteenMinuteRate().get(topicDescription.name())
.setScale(2, RoundingMode.HALF_UP));

topic.produceRequestsMeanRate(metrics.getProduceRequestsMeanRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getProduceRequestsMeanRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.produceRequestsOneMinuteRate(metrics.getProduceRequestsOneMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getProduceRequestsOneMinuteRate().get(topicDescription.name()).setScale(2, RoundingMode.HALF_UP));
topic.produceRequestsFiveMinuteRate(metrics.getProduceRequestsFiveMinuteRate().get(topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getProduceRequestsFiveMinuteRate().get(topicDescription.name())
.setScale(2, RoundingMode.HALF_UP));
topic.produceRequestsFifteenMinuteRate(metrics.getProduceRequestsFifteenMinuteRate().get(
topicDescription.name()) == null
? BigDecimal.ZERO
: metrics.getProduceRequestsFifteenMinuteRate().get(topicDescription.name())
.setScale(2, RoundingMode.HALF_UP));

topic.topicConfigs(
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ public class Metrics {
Map<String, BigDecimal> topicBytesInPerSec;
Map<String, BigDecimal> topicBytesOutPerSec;
Map<Integer, List<RawMetric>> perBrokerMetrics;
Map<String, BigDecimal> messageInMeanRate;
Map<String, BigDecimal> messageInOneMinuteRate;
Map<String, BigDecimal> messageInFiveMinuteRate;
Map<String, BigDecimal> messageInFifteenMinuteRate;
Map<String, BigDecimal> fetchRequestsMeanRate;
Map<String, BigDecimal> fetchRequestsOneMinuteRate;
Map<String, BigDecimal> fetchRequestsFiveMinuteRate;
Map<String, BigDecimal> fetchRequestsFifteenMinuteRate;
Map<String, BigDecimal> produceRequestsMeanRate;
Map<String, BigDecimal> produceRequestsOneMinuteRate;
Map<String, BigDecimal> produceRequestsFiveMinuteRate;
Map<String, BigDecimal> produceRequestsFifteenMinuteRate;
Integer controller;

public static Metrics empty() {
return Metrics.builder()
Expand All @@ -29,6 +42,18 @@ public static Metrics empty() {
.topicBytesInPerSec(Map.of())
.topicBytesOutPerSec(Map.of())
.perBrokerMetrics(Map.of())
.messageInMeanRate(Map.of())
.messageInOneMinuteRate(Map.of())
.messageInFiveMinuteRate(Map.of())
.messageInFifteenMinuteRate(Map.of())
.fetchRequestsMeanRate(Map.of())
.fetchRequestsOneMinuteRate(Map.of())
.fetchRequestsFiveMinuteRate(Map.of())
.fetchRequestsFifteenMinuteRate(Map.of())
.produceRequestsMeanRate(Map.of())
.produceRequestsOneMinuteRate(Map.of())
.produceRequestsFiveMinuteRate(Map.of())
.produceRequestsFifteenMinuteRate(Map.of())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable {
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
private static final String JMX_SERVICE_TYPE = "jmxrmi";
private static final String CANONICAL_NAME_PATTERN = "kafka.server*:*";
private static final String CONTROLLER_NAME_PATTERN = "kafka.controller*:*";

@Override
public void close() {
Expand Down Expand Up @@ -118,6 +119,10 @@ private void getMetricsFromJmx(JMXConnector jmxConnector, List<RawMetric> sink)
for (ObjectName jmxMetric : jmxMetrics) {
sink.addAll(extractObjectMetrics(jmxMetric, msc));
}
var controllerMetrics = msc.queryNames(new ObjectName(CONTROLLER_NAME_PATTERN), null);
for (ObjectName jmxMetric : controllerMetrics) {
sink.addAll(extractObjectMetrics(jmxMetric, msc));
}
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
class WellKnownMetrics {

private static final String BROKER_TOPIC_METRICS = "BrokerTopicMetrics";
private static final String MEAN_RATE = "MeanRate";
private static final String ONE_MINUTE_RATE = "OneMinuteRate";
private static final String FIVE_MINUTE_RATE = "FiveMinuteRate";
private static final String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";

// per broker
Expand All @@ -21,9 +24,23 @@ class WellKnownMetrics {
// per topic
final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();

final Map<String, BigDecimal> messageInMeanRate = new HashMap<>();
final Map<String, BigDecimal> messageInOneMinuteRate = new HashMap<>();
final Map<String, BigDecimal> messageInFiveMinuteRate = new HashMap<>();
final Map<String, BigDecimal> messageInFifteenMinuteRate = new HashMap<>();
final Map<String, BigDecimal> fetchRequestsMeanRate = new HashMap<>();
final Map<String, BigDecimal> fetchRequestsOneMinuteRate = new HashMap<>();
final Map<String, BigDecimal> fetchRequestsFiveMinuteRate = new HashMap<>();
final Map<String, BigDecimal> fetchRequestsFifteenMinuteRate = new HashMap<>();
final Map<String, BigDecimal> produceRequestsMeanRate = new HashMap<>();
final Map<String, BigDecimal> produceRequestsOneMinuteRate = new HashMap<>();
final Map<String, BigDecimal> produceRequestsFiveMinuteRate = new HashMap<>();
final Map<String, BigDecimal> produceRequestsFifteenMinuteRate = new HashMap<>();
int controller = -1;

void populate(Node node, RawMetric rawMetric) {
updateBrokerIOrates(node, rawMetric);
updateController(node, rawMetric);
updateTopicsIOrates(rawMetric);
}

Expand All @@ -32,6 +49,19 @@ void apply(Metrics.MetricsBuilder metricsBuilder) {
metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate);
metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate);
metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate);
metricsBuilder.messageInMeanRate(messageInMeanRate);
metricsBuilder.messageInOneMinuteRate(messageInOneMinuteRate);
metricsBuilder.messageInFiveMinuteRate(messageInFiveMinuteRate);
metricsBuilder.messageInFifteenMinuteRate(messageInFifteenMinuteRate);
metricsBuilder.fetchRequestsMeanRate(fetchRequestsMeanRate);
metricsBuilder.fetchRequestsOneMinuteRate(fetchRequestsOneMinuteRate);
metricsBuilder.fetchRequestsFiveMinuteRate(fetchRequestsFiveMinuteRate);
metricsBuilder.fetchRequestsFifteenMinuteRate(fetchRequestsFifteenMinuteRate);
metricsBuilder.produceRequestsMeanRate(produceRequestsMeanRate);
metricsBuilder.produceRequestsOneMinuteRate(produceRequestsOneMinuteRate);
metricsBuilder.produceRequestsFiveMinuteRate(produceRequestsFiveMinuteRate);
metricsBuilder.produceRequestsFifteenMinuteRate(produceRequestsFifteenMinuteRate);
metricsBuilder.controller(controller);
}

private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
Expand All @@ -53,18 +83,76 @@ && endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
}

private void updateTopicsIOrates(RawMetric rawMetric) {
String name = rawMetric.name();
String topic = rawMetric.labels().get("topic");
if (topic != null
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
String nameProperty = rawMetric.labels().get("name");
if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
bytesOutFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
}
}
String name = rawMetric.name();
String topic = rawMetric.labels().get("topic");
if (topic != null && containsIgnoreCase(name, BROKER_TOPIC_METRICS)) {
if (endsWithIgnoreCase(name, MEAN_RATE)) {
String nameProperty = rawMetric.labels().get("name");
if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
messageInMeanRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
fetchRequestsMeanRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
produceRequestsMeanRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
}
}else if (endsWithIgnoreCase(name, ONE_MINUTE_RATE)) {
String nameProperty = rawMetric.labels().get("name");
if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
messageInOneMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
fetchRequestsOneMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
produceRequestsOneMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
}
}else if (endsWithIgnoreCase(name, FIVE_MINUTE_RATE)) {
String nameProperty = rawMetric.labels().get("name");
if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
messageInFiveMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
fetchRequestsFiveMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
produceRequestsFiveMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
}
}else if (endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
String nameProperty = rawMetric.labels().get("name");
if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
bytesInFifteenMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
bytesOutFifteenMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("MessagesInPerSec".equalsIgnoreCase(nameProperty)) {
messageInFifteenMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalFetchRequestsPerSec".equalsIgnoreCase(nameProperty)) {
fetchRequestsFifteenMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
} else if ("TotalProduceRequestsPerSec".equalsIgnoreCase(nameProperty)) {
produceRequestsFifteenMinuteRate.compute(topic,
(k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
}
}
}
}

private void updateController(Node node, RawMetric rawMetric) {
if (controller < 0 && containsIgnoreCase(rawMetric.name(), "KafkaController")) {
String nameProperty = rawMetric.labels().get("name");
if ("ActiveControllerCount".equals(nameProperty)) {
BigDecimal value = rawMetric.value();
if (value.intValue() == 1) {
controller = node.id();
}
}
}
}

}
30 changes: 30 additions & 0 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,12 @@ components:
- TOTAL_PARTITIONS
- REPLICATION_FACTOR
- SIZE
- BYTESIN_PERSEC
- BYTESOUT_PERSEC
- MSG_RATE
- MSG_5_RATE
- FETCH_RATE
- FETCH_5_RATE

ConnectorColumnsToSort:
type: string
Expand Down Expand Up @@ -2357,6 +2363,30 @@ components:
type: number
bytesOutPerSec:
type: number
messageInMeanRate:
type: number
messageInOneMinuteRate:
type: number
messageInFiveMinuteRate:
type: number
messageInFifteenMinuteRate:
type: number
fetchRequestsMeanRate:
type: number
fetchRequestsOneMinuteRate:
type: number
fetchRequestsFiveMinuteRate:
type: number
fetchRequestsFifteenMinuteRate:
type: number
produceRequestsMeanRate:
type: number
produceRequestsOneMinuteRate:
type: number
produceRequestsFiveMinuteRate:
type: number
produceRequestsFifteenMinuteRate:
type: number
underReplicatedPartitions:
type: integer
cleanUpPolicy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const BrokersList: React.FC = () => {
partitionsSkew: broker?.partitionsSkew,
leadersSkew: broker?.leadersSkew,
inSyncPartitions: broker?.inSyncPartitions,
bytesInPerSec: broker?.bytesInPerSec,
bytesOutPerSec: broker?.bytesOutPerSec,
};
});
}, [diskUsage, brokers]);
Expand Down Expand Up @@ -160,6 +162,8 @@ const BrokersList: React.FC = () => {
header: 'Host',
accessorKey: 'host',
},
{ header: 'IN /sec', accessorKey: 'bytesInPerSec', cell: SizeCell },
{ header: 'OUT /sec', accessorKey: 'bytesOutPerSec', cell: SizeCell },
],
[]
);
Expand Down
Loading
Loading