-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_live.py
More file actions
238 lines (205 loc) · 8.13 KB
/
api_live.py
File metadata and controls
238 lines (205 loc) · 8.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import os
import cv2
import csv
import time
import torch
import pickle
import random
import numpy as np
import requests
from collections import deque, defaultdict
from datetime import datetime
from insightface.app import FaceAnalysis
from train_arcface import SimpleClassifier
import mediapipe as mp
# YOLO 로드
try:
from ultralytics import YOLO
yolo_model = YOLO("yolov8s.pt")
yolo_model.to("cpu") # CUDA 오류 방지
print("✅ YOLO 모델 로드 완료")
except Exception as e:
print(f"❌ YOLO 모델 로드 실패: {e}")
exit()
# 설정값
CONFIRM_FRAMES = 5
DUPLICATE_INTERVAL_SEC = 10
LIVENESS_DURATION = 3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 레이블 매핑
with open('label_to_id.pkl', 'rb') as f: label2id = pickle.load(f)
id2label = {v: k for k, v in label2id.items()}
# 분류기 로드
classifier = SimpleClassifier(512, len(label2id)).to(device)
ckpt = torch.load('classifier.pth', map_location=device)
classifier.load_state_dict(ckpt['model'])
classifier.eval()
# 얼굴 감지기
provider = 'CUDAExecutionProvider' if torch.cuda.is_available() else 'CPUExecutionProvider'
app = FaceAnalysis(name='buffalo_l', providers=[provider])
app.prepare(ctx_id=0, det_size=(864, 576), det_thresh=0.5)
# FaceMesh 초기화
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=True)
instructions = ["Please open your mouth", "Turn your head to the left", "Turn your head to the right"]
current_instruction = None
instruction_start_time = None
instruction_active_until = None
waiting_for_instruction = True
liveness_passed_flag = defaultdict(bool)
user_status = defaultdict(lambda: 'EXIT')
buffer = deque(maxlen=CONFIRM_FRAMES)
last_logged = defaultdict(lambda: datetime.min)
frame_width, frame_height = 864, 576
box_size = 300
center_x, center_y = frame_width // 2, frame_height // 2
fixed_box = (
center_x - box_size // 2,
center_y - box_size // 2,
center_x + box_size // 2,
center_y + box_size // 2
)
def get_embedding(img):
faces = app.get(img)
if not faces: return None
e = faces[0].normed_embedding
return e / np.linalg.norm(e)
def send_access_log(identifier, access_type, similarity):
url = "http://13.125.140.66:8080/api/access/face"
payload = {
"identifier": identifier,
"accessType": access_type,
"similarity": round(similarity, 4)
}
try:
response = requests.post(url, json=payload, timeout=3)
if response.status_code == 200:
print(f"✅ 서버 전송 완료: {payload}")
else:
print(f"⚠️ 서버 응답 오류: {response.status_code} - {response.text}")
except Exception as e:
print(f"❌ 서버 전송 실패: {e}")
def get_face_landmarks(image):
img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
result = face_mesh.process(img_rgb)
if not result.multi_face_landmarks:
return None
h, w, _ = image.shape
return [(int(p.x * w), int(p.y * h)) for p in result.multi_face_landmarks[0].landmark]
def check_instruction_performed(instruction, landmarks):
if not landmarks: return False
if instruction == "Please open your mouth":
return abs(landmarks[14][1] - landmarks[13][1]) > 15
if instruction == "Turn your head to the right":
cx = (landmarks[33][0] + landmarks[263][0]) // 2
return landmarks[1][0] < cx - 15
if instruction == "Turn your head to the left":
cx = (landmarks[33][0] + landmarks[263][0]) // 2
return landmarks[1][0] > cx + 15
return False
def liveness_check(frame, name):
global current_instruction, instruction_start_time, instruction_active_until, waiting_for_instruction
now = time.time()
landmarks = get_face_landmarks(frame)
if waiting_for_instruction:
current_instruction = random.choice(instructions)
instruction_start_time = now
instruction_active_until = now + LIVENESS_DURATION
waiting_for_instruction = False
print(f"[{name}] Instruction: {current_instruction}")
if now <= instruction_active_until:
if check_instruction_performed(current_instruction, landmarks):
print(f"[{name}] Liveness success")
liveness_passed_flag[name] = True
waiting_for_instruction = True
else:
print(f"[{name}] Liveness failed")
liveness_passed_flag[name] = False
waiting_for_instruction = True
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)
if not cap.isOpened():
print("❌ 웹캠 열기 실패")
exit()
print("▶ Face recognition with liveness + YOLO started (press Q to quit)")
while True:
ret, frame = cap.read()
if not ret:
print("❌ 프레임 수신 실패")
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
spoofing_detected = False
try:
results = yolo_model(frame, conf=0.25, verbose=False)[0]
if results.boxes is not None:
for box in results.boxes.data.tolist():
x1, y1, x2, y2 = map(int, box[:4])
cls = int(box[5])
name_obj = yolo_model.names.get(cls, "unknown").lower()
inter_x1 = max(x1, fixed_box[0])
inter_y1 = max(y1, fixed_box[1])
inter_x2 = min(x2, fixed_box[2])
inter_y2 = min(y2, fixed_box[3])
inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
overlap_ratio = inter_area / (box_size * box_size)
if name_obj in ["cell phone", "laptop", "tablet"] and overlap_ratio > 0.3:
spoofing_detected = True
print(f"[YOLO] Spoofing detected: {name_obj}")
break
except Exception as e:
print(f"❌ YOLO 분석 오류: {e}")
emb = get_embedding(rgb)
name = "No Face"
display_text = ""
color = (255, 255, 255)
if emb is not None and not spoofing_detected:
inp = torch.tensor(emb, dtype=torch.float32).unsqueeze(0).to(device)
out = classifier(inp)
probs = torch.softmax(out, 1)
conf, pred = probs.max(1)
conf = conf.item(); pid = pred.item()
if conf > 0.7:
name = id2label[pid]
buffer.append(name)
if buffer.count(name) >= CONFIRM_FRAMES:
liveness_check(frame, name)
if liveness_passed_flag[name]:
now = datetime.now()
if (now - last_logged[name]).total_seconds() >= DUPLICATE_INTERVAL_SEC:
last_logged[name] = now
liveness_passed_flag[name] = False
prev_status = user_status[name]
new_status = "ENTRY" if prev_status == "EXIT" else "EXIT"
user_status[name] = new_status
print(f"[LOG] {name} access logged (score={conf:.4f})")
send_access_log(name, new_status, conf)
display_text = f"{name} ✅ ACCESS GRANTED"
color = (0, 255, 0)
else:
display_text = f"{name} ❌ LIVENESS FAILED"
color = (0, 0, 255)
else:
display_text = f"{name} ⏳ RECOGNIZING..."
else:
buffer.append("Unknown")
display_text = f"Unknown ({conf:.2f})"
color = (0, 0, 255)
elif spoofing_detected:
display_text = "🚨 Spoofing Device Detected"
color = (0, 0, 255)
else:
buffer.append("No Face")
cv2.rectangle(frame, (fixed_box[0], fixed_box[1]), (fixed_box[2], fixed_box[3]), color, 2)
cv2.putText(frame, display_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
if current_instruction:
cv2.putText(frame, f"[Instruction] {current_instruction}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2)
try:
cv2.imshow('Face Recognition with Liveness + YOLO', frame)
except Exception as e:
print(f"❌ imshow 오류: {e}")
break
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()