|
| 1 | +import numpy as np |
| 2 | +import math |
| 3 | +import time |
| 4 | +import sys |
| 5 | +import cv2 |
| 6 | +import argparse |
| 7 | +from stabilizer import Stabilizer |
| 8 | +try: |
| 9 | + from tflite_runtime.interpreter import Interpreter |
| 10 | +except: |
| 11 | + from tensorflow.lite.python.interpreter import Interpreter |
| 12 | + |
| 13 | +from edgetpu.basic.basic_engine import BasicEngine |
| 14 | +from edgetpu.basic import edgetpu_utils |
| 15 | + |
| 16 | +fps = "" |
| 17 | +detectfps = "" |
| 18 | +framecount = 0 |
| 19 | +detectframecount = 0 |
| 20 | +time1 = 0 |
| 21 | +time2 = 0 |
| 22 | +LABELS = ['face'] |
| 23 | + |
| 24 | + |
| 25 | +class FaceDetector: |
| 26 | + def __init__(self, model_face_detect, num_threads): |
| 27 | + # Init Face Detector |
| 28 | + self.interpreter_face_detect = Interpreter(model_path=model_face_detect) |
| 29 | + try: |
| 30 | + self.interpreter_face_detect.set_num_threads(num_threads) |
| 31 | + except: |
| 32 | + print("WARNING: The installed PythonAPI of Tensorflow/Tensorflow Lite runtime does not support Multi-Thread processing.") |
| 33 | + print("WARNING: It works in single thread mode.") |
| 34 | + print("WARNING: If you want to use Multi-Thread to improve performance on aarch64/armv7l platforms, please refer to one of the below to implement a customized Tensorflow/Tensorflow Lite runtime.") |
| 35 | + print("https://github.com/PINTO0309/Tensorflow-bin.git") |
| 36 | + print("https://github.com/PINTO0309/TensorflowLite-bin.git") |
| 37 | + pass |
| 38 | + self.interpreter_face_detect.allocate_tensors() |
| 39 | + self.input_details = self.interpreter_face_detect.get_input_details()[0]['index'] |
| 40 | + self.box = self.interpreter_face_detect.get_output_details()[0]['index'] |
| 41 | + self.scores = self.interpreter_face_detect.get_output_details()[2]['index'] |
| 42 | + self.count = self.interpreter_face_detect.get_output_details()[3]['index'] |
| 43 | + |
| 44 | + |
| 45 | +class HeadPoseEstimator: |
| 46 | + def __init__(self, model_head_pose): |
| 47 | + # Init Head Pose Estimator |
| 48 | + self.devices = edgetpu_utils.ListEdgeTpuPaths(edgetpu_utils.EDGE_TPU_STATE_UNASSIGNED) |
| 49 | + self.engine = BasicEngine(model_path=model_head_pose, device_path=self.devices[0]) |
| 50 | + self.model_height = self.engine.get_input_tensor_shape()[1] |
| 51 | + self.model_width = self.engine.get_input_tensor_shape()[2] |
| 52 | + |
| 53 | + |
| 54 | +def get_square_box(box): |
| 55 | + """Get a square box out of the given box, by expanding it.""" |
| 56 | + left_x = box[0] |
| 57 | + top_y = box[1] |
| 58 | + right_x = box[2] |
| 59 | + bottom_y = box[3] |
| 60 | + |
| 61 | + box_width = right_x - left_x |
| 62 | + box_height = bottom_y - top_y |
| 63 | + |
| 64 | + # Check if box is already a square. If not, make it a square. |
| 65 | + diff = box_height - box_width |
| 66 | + delta = int(abs(diff) / 2) |
| 67 | + if diff == 0: # Already a square. |
| 68 | + return box |
| 69 | + elif diff > 0: # Height > width, a slim box. |
| 70 | + left_x -= delta |
| 71 | + right_x += delta |
| 72 | + if diff % 2 == 1: |
| 73 | + right_x += 1 |
| 74 | + else: # Width > height, a short box. |
| 75 | + top_y -= delta |
| 76 | + bottom_y += delta |
| 77 | + if diff % 2 == 1: |
| 78 | + bottom_y += 1 |
| 79 | + return [left_x, top_y, right_x, bottom_y] |
| 80 | + |
| 81 | + |
| 82 | +def draw_annotation_box(image, rotation_vector, translation_vector, camera_matrix, dist_coeefs, color=(255, 255, 255), line_width=2): |
| 83 | + """Draw a 3D box as annotation of pose""" |
| 84 | + point_3d = [] |
| 85 | + rear_size = 75 |
| 86 | + rear_depth = 0 |
| 87 | + point_3d.append((-rear_size, -rear_size, rear_depth)) |
| 88 | + point_3d.append((-rear_size, rear_size, rear_depth)) |
| 89 | + point_3d.append((rear_size, rear_size, rear_depth)) |
| 90 | + point_3d.append((rear_size, -rear_size, rear_depth)) |
| 91 | + point_3d.append((-rear_size, -rear_size, rear_depth)) |
| 92 | + |
| 93 | + front_size = 100 |
| 94 | + front_depth = 100 |
| 95 | + point_3d.append((-front_size, -front_size, front_depth)) |
| 96 | + point_3d.append((-front_size, front_size, front_depth)) |
| 97 | + point_3d.append((front_size, front_size, front_depth)) |
| 98 | + point_3d.append((front_size, -front_size, front_depth)) |
| 99 | + point_3d.append((-front_size, -front_size, front_depth)) |
| 100 | + point_3d = np.array(point_3d, dtype=np.float).reshape(-1, 3) |
| 101 | + |
| 102 | + # Map to 2d image points |
| 103 | + (point_2d, _) = cv2.projectPoints(point_3d, rotation_vector, translation_vector, camera_matrix, dist_coeefs) |
| 104 | + point_2d = np.int32(point_2d.reshape(-1, 2)) |
| 105 | + |
| 106 | + # Draw all the lines |
| 107 | + cv2.polylines(image, [point_2d], True, color, line_width, cv2.LINE_AA) |
| 108 | + cv2.line(image, tuple(point_2d[1]), tuple(point_2d[6]), color, line_width, cv2.LINE_AA) |
| 109 | + cv2.line(image, tuple(point_2d[2]), tuple(point_2d[7]), color, line_width, cv2.LINE_AA) |
| 110 | + cv2.line(image, tuple(point_2d[3]), tuple(point_2d[8]), color, line_width, cv2.LINE_AA) |
| 111 | + |
| 112 | + |
| 113 | +if __name__ == '__main__': |
| 114 | + |
| 115 | + parser = argparse.ArgumentParser() |
| 116 | + parser.add_argument("--model_face_detect", default="ssdlite_mobilenet_v2_face_300_integer_quant_with_postprocess.tflite", help="Path of the detection model.") |
| 117 | + parser.add_argument("--model_head_pose", default="head_pose_estimator_full_integer_quant_edgetpu.tflite", help="Path of the detection model.") |
| 118 | + parser.add_argument("--usbcamno", type=int, default=0, help="USB Camera number.") |
| 119 | + parser.add_argument("--camera_width", type=int, default=640, help="width.") |
| 120 | + parser.add_argument("--camera_height", type=int, default=480, help="height.") |
| 121 | + parser.add_argument("--vidfps", type=int, default=30, help="Frame rate.") |
| 122 | + parser.add_argument("--num_threads", type=int, default=4, help="Threads.") |
| 123 | + args = parser.parse_args() |
| 124 | + |
| 125 | + model_face_detect = args.model_face_detect |
| 126 | + model_head_pose = args.model_head_pose |
| 127 | + usbcamno = args.usbcamno |
| 128 | + image_width = args.camera_width |
| 129 | + image_height = args.camera_height |
| 130 | + vidfps = args.vidfps |
| 131 | + num_threads = args.num_threads |
| 132 | + |
| 133 | + # 3D model points. |
| 134 | + model_points = np.array([ |
| 135 | + (0.0, 0.0, 0.0), # Nose tip |
| 136 | + (0.0, -330.0, -65.0), # Chin |
| 137 | + (-225.0, 170.0, -135.0), # Left eye left corner |
| 138 | + (225.0, 170.0, -135.0), # Right eye right corner |
| 139 | + (-150.0, -150.0, -125.0), # Mouth left corner |
| 140 | + (150.0, -150.0, -125.0) # Mouth right corner |
| 141 | + ]) / 4.5 |
| 142 | + |
| 143 | + raw_value = [] |
| 144 | + with open('model.txt') as file: |
| 145 | + for line in file: |
| 146 | + raw_value.append(line) |
| 147 | + model_points_68 = np.array(raw_value, dtype=np.float32) |
| 148 | + model_points_68 = np.reshape(model_points_68, (3, -1)).T |
| 149 | + # Transform the model into a front view. |
| 150 | + model_points_68[:, 2] *= -1 |
| 151 | + |
| 152 | + |
| 153 | + # Camera internals |
| 154 | + focal_length = image_width |
| 155 | + camera_center = (image_width / 2, image_height / 2) |
| 156 | + camera_matrix = np.array([[focal_length, 0, camera_center[0]], [0, focal_length, camera_center[1]], [0, 0, 1]], dtype="double") |
| 157 | + |
| 158 | + # Assuming no lens distortion |
| 159 | + dist_coeefs = np.zeros((4, 1)) |
| 160 | + |
| 161 | + # Rotation vector and translation vector |
| 162 | + r_vec = np.array([[0.01891013], [0.08560084], [-3.14392813]]) |
| 163 | + t_vec = np.array([[-14.97821226], [-10.62040383], [-2053.03596872]]) |
| 164 | + |
| 165 | + # Introduce scalar stabilizers for pose. |
| 166 | + pose_stabilizers = [Stabilizer(state_num=2, measure_num=1, cov_process=0.1, cov_measure=0.1) for _ in range(6)] |
| 167 | + |
| 168 | + # Init Face Detector |
| 169 | + face_detector = FaceDetector(model_face_detect, num_threads) |
| 170 | + interpreter_face_detect = face_detector.interpreter_face_detect |
| 171 | + face_detector_input_details = face_detector.input_details |
| 172 | + face_detector_box = face_detector.box |
| 173 | + face_detector_scores = face_detector.scores |
| 174 | + face_detector_count = face_detector.count |
| 175 | + |
| 176 | + # Init Head Pose Estimator |
| 177 | + head_pose_estimator = HeadPoseEstimator(model_head_pose) |
| 178 | + |
| 179 | + # Init Camera |
| 180 | + cam = cv2.VideoCapture(usbcamno) |
| 181 | + cam.set(cv2.CAP_PROP_FPS, vidfps) |
| 182 | + cam.set(cv2.CAP_PROP_FRAME_WIDTH, image_width) |
| 183 | + cam.set(cv2.CAP_PROP_FRAME_HEIGHT, image_height) |
| 184 | + window_name = "USB Camera" |
| 185 | + cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE) |
| 186 | + |
| 187 | + |
| 188 | + while True: |
| 189 | + start_time = time.perf_counter() |
| 190 | + |
| 191 | + ret, image = cam.read() |
| 192 | + if not ret: |
| 193 | + continue |
| 194 | + |
| 195 | + # Resize and normalize image for network input |
| 196 | + frame = cv2.resize(image, (300, 300)) |
| 197 | + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| 198 | + frame = np.expand_dims(frame, axis=0) |
| 199 | + frame = frame.astype(np.float32) |
| 200 | + cv2.normalize(frame, frame, -1, 1, cv2.NORM_MINMAX) |
| 201 | + |
| 202 | + # run model - Face Detector |
| 203 | + interpreter_face_detect.set_tensor(face_detector_input_details, frame) |
| 204 | + interpreter_face_detect.invoke() |
| 205 | + |
| 206 | + # get results - Face Detector |
| 207 | + boxes = interpreter_face_detect.get_tensor(face_detector_box)[0] |
| 208 | + scores = interpreter_face_detect.get_tensor(face_detector_scores)[0] |
| 209 | + count = interpreter_face_detect.get_tensor(face_detector_count)[0] |
| 210 | + |
| 211 | + for i, (box, score) in enumerate(zip(boxes, scores)): |
| 212 | + probability = score |
| 213 | + if probability >= 0.6: |
| 214 | + if (not math.isnan(box[0]) and |
| 215 | + not math.isnan(box[1]) and |
| 216 | + not math.isnan(box[2]) and |
| 217 | + not math.isnan(box[3])): |
| 218 | + pass |
| 219 | + else: |
| 220 | + continue |
| 221 | + ymin = int(box[0] * image_height) |
| 222 | + xmin = int(box[1] * image_width) |
| 223 | + ymax = int(box[2] * image_height) |
| 224 | + xmax = int(box[3] * image_width) |
| 225 | + if ymin > ymax: |
| 226 | + continue |
| 227 | + if xmin > xmax: |
| 228 | + continue |
| 229 | + |
| 230 | + offset_y = int(abs(ymax - ymin) * 0.1) |
| 231 | + facebox = get_square_box([xmin, ymin + offset_y, xmax, ymax + offset_y]) |
| 232 | + if not (facebox[0] >= 0 and facebox[1] >= 0 and facebox[2] <= image_width and facebox[3] <= image_height): |
| 233 | + continue |
| 234 | + |
| 235 | + face_img = image[facebox[1]:facebox[3], facebox[0]:facebox[2]] |
| 236 | + face_img = cv2.resize(face_img, (128, 128)) |
| 237 | + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) |
| 238 | + face_img = np.expand_dims(face_img, axis=0) |
| 239 | + # face_img = face_img.astype(np.float32) |
| 240 | + |
| 241 | + # run model - Head Pose |
| 242 | + face_img = face_img.astype(np.uint8) |
| 243 | + |
| 244 | + # get results - Head Pose |
| 245 | + inference_time, marks = head_pose_estimator.engine.run_inference(face_img.flatten()) |
| 246 | + marks = np.reshape(marks, (-1, 2)) |
| 247 | + |
| 248 | + # Convert the marks locations from local CNN to global image. |
| 249 | + marks *= (facebox[2] - facebox[0]) |
| 250 | + marks[:, 0] += facebox[0] |
| 251 | + marks[:, 1] += facebox[1] |
| 252 | + |
| 253 | + # Try pose estimation with 68 points. |
| 254 | + if r_vec is None: |
| 255 | + (_, rotation_vector, translation_vector) = cv2.solvePnP(model_points_68, marks, camera_matrix, dist_coeefs) |
| 256 | + r_vec = rotation_vector |
| 257 | + t_vec = translation_vector |
| 258 | + (_, rotation_vector, translation_vector) = cv2.solvePnP(model_points_68, |
| 259 | + marks, |
| 260 | + camera_matrix, |
| 261 | + dist_coeefs, |
| 262 | + rvec=r_vec, |
| 263 | + tvec=t_vec, |
| 264 | + useExtrinsicGuess=True) |
| 265 | + pose = (rotation_vector, translation_vector) |
| 266 | + |
| 267 | + # Stabilize the pose. |
| 268 | + steady_pose = [] |
| 269 | + pose_np = np.array(pose).flatten() |
| 270 | + for value, ps_stb in zip(pose_np, pose_stabilizers): |
| 271 | + ps_stb.update([value]) |
| 272 | + steady_pose.append(ps_stb.state[0]) |
| 273 | + steady_pose = np.reshape(steady_pose, (-1, 3)) |
| 274 | + |
| 275 | + # Draw boxes |
| 276 | + draw_annotation_box(image, steady_pose[0], steady_pose[1], camera_matrix, dist_coeefs, color=(128, 255, 128)) |
| 277 | + cv2.putText(image, detectfps, (image_width - 170, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (38, 0, 255), 1, cv2.LINE_AA) |
| 278 | + |
| 279 | + if i >= (count - 1): |
| 280 | + break |
| 281 | + |
| 282 | + cv2.imshow('USB Camera', image) |
| 283 | + |
| 284 | + if cv2.waitKey(1)&0xFF == ord('q'): |
| 285 | + break |
| 286 | + |
| 287 | + detectframecount += 1 |
| 288 | + |
| 289 | + # FPS calculation |
| 290 | + framecount += 1 |
| 291 | + if framecount >= 10: |
| 292 | + fps = "(Playback) {:.1f} FPS".format(time1 / 10) |
| 293 | + detectfps = "(Detection) {:.1f} FPS".format(detectframecount / time2) |
| 294 | + framecount = 0 |
| 295 | + detectframecount = 0 |
| 296 | + time1 = 0 |
| 297 | + time2 = 0 |
| 298 | + end_time = time.perf_counter() |
| 299 | + elapsedTime = end_time - start_time |
| 300 | + time1 += 1 / elapsedTime |
| 301 | + time2 += elapsedTime |
0 commit comments