Snippets

Created by Emre Şahin
import sys
from os import path
import os
# print(os.environ['LD_LIBRARY_PATH'])
# print(os.environ['PATH'])

import gc

import cv2
import numpy as np
import datetime as dt
import os
import time
import io

from PySide2 import QtCore as qtc
from PySide2 import QtWidgets as qtw
from PySide2 import QtGui as qtg

import face_recognition as fr
import database_api as db

import camera_dialog as cd
import person_dialog as pd
import history_dialog as hd

import camera_controller as cc

import history_recorder
import video_recorder

import redis
import rq

from utils import *

log = init_logging()

R = redis.Redis(host='localhost', port=6379)
history_queue = rq.Queue("history", connection=R)
video_queue = rq.Queue("video", connection=R)
VIDEO_RECORD_TIMEOUT = 3600


class FaceRecognitionWidget(qtw.QWidget):
    def __init__(self, camera_controller, parent=None):
        super().__init__(parent)
        self.camera_controller = camera_controller
        self.video_device = camera_controller.device
        self.recognizer = fr.FaceRecognizer_v1()
        self.font = cv2.FONT_HERSHEY_COMPLEX_SMALL
        self.text_color = (255, 255, 0)
        self.rectangle_color = (255, 0, 255)

        self.image = qtg.QImage()
        self.timer = qtc.QBasicTimer()
        self.camera = cv2.VideoCapture(self.video_device)

    def start_timer(self):
        self.timer.start(0, self)

    def stop_timer(self):
        self.timer.stop()

    def timerEvent(self, event):
        if (event.timerId() != self.timer.timerId()):
            return

        read, image = self.camera.read()
        if read:
            self.process_image(image)

    def add_frame_to_redis(self, image_data):
        t = time.time()
        key = 'frame:{}'.format(t)
        log.debug("key: %s", key)
        filename = video_recorder.get_video_filename(
            t, self.camera_controller.camera_id)
        # memfile = io.BytesIO()
        # we can use savez here if memory becomes an issue, I prefer speed
        # np.save(memfile, image_data)
        R.hmset(
            key,
            {
                'time': t,
                'camera_id': self.camera_controller.camera_id,
                # 'image_string': memfile.getvalue(),
                'image_data': image_data.tostring(),
                'image_shape_x': image_data.shape[0],
                'image_shape_y': image_data.shape[1],
                'image_shape_z': image_data.shape[2],
                'image_dtype': image_data.dtype,
                'filename': filename
            })
        R.rpush('framekeys', key)
        log.debug("R.llen(framekeys): %s", R.llen('framekeys'))
        return filename

    def process_image(self, image_data):
        video_filename = self.add_frame_to_redis(image_data)
        faces = self.recognizer.predict_faces(image_data)
        # image_data_processed = image_data.copy()
        log.debug("faces: %s", faces)
        for coords, person_id in faces:
            log.debug(coords)
            log.debug(person_id)
            (x, y, w, h) = coords
            db_result = []
            if person_id is not None:
                person_id = int(person_id)
                db_result = db.person_by_id(person_id)
            log.debug("db_result: %s", db_result)
            if db_result == []:
                name = "Unknown Unknown"
                title = "Unknown"
                notes = ""
            else:
                assert (len(db_result) == 1)
                (person_id_, name, title, notes) = db_result[0]
            log.debug("person_id: {} title: {} name: {}".format(
                person_id, title, name))
            face_image = image_data[x:(x + w), y:(y + h)]

            history_queue.enqueue(history_recorder.record, time.time(),
                                  self.camera_controller.camera_id, person_id,
                                  image_data, face_image, video_filename)

            # SYNC CALL FOR DEBUG history_recorder.record(time.time(),
            # self.camera_controller.camera_id, person_id, image_data,
            # face_image, video_filename)

            cv2.putText(image_data, name, (y, x - 5), self.font, 1.0,
                        self.text_color, 2)
            cv2.rectangle(image_data, (y, x), (y + h, x + w),
                          self.rectangle_color, 2)
        qs = self.size()
        image_data = cv2.resize(image_data, (qs.height(), qs.width()))
        self.image = get_qimage(image_data)
        # if self.image.size() != self.size():
        #     self.setFixedSize(self.image.size())

        self.update()

    def paintEvent(self, event):
        painter = qtg.QPainter(self)
        painter.drawImage(0, 0, self.image)
        del self.image
        self.image = qtg.QImage()


class MainWidget(qtw.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.camera_controllers = cc.get_camera_controllers()
        self.camera_controllers = {
            k: c
            for k, c in self.camera_controllers.items()
            if (c.device != '' and os.path.exists(c.device))
        }

        self.run_button = qtw.QPushButton('Start')
        self.camera_configuration_button = qtw.QPushButton('Camera Config')
        self.camera_configuration_button.clicked.connect(
            self.camera_config_dialog)
        self.history_button = qtw.QPushButton('History')
        self.history_button.clicked.connect(self.history_dialog)
        self.people_button = qtw.QPushButton('People')
        self.people_button.clicked.connect(self.people_dialog)
        self.close_button = qtw.QPushButton('Close')
        self.close_button.clicked.connect(self.close)

        self.recognizers = []
        log.debug("Cameras: {}".format(self.camera_controllers))
        for cam_id, cam in self.camera_controllers.items():
            log.debug("Running for {}: {}".format(cam.name, cam.command))
            cam.run_command()
            frw = FaceRecognitionWidget(camera_controller=cam, parent=self)
            self.recognizers.append(frw)
            self.run_button.clicked.connect(frw.start_timer)

        # Connect the image data signal and slot together
        # image_data_slot = self.face_detection_widget.image_data_slot
        # self.record_video.image_data.connect(image_data_slot)
        # #
        # connect the run button to the start recording slot
        # self.run_button.clicked.connect(self.record_video.start_recording)

        # Create and set the layout

        if len(self.recognizers) == 0:
            qtw.QMessageBox.warning(self, "No Cams", "No Cameras Are Present")
            self.camera_config_dialog()
        elif len(self.recognizers) > 1:
            self.setLayout(self.layout_for_4_cameras())
        else:
            self.setLayout(self.layout_for_1_camera())

        for frw in self.recognizers:
            frw.start_timer()

    def closeEvent(self, event):
        log.debug("camera_controllers: %s", self.camera_controllers)

        for cam_id, cam in self.camera_controllers.items():
            log.debug("cam_id: %s", cam_id)
            cam.kill_command()
            log.debug("cam.process: %s", cam.process)

        for frw in self.recognizers:
            frw.stop_timer()

        log.debug("before event: %s", event)
        event.accept()
        log.debug("after event: %s", event)

    def history_dialog(self):
        history_dialog = hd.HistoryDialog(self.recognizers[0].recognizer, self)
        history_dialog.exec_()

    def camera_config_dialog(self):
        dialog = cd.CameraConfigurationDialog(self)
        dialog.exec_()

    def people_dialog(self):
        dialog = pd.PersonDialog(self.recognizers[0].recognizer, self)
        dialog.exec_()

    def layout_for_buttons(self):
        layout = qtw.QHBoxLayout()
        layout.addWidget(self.run_button, 1)
        layout.addSpacing(1)

        layout.addWidget(self.camera_configuration_button, 1)
        layout.addSpacing(1)

        layout.addWidget(self.history_button, 1)
        layout.addSpacing(1)

        layout.addWidget(self.people_button, 1)
        layout.addSpacing(1)

        layout.addWidget(self.close_button, 1)
        layout.addSpacing(1)

        return layout

    def layout_for_1_camera(self):
        layout = qtw.QVBoxLayout()
        layout.addWidget(self.recognizers[0], 2)
        layout.addLayout(self.layout_for_buttons(), 0)
        return layout

    def layout_for_4_cameras(self):
        layout = qtw.QVBoxLayout()
        camera_layout = qtw.QGridLayout()
        try:
            camera_layout.addWidget(self.recognizers[0], 0, 0, 1, 1)
            camera_layout.addWidget(self.recognizers[1], 0, 1, 1, 1)
            camera_layout.addWidget(self.recognizers[2], 1, 0, 1, 1)
            camera_layout.addWidget(self.recognizers[3], 1, 1, 1, 1)
        except IndexError:
            pass

        layout.addLayout(camera_layout, 2)
        layout.addLayout(self.layout_for_buttons(), 0)
        return layout


def main():
    app = qtw.QApplication(sys.argv)
    main_window = qtw.QMainWindow()
    main_widget = MainWidget()
    main_widget.setAttribute(qtc.Qt.WA_DeleteOnClose, True)
    main_window.setAttribute(qtc.Qt.WA_DeleteOnClose, True)
    main_window.setCentralWidget(main_widget)
    main_widget.close_button.clicked.connect(main_window.close)
    main_window.showFullScreen()
    app_return = app.exec_()
    log.debug("app_return: %s", app_return)
    return sys.exit(app_return)


if __name__ == '__main__':
    script_dir = path.dirname(path.realpath(__file__))
    # cascade_filepath = path.join(script_dir,
    #                              'haarcascade_frontalface_default.xml')

    # cascade_filepath = path.abspath(cascade_filepath)
    main()

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.