Skip to content

Commit 782a550

Browse files
committed
Add Files
1 parent 93d5762 commit 782a550

File tree

5 files changed

+120
-94
lines changed

5 files changed

+120
-94
lines changed

kafka/consumer.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import datetime
2+
from flask import Flask, Response, render_template
3+
from kafka import KafkaConsumer
4+
5+
# Fire up the Kafka Consumer
6+
topic = "testpico"
7+
8+
consumer = KafkaConsumer(
9+
topic,
10+
bootstrap_servers=['localhost:9093'])
11+
12+
13+
# Set the consumer in a Flask App
14+
app = Flask(__name__)
15+
16+
@app.route('/')
17+
def index():
18+
return render_template('index.html')
19+
20+
@app.route('/video_feed', methods=['GET'])
21+
def video_feed():
22+
"""
23+
This is the heart of our video display. Notice we set the mimetype to
24+
multipart/x-mixed-replace. This tells Flask to replace any old images with
25+
new values streaming through the pipeline.
26+
"""
27+
return Response(
28+
get_video_stream(),
29+
mimetype='multipart/x-mixed-replace; boundary=frame')
30+
31+
def get_video_stream():
32+
"""
33+
Here is where we recieve streamed images from the Kafka Server and convert
34+
them to a Flask-readable format.
35+
"""
36+
for msg in consumer:
37+
yield (b'--frame\r\n'
38+
b'Content-Type: image/jpg\r\n\r\n' + msg.value + b'\r\n\r\n')
39+
40+
if __name__ == "__main__":
41+
app.run(host='0.0.0.0', debug=True)

kafka/docker-compose.yml

Lines changed: 59 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,73 @@
1-
version: '2'
2-
services:
1+
version: '3.4'
32

4-
zkui:
5-
image: qnib/plain-zkui@sha256:30c4aa1236ee90e4274a9059a5fa87de2ee778d9bfa3cb48c4c9aafe7cfa1a13
6-
ports:
7-
- "9090:9090"
8-
environment:
9-
ZKUI_ZK_SERVER=[192.168.0.26:12181,192.168.0.26:22181,192.168.0.26:32181]
10-
zookeeper-1:
11-
image: confluentinc/cp-zookeeper:latest
12-
hostname: zookeeper-1
13-
ports:
14-
- "12181:12181"
3+
services:
4+
zk-1: &zk
5+
image: confluentinc/cp-zookeeper:4.0.0
6+
env_file:
7+
- zk-common.env
158
environment:
169
ZOOKEEPER_SERVER_ID: 1
17-
ZOOKEEPER_CLIENT_PORT: 12181
18-
ZOOKEEPER_TICK_TIME: 2000
19-
ZOOKEEPER_INIT_LIMIT: 5
20-
ZOOKEEPER_SYNC_LIMIT: 2
21-
ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888
22-
23-
zookeeper-2:
24-
image: confluentinc/cp-zookeeper:latest
25-
hostname: zookeeper-2
26-
ports:
27-
- "22181:22181"
10+
ZOOKEEPER_SERVERS: 0.0.0.0:2888:3888;zk-2:2888:3888;zk-3:2888:3888
11+
volumes:
12+
- zk-1:/var/lib/zookeeper/data
13+
zk-2:
14+
<<: *zk
2815
environment:
2916
ZOOKEEPER_SERVER_ID: 2
30-
ZOOKEEPER_CLIENT_PORT: 22181
31-
ZOOKEEPER_TICK_TIME: 2000
32-
ZOOKEEPER_INIT_LIMIT: 5
33-
ZOOKEEPER_SYNC_LIMIT: 2
34-
ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888
35-
36-
zookeeper-3:
37-
image: confluentinc/cp-zookeeper:latest
38-
hostname: zookeeper-3
39-
ports:
40-
- "32181:32181"
17+
ZOOKEEPER_SERVERS: zk-1:2888:3888;0.0.0.0:2888:3888;zk-3:2888:3888
18+
volumes:
19+
- zk-2:/var/lib/zookeeper/data
20+
zk-3:
21+
<<: *zk
4122
environment:
4223
ZOOKEEPER_SERVER_ID: 3
43-
ZOOKEEPER_CLIENT_PORT: 32181
44-
ZOOKEEPER_TICK_TIME: 2000
45-
ZOOKEEPER_INIT_LIMIT: 5
46-
ZOOKEEPER_SYNC_LIMIT: 2
47-
ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888
24+
ZOOKEEPER_SERVERS: zk-1:2888:3888;zk-2:2888:3888;0.0.0.0:2888:3888
25+
volumes:
26+
- zk-3:/var/lib/zookeeper/data
4827

49-
kafka-1:
50-
image: confluentinc/cp-kafka:latest
51-
hostname: kafka-1
52-
ports:
53-
- "19092:19092"
54-
depends_on:
55-
- zookeeper-1
56-
- zookeeper-2
57-
- zookeeper-3
28+
kafka-1: &kafka
29+
image: confluentinc/cp-kafka:4.0.0
30+
env_file:
31+
- kafka-common.env
5832
environment:
59-
KAFKA_BROKER_ID: 1
60-
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
61-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092
62-
63-
kafka-2:
64-
image: confluentinc/cp-kafka:latest
65-
hostname: kafka-2
33+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9192,EXTERNAL://localhost:9092
34+
KAFKA_JMX_HOSTNAME: kafka-1
6635
ports:
67-
- "29092:29092"
68-
depends_on:
69-
- zookeeper-1
70-
- zookeeper-2
71-
- zookeeper-3
36+
- 9092:9092
37+
volumes:
38+
- kafka-1:/var/lib/kafka/data
39+
kafka-2:
40+
<<: *kafka
7241
environment:
73-
KAFKA_BROKER_ID: 2
74-
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
75-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092
76-
42+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:9192,EXTERNAL://localhost:9093
43+
KAFKA_JMX_HOSTNAME: kafka-2
44+
ports:
45+
- 9093:9092
46+
volumes:
47+
- kafka-2:/var/lib/kafka/data
7748
kafka-3:
78-
image: confluentinc/cp-kafka:latest
79-
hostname: kafka-3
49+
<<: *kafka
50+
environment:
51+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:9192,EXTERNAL://localhost:9094
52+
KAFKA_JMX_HOSTNAME: kafka-3
8053
ports:
81-
- "39092:39092"
82-
depends_on:
83-
- zookeeper-1
84-
- zookeeper-2
85-
- zookeeper-3
54+
- 9094:9092
55+
volumes:
56+
- kafka-3:/var/lib/kafka/data
57+
58+
kafka-manager:
59+
image: sheepkiller/kafka-manager
8660
environment:
87-
KAFKA_BROKER_ID: 3
88-
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
89-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092
61+
ZK_HOSTS: zk-1:2181,zk-2:2181,zk-3:2181
62+
JMX_PORT: 9181
63+
APPLICATION_SECRET: letmein
64+
ports:
65+
- 9000:9000
66+
67+
volumes:
68+
zk-1:
69+
zk-2:
70+
zk-3:
71+
kafka-1:
72+
kafka-2:
73+
kafka-3:

kafka/kafka-common.env

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
KAFKA_ZOOKEEPER_CONNECT=zk-1:2181,zk-2:2181,zk-3:2181
2+
KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9192
3+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
4+
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
5+
KAFKA_DEFAULT_REPLICATION_FACTOR=3
6+
KAFKA_JMX_PORT=9181

kafka/producer.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import sys
22
import time
33
import cv2
4-
from picamera.array import PiRGBArray
5-
from picamera import PiCamera
4+
# from picamera.array import PiRGBArray
5+
# from picamera import PiCamera
66
from kafka import KafkaProducer
7+
from kafka.errors import KafkaError
78

8-
topic = "distributed-video1"
9+
topic = "testpico"
910

1011
def publish_video(video_file):
1112
"""
@@ -15,7 +16,7 @@ def publish_video(video_file):
1516
:param video_file: path to video file <string>
1617
"""
1718
# Start up producer
18-
producer = KafkaProducer(bootstrap_servers='localhost:9092')
19+
producer = KafkaProducer(bootstrap_servers='10.140.0.2:9092')
1920

2021
# Open file
2122
video = cv2.VideoCapture(video_file)
@@ -48,7 +49,7 @@ def publish_camera():
4849
"""
4950

5051
# Start up producer
51-
producer = KafkaProducer(bootstrap_servers='localhost:9092')
52+
producer = KafkaProducer(bootstrap_servers='10.140.0.2:9092')
5253

5354

5455
camera = cv2.VideoCapture(0)
@@ -58,26 +59,19 @@ def publish_camera():
5859

5960
ret, buffer = cv2.imencode('.jpg', frame)
6061
producer.send(topic, buffer.tobytes())
61-
62+
image.write(buffer.tobytes())
63+
64+
6265
# Choppier stream, reduced load on processor
6366
time.sleep(0.2)
64-
67+
6568
except:
6669
print("\nExiting.")
6770
sys.exit(1)
6871

6972

7073
camera.release()
7174

72-
73-
if __name__ == '__main__':
74-
"""
75-
Producer will publish to Kafka Server a video file given as a system arg.
76-
Otherwise it will default by streaming webcam feed.
77-
"""
78-
if(len(sys.argv) > 1):
79-
video_path = sys.argv[1]
80-
publish_video(video_path)
81-
else:
82-
print("publishing feed!")
83-
publish_camera()
75+
76+
77+
publish_video('Countdown1.mp4')

kafka/zk-common.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ZOOKEEPER_CLIENT_PORT=2181

0 commit comments

Comments
 (0)