Skip to content

Commit bdfa8ce

Browse files
update projedt name + send method
1 parent aa1fbd3 commit bdfa8ce

File tree

15 files changed

+32
-39
lines changed

15 files changed

+32
-39
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ env:
1818
- PROJECT_DIR=spring-kafka-boot
1919
- PROJECT_DIR=spring-kafka-helloworld
2020
- PROJECT_DIR=spring-kafka-json
21-
- PROJECT_DIR=spring-kafka-multiple-consumers
21+
- PROJECT_DIR=spring-kafka-multiple-topics
2222
- PROJECT_DIR=spring-kafka-spring-integration-helloworld
2323
- PROJECT_DIR=spring-kafka-test-embedded
2424

spring-kafka-multiple-consumers/src/main/resources/application.yml

-10
This file was deleted.
+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# spring-kafka-multiple-topics
2+
3+
[![Quality Gate](https://sonarqube.com/api/badges/gate?key=com.codenotfound:spring-kafka-multiple-topics)](https://sonarqube.com/dashboard/index/com.codenotfound:spring-kafka-multiple-topics)
4+
5+
A detailed step-by-step tutorial on how to send/receive JSON messages using Spring Kafka and Spring Boot.
6+
7+
[https://www.codenotfound.com/spring-kafka-consume-multiple-topics-example.html](https://www.codenotfound.com/spring-kafka-consume-multiple-topics-example.html)

spring-kafka-multiple-consumers/pom.xml renamed to spring-kafka-multiple-topics/pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
<modelVersion>4.0.0</modelVersion>
55

66
<groupId>com.codenotfound</groupId>
7-
<artifactId>spring-kafka-multiple-consumers</artifactId>
7+
<artifactId>spring-kafka-multiple-topics</artifactId>
88
<version>0.0.1-SNAPSHOT</version>
99

10-
<name>spring-kafka-multiple-consumers</name>
11-
<description>Spring Kafka - Multiple Consumers Example</description>
12-
<url>https://www.codenotfound.com/spring-kafka</url>
10+
<name>spring-kafka-multiple-topics</name>
11+
<description>Spring Kafka - Consume Multiple Topics Example</description>
12+
<url>https://www.codenotfound.com/spring-kafka-consume-multiple-topics-example.html</url>
1313

1414
<parent>
1515
<groupId>org.springframework.boot</groupId>

spring-kafka-multiple-consumers/src/main/java/com/codenotfound/kafka/consumer/Receiver.java renamed to spring-kafka-multiple-topics/src/main/java/com/codenotfound/kafka/consumer/Receiver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ public CountDownLatch getLatch() {
1919
return latch;
2020
}
2121

22-
@KafkaListener(topics = "${topic.bar}")
22+
@KafkaListener(topics = "${kafka.topic.bar}")
2323
public void receiveBar(Bar bar) {
2424
LOGGER.info("received {}", bar.toString());
2525
latch.countDown();
2626
}
2727

28-
@KafkaListener(topics = "${topic.foo}")
28+
@KafkaListener(topics = "${kafka.topic.foo}")
2929
public void receiveFoo(Foo foo) {
3030
LOGGER.info("received {}", foo.toString());
3131
latch.countDown();

spring-kafka-multiple-consumers/src/main/java/com/codenotfound/kafka/producer/Sender.java renamed to spring-kafka-multiple-topics/src/main/java/com/codenotfound/kafka/producer/Sender.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import org.slf4j.LoggerFactory;
55
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.kafka.core.KafkaTemplate;
7-
import org.springframework.messaging.Message;
7+
import org.springframework.kafka.support.KafkaHeaders;
8+
import org.springframework.messaging.support.MessageBuilder;
89

910
public class Sender {
1011

@@ -13,8 +14,9 @@ public class Sender {
1314
@Autowired
1415
private KafkaTemplate<String, ?> kafkaTemplate;
1516

16-
public void send(Message<?> message) {
17-
LOGGER.info("sending {}", message.toString());
18-
kafkaTemplate.send(message);
17+
public void send(String topic, Object payload) {
18+
LOGGER.info("sending payload='{}' to topic='{}'", payload.toString(), topic);
19+
kafkaTemplate
20+
.send(MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build());
1921
}
2022
}

spring-kafka-multiple-consumers/src/main/java/com/codenotfound/kafka/producer/SenderConfig.java renamed to spring-kafka-multiple-topics/src/main/java/com/codenotfound/kafka/producer/SenderConfig.java

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class SenderConfig {
2222
@Bean
2323
public Map<String, Object> producerConfigs() {
2424
Map<String, Object> props = new HashMap<>();
25-
2625
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
2726
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
2827
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kafka:
2+
bootstrap-servers: localhost:9092
3+
topic:
4+
bar: bar.t
5+
foo: foo.t

spring-kafka-multiple-consumers/src/main/resources/logback.xml renamed to spring-kafka-multiple-topics/src/main/resources/logback.xml

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
<logger name="com.codenotfound" level="INFO" />
1111
<logger name="org.springframework" level="WARN" />
1212

13-
1413
<root level="WARN">
1514
<appender-ref ref="STDOUT" />
1615
</root>
+2-16
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,15 @@
55
import java.util.concurrent.TimeUnit;
66

77
import org.junit.Before;
8-
import org.junit.BeforeClass;
98
import org.junit.ClassRule;
109
import org.junit.Test;
1110
import org.junit.runner.RunWith;
1211
import org.springframework.beans.factory.annotation.Autowired;
1312
import org.springframework.boot.test.context.SpringBootTest;
1413
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
1514
import org.springframework.kafka.listener.MessageListenerContainer;
16-
import org.springframework.kafka.support.KafkaHeaders;
1715
import org.springframework.kafka.test.rule.KafkaEmbedded;
1816
import org.springframework.kafka.test.utils.ContainerTestUtils;
19-
import org.springframework.messaging.Message;
20-
import org.springframework.messaging.support.MessageBuilder;
2117
import org.springframework.test.context.junit4.SpringRunner;
2218

2319
import com.codenotfound.kafka.consumer.Receiver;
@@ -44,11 +40,6 @@ public class SpringKafkaApplicationTest {
4440
@ClassRule
4541
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BAR_TOPIC, FOO_TOPIC);
4642

47-
@BeforeClass
48-
public static void setUpBeforeClass() throws Exception {
49-
System.setProperty("kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
50-
}
51-
5243
@Before
5344
public void setUp() throws Exception {
5445
// wait until the partitions are assigned
@@ -61,13 +52,8 @@ public void setUp() throws Exception {
6152

6253
@Test
6354
public void testReceive() throws Exception {
64-
Message<Bar> bar =
65-
MessageBuilder.withPayload(new Bar("bar")).setHeader(KafkaHeaders.TOPIC, BAR_TOPIC).build();
66-
sender.send(bar);
67-
68-
Message<Foo> foo =
69-
MessageBuilder.withPayload(new Foo("foo")).setHeader(KafkaHeaders.TOPIC, FOO_TOPIC).build();
70-
sender.send(foo);
55+
sender.send(BAR_TOPIC, new Bar("bar"));
56+
sender.send(FOO_TOPIC, new Foo("foo"));
7157

7258
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
7359
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kafka:
2+
bootstrap-servers: ${spring.embedded.kafka.brokers}
3+
topic:
4+
bar: bar.t
5+
foo: foo.t

0 commit comments

Comments
 (0)