-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
139 lines (107 loc) · 3.83 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import json
import os
import boto3
from chalice import Chalice
from chalice import NotFoundError
from chalicelib import db
from chalicelib import rekognition
app = Chalice(app_name='media-query')
_MEDIA_DB = None
_REKOGNITION_CLIENT = None
_SUPPORTED_IMAGE_EXTENSIONS = (
'.jpg',
'.jpeg',
'.png',
)
_SUPPORTED_VIDEO_EXTENSIONS = (
'.mp4',
'.flv',
'.mov',
)
def get_media_db():
global _MEDIA_DB
if _MEDIA_DB is None:
_MEDIA_DB = db.DynamoMediaDB(
boto3.resource('dynamodb').Table(
os.environ['MEDIA_TABLE_NAME']))
return _MEDIA_DB
def get_rekognition_client():
global _REKOGNITION_CLIENT
if _REKOGNITION_CLIENT is None:
_REKOGNITION_CLIENT = rekognition.RekognitonClient(
boto3.client('rekognition'))
return _REKOGNITION_CLIENT
# Start of Event Handlers.
@app.on_s3_event(bucket=os.environ['MEDIA_BUCKET_NAME'],
events=['s3:ObjectCreated:*'])
def handle_object_created(event):
if _is_image(event.key):
_handle_created_image(bucket=event.bucket, key=event.key)
elif _is_video(event.key):
_handle_created_video(bucket=event.bucket, key=event.key)
@app.on_s3_event(bucket=os.environ['MEDIA_BUCKET_NAME'],
events=['s3:ObjectRemoved:*'])
def handle_object_removed(event):
if _is_image(event.key) or _is_video(event.key):
get_media_db().delete_media_file(event.key)
@app.on_sns_message(topic=os.environ['VIDEO_TOPIC_NAME'])
def add_video_file(event):
message = json.loads(event.message)
labels = get_rekognition_client().get_video_job_labels(message['JobId'])
get_media_db().add_media_file(
name=message['Video']['S3ObjectName'],
media_type=db.VIDEO_TYPE,
labels=labels)
@app.on_sns_message(topic=os.environ['VIDEO_TOPIC_FACE_NAME'])
def add_video_file_face(event):
message = json.loads(event.message)
emotions = get_rekognition_client().get_video_job_faces(message['JobId'])
get_media_db().add_media_file_face(
name=message['Video']['S3ObjectName'],
media_type=db.VIDEO_TYPE,
emotions=emotions)
@app.route('/',cors=True)
def list_media_files():
params = {}
if app.current_request.query_params:
params = _extract_db_list_params(app.current_request.query_params)
return get_media_db().list_media_files(**params)
@app.route('/{name}',cors=True)
def get_media_file(name):
item = get_media_db().get_media_file(name)
if item is None:
raise NotFoundError('Media file (%s) not found' % name)
return item
# End of Event Handlers.
def _extract_db_list_params(query_params):
valid_query_params = [
'startswith',
'media-type',
'label'
]
return {
k.replace('-', '_'): v
for k, v in query_params.items() if k in valid_query_params
}
def _is_image(key):
return key.endswith(_SUPPORTED_IMAGE_EXTENSIONS)
def _handle_created_image(bucket, key):
labels = get_rekognition_client().get_image_labels(bucket=bucket, key=key)
emotions = get_rekognition_client().get_image_emotions(bucket=bucket, key=key)
if 'public/' in key:
index = key.find('public/')
key = key[index+len('public/'):]
get_media_db().add_media_file(key, media_type=db.IMAGE_TYPE, labels=labels)
get_media_db().add_media_file_face(key, media_type=db.IMAGE_TYPE, emotions=emotions)
def _is_video(key):
return key.endswith(_SUPPORTED_VIDEO_EXTENSIONS)
def _handle_created_video(bucket, key):
get_rekognition_client().start_video_label_job(
bucket=bucket, key=key, topic_arn=os.environ['VIDEO_TOPIC_ARN'],
role_arn=os.environ['VIDEO_ROLE_ARN']
)
get_rekognition_client().start_video_face_job(
bucket=bucket, key=key,
topic_arn=os.environ['VIDEO_TOPIC_FACE_ARN'],
role_arn=os.environ['VIDEO_ROLE_ARN']
)