Skip to content

Commit 4d0b50e

Browse files
Merge pull request #38057 from Wzy19930507
* pr/38057: Polish "Add observationEnabled properties for Apache Kafka" Add observationEnabled properties for Apache Kafka Closes gh-38057
2 parents 9ba46f5 + 55ab56f commit 4d0b50e

File tree

4 files changed

+32
-1
lines changed

4 files changed

+32
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ private void configureContainer(ContainerProperties container) {
236236
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
237237
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
238238
map.from(properties::isImmediateStop).to(container::setStopImmediate);
239+
map.from(properties::getObservationEnabled).to(container::setObservationEnabled);
239240
map.from(this.transactionManager).to(container::setTransactionManager);
240241
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
241242
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ PropertiesKafkaConnectionDetails kafkaConnectionDetails(KafkaProperties properti
9898
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
9999
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
100100
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
101+
map.from(this.properties.getTemplate().getObservationEnabled()).to(kafkaTemplate::setObservationEnabled);
101102
return kafkaTemplate;
102103
}
103104

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,11 @@ public static class Template {
984984
*/
985985
private String transactionIdPrefix;
986986

987+
/**
988+
* Whether to enable observation.
989+
*/
990+
private boolean observationEnabled;
991+
987992
public String getDefaultTopic() {
988993
return this.defaultTopic;
989994
}
@@ -1000,6 +1005,14 @@ public void setTransactionIdPrefix(String transactionIdPrefix) {
10001005
this.transactionIdPrefix = transactionIdPrefix;
10011006
}
10021007

1008+
public boolean getObservationEnabled() {
1009+
return this.observationEnabled;
1010+
}
1011+
1012+
public void setObservationEnabled(boolean observationEnabled) {
1013+
this.observationEnabled = observationEnabled;
1014+
}
1015+
10031016
}
10041017

10051018
public static class Listener {
@@ -1117,6 +1130,11 @@ public enum Type {
11171130
*/
11181131
private Boolean changeConsumerThreadName;
11191132

1133+
/**
1134+
* Whether to enable observation.
1135+
*/
1136+
private boolean observationEnabled;
1137+
11201138
public Type getType() {
11211139
return this.type;
11221140
}
@@ -1261,6 +1279,14 @@ public void setChangeConsumerThreadName(Boolean changeConsumerThreadName) {
12611279
this.changeConsumerThreadName = changeConsumerThreadName;
12621280
}
12631281

1282+
public boolean getObservationEnabled() {
1283+
return this.observationEnabled;
1284+
}
1285+
1286+
public void setObservationEnabled(boolean observationEnabled) {
1287+
this.observationEnabled = observationEnabled;
1288+
}
1289+
12641290
}
12651291

12661292
public static class Ssl {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,8 @@ void listenerProperties() {
618618
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
619619
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
620620
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
621-
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true")
621+
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true",
622+
"spring.kafka.template.observation-enabled=true", "spring.kafka.listener.observation-enabled=true")
622623
.run((context) -> {
623624
DefaultKafkaProducerFactory<?, ?> producerFactory = context.getBean(DefaultKafkaProducerFactory.class);
624625
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context.getBean(DefaultKafkaConsumerFactory.class);
@@ -629,6 +630,7 @@ void listenerProperties() {
629630
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
630631
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
631632
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
633+
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("observationEnabled", true);
632634
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
633635
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
634636
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
@@ -645,6 +647,7 @@ void listenerProperties() {
645647
assertThat(containerProperties.isLogContainerConfig()).isTrue();
646648
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
647649
assertThat(containerProperties.isStopImmediate()).isTrue();
650+
assertThat(containerProperties.isObservationEnabled()).isTrue();
648651
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
649652
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
650653
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", true);

0 commit comments

Comments
 (0)