Source code for luna_handlers.sdk.sdk_loop.face_detector

"""
Module contains a functional for batch face detection and estimation detection attributes
"""
import os
from itertools import chain
from statistics import mean
from typing import List, Union, Type, Dict, Any, Optional

from time import perf_counter
from lunavl.sdk.detectors.base import ImageForRedetection
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationModeEstimator, OrientationType
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from lunavl.sdk.image_utils.geometry import Rect
from lunavl.sdk.image_utils.image import VLImage


from .base_detector import BaseDetectorSubTask, BaseDetectorState, BaseLoopDetector, DetectorSubTask
from .enums import MultifacePolicy, SDKFaceEstimations
from .estimation_targets import SDKEstimationTargets, SDKFaceEstimationTargets
from .liveness_predictor import LivenessPredicted
from .monitoring import TaskMonitoringData
from .sdk_task import (
    SDKTask,
    SDKDetectableImage,
    FaceWarp,
    tasksTimeMonitoring,
    SDKEstimation,
    FaceEstimation,
    FilteredEstimation,
    AggregatedDetectionFaceAttributes,
)
from .settings import FaceDetectorSettings, FaceDetV12Settings, FaceDetV3Settings
from .utils.rotation import getDetectionRotationAngle, rotateRect, rotatePoints


[docs]class DetectorState(BaseDetectorState): """ Detector worker state, process local. Final class. State contains: - logger for worker - instance of detector and estimators """ # detector and estimators _detector: "VLFaceDetector" # worker settings _settings: FaceDetectorSettings # orientation mode estimator orientationModeEstimator: OrientationModeEstimator @property def detector(self) -> "VLFaceDetector": """ Get detector Returns: vl detector """ return DetectorState._detector
[docs] @classmethod def initialize(cls, workerName: str, settings: FaceDetectorSettings) -> bool: """ Initialize state. Singleton. Initialize FaceEngine, detector. Args: workerName: worker name settings: settings for worker Returns: True if it is first call of initialize (for process) otherwise False """ if not super().initialize(workerName, settings): return False faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider() faceDetV1Settings: FaceDetV12Settings = cls._settings.faceDetV1Settings faceDetV2Settings: FaceDetV12Settings = cls._settings.faceDetV2Settings faceDetV3Settings: FaceDetV3Settings = cls._settings.faceDetV3Settings faceEngineSettingsProvider.systemSettings.defaultDetectorType = cls._settings.detectorType faceEngineSettingsProvider.faceDetV1Settings.minFaceSize = faceDetV1Settings.minFaceSize faceEngineSettingsProvider.faceDetV1Settings.scaleFactor = faceDetV1Settings.scaleFactor faceEngineSettingsProvider.faceDetV2Settings.minFaceSize = faceDetV2Settings.minFaceSize faceEngineSettingsProvider.faceDetV2Settings.scaleFactor = faceDetV2Settings.scaleFactor faceEngineSettingsProvider.faceDetV2Settings.useLNet = faceDetV2Settings.useLNet faceEngineSettingsProvider.faceDetV3Settings.minFaceSize = faceDetV3Settings.minFaceSize faceEngineSettingsProvider.faceDetV3Settings.maxFaceSize = faceDetV3Settings.maxFaceSize faceEngineSettingsProvider.faceDetV3Settings.scoreThreshold = faceDetV3Settings.scoreThreshold faceEngineSettingsProvider.faceDetV3Settings.redetectFaceTargetSize = faceDetV3Settings.redetectFaceTargetSize faceEngineSettingsProvider.faceDetV3Settings.redetectTensorSize = faceDetV3Settings.redetectTensorSize faceEngineSettingsProvider.faceDetV3Settings.redetectScoreThreshold = faceDetV3Settings.redetectScoreThreshold faceEngineSettingsProvider.livenessV1Estimator.realThreshold = cls._settings.livenessV1Settings.realThreshold LivenessPredicted.initialize(faceEngineSettingsProvider.livenessV1Estimator.realThreshold) runtimeSettingsProvider.runtimeSettings.deviceClass = cls._settings.runtimeSettings.deviceClass runtimeSettingsProvider.runtimeSettings.numThreads = cls._settings.runtimeSettings.numThreads runtimeSettingsProvider.runtimeSettings.numComputeStreams = cls._settings.runtimeSettings.numComputeStreams if cls._settings.runtimeSettings.deviceClass.value == "cpu": os.environ["CUDA_VISIBLE_DEVICES"] = "-1" from lunavl.sdk.luna_faces import VLFaceEngine, VLFaceDetector # pylint: disable-msg=C0415 faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider) cls._detector = VLFaceDetector(faceEngine=faceEngine) cls.orientationModeEstimator = faceEngine.createOrientationModeEstimator() return True
[docs]def aggregateLivenessFromEstimates( livenessEstimates: List[Dict[str, Any]], qualityThreshold: float, scoreThreshold: Optional[float] = None ) -> LivenessPredicted: """ Get aggregated liveness from estimates. Args: livenessEstimates: liveness input estimates qualityThreshold: liveness quality threshold scoreThreshold: liveness score threshold Returns: class: `LivenessPredicted` """ livenessAggregated = LivenessPredicted( score=mean(est["estimations"]["score"] for est in livenessEstimates), quality=mean(est["estimations"]["quality"] for est in livenessEstimates), ) livenessAggregated.makePrediction(scoreThreshold=scoreThreshold, qualityThreshold=qualityThreshold) return livenessAggregated
[docs]class FaceDetectorSubTask(BaseDetectorSubTask): """ Sub task for face detector. Attributes: detections (List[VLFaceDetection]): list face detections """ def __init__( self, image: SDKDetectableImage, estimations: SDKEstimationTargets, getBestDetection: bool, taskId: int, taskMonitoring: TaskMonitoringData, ): super().__init__( image=image, getBestDetection=getBestDetection, taskId=taskId, taskMonitoring=taskMonitoring, estimations=estimations, ) self.detections: List["VLFaceDetection"] = []
[docs] @staticmethod def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]: """ Create sdk image from SDKDetectableImage Args: img: image Returns: VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed """ try: imageData = img.asPillow() if imageData: vlImage = VLImage(imageData, filename=img.id) if img.faceBoundingBoxes: return ImageForRedetection(vlImage, [Rect(**img.faceBoundingBoxes[0].asDict())]) return vlImage except LunaSDKException as e: img.error = e.error return None
[docs] @classmethod def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]: """ Create sub tasks for each image in tasks Args: tasks: tasks Returns: list of sub tasks """ detectSubTasks = [] for task in tasks: for image in task.images: if image.error: # Its possible that the error was already set by a handler continue detectSubTasks.append( cls( image=image.image, estimations=task.toEstimation, getBestDetection=task.multifacePolicy == MultifacePolicy.getBest, taskId=task.taskId, taskMonitoring=task.monitoringData, ) ) return detectSubTasks
[docs] @staticmethod def getEstimationTargets(estimations: SDKEstimationTargets) -> SDKFaceEstimationTargets: """ Get estimation targets Args: estimations: task estimations Returns: task face estimation targets """ return estimations.faceEstimationTargets
[docs]class FaceDetector(BaseLoopDetector): """ Loop face detector """ # state class _state = DetectorState
[docs] @staticmethod def estimateDetectionAttributes(subTask: BaseDetectorSubTask): """ Estimate detection attributes for a sub task. Args: subTask: sub task """ requiredEstimation = subTask.estimations.getTargetsForFaceDetector() for detection in subTask.detections: for estimation in requiredEstimation: startEstimate = perf_counter() getattr(detection, estimation.value) subTask.updateMonitoringData(estimation, perf_counter() - startEstimate)
[docs] @staticmethod def getResultDetectionAsDict(subTask: FaceDetectorSubTask, detection: "VLFaceDetection") -> dict: """ Get detection result as dict. Rotate detection if needed Args: subTask: face detector subtask detection: face detection Returns: dict with detection results """ result = detection.asDict() if subTask.imageOrientation in (None, OrientationType.NORMAL): return result imageRect = subTask.sdkImage.rect if isinstance(subTask.sdkImage, VLImage) else subTask.sdkImage.image.rect imageSize = imageRect.width, imageRect.height angle = getDetectionRotationAngle(subTask.imageOrientation) result["rect"] = rotateRect(result["rect"], imageSize, angle) detectionSize = detection.boundingBox.rect.width, detection.boundingBox.rect.height if "landmarks5" in result: result["landmarks5"] = rotatePoints(result["landmarks5"], detectionSize, angle) if "landmarks68" in result: result["landmarks68"] = rotatePoints(result["landmarks68"], detectionSize, angle) if "eyes_attributes" in result["attributes"]: for eye in ("left_eye", "right_eye"): for eyeLandmarks in ("iris_landmarks", "eyelid_landmarks"): result["attributes"]["eyes_attributes"][eye][eyeLandmarks] = rotatePoints( result["attributes"]["eyes_attributes"][eye][eyeLandmarks], detectionSize, angle ) return result
[docs] @staticmethod def makeLivenessPrediction(task: SDKTask, detection: "VLFaceDetection") -> None: """ Update SDK liveness estimation using task settings Args: task: SDK task detection: SDK detection """ livenessPredicted = LivenessPredicted( score=detection.liveness.score, quality=detection.liveness.quality, prediction=detection.liveness.prediction, ) livenessPredicted.makePrediction( qualityThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.qualityThreshold, scoreThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.scoreThreshold, ) detection._liveness = livenessPredicted # pylint: disable-msg=W0212
[docs] @staticmethod def aggregateLivenessAndCheckState(task: SDKTask) -> None: """ Check whether extracted attribute is filtered, and update task with filtered result if it is. Attributes: task: task Returns: True if attribute is filtered, otherwise False """ imageEstimations = chain(*(image.estimations for image in task.images if not image.error)) faceEstimationsToAggregate = [image.face for image in imageEstimations] livenessEstimates = [e.attributes["liveness"] for e in faceEstimationsToAggregate] if not livenessEstimates: return livenessAggregated = aggregateLivenessFromEstimates( livenessEstimates=livenessEstimates, scoreThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.scoreThreshold, qualityThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.qualityThreshold, ) filtrationRes = task.filters.checkFilterByLiveness(livenessAggregated) if filtrationRes["is_filtered"]: for image in task.images: e = image.estimations.pop() e.face.warp.isFiltered = True e.face.filter = filtrationRes task.filteredEstimations.append(FilteredEstimation(filename=e.face.warp.filename, estimation=e.face)) task.aggregatedEstimations.detection.face = AggregatedDetectionFaceAttributes( liveness=livenessAggregated.asDict(), warps=[e.warp for e in faceEstimationsToAggregate], filtered=filtrationRes["is_filtered"], )
[docs] @staticmethod def handleSubtaskDetections(subTask: FaceDetectorSubTask, task: SDKTask) -> None: """ Handle subtask detections and collect results to task Args: subTask: detection subtask task: detection task """ targetToFilterMap = { SDKFaceEstimations.headPose: task.filters.checkFilterByAngles, SDKFaceEstimations.liveness: None if task.aggregateAttributes else task.filters.checkFilterByLiveness, } filterTargets = task.toEstimation.faceEstimationTargets.getTargetsForFaceDetector() superestimates = [] if task.toEstimation.faceEstimationTargets.estimateLiveness.estimate: superestimates.append(FaceDetector.makeLivenessPrediction) for detection in subTask.detections: for estimate in superestimates: estimate(task, detection) detectionRes = FaceDetector.getResultDetectionAsDict(subTask, detection) detectionRes["filter"] = {"is_filtered": False, "filter_reasons": []} for target in filterTargets: if targetToFilterMap.get(target): filtrationRes = targetToFilterMap[target](getattr(detection, target.value)) detectionRes["filter"]["is_filtered"] |= filtrationRes["is_filtered"] detectionRes["filter"]["filter_reasons"].extend(filtrationRes["filter_reasons"]) warp = FaceWarp( detection.warp.warpedImage.asNPArray(), imageId=subTask.image.id, filename=subTask.image.filename, ) warp.isFiltered = detectionRes["filter"]["is_filtered"] updateTaskWithDetectionResults(task=task, warp=warp, detection=detectionRes)
[docs] @staticmethod def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[FaceDetectorSubTask]): """ Collect result from sub tasks to corresponding tasks. Args: tasks: tasks subTasks: sub tasks """ for task in tasks: for imageSubTask in subTasks: if task.taskId != imageSubTask.taskId: continue if imageSubTask.error is not None: for image in task.images: if image.image.id == imageSubTask.image.id: image.image.error = imageSubTask.error break FaceDetector.handleSubtaskDetections(subTask=imageSubTask, task=task) if task.aggregateAttributes and task.toEstimation.faceEstimationTargets.estimateLiveness.estimate: FaceDetector.aggregateLivenessAndCheckState(task) if task.multifacePolicy == MultifacePolicy.notAllowed: task.checkMultiFacesRule()
[docs]def updateTaskWithDetectionResults(task: SDKTask, warp: FaceWarp, detection: dict): """ Update task with collected face detection results. Args: task: task warp: face warp detection: face detection result """ sdkEstimation = SDKEstimation(face=FaceEstimation(warp=warp, **detection)) if warp.isFiltered: task.filteredEstimations.append(FilteredEstimation(filename=warp.filename, estimation=sdkEstimation.face)) else: for image in task.images: if image.image.id == warp.imageId: image.estimations.append(sdkEstimation) break else: raise RuntimeError("Face warp source is not found in task images.")
[docs]def detect(tasks: List[SDKTask]) -> List[SDKTask]: """ Detect faces and estimate detection attributes. Args: tasks: tasks Returns: task with estimated attributes and cropped warps """ detector = FaceDetector() detector.logger.info(f"gotten {len(tasks)} tasks") detectorSubTasks = FaceDetectorSubTask.createImageSubTasksForDetector(tasks) with tasksTimeMonitoring(fieldName="faceDetectTime", tasks=tasks): detector.batchDetect(detectorSubTasks) for subTask in detectorSubTasks: detector.estimateDetectionAttributes(subTask) detector.collectResultsFromSubTasksToTasks(tasks, detectorSubTasks) detector.logger.info(f"performed {len(tasks)} tasks") return tasks
[docs]def initWorker(settings: FaceDetectorSettings): """ Initialize face detector worker. Init logger, initialize FSDK, create detector, initialize estimators. Args: settings: detector settings """ DetectorState.initialize("luna-handlers-f-detector", settings=settings) DetectorState().logger.info("detector worker is initialized")