Skip to content

Commit d74045f

Browse files
committed
Initial commit.
0 parents  commit d74045f

File tree

7 files changed

+1085
-0
lines changed

7 files changed

+1085
-0
lines changed

.gitignore

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/target/
2+
/.classpath
3+
/.project
4+
/target/
5+
*#
6+
*.iml
7+
*.ipr
8+
*.iws
9+
*.jar
10+
*.sw?
11+
*~
12+
.#*
13+
.*.md.html
14+
.DS_Store
15+
.classpath
16+
.factorypath
17+
.gradle
18+
.idea
19+
.metadata
20+
.project
21+
.recommenders
22+
.settings
23+
.springBeans
24+
/build
25+
/code
26+
MANIFEST.MF
27+
_site/
28+
activemq-data
29+
bin
30+
build
31+
build.log
32+
dependency-reduced-pom.xml
33+
dump.rdb
34+
interpolated*.xml
35+
lib/
36+
manifest.yml
37+
overridedb.*
38+
settings.xml
39+
target
40+
transaction-logs
41+
/application.properties

LICENSE

+674
Large diffs are not rendered by default.

pom.xml

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.contactsunny.poc</groupId>
8+
<artifactId>SimpleKafkaProducer</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<name>SimpleKafkaProducer</name>
12+
<url>http://maven.apache.org</url>
13+
<parent>
14+
<groupId>org.springframework.boot</groupId>
15+
<artifactId>spring-boot-starter-parent</artifactId>
16+
<version>1.5.6.RELEASE</version>
17+
<relativePath/> <!-- lookup parent from repository -->
18+
</parent>
19+
<properties>
20+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21+
<maven.compiler.source>1.8</maven.compiler.source>
22+
<maven.compiler.target>1.8</maven.compiler.target>
23+
</properties>
24+
25+
<build>
26+
<plugins>
27+
<!-- Package as an executable jar/war -->
28+
<plugin>
29+
<groupId>org.springframework.boot</groupId>
30+
<artifactId>spring-boot-maven-plugin</artifactId>
31+
</plugin>
32+
</plugins>
33+
</build>
34+
35+
<dependencies>
36+
37+
<dependency>
38+
<groupId>org.springframework.boot</groupId>
39+
<artifactId>spring-boot-starter</artifactId>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.apache.kafka</groupId>
44+
<artifactId>kafka-clients</artifactId>
45+
<version>0.11.0.0</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.codehaus.jettison</groupId>
49+
<artifactId>jettison</artifactId>
50+
<version>1.2</version>
51+
</dependency>
52+
</dependencies>
53+
54+
</project>

readme.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Simple Kafka Producer
2+
3+
## Kafka Topic
4+
5+
```shell
6+
thetechcheck
7+
```
8+
9+
## Creating a Kafka Topic
10+
11+
```cd``` into the Kafka directory, and run the following command to create a new topic:
12+
13+
```shell
14+
./bin/kafka-topics.sh --create --topic thetechcheck --replication-factor 1 --partitions 1 --zookeeper localhost:2181
15+
```
16+
17+
## Running the SpringBoot application
18+
19+
```cd``` into the project directory and run the following command to create a ```.jar``` file of the project:
20+
21+
```shell
22+
mvn clean install
23+
```
24+
25+
This will create a ```.jar``` file in the ```target``` directory, inside the project directory. Now to run the project, run the following command from the same project directory:
26+
27+
```shell
28+
java -jar target/<name_of_jar_file>.jar
29+
```
30+
31+
You should now be seeing the output in the terminal.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package com.contactsunny.poc.SimpleKafkaProducer;
2+
3+
import com.contactsunny.poc.SimpleKafkaProducer.kafkaConsumers.SimpleKafkaConsumer;
4+
import org.apache.kafka.clients.producer.KafkaProducer;
5+
import org.apache.kafka.clients.producer.ProducerRecord;
6+
import org.apache.log4j.Logger;
7+
import org.codehaus.jettison.json.JSONException;
8+
import org.codehaus.jettison.json.JSONObject;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.boot.CommandLineRunner;
11+
import org.springframework.boot.SpringApplication;
12+
import org.springframework.boot.autoconfigure.SpringBootApplication;
13+
14+
import java.util.Properties;
15+
16+
@SpringBootApplication
17+
public class SimpleKafkaProducerApplication implements CommandLineRunner {
18+
19+
@Value("${kafka.topic.thetechcheck}")
20+
private String theTechCheckTopicName;
21+
22+
@Value("${kafka.bootstrap.servers}")
23+
private String kafkaBootstrapServers;
24+
25+
@Value("${zookeeper.groupId}")
26+
private String zookeeperGroupId;
27+
28+
@Value("${zookeeper.host}")
29+
String zookeeperHost;
30+
31+
private static final Logger logger = Logger.getLogger(SimpleKafkaProducerApplication.class);
32+
33+
public static void main( String[] args ) {
34+
SpringApplication.run(SimpleKafkaProducerApplication.class, args);
35+
}
36+
37+
@Override
38+
public void run(String... args) {
39+
40+
/*
41+
* Defining producer properties.
42+
*/
43+
Properties props = new Properties();
44+
props.put("bootstrap.servers", kafkaBootstrapServers);
45+
props.put("acks", "all");
46+
props.put("retries", 0);
47+
props.put("batch.size", 16384);
48+
props.put("linger.ms", 1);
49+
props.put("buffer.memory", 33554432);
50+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
51+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
52+
53+
/*
54+
Creating a Kafka Producer object with the configuration above.
55+
*/
56+
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
57+
58+
/*
59+
The sendTestMessagesToKafka method will generate some random test messages
60+
and send them to Kafka.
61+
*/
62+
sendTestMessagesToKafka(producer);
63+
64+
/*
65+
Now that we've produced some test messages, let's see how to consume them using a Kafka consumer object.
66+
*/
67+
/*
68+
* Defining Kafka consumer properties.
69+
*/
70+
Properties consumerProperties = new Properties();
71+
consumerProperties.put("bootstrap.servers", kafkaBootstrapServers);
72+
consumerProperties.put("group.id", zookeeperGroupId);
73+
consumerProperties.put("zookeeper.session.timeout.ms", "6000");
74+
consumerProperties.put("zookeeper.sync.time.ms","2000");
75+
consumerProperties.put("auto.commit.enable", "false");
76+
consumerProperties.put("auto.commit.interval.ms", "1000");
77+
consumerProperties.put("consumer.timeout.ms", "-1");
78+
consumerProperties.put("max.poll.records", "1");
79+
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
80+
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
81+
82+
/*
83+
* Creating a thread to listen to the kafka topic
84+
*/
85+
Thread kafkaConsumerThread = new Thread(() -> {
86+
logger.info("Starting Kafka consumer thread.");
87+
88+
SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer(
89+
theTechCheckTopicName,
90+
consumerProperties
91+
);
92+
93+
simpleKafkaConsumer.runSingleWorker();
94+
});
95+
96+
/*
97+
* Starting the first thread.
98+
*/
99+
kafkaConsumerThread.start();
100+
}
101+
102+
/**
103+
* Function to send some test messages to Kafka.
104+
* We'll get the Kafka producer object as a parameter to this function.
105+
* We'll generate some test messages, both simple strings and JSON objects, in a couple of
106+
* loops inside the function. We'll send these test messages to the topic in Kafka.
107+
*
108+
* @param producer The Kafka producer we created in the run() method earlier.
109+
*/
110+
private void sendTestMessagesToKafka(KafkaProducer<String, String> producer) {
111+
/*
112+
Creating a loop which iterates 10 times, from 0 to 9, and sending a
113+
simple message to Kafka.
114+
*/
115+
for (int index = 0; index < 10; index++) {
116+
sendKafkaMessage("The index is now: " + index, producer, theTechCheckTopicName);
117+
}
118+
119+
/*
120+
Creating a loop which iterates 10 times, from 0 to 9, and creates an instance of JSONObject
121+
in each iteration. We'll use this simple JSON object to illustrate how we can send a JSON
122+
object as a message in Kafka.
123+
*/
124+
for (int index = 0; index < 10; index++) {
125+
126+
/*
127+
We'll create a JSON object which will have a bunch of fields, and another JSON object,
128+
which will be nested inside the first JSON object. This is just to demonstrate how
129+
complex objects could be serialized and sent to topics in Kafka.
130+
*/
131+
JSONObject jsonObject = new JSONObject();
132+
JSONObject nestedJsonObject = new JSONObject();
133+
134+
try {
135+
/*
136+
Adding some random data into the JSON object.
137+
*/
138+
jsonObject.put("index", index);
139+
jsonObject.put("message", "The index is now: " + index);
140+
141+
/*
142+
We're adding a field in the nested JSON object.
143+
*/
144+
nestedJsonObject.put("nestedObjectMessage", "This is a nested JSON object with index: " + index);
145+
146+
/*
147+
Adding the nexted JSON object to the main JSON object.
148+
*/
149+
jsonObject.put("nestedJsonObject", nestedJsonObject);
150+
151+
} catch (JSONException e) {
152+
logger.error(e.getMessage());
153+
}
154+
155+
/*
156+
We'll now serialize the JSON object we created above, and send it to the same topic in Kafka,
157+
using the same function we used earlier.
158+
You can use any JSON library for this, just make sure it serializes your objects properly.
159+
A popular alternative to the one I've used is Gson.
160+
*/
161+
sendKafkaMessage(jsonObject.toString(), producer, theTechCheckTopicName);
162+
}
163+
}
164+
165+
/**
166+
* Function to send a message to Kafka
167+
* @param payload
168+
* @param producer
169+
* @param topic
170+
*/
171+
private static void sendKafkaMessage(String payload,
172+
KafkaProducer<String, String> producer,
173+
String topic)
174+
{
175+
logger.info("Sending Kafka message: " + payload);
176+
producer.send(new ProducerRecord<>(topic, payload));
177+
}
178+
}

0 commit comments

Comments
 (0)