Source code for luna_handlers.sdk.sdk_loop.human_detector

"""
Module contains a functional for batch human detection
"""
import os
from operator import itemgetter
from typing import List, Union, Type, Tuple

from lunavl.sdk.detectors.base import ImageForRedetection
from lunavl.sdk.detectors.humandetector import HumanDetector, HumanDetection
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarper
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationModeEstimator, OrientationType
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from lunavl.sdk.image_utils.geometry import Rect
from lunavl.sdk.image_utils.image import VLImage

from .base_detector import BaseDetectorSubTask, BaseDetectorState, BaseLoopDetector, DetectorSubTask
from .enums import MultifacePolicy
from .estimation_targets import SDKEstimationTargets, SDKHumanEstimationTargets
from .monitoring import TaskMonitoringData
from .sdk_task import (
    SDKTask,
    SDKDetectableImage,
    tasksTimeMonitoring,
    HumanWarp,
    BoundingBox,
    SDKEstimation,
    FaceEstimation,
    HumanEstimation,
    FilteredEstimation,
)
from .settings import RuntimeSettings, HumanDetectorSettings
from .utils.rotation import getDetectionRotationAngle, rotateRect

LANDMARKS17_FACE_INDEXES = range(3)


[docs]class HumanDetectorState(BaseDetectorState): """ Detector worker state, process local. Final class. State contains: - logger for worker - instance of detector """ # detector and estimators _detector: HumanDetector # human warper _warper: HumanWarper # worker settings _settings: HumanDetectorSettings # orientation mode estimator orientationModeEstimator: OrientationModeEstimator @property def detector(self) -> HumanDetector: """ Get detector Returns: vl detector """ return self._detector @property def warper(self) -> HumanWarper: """ Get human warper Returns: warper """ return self._warper
[docs] @classmethod def initialize(cls, workerName: str, settings: HumanDetectorSettings) -> bool: """ Initialize state. Singleton. Initialize FaceEngine, detector, orientation mode estimator. Args: workerName: worker name settings: settings for worker Returns: True if it is first call of initialize (for process) otherwise False """ if not super().initialize(workerName, settings): return False faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider() runtimeSettings: RuntimeSettings = cls._settings.runtimeSettings bodyDetSettings = faceEngineSettingsProvider.humanDetectorSettings bodyDetSettings.scoreThreshold = cls._settings.bodyDetSettings.scoreThreshold bodyDetSettings.imageSize = cls._settings.bodyDetSettings.imageSize bodyDetSettings.redetectScoreThreshold = cls._settings.bodyDetSettings.redetectScoreThreshold bodyDetSettings.landmarks17Threshold = cls._settings.bodyDetSettings.landmarks17Threshold runtimeSettingsProvider.runtimeSettings.deviceClass = runtimeSettings.deviceClass runtimeSettingsProvider.runtimeSettings.numThreads = runtimeSettings.numThreads runtimeSettingsProvider.runtimeSettings.numComputeStreams = runtimeSettings.numComputeStreams if cls._settings.runtimeSettings.deviceClass.value == "cpu": os.environ["CUDA_VISIBLE_DEVICES"] = "-1" from lunavl.sdk.faceengine.engine import VLFaceEngine # pylint: disable-msg=C0415 faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider) cls._detector = faceEngine.createHumanDetector() cls._warper = faceEngine.createHumanWarper() cls._faceEngine = faceEngine cls.orientationModeEstimator = faceEngine.createOrientationModeEstimator() return True
[docs]class HumanDetectorSubTask(BaseDetectorSubTask): """ Sub task for human detector. Args: detections (List[HumanDetection]): human detection list """ def __init__( self, image: SDKDetectableImage, estimations: SDKEstimationTargets, getBestDetection: bool, taskId: int, taskMonitoring: TaskMonitoringData, ): super().__init__( image=image, getBestDetection=getBestDetection, taskId=taskId, taskMonitoring=taskMonitoring, estimations=estimations, ) self.detections: List[HumanDetection] = []
[docs] @staticmethod def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]: """ Create sdk image from SDKDetectableImage Args: img: image Returns: VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed """ try: vlImage = VLImage(img.body, filename=img.id) if img.humanBoundingBoxes: return ImageForRedetection(vlImage, [Rect(**img.humanBoundingBoxes[0].asDict())]) return vlImage except LunaSDKException as e: img.error = e.error return None
[docs] @classmethod def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]: """ Create sub tasks for each image in tasks Args: tasks: tasks Returns: list of sub tasks """ detectSubTasks = [] for task in tasks: for image in task.images: if image.error: # Its possible that the error was already set by a handler continue detectSubTasks.append( cls( image=image.image, estimations=task.toEstimation, getBestDetection=False, taskId=task.taskId, taskMonitoring=task.monitoringData, ) ) return detectSubTasks
[docs] @staticmethod def getEstimationTargets(estimations: SDKEstimationTargets) -> SDKHumanEstimationTargets: """ Get estimation from task. Args: estimations: task estimations Returns: human estimations """ return estimations.humanEstimationTargets
[docs]class LoopHumanDetector(BaseLoopDetector[HumanDetectorState]): """ Loop human detector """ # state class _state = HumanDetectorState @property def warper(self) -> HumanWarper: """ Get warper Returns: warper from state """ return self.state.warper
[docs] @staticmethod def getResultDetectionAsDict(subTask: HumanDetectorSubTask, detection: HumanDetection) -> dict: """ Get detection result as dict. Rotate detection if needed Args: subTask: human detector subtask detection: human detection Returns: dict with detection results """ result = detection.asDict() if subTask.imageOrientation in (None, OrientationType.NORMAL): return result imageRect = subTask.sdkImage.rect if isinstance(subTask.sdkImage, VLImage) else subTask.sdkImage.image.rect imageSize = imageRect.width, imageRect.height angle = getDetectionRotationAngle(subTask.imageOrientation) result["rect"] = rotateRect(result["rect"], imageSize, angle) return result
[docs] @staticmethod def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[HumanDetectorSubTask]): """ Collect result from sub tasks to corresponding tasks. Args: tasks: tasks subTasks: sub tasks """ for task in tasks: for imageSubTask in subTasks: if task.taskId != imageSubTask.taskId: continue if imageSubTask.error is not None: for image in task.images: if image.image.id == imageSubTask.image.id: image.image.error = imageSubTask.error break for detection in imageSubTask.detections: detectionRes = LoopHumanDetector.getResultDetectionAsDict(imageSubTask, detection) detector = LoopHumanDetector() warp = HumanWarp( detector.warper.warp(detection).warpedImage.asNPArray(), imageId=imageSubTask.image.id, filename=imageSubTask.image.filename, ) warp.isFiltered = False updateTaskWithDetectionResults( task=task, warp=warp, detection=detectionRes, isFiltered=warp.isFiltered )
[docs]def checkLandmarkInRect(landmark: Tuple[int, int], rect: BoundingBox): """ Check landmark in detection rect Args: landmark: landmark rect: detection rect Returns: True if landmark in detection rect, otherwise False """ if rect.x <= landmark[0] <= rect.x + rect.width: if rect.y <= landmark[1] <= rect.y + rect.height: return True return False
[docs]def checkLandmarkReliable(landmark: dict) -> bool: """ Check landmark reliable. There may be unreliable landmarks among the landmarks17 that do not need to be taken into account when check detections related. Unreliable landmark takes the value (0, 0) and its score is below the threshold. Args: landmark: landmark Returns: True if landmark reliable, otherwise False """ return landmark["score"] >= LoopHumanDetector().state.settings.bodyDetSettings.landmarks17Threshold
[docs]def checkDetectionsRelated(bodyDetection: dict, faceDetection: FaceEstimation) -> bool: """ Check the body detection and face detection are related to the same event. Args: bodyDetection: body detection faceDetection: face detection Returns: True if face landmarks of body detection are in face detection rect, otherwise False """ if "landmarks17" not in bodyDetection: return False bodyLandmarks = bodyDetection["landmarks17"] faceDetectonRect = BoundingBox(**faceDetection.rect) checkDone = False for landmark in filter(checkLandmarkReliable, itemgetter(*LANDMARKS17_FACE_INDEXES)(bodyLandmarks)): if not checkLandmarkInRect(landmark["point"], faceDetectonRect): return False checkDone = True return bool(checkDone)
[docs]def updateTaskWithDetectionResults(task: SDKTask, warp: HumanWarp, detection: dict, isFiltered: bool): """ Update task with collected human body detection results. Args: task: task warp: human body warp detection: human body detection result isFiltered: whether the detection is filtered """ sdkEstimation = SDKEstimation(body=HumanEstimation(warp=warp, **detection)) if isFiltered: task.filteredEstimations.append(FilteredEstimation(filename=warp.filename, estimation=sdkEstimation.body)) return for image in task.images: if image.image.id == warp.imageId: sourceImage = image break else: raise RuntimeError("Human body warp source is not found in task images.") for result in sourceImage.estimations: if result.body: continue if checkDetectionsRelated(detection, faceDetection=result.face): result.body = HumanEstimation(warp=warp, **detection) return # Related estimation not found, create a new if task.multifacePolicy is MultifacePolicy.allowed: sourceImage.estimations.append(sdkEstimation)
[docs]def detect(tasks: List[SDKTask]) -> List[SDKTask]: """ Detect human bodies Args: tasks: tasks Returns: task with estimated attributes and cropped warps """ detector = LoopHumanDetector() detector.logger.info(f"gotten {len(tasks)} tasks") detectorSubTasks: List[HumanDetectorSubTask] = HumanDetectorSubTask.createImageSubTasksForDetector(tasks) with tasksTimeMonitoring(fieldName="humanDetectTime", tasks=tasks): detector.batchDetect(detectorSubTasks) for subTask in detectorSubTasks: detector.estimateDetectionAttributes(subTask) detector.collectResultsFromSubTasksToTasks(tasks, detectorSubTasks) detector.logger.info(f"performed {len(tasks)} tasks") return tasks
[docs]def initWorker(settings: HumanDetectorSettings): """ Initialize detector worker. Init logger, initialize FSDK, create detector, initialize estimators. Args: settings: detector settings """ HumanDetectorState.initialize("luna-handlers-h-detector", settings=settings) HumanDetectorState().logger.info("human detector worker is initialized")