Skip to content

Commit 2936c44

Browse files
committed
kafka consumer template
0 parents  commit 2936c44

File tree

10 files changed

+375
-0
lines changed

10 files changed

+375
-0
lines changed

.gitignore

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# If you prefer the allow list template instead of the deny list, see community template:
2+
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
3+
#
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.exe~
7+
*.dll
8+
*.so
9+
*.dylib
10+
11+
# Test binary, built with `go test -c`
12+
*.test
13+
14+
# Output of the go coverage tool, specifically when used with LiteIDE
15+
*.out
16+
17+
# Dependency directories (remove the comment below to include it)
18+
# vendor/
19+
20+
# Go workspace file
21+
go.work
22+
23+
go.sum

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Simple Kafka Consumer in Go
2+
3+
This is a template repository for a simple Kafka consumer in Go. It uses the [confluent pkg](https://github.com/confluentinc/confluent-kafka-go).
4+
5+
## Instructions
6+
7+
1. Clone this repository into your `$GOPATH/src` directory.
8+
9+
2. Make sure your imports are correct and run `go mod tidy` to update the `go.sum` file.
10+
11+
3. Start up a local kafka instance using the below command. This docker compose file will automatically create a topic called `test-topic-1`.Kafka-REST is also included to make it easy to interact with the kafka instance vua HTTP.
12+
13+
```bash
14+
docker-compose up -d
15+
```
16+
17+
You can check that the topic was created by running the following command:
18+
19+
```bash
20+
curl --request GET --url http://localhost:38082/topics
21+
```
22+
23+
4. Run `go run main.go` to start the consumer.
24+
25+
*Note: If you are using a different kafka instance, you will need to update the properties in the `config/config.go` file.*
26+
27+
5. Send a message to the topic using the following command:
28+
29+
```bash
30+
curl --request POST \
31+
--url http://localhost:38082/topics/test-topic-1 \
32+
--header 'content-type: application/vnd.kafka.json.v2+json' \
33+
--data '{
34+
"records": [
35+
{
36+
"value": {
37+
"taskType":"taskType1",
38+
"data":{
39+
"id":1,
40+
"name": "John Doe"
41+
}
42+
}
43+
}
44+
]
45+
}'
46+
```
47+
48+
You should see the message printed to the console.

config/config.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package config
2+
3+
import (
4+
"github.com/ilyakaznacheev/cleanenv"
5+
"github.com/sirupsen/logrus"
6+
)
7+
8+
type Configuration struct {
9+
AppEnv string `env:"APP_ENV" env-description:"app environment" env-default:"staging"`
10+
LogFormat string `env:"LOG_FORMAT" env-description:"format of the go-consumer log" env-default:"text"`
11+
NoOfThreads int `env:"NO_OF_THREADS" env-description:"Total number of threads/go-routines to be spawned" env-default:"4"`
12+
ChannelBufferSize int `env:"CHANNEL_BUFFER_SIZE" env-description:"total size of channel buffer" env-default:"5"`
13+
14+
KafkaBroker string `env:"KAFKA_BROKER" env-description:"kafka broker address host:port" env-default:"localhost:9092"`
15+
KafkaGroup string `env:"KAFKA_GROUP" env-description:"kafka consumer group" env-default:"consumer-0"`
16+
KafkaTopic string `env:"KAFKA_TOPIC" env-description:"kafka topic" env-default:"test-topic-1"`
17+
}
18+
19+
var Config Configuration
20+
21+
func init() {
22+
err := cleanenv.ReadEnv(&Config)
23+
if err != nil {
24+
logrus.Errorf("Something went wrong while loading configurations from env | Error: %s", err.Error())
25+
}
26+
}

consumer/consumer.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package consumer
2+
3+
import (
4+
"github.com/pythonista7/go-kafka-consumer/utils"
5+
"github.com/pythonista7/go-kafka-consumer/worker"
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
// Main Consumer
10+
func ConsumeQueue(
11+
log *logrus.Logger,
12+
consumeQueue <-chan utils.KafkaPayload,
13+
) error {
14+
for payload := range consumeQueue {
15+
log.Debugf("Processing payload ...")
16+
17+
switch payload.TaskType {
18+
case "taskType1":
19+
{
20+
err := worker.PerformTask1(log, payload.Data)
21+
if err != nil {
22+
log.Errorf("Error performing task1 : %s", err.Error())
23+
continue
24+
}
25+
}
26+
/*
27+
Add more cases as necessary
28+
*/
29+
default:
30+
{
31+
log.Errorf("Invalid task type: %s", payload.TaskType)
32+
}
33+
}
34+
}
35+
36+
return nil
37+
}

docker-compose.yml

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
---
2+
version: '2'
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:latest
6+
environment:
7+
ZOOKEEPER_CLIENT_PORT: 2181
8+
ZOOKEEPER_TICK_TIME: 2000
9+
10+
kafka:
11+
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
12+
# An important note about accessing Kafka from clients on other machines:
13+
# -----------------------------------------------------------------------
14+
#
15+
# The config used here exposes port 9092 for _external_ connections to the broker
16+
# i.e. those from _outside_ the docker network. This could be from the host machine
17+
# running docker, or maybe further afield if you've got a more complicated setup.
18+
# If the latter is true, you will need to change the value 'localhost' in
19+
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
20+
# remote clients
21+
#
22+
# For connections _internal_ to the docker network, such as from other services
23+
# and components, use kafka:29092.
24+
#
25+
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
26+
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
27+
#
28+
image: confluentinc/cp-kafka:latest
29+
depends_on:
30+
- zookeeper
31+
32+
# reachable on 9092 from the host and on 29092 from inside docker compose
33+
ports:
34+
- 9092:9092
35+
expose:
36+
- '29092'
37+
environment:
38+
KAFKA_BROKER_ID: 1
39+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
40+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
41+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
42+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
44+
45+
init-kafka:
46+
image: confluentinc/cp-kafka:latest
47+
depends_on:
48+
- kafka
49+
entrypoint: [ '/bin/sh', '-c' ]
50+
command: |
51+
"
52+
# blocks until kafka is reachable
53+
kafka-topics --bootstrap-server kafka:29092 --list
54+
55+
echo -e 'Creating kafka topics'
56+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic test-topic-1 --replication-factor 1 --partitions 1
57+
58+
echo -e 'Successfully created the following topics:'
59+
kafka-topics --bootstrap-server kafka:29092 --list
60+
"
61+
62+
kafka-rest:
63+
image: confluentinc/cp-kafka-rest:4.1.1
64+
hostname: kafka-rest
65+
ports:
66+
- "38082:38082"
67+
depends_on:
68+
- kafka
69+
environment:
70+
KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
71+
KAFKA_REST_HOST_NAME: kafka-rest
72+
KAFKA_REST_LISTENERS: http://kafka-rest:38082

go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module github.com/pythonista7/go-kafka-consumer
2+
3+
go 1.15
4+
5+
require (
6+
github.com/confluentinc/confluent-kafka-go v1.9.2 // indirect
7+
github.com/ilyakaznacheev/cleanenv v1.3.0
8+
github.com/sirupsen/logrus v1.9.0
9+
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2
10+
)

main.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"strings"
9+
"syscall"
10+
"time"
11+
12+
"github.com/pythonista7/go-kafka-consumer/config"
13+
"github.com/pythonista7/go-kafka-consumer/consumer"
14+
"github.com/pythonista7/go-kafka-consumer/utils"
15+
"github.com/sirupsen/logrus"
16+
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
17+
)
18+
19+
func main() {
20+
21+
log := logrus.New()
22+
log.Out = os.Stdout
23+
log.SetReportCaller(true)
24+
25+
if config.Config.AppEnv != "production" {
26+
log.SetLevel(logrus.TraceLevel)
27+
}
28+
29+
if config.Config.LogFormat == "json" {
30+
log.SetFormatter(&logrus.JSONFormatter{})
31+
}
32+
33+
topics := strings.Split(config.Config.KafkaTopic, ",")
34+
fmt.Printf("Broker : %s\n", config.Config.KafkaBroker)
35+
sigchan := make(chan os.Signal, 1)
36+
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
37+
38+
c, err := kafka.NewConsumer(&kafka.ConfigMap{
39+
"allow.auto.create.topics": true,
40+
"bootstrap.servers": config.Config.KafkaBroker,
41+
"group.id": config.Config.KafkaGroup,
42+
"session.timeout.ms": 6000,
43+
"go.events.channel.enable": true,
44+
"go.application.rebalance.enable": true,
45+
// Enable generation of PartitionEOF when the
46+
// end of a partition is reached.
47+
"enable.partition.eof": true,
48+
"auto.offset.reset": "earliest"})
49+
50+
if err != nil {
51+
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
52+
os.Exit(1)
53+
}
54+
55+
log.Infof("Created Consumer %s\n", c)
56+
57+
//define channel
58+
consume := make(chan utils.KafkaPayload, config.Config.ChannelBufferSize)
59+
60+
// spawn go routine in block state for the channel above
61+
for i := 0; i < config.Config.NoOfThreads; i++ {
62+
go consumer.ConsumeQueue(log, consume)
63+
}
64+
65+
// Note: you can also use the non-channel based approach, using the poll API as shown here:
66+
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_example/consumer_example.go
67+
68+
err = c.SubscribeTopics(topics, nil)
69+
if err != nil {
70+
log.Errorf("Error while subscribing topics: %v | Error: %s", topics, err.Error())
71+
}
72+
73+
run := true
74+
75+
for run == true {
76+
77+
select {
78+
case sig := <-sigchan:
79+
log.Infof("Caught signal %s: terminating\n", sig.String())
80+
run = false
81+
82+
case ev := <-c.Events():
83+
switch e := ev.(type) {
84+
case kafka.AssignedPartitions:
85+
log.Infof("AssignedPartitions : %v\n", e)
86+
fmt.Fprintf(os.Stderr, "%% %v\n", e)
87+
c.Assign(e.Partitions)
88+
89+
case kafka.RevokedPartitions:
90+
log.Errorf("RevokedPartitions : %v\n", e)
91+
fmt.Fprintf(os.Stderr, "%% %v\n", e)
92+
c.Unassign()
93+
94+
case *kafka.Message:
95+
log.Debugf("Buffer occupency %d / %d", len(consume), config.Config.ChannelBufferSize)
96+
log.Debugf("Picked up kafka message at : %v", time.Now())
97+
log.Infoln("Processing Kafka Message ...")
98+
99+
var payload utils.KafkaPayload
100+
err = json.Unmarshal(e.Value, &payload)
101+
consume <- payload
102+
103+
case kafka.PartitionEOF:
104+
log.Infof("Reached %v\n", e)
105+
106+
case kafka.Error:
107+
// Errors should generally be considered as informational, the client will try to automatically recover
108+
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
109+
log.Errorf("Error: %v\n", e)
110+
}
111+
}
112+
}
113+
fmt.Printf("Closing consumer\n")
114+
c.Close()
115+
}

utils/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package utils
2+
3+
type KafkaPayload struct {
4+
TaskType string `json:"taskType"`
5+
Data map[string]interface{} `json:"data"`
6+
}

utils/utils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package utils
2+
3+
/*
4+
Add generic/frequently used functions here.
5+
*/

worker/task1Worker.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package worker
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
// implement custom definition
10+
type Task1Payload struct {
11+
Id int `json:"id"`
12+
Name string `json:"name"`
13+
}
14+
15+
func PerformTask1(log *logrus.Logger, payload map[string]interface{}) error {
16+
// typecast payload to custom definition
17+
var task1Payload Task1Payload
18+
19+
raw, err := json.Marshal(payload)
20+
if err != nil {
21+
log.Errorf("Error decoding JSON : %v", err.Error())
22+
}
23+
24+
err = json.Unmarshal(raw, &task1Payload)
25+
if err != nil {
26+
log.Errorf("Error converting JSON to struct : %v", err.Error())
27+
}
28+
29+
// implement your custom logic
30+
log.Infof("Performing task 1 for id : %d , name: %s", task1Payload.Id, task1Payload.Name)
31+
32+
return nil
33+
}

0 commit comments

Comments
 (0)