"""
Module contains a functional for batch face detection and estimation detection attributes
"""
import os
from itertools import chain
from statistics import mean
from typing import List, Union, Type, Dict, Any, Optional
from time import perf_counter
from lunavl.sdk.detectors.base import ImageForRedetection
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationModeEstimator, OrientationType
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from lunavl.sdk.image_utils.geometry import Rect
from lunavl.sdk.image_utils.image import VLImage
from .base_detector import BaseDetectorSubTask, BaseDetectorState, BaseLoopDetector, DetectorSubTask
from .enums import MultifacePolicy, SDKFaceEstimations
from .estimation_targets import SDKEstimationTargets, SDKFaceEstimationTargets
from .liveness_predictor import LivenessPredicted
from .monitoring import TaskMonitoringData
from .sdk_task import (
SDKTask,
SDKDetectableImage,
FaceWarp,
tasksTimeMonitoring,
SDKEstimation,
FaceEstimation,
FilteredEstimation,
AggregatedDetectionFaceAttributes,
)
from .settings import FaceDetectorSettings, FaceDetV12Settings, FaceDetV3Settings
from .utils.rotation import getDetectionRotationAngle, rotateRect, rotatePoints
[docs]class DetectorState(BaseDetectorState):
"""
Detector worker state, process local. Final class.
State contains:
- logger for worker
- instance of detector and estimators
"""
# detector and estimators
_detector: "VLFaceDetector"
# worker settings
_settings: FaceDetectorSettings
# orientation mode estimator
orientationModeEstimator: OrientationModeEstimator
@property
def detector(self) -> "VLFaceDetector":
"""
Get detector
Returns:
vl detector
"""
return DetectorState._detector
[docs] @classmethod
def initialize(cls, workerName: str, settings: FaceDetectorSettings) -> bool:
"""
Initialize state. Singleton.
Initialize FaceEngine, detector.
Args:
workerName: worker name
settings: settings for worker
Returns:
True if it is first call of initialize (for process) otherwise False
"""
if not super().initialize(workerName, settings):
return False
faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider()
faceDetV1Settings: FaceDetV12Settings = cls._settings.faceDetV1Settings
faceDetV2Settings: FaceDetV12Settings = cls._settings.faceDetV2Settings
faceDetV3Settings: FaceDetV3Settings = cls._settings.faceDetV3Settings
faceEngineSettingsProvider.systemSettings.defaultDetectorType = cls._settings.detectorType
faceEngineSettingsProvider.faceDetV1Settings.minFaceSize = faceDetV1Settings.minFaceSize
faceEngineSettingsProvider.faceDetV1Settings.scaleFactor = faceDetV1Settings.scaleFactor
faceEngineSettingsProvider.faceDetV2Settings.minFaceSize = faceDetV2Settings.minFaceSize
faceEngineSettingsProvider.faceDetV2Settings.scaleFactor = faceDetV2Settings.scaleFactor
faceEngineSettingsProvider.faceDetV2Settings.useLNet = faceDetV2Settings.useLNet
faceEngineSettingsProvider.faceDetV3Settings.minFaceSize = faceDetV3Settings.minFaceSize
faceEngineSettingsProvider.faceDetV3Settings.maxFaceSize = faceDetV3Settings.maxFaceSize
faceEngineSettingsProvider.faceDetV3Settings.scoreThreshold = faceDetV3Settings.scoreThreshold
faceEngineSettingsProvider.faceDetV3Settings.redetectFaceTargetSize = faceDetV3Settings.redetectFaceTargetSize
faceEngineSettingsProvider.faceDetV3Settings.redetectTensorSize = faceDetV3Settings.redetectTensorSize
faceEngineSettingsProvider.faceDetV3Settings.redetectScoreThreshold = faceDetV3Settings.redetectScoreThreshold
faceEngineSettingsProvider.livenessV1Estimator.realThreshold = cls._settings.livenessV1Settings.realThreshold
LivenessPredicted.initialize(faceEngineSettingsProvider.livenessV1Estimator.realThreshold)
runtimeSettingsProvider.runtimeSettings.deviceClass = cls._settings.runtimeSettings.deviceClass
runtimeSettingsProvider.runtimeSettings.numThreads = cls._settings.runtimeSettings.numThreads
runtimeSettingsProvider.runtimeSettings.numComputeStreams = cls._settings.runtimeSettings.numComputeStreams
if cls._settings.runtimeSettings.deviceClass.value == "cpu":
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from lunavl.sdk.luna_faces import VLFaceEngine, VLFaceDetector # pylint: disable-msg=C0415
faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider)
cls._detector = VLFaceDetector(faceEngine=faceEngine)
cls.orientationModeEstimator = faceEngine.createOrientationModeEstimator()
return True
[docs]def aggregateLivenessFromEstimates(
livenessEstimates: List[Dict[str, Any]], qualityThreshold: float, scoreThreshold: Optional[float] = None
) -> LivenessPredicted:
"""
Get aggregated liveness from estimates.
Args:
livenessEstimates: liveness input estimates
qualityThreshold: liveness quality threshold
scoreThreshold: liveness score threshold
Returns:
class: `LivenessPredicted`
"""
livenessAggregated = LivenessPredicted(
score=mean(est["estimations"]["score"] for est in livenessEstimates),
quality=mean(est["estimations"]["quality"] for est in livenessEstimates),
)
livenessAggregated.makePrediction(scoreThreshold=scoreThreshold, qualityThreshold=qualityThreshold)
return livenessAggregated
[docs]class FaceDetectorSubTask(BaseDetectorSubTask):
"""
Sub task for face detector.
Attributes:
detections (List[VLFaceDetection]): list face detections
"""
def __init__(
self,
image: SDKDetectableImage,
estimations: SDKEstimationTargets,
getBestDetection: bool,
taskId: int,
taskMonitoring: TaskMonitoringData,
):
super().__init__(
image=image,
getBestDetection=getBestDetection,
taskId=taskId,
taskMonitoring=taskMonitoring,
estimations=estimations,
)
self.detections: List["VLFaceDetection"] = []
[docs] @staticmethod
def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]:
"""
Create sdk image from SDKDetectableImage
Args:
img: image
Returns:
VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed
"""
try:
imageData = img.asPillow()
if imageData:
vlImage = VLImage(imageData, filename=img.id)
if img.faceBoundingBoxes:
return ImageForRedetection(vlImage, [Rect(**img.faceBoundingBoxes[0].asDict())])
return vlImage
except LunaSDKException as e:
img.error = e.error
return None
[docs] @classmethod
def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]:
"""
Create sub tasks for each image in tasks
Args:
tasks: tasks
Returns:
list of sub tasks
"""
detectSubTasks = []
for task in tasks:
for image in task.images:
if image.error:
# Its possible that the error was already set by a handler
continue
detectSubTasks.append(
cls(
image=image.image,
estimations=task.toEstimation,
getBestDetection=task.multifacePolicy == MultifacePolicy.getBest,
taskId=task.taskId,
taskMonitoring=task.monitoringData,
)
)
return detectSubTasks
[docs] @staticmethod
def getEstimationTargets(estimations: SDKEstimationTargets) -> SDKFaceEstimationTargets:
"""
Get estimation targets
Args:
estimations: task estimations
Returns:
task face estimation targets
"""
return estimations.faceEstimationTargets
[docs]class FaceDetector(BaseLoopDetector):
"""
Loop face detector
"""
# state class
_state = DetectorState
[docs] @staticmethod
def estimateDetectionAttributes(subTask: BaseDetectorSubTask):
"""
Estimate detection attributes for a sub task.
Args:
subTask: sub task
"""
requiredEstimation = subTask.estimations.getTargetsForFaceDetector()
for detection in subTask.detections:
for estimation in requiredEstimation:
startEstimate = perf_counter()
getattr(detection, estimation.value)
subTask.updateMonitoringData(estimation, perf_counter() - startEstimate)
[docs] @staticmethod
def getResultDetectionAsDict(subTask: FaceDetectorSubTask, detection: "VLFaceDetection") -> dict:
"""
Get detection result as dict. Rotate detection if needed
Args:
subTask: face detector subtask
detection: face detection
Returns:
dict with detection results
"""
result = detection.asDict()
if subTask.imageOrientation in (None, OrientationType.NORMAL):
return result
imageRect = subTask.sdkImage.rect if isinstance(subTask.sdkImage, VLImage) else subTask.sdkImage.image.rect
imageSize = imageRect.width, imageRect.height
angle = getDetectionRotationAngle(subTask.imageOrientation)
result["rect"] = rotateRect(result["rect"], imageSize, angle)
detectionSize = detection.boundingBox.rect.width, detection.boundingBox.rect.height
if "landmarks5" in result:
result["landmarks5"] = rotatePoints(result["landmarks5"], detectionSize, angle)
if "landmarks68" in result:
result["landmarks68"] = rotatePoints(result["landmarks68"], detectionSize, angle)
if "eyes_attributes" in result["attributes"]:
for eye in ("left_eye", "right_eye"):
for eyeLandmarks in ("iris_landmarks", "eyelid_landmarks"):
result["attributes"]["eyes_attributes"][eye][eyeLandmarks] = rotatePoints(
result["attributes"]["eyes_attributes"][eye][eyeLandmarks], detectionSize, angle
)
return result
[docs] @staticmethod
def makeLivenessPrediction(task: SDKTask, detection: "VLFaceDetection") -> None:
"""
Update SDK liveness estimation using task settings
Args:
task: SDK task
detection: SDK detection
"""
livenessPredicted = LivenessPredicted(
score=detection.liveness.score,
quality=detection.liveness.quality,
prediction=detection.liveness.prediction,
)
livenessPredicted.makePrediction(
qualityThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.qualityThreshold,
scoreThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.scoreThreshold,
)
detection._liveness = livenessPredicted # pylint: disable-msg=W0212
[docs] @staticmethod
def aggregateLivenessAndCheckState(task: SDKTask) -> None:
"""
Check whether extracted attribute is filtered, and update task with filtered result if it is.
Attributes:
task: task
Returns:
True if attribute is filtered, otherwise False
"""
imageEstimations = chain(*(image.estimations for image in task.images if not image.error))
faceEstimationsToAggregate = [image.face for image in imageEstimations]
livenessEstimates = [e.attributes["liveness"] for e in faceEstimationsToAggregate]
if not livenessEstimates:
return
livenessAggregated = aggregateLivenessFromEstimates(
livenessEstimates=livenessEstimates,
scoreThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.scoreThreshold,
qualityThreshold=task.toEstimation.faceEstimationTargets.estimateLiveness.qualityThreshold,
)
filtrationRes = task.filters.checkFilterByLiveness(livenessAggregated)
if filtrationRes["is_filtered"]:
for image in task.images:
e = image.estimations.pop()
e.face.warp.isFiltered = True
e.face.filter = filtrationRes
task.filteredEstimations.append(FilteredEstimation(filename=e.face.warp.filename, estimation=e.face))
task.aggregatedEstimations.detection.face = AggregatedDetectionFaceAttributes(
liveness=livenessAggregated.asDict(),
warps=[e.warp for e in faceEstimationsToAggregate],
filtered=filtrationRes["is_filtered"],
)
[docs] @staticmethod
def handleSubtaskDetections(subTask: FaceDetectorSubTask, task: SDKTask) -> None:
"""
Handle subtask detections and collect results to task
Args:
subTask: detection subtask
task: detection task
"""
targetToFilterMap = {
SDKFaceEstimations.headPose: task.filters.checkFilterByAngles,
SDKFaceEstimations.liveness: None if task.aggregateAttributes else task.filters.checkFilterByLiveness,
}
filterTargets = task.toEstimation.faceEstimationTargets.getTargetsForFaceDetector()
superestimates = []
if task.toEstimation.faceEstimationTargets.estimateLiveness.estimate:
superestimates.append(FaceDetector.makeLivenessPrediction)
for detection in subTask.detections:
for estimate in superestimates:
estimate(task, detection)
detectionRes = FaceDetector.getResultDetectionAsDict(subTask, detection)
detectionRes["filter"] = {"is_filtered": False, "filter_reasons": []}
for target in filterTargets:
if targetToFilterMap.get(target):
filtrationRes = targetToFilterMap[target](getattr(detection, target.value))
detectionRes["filter"]["is_filtered"] |= filtrationRes["is_filtered"]
detectionRes["filter"]["filter_reasons"].extend(filtrationRes["filter_reasons"])
warp = FaceWarp(
detection.warp.warpedImage.asNPArray(), imageId=subTask.image.id, filename=subTask.image.filename,
)
warp.isFiltered = detectionRes["filter"]["is_filtered"]
updateTaskWithDetectionResults(task=task, warp=warp, detection=detectionRes)
[docs] @staticmethod
def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[FaceDetectorSubTask]):
"""
Collect result from sub tasks to corresponding tasks.
Args:
tasks: tasks
subTasks: sub tasks
"""
for task in tasks:
for imageSubTask in subTasks:
if task.taskId != imageSubTask.taskId:
continue
if imageSubTask.error is not None:
for image in task.images:
if image.image.id == imageSubTask.image.id:
image.image.error = imageSubTask.error
break
FaceDetector.handleSubtaskDetections(subTask=imageSubTask, task=task)
if task.aggregateAttributes and task.toEstimation.faceEstimationTargets.estimateLiveness.estimate:
FaceDetector.aggregateLivenessAndCheckState(task)
if task.multifacePolicy == MultifacePolicy.notAllowed:
task.checkMultiFacesRule()
[docs]def updateTaskWithDetectionResults(task: SDKTask, warp: FaceWarp, detection: dict):
"""
Update task with collected face detection results.
Args:
task: task
warp: face warp
detection: face detection result
"""
sdkEstimation = SDKEstimation(face=FaceEstimation(warp=warp, **detection))
if warp.isFiltered:
task.filteredEstimations.append(FilteredEstimation(filename=warp.filename, estimation=sdkEstimation.face))
else:
for image in task.images:
if image.image.id == warp.imageId:
image.estimations.append(sdkEstimation)
break
else:
raise RuntimeError("Face warp source is not found in task images.")
[docs]def detect(tasks: List[SDKTask]) -> List[SDKTask]:
"""
Detect faces and estimate detection attributes.
Args:
tasks: tasks
Returns:
task with estimated attributes and cropped warps
"""
detector = FaceDetector()
detector.logger.info(f"gotten {len(tasks)} tasks")
detectorSubTasks = FaceDetectorSubTask.createImageSubTasksForDetector(tasks)
with tasksTimeMonitoring(fieldName="faceDetectTime", tasks=tasks):
detector.batchDetect(detectorSubTasks)
for subTask in detectorSubTasks:
detector.estimateDetectionAttributes(subTask)
detector.collectResultsFromSubTasksToTasks(tasks, detectorSubTasks)
detector.logger.info(f"performed {len(tasks)} tasks")
return tasks
[docs]def initWorker(settings: FaceDetectorSettings):
"""
Initialize face detector worker. Init logger, initialize FSDK, create detector, initialize estimators.
Args:
settings: detector settings
"""
DetectorState.initialize("luna-handlers-f-detector", settings=settings)
DetectorState().logger.info("detector worker is initialized")