"""
Common human and face detections functions
"""
from abc import ABC, abstractmethod
from enum import Enum
from functools import cmp_to_key
from typing import List, TypeVar, Generic, Union, Optional, Type
from lunavl.sdk.detectors.base import ImageForRedetection, BaseDetection, ImageForDetection
from lunavl.sdk.detectors.humandetector import HumanDetection, HumanDetector
from lunavl.sdk.errors.errors import LunaVLError, ErrorInfo
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationType, OrientationModeEstimator
from lunavl.sdk.image_utils.image import VLImage
from .crutches_on_wheels.utils.log import Logger
from .estimation_targets import SDKEstimationTargets, BaseEstimationTargets
from .monitoring import MonitoringField, TaskMonitoringData
from .sdk_task import SDKTask, SDKDetectableImage
from .utils.rotation import rotateImage
from .utils.worker_state import State
CONF_THR = 0.99 # confidence detection comparison threshold
AREA_THR = 1.5 # area detection comparison threshold
#: generic type for allowed values type of detections
DetectorType = TypeVar("DetectorType", "VLFaceDetection", HumanDetection) # pylint: disable-msg=C0103
#: generic type for detector subtasks
DetectorSubTask = TypeVar("DetectorSubTask", bound="BaseDetectorSubTask")
#: generic type for detector state
DetectorState = TypeVar("DetectorState", bound="BaseDetectorState")
[docs]class Detections(Generic[DetectorType]):
"""
Detection comparison class
Attributes:
confThreshold: confidence detection comparison threshold
areaThreshold: area detection comparison threshold
image: image
detections: face detections from the image sorted by combined criteria of detection confidence, detection area
and manhattan distance from detection center to the center of the image
bestDetection: best face detection from the image
"""
__slots__ = ("confThreshold", "areaThreshold", "image", "detections", "bestDetection")
def __init__(self, image: VLImage, detections: List[DetectorType]):
"""
Args:
image: image
detections: face detections from the image
"""
self.confThreshold = CONF_THR
self.areaThreshold = AREA_THR
self.image = image
self.detections = detections
self.bestDetection = self.getBestDetection() if detections else None
def _compareCentreManhattan(self, detection1: DetectorType, detection2: DetectorType) -> int:
"""
Compare two detections by manhattan distance to the center of the image
Args:
detection1: first detection to compare
detection2: second detection to compare
Returns:
1 if detection1 is better than detection2 by manhattan distance criterion else -1
"""
imageCenter = self.image.rect.center
center1 = detection1.boundingBox.rect.center
center2 = detection2.boundingBox.rect.center
distance1 = abs(center1.x - imageCenter.x) + abs(center1.y - imageCenter.y)
distance2 = abs(center2.x - imageCenter.x) + abs(center2.y - imageCenter.y)
if distance1 < distance2:
return 1
return -1
@staticmethod
def _compareConfidence(detection1: DetectorType, detection2: DetectorType) -> int:
"""
Compare two detections by confidence
Args:
detection1: first detection to compare
detection2: second detection to compare
Returns:
1 if detection1 is better than detection2 by confidence criterion else -1
"""
if detection1.boundingBox.score > detection2.boundingBox.score:
return 1
return -1
def _comparerV3(self, detection1: DetectorType, detection2: DetectorType) -> int:
"""
Compare two detections by combined criteria of detection confidence, detection area and
manhattan distance from detection center to the center of the image
Args:
detection1: first detection to compare
detection2: second detection to compare
Returns:
1 if detection1 is better than detection2 by combined criterion else -1
"""
if detection1.boundingBox.score > self.confThreshold and detection2.boundingBox.score > self.confThreshold:
areaRatio = detection1.boundingBox.rect.getArea() / detection2.boundingBox.rect.getArea()
if areaRatio > self.areaThreshold:
return 1
if areaRatio < 1 / self.areaThreshold:
return -1
return self._compareCentreManhattan(detection1, detection2)
return Detections._compareConfidence(detection1, detection2)
[docs] def getBestDetection(self) -> DetectorType:
"""
Get best detection from the image.
Returns:
best face detection from the image
"""
self.detections.sort(key=cmp_to_key(self._comparerV3), reverse=True)
return self.detections[0]
[docs]class BaseDetectorState(State, ABC):
"""
Detector worker state, process local. Final class.
State contains:
- logger for worker
- instance of detector
"""
# detector and estimators
_detector: Union[HumanDetector, "VLFaceDetector"]
# face engine
_faceEngine: "VLFaceEngine"
# image orientation mode estimator
orientationModeEstimator: OrientationModeEstimator
@property
def faceEngine(self) -> "VLFaceEngine":
"""
Get instance of FaceEngine
Returns:
_faceEngine
"""
return self._faceEngine
@property
@abstractmethod
def detector(self) -> Union["VLFaceDetector", HumanDetector]:
"""
Get detector
Returns:
detector from state
"""
[docs]class BaseDetectorSubTask(ABC):
"""
Sub task for detector.
Attributes:
taskId (int): original task id
image (SDKDetectableImage): an image for detection and further estimation of the detect attributes.
detections (List[VLFaceDetection]): human detections from the image
sdkImage (Optional[VLImage, ImageForRedetection]): loaded sdk image if the image is a valid image otherwise None
isValid (bool): whether the image is loaded into a sdkImage
estimations (SDKEstimationTargets): estimation targets, set this param for estimations.
getBestDetection (bool): whether to get best detection
originTaskMonitoring (TaskMonitoringData): monitoring data of the original task
error (LunaVLError): error occurred during task execution
imageOrientation (Optional[OrientationType]): image orientation mode
"""
__slots__ = (
"taskId",
"detections",
"image",
"sdkImage",
"isValid",
"estimations",
"getBestDetection",
"originTaskMonitoring",
"error",
"imageOrientation",
)
def __init__(
self,
image: SDKDetectableImage,
getBestDetection: bool,
taskId: int,
taskMonitoring: TaskMonitoringData,
estimations: SDKEstimationTargets,
):
self.taskId = taskId
self.detections: List[BaseDetection] = []
self.image: SDKDetectableImage = image
self.error: Union[ErrorInfo, None] = None
self.sdkImage: Optional[Union[VLImage, ImageForRedetection]] = self.createImageForDetection(self.image)
self.imageOrientation: Optional[OrientationType] = None
self.isValid = bool(self.sdkImage)
self.getBestDetection = getBestDetection
self.estimations = self.getEstimationTargets(estimations)
self.originTaskMonitoring: TaskMonitoringData = taskMonitoring
[docs] @staticmethod
@abstractmethod
def getEstimationTargets(estimations: SDKEstimationTargets) -> BaseEstimationTargets:
"""
Get estimation targets for sub task.
Args:
estimations: task estimations
"""
[docs] @staticmethod
@abstractmethod
def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]:
"""
Create sdk image from SDKDetectableImage
Args:
img: image
Returns:
VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed
"""
[docs] def updateMonitoringData(self, estimation: Enum, estimationTime: float):
"""
Update monitoring data of the origin task. Add estimation time of a estimation to times from other subtasks.
Args:
estimation: estimation type
estimationTime: estimation time
"""
monitoringField: MonitoringField = getattr(self.originTaskMonitoring, f"{estimation.value}EstimationTime")
if monitoringField.value is not None:
monitoringField.value += estimationTime
else:
monitoringField.value = estimationTime
[docs] @classmethod
@abstractmethod
def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]:
"""
Create sub tasks for each image in tasks
Args:
tasks: tasks
Returns:
list of sub tasks
"""
[docs]class BaseLoopDetector(Generic[DetectorState]):
"""
Base class for detectors
"""
# state class
_state: Type[DetectorState]
@property
def state(self) -> DetectorState:
"""
Get state of detector.
Returns:
state instance
"""
return self._state()
@property
def detector(self) -> Union["VLFaceDetector", HumanDetector]:
"""
Get detector
Returns:
detector from state
"""
return self.state.detector
@property
def logger(self) -> Logger:
"""
Get Logger.
Returns:
logger from state
"""
return self.state.logger
[docs] def detectAndRetry(
self, vlImages: List[Union[VLImage, ImageForDetection]], toDetectSubTasks: List[BaseDetectorSubTask]
) -> None:
"""
Batch detect and if any errors occurred - collect errors and try to one more time for sub tasks without errors
Args:
vlImages: images for batch detection
toDetectSubTasks: detect subtasks
"""
message = "$".join(f"rect: {image.rect}" for image in vlImages)
self.logger.info(f"batch to detect: {message}")
try:
detections = self.detector.detect(vlImages, limit=self.state.settings.maxObjectCount)
except LunaSDKException as exc:
if exc.error.errorCode not in (
LunaVLError.BatchedInternalError.errorCode,
LunaVLError.ValidationFailed.errorCode,
):
raise
vlDetectImagesForSecondTry = []
for idx, error in enumerate(exc.context):
if LunaVLError.Ok.errorCode == error.errorCode:
vlDetectImagesForSecondTry.append(vlImages[idx])
else:
toDetectSubTasks[idx].error = error
detections = []
if vlDetectImagesForSecondTry:
detections = self.detector.detect(vlDetectImagesForSecondTry, limit=self.state.settings.maxObjectCount)
for index, taskDetections in enumerate(detections):
if toDetectSubTasks[index].getBestDetection:
bestDetection = Detections(toDetectSubTasks[index].sdkImage, taskDetections).bestDetection
toDetectSubTasks[index].detections = [bestDetection] if bestDetection else []
else:
toDetectSubTasks[index].detections = taskDetections
[docs] def redetectAndRetry(
self, vlRedectImages: List[ImageForRedetection], toRedetectSubTasks: List[BaseDetectorSubTask]
) -> None:
"""
Batch redetect and if any errors occurred - collect errors and try to one more time for sub tasks without errors
Args:
vlImages: images for batch detection
toRedetectSubTasks: redetect subtasks
"""
message = "$".join(
f"rect: {image.image.rect}; bboxes: {','.join(str(bbox) for bbox in image.bBoxes)}"
for image in vlRedectImages
)
self.logger.info(f"batch to redetect: {message}")
try:
detections = self.detector.redetect(vlRedectImages)
except LunaSDKException as exc:
if exc.error.errorCode not in (
LunaVLError.BatchedInternalError.errorCode,
LunaVLError.ValidationFailed.errorCode,
):
raise
vlDetectImagesForSecondTry = []
for idx, error in enumerate(exc.context):
if LunaVLError.Ok.errorCode == error.errorCode:
vlDetectImagesForSecondTry.append(vlRedectImages[idx])
else:
toRedetectSubTasks[idx].error = error
detections = []
if vlDetectImagesForSecondTry:
detections = self.detector.redetect(vlDetectImagesForSecondTry)
for index, taskDetections in enumerate(detections):
toRedetectSubTasks[index].detections = [
notEmptyDetection for notEmptyDetection in taskDetections if notEmptyDetection is not None
]
[docs] def getImageOrientation(self, image: Union[VLImage, ImageForRedetection]) -> OrientationType:
"""
Get image orientation mode
Args:
image: original image
Returns:
image orientation
"""
if isinstance(image, VLImage):
return self.state.orientationModeEstimator.estimate(image)
if isinstance(image, ImageForRedetection):
return self.state.orientationModeEstimator.estimate(image.image)
raise RuntimeError(f"Unsupported image type: {image.__class__}")
[docs] @staticmethod
@abstractmethod
def getResultDetectionAsDict(subTask: BaseDetectorSubTask, detection: BaseDetection):
"""
Get detection result as dict. Rotate detection and all its coordinates if original image has been rotated.
Args:
subTask: detector subtask
detection: detection
Returns:
dict with detection results
"""
[docs] @staticmethod
@abstractmethod
def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[BaseDetectorSubTask]):
"""
Collect result from sub tasks to corresponding tasks.
Args:
tasks: tasks
subTasks: sub tasks
"""
[docs] def batchDetect(self, subTasks: List[BaseDetectorSubTask]) -> None:
"""
Batch detect faces for valid sub tasks.
Args:
subTasks: sub tasks
"""
vlImages = []
vlRedectImages = []
toDetectSubTasks = []
toRedetectSubTasks = []
for task in subTasks:
if task.isValid:
if self.state.settings.useAutoRotation:
task.imageOrientation = self.getImageOrientation(task.sdkImage)
task.sdkImage = rotateImage(task.sdkImage, task.imageOrientation)
else:
task.imageOrientation = None
task.sdkImage = task.sdkImage
if isinstance(task.sdkImage, ImageForRedetection):
vlRedectImages.append(task.sdkImage)
toRedetectSubTasks.append(task)
else:
vlImages.append(task.sdkImage)
toDetectSubTasks.append(task)
if toDetectSubTasks:
self.detectAndRetry(vlImages, toDetectSubTasks)
if toRedetectSubTasks:
self.redetectAndRetry(vlRedectImages, toRedetectSubTasks)
[docs] def estimateDetectionAttributes(self, subTask: BaseDetectorSubTask):
"""
Estimate detection attributes for a sub task.
Args:
subTask: sub task
"""