Skip to content

Latest commit

 

History

History
115 lines (91 loc) · 4.21 KB

README.md

File metadata and controls

115 lines (91 loc) · 4.21 KB

Streams

Spring Cloud Streams example

Run

Start kafka

docker compose -f infra/redpanda.yml up redpanda

docker exec -it infra-redpanda-1 /bin/bash
docker exec -it infra-redpanda-1 rpk version
docker exec -it infra-redpanda-1 rpk cluster info
docker exec -it infra-redpanda-1 rpk topic delete state-out-0 city-in-0

# produce
docker exec -it infra-redpanda-1 rpk topic produce all-in-topic -k my-key
{"name": "Red", "city": "nuur", "state": "ca"}
{"name": "Red2", "city": "nuur2", "state": "ca"}
# or
docker exec -it infra-redpanda-1 /bin/bash
echo '{"name": "Red", "city": "nuur", "state": "ca"}' | rpk topic produce all-in-topic -k my-key
# consume
docker exec -it infra-redpanda-1 rpk topic consume all-in-topic
docker exec -it infra-redpanda-1 rpk topic consume state-out-topic
docker exec -it infra-redpanda-1 rpk topic consume city-out-topic

Start µService

gradle :services:streams:bootRun
# log at debug level
gradle :services:streams:bootRun --debug

Build

gradle services:streams:spotlessApply
gradle services:streams:build

Test

# list all schemas 
curl -s \
  "http://localhost:8081/subjects" \
  | jq .
# get schemas for `all-in-topic-value`
curl -s \
  "http://localhost:8081/subjects/all-in-topic-value/versions/1" \
  | jq '.schema | fromjson' 
# (or) you can see ` "sensitive": "true"` property.
curl -s \
  "http://localhost:8081/subjects/all-in-topic-value/versions/latest/schema" \
  | jq .

Operations

Metrics

http :8080/actuator

http :8080/actuator/health

http :8080/actuator/metrics
http :8080/actuator/metrics/kafka.admin.client.request.total

http :8080/actuator/bindings
http :8080/actuator/bindings/state-out-0
http :8080/actuator/bindings/generate-in-0
http :8080/actuator/bindings/print-in-0

http :8080/actuator/kafkastreamstopology
http :8080/actuator/kafkastreamstopology/<application-id of the processor>
http :8080/actuator/kafkastreamstopology/state-applicationId
http :8080/actuator/kafkastreamstopology/city-applicationId
http :8080/actuator/kafkastreamstopology/print-applicationId

Binding control

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
http :8080/actuator/bindings/print-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"PAUSED"}'  -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0

Binders

we need add kafka binder for Supplier functions to work We can only use Consumer and Function functions with KStream binder.

implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")

Reference

Example projects