Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new metric kafka_consumergroup_topicmembers #435

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ kafka_exporter
# Test configuration
test/
.DS_Store

*.strimzi
6 changes: 6 additions & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ build:
tarball:
files:
- LICENSE
crossbuild:
platforms:
- linux/amd64
- linux/s390x
- linux/arm64
- linux/ppc64le
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ docker: build
@docker build -t "$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" --build-arg BIN_DIR=. .

push: crossbuild
@echo ">> building and pushing multi-arch docker images, $(DOCKER_USERNAME),$(DOCKER_IMAGE_NAME),$(GIT_TAG_NAME)"
ifneq (, $(DOCKER_PASSWORD))
@docker login -u $(DOCKER_USERNAME) -p $(DOCKER_PASSWORD)
endif
@echo ">> building and pushing multi-arch docker images, $(DOCKER_USERNAME),$(DOCKER_IMAGE_NAME),$(GIT_TAG_NAME)"
@docker buildx create --use
@docker buildx build -t "$(DOCKER_USERNAME)/$(DOCKER_IMAGE_NAME):$(GIT_TAG_NAME)" \
@docker buildx build -t "$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" \
--output "$(PUSHTAG)" \
--platform "$(DOCKER_PLATFORMS)" \
.
Expand Down Expand Up @@ -106,8 +108,8 @@ lint: golangci-lint
golangci-lint:
ifeq (, $(shell which golangci-lint))
@GOOS=$(shell uname -s | tr A-Z a-z) \
GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m))) \
$(GO) install github.com/golangci/golangci-lint/cmd/[email protected]
GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m))) \
$(GO) install github.com/golangci/golangci-lint/cmd/[email protected]
GOLANG_LINT=$(shell go env GOPATH)/bin/golangci-lint
else
GOLANG_LINT=$(shell which golangci-lint)
Expand Down
27 changes: 27 additions & 0 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
consumergroupLagSum *prometheus.Desc
consumergroupLagZookeeper *prometheus.Desc
consumergroupMembers *prometheus.Desc
consumergroupTopicMembers *prometheus.Desc
)

// Exporter collects Kafka stats from the given server and exports them using
Expand Down Expand Up @@ -594,9 +595,29 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
}
}
}

ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)

uniqueTopics := make(map[string]int)
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil || assignment == nil {
klog.Errorf("Cannot get GetMemberAssignment of group member %v : %v", member, err)
continue
}
for topic := range assignment.Topics {
uniqueTopics[topic]++
}
}

for topic, count := range uniqueTopics {
ch <- prometheus.MustNewConstMetric(
consumergroupTopicMembers, prometheus.GaugeValue, float64(count), group.GroupId, topic,
)
}

offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
klog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err)
Expand Down Expand Up @@ -896,6 +917,12 @@ func setup(
[]string{"consumergroup"}, labels,
)

consumergroupTopicMembers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "topicmembers"),
"Amount of members in a consumer group against a topic",
[]string{"consumergroup", "topic"}, labels,
)

if logSarama {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
Expand Down