"""
Module contains a functional for batch human detection
"""
import os
from operator import itemgetter
from typing import List, Union, Type, Tuple
from lunavl.sdk.detectors.base import ImageForRedetection
from lunavl.sdk.detectors.humandetector import HumanDetector, HumanDetection
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarper
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationModeEstimator, OrientationType
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from lunavl.sdk.image_utils.geometry import Rect
from lunavl.sdk.image_utils.image import VLImage
from .base_detector import BaseDetectorSubTask, BaseDetectorState, BaseLoopDetector, DetectorSubTask
from .enums import MultifacePolicy
from .estimation_targets import SDKEstimationTargets, SDKHumanEstimationTargets
from .monitoring import TaskMonitoringData
from .sdk_task import (
SDKTask,
SDKDetectableImage,
tasksTimeMonitoring,
HumanWarp,
BoundingBox,
SDKEstimation,
FaceEstimation,
HumanEstimation,
FilteredEstimation,
)
from .settings import RuntimeSettings, HumanDetectorSettings
from .utils.rotation import getDetectionRotationAngle, rotateRect
LANDMARKS17_FACE_INDEXES = range(3)
[docs]class HumanDetectorState(BaseDetectorState):
"""
Detector worker state, process local. Final class.
State contains:
- logger for worker
- instance of detector
"""
# detector and estimators
_detector: HumanDetector
# human warper
_warper: HumanWarper
# worker settings
_settings: HumanDetectorSettings
# orientation mode estimator
orientationModeEstimator: OrientationModeEstimator
@property
def detector(self) -> HumanDetector:
"""
Get detector
Returns:
vl detector
"""
return self._detector
@property
def warper(self) -> HumanWarper:
"""
Get human warper
Returns:
warper
"""
return self._warper
[docs] @classmethod
def initialize(cls, workerName: str, settings: HumanDetectorSettings) -> bool:
"""
Initialize state. Singleton.
Initialize FaceEngine, detector, orientation mode estimator.
Args:
workerName: worker name
settings: settings for worker
Returns:
True if it is first call of initialize (for process) otherwise False
"""
if not super().initialize(workerName, settings):
return False
faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider()
runtimeSettings: RuntimeSettings = cls._settings.runtimeSettings
bodyDetSettings = faceEngineSettingsProvider.humanDetectorSettings
bodyDetSettings.scoreThreshold = cls._settings.bodyDetSettings.scoreThreshold
bodyDetSettings.imageSize = cls._settings.bodyDetSettings.imageSize
bodyDetSettings.redetectScoreThreshold = cls._settings.bodyDetSettings.redetectScoreThreshold
bodyDetSettings.landmarks17Threshold = cls._settings.bodyDetSettings.landmarks17Threshold
runtimeSettingsProvider.runtimeSettings.deviceClass = runtimeSettings.deviceClass
runtimeSettingsProvider.runtimeSettings.numThreads = runtimeSettings.numThreads
runtimeSettingsProvider.runtimeSettings.numComputeStreams = runtimeSettings.numComputeStreams
if cls._settings.runtimeSettings.deviceClass.value == "cpu":
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from lunavl.sdk.faceengine.engine import VLFaceEngine # pylint: disable-msg=C0415
faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider)
cls._detector = faceEngine.createHumanDetector()
cls._warper = faceEngine.createHumanWarper()
cls._faceEngine = faceEngine
cls.orientationModeEstimator = faceEngine.createOrientationModeEstimator()
return True
[docs]class HumanDetectorSubTask(BaseDetectorSubTask):
"""
Sub task for human detector.
Args:
detections (List[HumanDetection]): human detection list
"""
def __init__(
self,
image: SDKDetectableImage,
estimations: SDKEstimationTargets,
getBestDetection: bool,
taskId: int,
taskMonitoring: TaskMonitoringData,
):
super().__init__(
image=image,
getBestDetection=getBestDetection,
taskId=taskId,
taskMonitoring=taskMonitoring,
estimations=estimations,
)
self.detections: List[HumanDetection] = []
[docs] @staticmethod
def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]:
"""
Create sdk image from SDKDetectableImage
Args:
img: image
Returns:
VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed
"""
try:
vlImage = VLImage(img.body, filename=img.id)
if img.humanBoundingBoxes:
return ImageForRedetection(vlImage, [Rect(**img.humanBoundingBoxes[0].asDict())])
return vlImage
except LunaSDKException as e:
img.error = e.error
return None
[docs] @classmethod
def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]:
"""
Create sub tasks for each image in tasks
Args:
tasks: tasks
Returns:
list of sub tasks
"""
detectSubTasks = []
for task in tasks:
for image in task.images:
if image.error:
# Its possible that the error was already set by a handler
continue
detectSubTasks.append(
cls(
image=image.image,
estimations=task.toEstimation,
getBestDetection=False,
taskId=task.taskId,
taskMonitoring=task.monitoringData,
)
)
return detectSubTasks
[docs] @staticmethod
def getEstimationTargets(estimations: SDKEstimationTargets) -> SDKHumanEstimationTargets:
"""
Get estimation from task.
Args:
estimations: task estimations
Returns:
human estimations
"""
return estimations.humanEstimationTargets
[docs]class LoopHumanDetector(BaseLoopDetector[HumanDetectorState]):
"""
Loop human detector
"""
# state class
_state = HumanDetectorState
@property
def warper(self) -> HumanWarper:
"""
Get warper
Returns:
warper from state
"""
return self.state.warper
[docs] @staticmethod
def getResultDetectionAsDict(subTask: HumanDetectorSubTask, detection: HumanDetection) -> dict:
"""
Get detection result as dict. Rotate detection if needed
Args:
subTask: human detector subtask
detection: human detection
Returns:
dict with detection results
"""
result = detection.asDict()
if subTask.imageOrientation in (None, OrientationType.NORMAL):
return result
imageRect = subTask.sdkImage.rect if isinstance(subTask.sdkImage, VLImage) else subTask.sdkImage.image.rect
imageSize = imageRect.width, imageRect.height
angle = getDetectionRotationAngle(subTask.imageOrientation)
result["rect"] = rotateRect(result["rect"], imageSize, angle)
return result
[docs] @staticmethod
def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[HumanDetectorSubTask]):
"""
Collect result from sub tasks to corresponding tasks.
Args:
tasks: tasks
subTasks: sub tasks
"""
for task in tasks:
for imageSubTask in subTasks:
if task.taskId != imageSubTask.taskId:
continue
if imageSubTask.error is not None:
for image in task.images:
if image.image.id == imageSubTask.image.id:
image.image.error = imageSubTask.error
break
for detection in imageSubTask.detections:
detectionRes = LoopHumanDetector.getResultDetectionAsDict(imageSubTask, detection)
detector = LoopHumanDetector()
warp = HumanWarp(
detector.warper.warp(detection).warpedImage.asNPArray(),
imageId=imageSubTask.image.id,
filename=imageSubTask.image.filename,
)
warp.isFiltered = False
updateTaskWithDetectionResults(
task=task, warp=warp, detection=detectionRes, isFiltered=warp.isFiltered
)
[docs]def checkLandmarkInRect(landmark: Tuple[int, int], rect: BoundingBox):
"""
Check landmark in detection rect
Args:
landmark: landmark
rect: detection rect
Returns:
True if landmark in detection rect, otherwise False
"""
if rect.x <= landmark[0] <= rect.x + rect.width:
if rect.y <= landmark[1] <= rect.y + rect.height:
return True
return False
[docs]def checkLandmarkReliable(landmark: dict) -> bool:
"""
Check landmark reliable.
There may be unreliable landmarks among the landmarks17 that do not need to be taken into account when check
detections related. Unreliable landmark takes the value (0, 0) and its score is below the threshold.
Args:
landmark: landmark
Returns:
True if landmark reliable, otherwise False
"""
return landmark["score"] >= LoopHumanDetector().state.settings.bodyDetSettings.landmarks17Threshold
[docs]def updateTaskWithDetectionResults(task: SDKTask, warp: HumanWarp, detection: dict, isFiltered: bool):
"""
Update task with collected human body detection results.
Args:
task: task
warp: human body warp
detection: human body detection result
isFiltered: whether the detection is filtered
"""
sdkEstimation = SDKEstimation(body=HumanEstimation(warp=warp, **detection))
if isFiltered:
task.filteredEstimations.append(FilteredEstimation(filename=warp.filename, estimation=sdkEstimation.body))
return
for image in task.images:
if image.image.id == warp.imageId:
sourceImage = image
break
else:
raise RuntimeError("Human body warp source is not found in task images.")
for result in sourceImage.estimations:
if result.body:
continue
if checkDetectionsRelated(detection, faceDetection=result.face):
result.body = HumanEstimation(warp=warp, **detection)
return
# Related estimation not found, create a new
if task.multifacePolicy is MultifacePolicy.allowed:
sourceImage.estimations.append(sdkEstimation)
[docs]def detect(tasks: List[SDKTask]) -> List[SDKTask]:
"""
Detect human bodies
Args:
tasks: tasks
Returns:
task with estimated attributes and cropped warps
"""
detector = LoopHumanDetector()
detector.logger.info(f"gotten {len(tasks)} tasks")
detectorSubTasks: List[HumanDetectorSubTask] = HumanDetectorSubTask.createImageSubTasksForDetector(tasks)
with tasksTimeMonitoring(fieldName="humanDetectTime", tasks=tasks):
detector.batchDetect(detectorSubTasks)
for subTask in detectorSubTasks:
detector.estimateDetectionAttributes(subTask)
detector.collectResultsFromSubTasksToTasks(tasks, detectorSubTasks)
detector.logger.info(f"performed {len(tasks)} tasks")
return tasks
[docs]def initWorker(settings: HumanDetectorSettings):
"""
Initialize detector worker. Init logger, initialize FSDK, create detector, initialize estimators.
Args:
settings: detector settings
"""
HumanDetectorState.initialize("luna-handlers-h-detector", settings=settings)
HumanDetectorState().logger.info("human detector worker is initialized")