Source code for luna_handlers.sdk.sdk_loop.base_detector

"""
Common human and face detections functions
"""
from abc import ABC, abstractmethod
from enum import Enum
from functools import cmp_to_key
from typing import List, TypeVar, Generic, Union, Optional, Type

from lunavl.sdk.detectors.base import ImageForRedetection, BaseDetection, ImageForDetection
from lunavl.sdk.detectors.humandetector import HumanDetection, HumanDetector
from lunavl.sdk.errors.errors import LunaVLError, ErrorInfo
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationType, OrientationModeEstimator
from lunavl.sdk.image_utils.image import VLImage

from .crutches_on_wheels.utils.log import Logger
from .estimation_targets import SDKEstimationTargets, BaseEstimationTargets
from .monitoring import MonitoringField, TaskMonitoringData
from .sdk_task import SDKTask, SDKDetectableImage
from .utils.rotation import rotateImage
from .utils.worker_state import State

CONF_THR = 0.99  # confidence detection comparison threshold
AREA_THR = 1.5  # area detection comparison threshold

#: generic type for allowed values type of detections
DetectorType = TypeVar("DetectorType", "VLFaceDetection", HumanDetection)  # pylint: disable-msg=C0103
#: generic type for detector subtasks
DetectorSubTask = TypeVar("DetectorSubTask", bound="BaseDetectorSubTask")
#: generic type for detector state
DetectorState = TypeVar("DetectorState", bound="BaseDetectorState")


[docs]class Detections(Generic[DetectorType]): """ Detection comparison class Attributes: confThreshold: confidence detection comparison threshold areaThreshold: area detection comparison threshold image: image detections: face detections from the image sorted by combined criteria of detection confidence, detection area and manhattan distance from detection center to the center of the image bestDetection: best face detection from the image """ __slots__ = ("confThreshold", "areaThreshold", "image", "detections", "bestDetection") def __init__(self, image: VLImage, detections: List[DetectorType]): """ Args: image: image detections: face detections from the image """ self.confThreshold = CONF_THR self.areaThreshold = AREA_THR self.image = image self.detections = detections self.bestDetection = self.getBestDetection() if detections else None def _compareCentreManhattan(self, detection1: DetectorType, detection2: DetectorType) -> int: """ Compare two detections by manhattan distance to the center of the image Args: detection1: first detection to compare detection2: second detection to compare Returns: 1 if detection1 is better than detection2 by manhattan distance criterion else -1 """ imageCenter = self.image.rect.center center1 = detection1.boundingBox.rect.center center2 = detection2.boundingBox.rect.center distance1 = abs(center1.x - imageCenter.x) + abs(center1.y - imageCenter.y) distance2 = abs(center2.x - imageCenter.x) + abs(center2.y - imageCenter.y) if distance1 < distance2: return 1 return -1 @staticmethod def _compareConfidence(detection1: DetectorType, detection2: DetectorType) -> int: """ Compare two detections by confidence Args: detection1: first detection to compare detection2: second detection to compare Returns: 1 if detection1 is better than detection2 by confidence criterion else -1 """ if detection1.boundingBox.score > detection2.boundingBox.score: return 1 return -1 def _comparerV3(self, detection1: DetectorType, detection2: DetectorType) -> int: """ Compare two detections by combined criteria of detection confidence, detection area and manhattan distance from detection center to the center of the image Args: detection1: first detection to compare detection2: second detection to compare Returns: 1 if detection1 is better than detection2 by combined criterion else -1 """ if detection1.boundingBox.score > self.confThreshold and detection2.boundingBox.score > self.confThreshold: areaRatio = detection1.boundingBox.rect.getArea() / detection2.boundingBox.rect.getArea() if areaRatio > self.areaThreshold: return 1 if areaRatio < 1 / self.areaThreshold: return -1 return self._compareCentreManhattan(detection1, detection2) return Detections._compareConfidence(detection1, detection2)
[docs] def getBestDetection(self) -> DetectorType: """ Get best detection from the image. Returns: best face detection from the image """ self.detections.sort(key=cmp_to_key(self._comparerV3), reverse=True) return self.detections[0]
[docs]class BaseDetectorState(State, ABC): """ Detector worker state, process local. Final class. State contains: - logger for worker - instance of detector """ # detector and estimators _detector: Union[HumanDetector, "VLFaceDetector"] # face engine _faceEngine: "VLFaceEngine" # image orientation mode estimator orientationModeEstimator: OrientationModeEstimator @property def faceEngine(self) -> "VLFaceEngine": """ Get instance of FaceEngine Returns: _faceEngine """ return self._faceEngine @property @abstractmethod def detector(self) -> Union["VLFaceDetector", HumanDetector]: """ Get detector Returns: detector from state """
[docs]class BaseDetectorSubTask(ABC): """ Sub task for detector. Attributes: taskId (int): original task id image (SDKDetectableImage): an image for detection and further estimation of the detect attributes. detections (List[VLFaceDetection]): human detections from the image sdkImage (Optional[VLImage, ImageForRedetection]): loaded sdk image if the image is a valid image otherwise None isValid (bool): whether the image is loaded into a sdkImage estimations (SDKEstimationTargets): estimation targets, set this param for estimations. getBestDetection (bool): whether to get best detection originTaskMonitoring (TaskMonitoringData): monitoring data of the original task error (LunaVLError): error occurred during task execution imageOrientation (Optional[OrientationType]): image orientation mode """ __slots__ = ( "taskId", "detections", "image", "sdkImage", "isValid", "estimations", "getBestDetection", "originTaskMonitoring", "error", "imageOrientation", ) def __init__( self, image: SDKDetectableImage, getBestDetection: bool, taskId: int, taskMonitoring: TaskMonitoringData, estimations: SDKEstimationTargets, ): self.taskId = taskId self.detections: List[BaseDetection] = [] self.image: SDKDetectableImage = image self.error: Union[ErrorInfo, None] = None self.sdkImage: Optional[Union[VLImage, ImageForRedetection]] = self.createImageForDetection(self.image) self.imageOrientation: Optional[OrientationType] = None self.isValid = bool(self.sdkImage) self.getBestDetection = getBestDetection self.estimations = self.getEstimationTargets(estimations) self.originTaskMonitoring: TaskMonitoringData = taskMonitoring
[docs] @staticmethod @abstractmethod def getEstimationTargets(estimations: SDKEstimationTargets) -> BaseEstimationTargets: """ Get estimation targets for sub task. Args: estimations: task estimations """
[docs] @staticmethod @abstractmethod def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]: """ Create sdk image from SDKDetectableImage Args: img: image Returns: VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed """
[docs] def updateMonitoringData(self, estimation: Enum, estimationTime: float): """ Update monitoring data of the origin task. Add estimation time of a estimation to times from other subtasks. Args: estimation: estimation type estimationTime: estimation time """ monitoringField: MonitoringField = getattr(self.originTaskMonitoring, f"{estimation.value}EstimationTime") if monitoringField.value is not None: monitoringField.value += estimationTime else: monitoringField.value = estimationTime
[docs] @classmethod @abstractmethod def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]: """ Create sub tasks for each image in tasks Args: tasks: tasks Returns: list of sub tasks """
[docs]class BaseLoopDetector(Generic[DetectorState]): """ Base class for detectors """ # state class _state: Type[DetectorState] @property def state(self) -> DetectorState: """ Get state of detector. Returns: state instance """ return self._state() @property def detector(self) -> Union["VLFaceDetector", HumanDetector]: """ Get detector Returns: detector from state """ return self.state.detector @property def logger(self) -> Logger: """ Get Logger. Returns: logger from state """ return self.state.logger
[docs] def detectAndRetry( self, vlImages: List[Union[VLImage, ImageForDetection]], toDetectSubTasks: List[BaseDetectorSubTask] ) -> None: """ Batch detect and if any errors occurred - collect errors and try to one more time for sub tasks without errors Args: vlImages: images for batch detection toDetectSubTasks: detect subtasks """ message = "$".join(f"rect: {image.rect}" for image in vlImages) self.logger.info(f"batch to detect: {message}") try: detections = self.detector.detect(vlImages, limit=self.state.settings.maxObjectCount) except LunaSDKException as exc: if exc.error.errorCode not in ( LunaVLError.BatchedInternalError.errorCode, LunaVLError.ValidationFailed.errorCode, ): raise vlDetectImagesForSecondTry = [] for idx, error in enumerate(exc.context): if LunaVLError.Ok.errorCode == error.errorCode: vlDetectImagesForSecondTry.append(vlImages[idx]) else: toDetectSubTasks[idx].error = error detections = [] if vlDetectImagesForSecondTry: detections = self.detector.detect(vlDetectImagesForSecondTry, limit=self.state.settings.maxObjectCount) for index, taskDetections in enumerate(detections): if toDetectSubTasks[index].getBestDetection: bestDetection = Detections(toDetectSubTasks[index].sdkImage, taskDetections).bestDetection toDetectSubTasks[index].detections = [bestDetection] if bestDetection else [] else: toDetectSubTasks[index].detections = taskDetections
[docs] def redetectAndRetry( self, vlRedectImages: List[ImageForRedetection], toRedetectSubTasks: List[BaseDetectorSubTask] ) -> None: """ Batch redetect and if any errors occurred - collect errors and try to one more time for sub tasks without errors Args: vlImages: images for batch detection toRedetectSubTasks: redetect subtasks """ message = "$".join( f"rect: {image.image.rect}; bboxes: {','.join(str(bbox) for bbox in image.bBoxes)}" for image in vlRedectImages ) self.logger.info(f"batch to redetect: {message}") try: detections = self.detector.redetect(vlRedectImages) except LunaSDKException as exc: if exc.error.errorCode not in ( LunaVLError.BatchedInternalError.errorCode, LunaVLError.ValidationFailed.errorCode, ): raise vlDetectImagesForSecondTry = [] for idx, error in enumerate(exc.context): if LunaVLError.Ok.errorCode == error.errorCode: vlDetectImagesForSecondTry.append(vlRedectImages[idx]) else: toRedetectSubTasks[idx].error = error detections = [] if vlDetectImagesForSecondTry: detections = self.detector.redetect(vlDetectImagesForSecondTry) for index, taskDetections in enumerate(detections): toRedetectSubTasks[index].detections = [ notEmptyDetection for notEmptyDetection in taskDetections if notEmptyDetection is not None ]
[docs] def getImageOrientation(self, image: Union[VLImage, ImageForRedetection]) -> OrientationType: """ Get image orientation mode Args: image: original image Returns: image orientation """ if isinstance(image, VLImage): return self.state.orientationModeEstimator.estimate(image) if isinstance(image, ImageForRedetection): return self.state.orientationModeEstimator.estimate(image.image) raise RuntimeError(f"Unsupported image type: {image.__class__}")
[docs] @staticmethod @abstractmethod def getResultDetectionAsDict(subTask: BaseDetectorSubTask, detection: BaseDetection): """ Get detection result as dict. Rotate detection and all its coordinates if original image has been rotated. Args: subTask: detector subtask detection: detection Returns: dict with detection results """
[docs] @staticmethod @abstractmethod def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[BaseDetectorSubTask]): """ Collect result from sub tasks to corresponding tasks. Args: tasks: tasks subTasks: sub tasks """
[docs] def batchDetect(self, subTasks: List[BaseDetectorSubTask]) -> None: """ Batch detect faces for valid sub tasks. Args: subTasks: sub tasks """ vlImages = [] vlRedectImages = [] toDetectSubTasks = [] toRedetectSubTasks = [] for task in subTasks: if task.isValid: if self.state.settings.useAutoRotation: task.imageOrientation = self.getImageOrientation(task.sdkImage) task.sdkImage = rotateImage(task.sdkImage, task.imageOrientation) else: task.imageOrientation = None task.sdkImage = task.sdkImage if isinstance(task.sdkImage, ImageForRedetection): vlRedectImages.append(task.sdkImage) toRedetectSubTasks.append(task) else: vlImages.append(task.sdkImage) toDetectSubTasks.append(task) if toDetectSubTasks: self.detectAndRetry(vlImages, toDetectSubTasks) if toRedetectSubTasks: self.redetectAndRetry(vlRedectImages, toRedetectSubTasks)
[docs] def estimateDetectionAttributes(self, subTask: BaseDetectorSubTask): """ Estimate detection attributes for a sub task. Args: subTask: sub task """