Skip to content

Commit 9b8d233

Browse files
committed
Consumer Code with Multiple Camera
1 parent 6a1f1ba commit 9b8d233

File tree

4 files changed

+122
-8
lines changed

4 files changed

+122
-8
lines changed

deployment/raspbi/consumer.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import datetime
2+
from flask import Flask, Response, render_template
3+
from kafka import KafkaConsumer
4+
5+
# Fire up the Kafka Consumer
6+
topic = "testpico"
7+
brokers = ["35.189.130.4:9092"]
8+
9+
consumer = KafkaConsumer(
10+
topic,
11+
bootstrap_servers=brokers,
12+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
13+
14+
15+
# Set the consumer in a Flask App
16+
app = Flask(__name__)
17+
18+
@app.route('/')
19+
def index():
20+
return render_template('index.html')
21+
22+
@app.route('/camera_1', methods=['GET'])
23+
def camera_1():
24+
"""
25+
This is the heart of our video display. Notice we set the mimetype to
26+
multipart/x-mixed-replace. This tells Flask to replace any old images with
27+
new values streaming through the pipeline.
28+
"""
29+
return Response(
30+
get_video_stream(id),
31+
mimetype='multipart/x-mixed-replace; boundary=frame')
32+
33+
@app.route('/camera_2', methods=['GET'])
34+
def camera_2():
35+
"""
36+
This is the heart of our video display. Notice we set the mimetype to
37+
multipart/x-mixed-replace. This tells Flask to replace any old images with
38+
new values streaming through the pipeline.
39+
"""
40+
return Response(
41+
get_video_stream(id),
42+
mimetype='multipart/x-mixed-replace; boundary=frame')
43+
44+
45+
@app.route('/camera_3', methods=['GET'])
46+
def camera_3():
47+
"""
48+
This is the heart of our video display. Notice we set the mimetype to
49+
multipart/x-mixed-replace. This tells Flask to replace any old images with
50+
new values streaming through the pipeline.
51+
"""
52+
return Response(
53+
get_video_stream(id),
54+
mimetype='multipart/x-mixed-replace; boundary=frame')
55+
56+
def get_video_stream(id):
57+
"""
58+
Here is where we recieve streamed images from the Kafka Server and convert
59+
them to a Flask-readable format.
60+
"""
61+
for msg in consumer:
62+
if str(msg.value['camera_id']) == str(id):
63+
yield (b'--frame\r\n'
64+
b'Content-Type: image/jpg\r\n\r\n' + base64.b64decode(msg.value['image_bytes']) + b'\r\n\r\n')
65+
66+
if __name__ == "__main__":
67+
app.run(host='0.0.0.0', debug=True)
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<html>
2+
<head>
3+
<title>Video Streaming Demonstration</title>
4+
</head>
5+
<body>
6+
<h1>Video Streaming Demonstration</h1>
7+
<img src="{{ url_for('camera_1') }}">
8+
9+
<img src="{{ url_for('camera_2') }}">
10+
11+
<img src="{{ url_for('camera_3') }}">
12+
13+
14+
</body>
15+
</html>

kafka/consumer.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
# Fire up the Kafka Consumer
66
topic = "testpico"
7+
brokers = ["35.189.130.4:9092"]
78

89
consumer = KafkaConsumer(
910
topic,
10-
bootstrap_servers=['localhost:9093'])
11+
bootstrap_servers=brokers,
12+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
1113

1214

1315
# Set the consumer in a Flask App
@@ -17,25 +19,49 @@
1719
def index():
1820
return render_template('index.html')
1921

20-
@app.route('/video_feed', methods=['GET'])
21-
def video_feed():
22+
@app.route('/camera_1', methods=['GET'])
23+
def camera_1():
2224
"""
2325
This is the heart of our video display. Notice we set the mimetype to
2426
multipart/x-mixed-replace. This tells Flask to replace any old images with
2527
new values streaming through the pipeline.
2628
"""
2729
return Response(
28-
get_video_stream(),
30+
get_video_stream(id),
2931
mimetype='multipart/x-mixed-replace; boundary=frame')
3032

31-
def get_video_stream():
33+
@app.route('/camera_2', methods=['GET'])
34+
def camera_2():
35+
"""
36+
This is the heart of our video display. Notice we set the mimetype to
37+
multipart/x-mixed-replace. This tells Flask to replace any old images with
38+
new values streaming through the pipeline.
39+
"""
40+
return Response(
41+
get_video_stream(id),
42+
mimetype='multipart/x-mixed-replace; boundary=frame')
43+
44+
45+
@app.route('/camera_3', methods=['GET'])
46+
def camera_3():
47+
"""
48+
This is the heart of our video display. Notice we set the mimetype to
49+
multipart/x-mixed-replace. This tells Flask to replace any old images with
50+
new values streaming through the pipeline.
51+
"""
52+
return Response(
53+
get_video_stream(id),
54+
mimetype='multipart/x-mixed-replace; boundary=frame')
55+
56+
def get_video_stream(id):
3257
"""
3358
Here is where we recieve streamed images from the Kafka Server and convert
3459
them to a Flask-readable format.
3560
"""
3661
for msg in consumer:
37-
yield (b'--frame\r\n'
38-
b'Content-Type: image/jpg\r\n\r\n' + msg.value + b'\r\n\r\n')
62+
if str(msg.value['camera_id']) == str(id):
63+
yield (b'--frame\r\n'
64+
b'Content-Type: image/jpg\r\n\r\n' + base64.b64decode(msg.value['image_bytes']) + b'\r\n\r\n')
3965

4066
if __name__ == "__main__":
4167
app.run(host='0.0.0.0', debug=True)

kafka/templates/index.html

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44
</head>
55
<body>
66
<h1>Video Streaming Demonstration</h1>
7-
<img src="{{ url_for('video_feed') }}">
7+
<img src="{{ url_for('camera_1') }}">
8+
9+
<img src="{{ url_for('camera_2') }}">
10+
11+
<img src="{{ url_for('camera_3') }}">
12+
13+
814
</body>
915
</html>

0 commit comments

Comments
 (0)