Common human and face detections functions
from abc import ABC, abstractmethod
from enum import Enum
from functools import cmp_to_key
from typing import List, TypeVar, Generic, Union, Optional, Type
from lunavl.sdk.detectors.base import ImageForRedetection, BaseDetection, ImageForDetection
from lunavl.sdk.detectors.humandetector import HumanDetection, HumanDetector
from lunavl.sdk.errors.errors import LunaVLError, ErrorInfo
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.image_estimators.orientation_mode import OrientationType, OrientationModeEstimator
from lunavl.sdk.image_utils.image import VLImage
from .crutches_on_wheels.utils.log import Logger
from .estimation_targets import SDKEstimationTargets, BaseEstimationTargets
from .monitoring import MonitoringField, TaskMonitoringData
from .sdk_task import SDKTask, SDKDetectableImage
from .utils.rotation import rotateImage
from .utils.worker_state import State
CONF_THR = 0.99 # confidence detection comparison threshold
AREA_THR = 1.5 # area detection comparison threshold
#: generic type for allowed values type of detections
DetectorType = TypeVar("DetectorType", "VLFaceDetection", HumanDetection) # pylint: disable-msg=C0103
#: generic type for detector subtasks
DetectorSubTask = TypeVar("DetectorSubTask", bound="BaseDetectorSubTask")
#: generic type for detector state
DetectorState = TypeVar("DetectorState", bound="BaseDetectorState")
[docs]class Detections(Generic[DetectorType]):
Detection comparison class
confThreshold: confidence detection comparison threshold
areaThreshold: area detection comparison threshold
image: image
detections: face detections from the image sorted by combined criteria of detection confidence, detection area
and manhattan distance from detection center to the center of the image
bestDetection: best face detection from the image
__slots__ = ("confThreshold", "areaThreshold", "image", "detections", "bestDetection")
def __init__(self, image: VLImage, detections: List[DetectorType]):
image: image
detections: face detections from the image
self.confThreshold = CONF_THR
self.areaThreshold = AREA_THR
self.image = image
self.detections = detections
self.bestDetection = self.getBestDetection() if detections else None
def _compareCentreManhattan(self, detection1: DetectorType, detection2: DetectorType) -> int:
Compare two detections by manhattan distance to the center of the image
detection1: first detection to compare
detection2: second detection to compare
1 if detection1 is better than detection2 by manhattan distance criterion else -1
imageCenter = self.image.rect.center
center1 = detection1.boundingBox.rect.center
center2 = detection2.boundingBox.rect.center
distance1 = abs(center1.x - imageCenter.x) + abs(center1.y - imageCenter.y)
distance2 = abs(center2.x - imageCenter.x) + abs(center2.y - imageCenter.y)
if distance1 < distance2:
return 1
return -1
def _compareConfidence(detection1: DetectorType, detection2: DetectorType) -> int:
Compare two detections by confidence
detection1: first detection to compare
detection2: second detection to compare
1 if detection1 is better than detection2 by confidence criterion else -1
if detection1.boundingBox.score > detection2.boundingBox.score:
return 1
return -1
def _comparerV3(self, detection1: DetectorType, detection2: DetectorType) -> int:
Compare two detections by combined criteria of detection confidence, detection area and
manhattan distance from detection center to the center of the image
detection1: first detection to compare
detection2: second detection to compare
1 if detection1 is better than detection2 by combined criterion else -1
if detection1.boundingBox.score > self.confThreshold and detection2.boundingBox.score > self.confThreshold:
areaRatio = detection1.boundingBox.rect.getArea() / detection2.boundingBox.rect.getArea()
if areaRatio > self.areaThreshold:
return 1
if areaRatio < 1 / self.areaThreshold:
return -1
return self._compareCentreManhattan(detection1, detection2)
return Detections._compareConfidence(detection1, detection2)
[docs] def getBestDetection(self) -> DetectorType:
Get best detection from the image.
best face detection from the image
self.detections.sort(key=cmp_to_key(self._comparerV3), reverse=True)
return self.detections[0]
[docs]class BaseDetectorState(State, ABC):
Detector worker state, process local. Final class.
State contains:
- logger for worker
- instance of detector
# detector and estimators
_detector: Union[HumanDetector, "VLFaceDetector"]
# face engine
_faceEngine: "VLFaceEngine"
# image orientation mode estimator
orientationModeEstimator: OrientationModeEstimator
def faceEngine(self) -> "VLFaceEngine":
Get instance of FaceEngine
return self._faceEngine
def detector(self) -> Union["VLFaceDetector", HumanDetector]:
Get detector
detector from state
[docs]class BaseDetectorSubTask(ABC):
Sub task for detector.
taskId (int): original task id
image (SDKDetectableImage): an image for detection and further estimation of the detect attributes.
detections (List[VLFaceDetection]): human detections from the image
sdkImage (Optional[VLImage, ImageForRedetection]): loaded sdk image if the image is a valid image otherwise None
isValid (bool): whether the image is loaded into a sdkImage
estimations (SDKEstimationTargets): estimation targets, set this param for estimations.
getBestDetection (bool): whether to get best detection
originTaskMonitoring (TaskMonitoringData): monitoring data of the original task
error (LunaVLError): error occurred during task execution
imageOrientation (Optional[OrientationType]): image orientation mode
__slots__ = (
def __init__(
image: SDKDetectableImage,
getBestDetection: bool,
taskId: int,
taskMonitoring: TaskMonitoringData,
estimations: SDKEstimationTargets,
self.taskId = taskId
self.detections: List[BaseDetection] = []
self.image: SDKDetectableImage = image
self.error: Union[ErrorInfo, None] = None
self.sdkImage: Optional[Union[VLImage, ImageForRedetection]] = self.createImageForDetection(self.image)
self.imageOrientation: Optional[OrientationType] = None
self.isValid = bool(self.sdkImage)
self.getBestDetection = getBestDetection
self.estimations = self.getEstimationTargets(estimations)
self.originTaskMonitoring: TaskMonitoringData = taskMonitoring
[docs] @staticmethod
def getEstimationTargets(estimations: SDKEstimationTargets) -> BaseEstimationTargets:
Get estimation targets for sub task.
estimations: task estimations
[docs] @staticmethod
def createImageForDetection(img: SDKDetectableImage) -> Union[ImageForRedetection, VLImage, None]:
Create sdk image from SDKDetectableImage
img: image
VLImage if image has not bounding box, ImageForDetection if has bounding box, None if loading is failed
[docs] def updateMonitoringData(self, estimation: Enum, estimationTime: float):
Update monitoring data of the origin task. Add estimation time of a estimation to times from other subtasks.
estimation: estimation type
estimationTime: estimation time
monitoringField: MonitoringField = getattr(self.originTaskMonitoring, f"{estimation.value}EstimationTime")
if monitoringField.value is not None:
monitoringField.value += estimationTime
monitoringField.value = estimationTime
[docs] @classmethod
def createImageSubTasksForDetector(cls: Type[DetectorSubTask], tasks: List[SDKTask]) -> List[DetectorSubTask]:
Create sub tasks for each image in tasks
tasks: tasks
list of sub tasks
[docs]class BaseLoopDetector(Generic[DetectorState]):
Base class for detectors
# state class
_state: Type[DetectorState]
def state(self) -> DetectorState:
Get state of detector.
state instance
return self._state()
def detector(self) -> Union["VLFaceDetector", HumanDetector]:
Get detector
detector from state
return self.state.detector
def logger(self) -> Logger:
Get Logger.
logger from state
return self.state.logger
[docs] def detectAndRetry(
self, vlImages: List[Union[VLImage, ImageForDetection]], toDetectSubTasks: List[BaseDetectorSubTask]
) -> None:
Batch detect and if any errors occurred - collect errors and try to one more time for sub tasks without errors
vlImages: images for batch detection
toDetectSubTasks: detect subtasks
message = "$".join(f"rect: {image.rect}" for image in vlImages)
self.logger.info(f"batch to detect: {message}")
detections = self.detector.detect(vlImages, limit=self.state.settings.maxObjectCount)
except LunaSDKException as exc:
if exc.error.errorCode not in (
vlDetectImagesForSecondTry = []
for idx, error in enumerate(exc.context):
if LunaVLError.Ok.errorCode == error.errorCode:
toDetectSubTasks[idx].error = error
detections = []
if vlDetectImagesForSecondTry:
detections = self.detector.detect(vlDetectImagesForSecondTry, limit=self.state.settings.maxObjectCount)
for index, taskDetections in enumerate(detections):
if toDetectSubTasks[index].getBestDetection:
bestDetection = Detections(toDetectSubTasks[index].sdkImage, taskDetections).bestDetection
toDetectSubTasks[index].detections = [bestDetection] if bestDetection else []
toDetectSubTasks[index].detections = taskDetections
[docs] def redetectAndRetry(
self, vlRedectImages: List[ImageForRedetection], toRedetectSubTasks: List[BaseDetectorSubTask]
) -> None:
Batch redetect and if any errors occurred - collect errors and try to one more time for sub tasks without errors
vlImages: images for batch detection
toRedetectSubTasks: redetect subtasks
message = "$".join(
f"rect: {image.image.rect}; bboxes: {','.join(str(bbox) for bbox in image.bBoxes)}"
for image in vlRedectImages
self.logger.info(f"batch to redetect: {message}")
detections = self.detector.redetect(vlRedectImages)
except LunaSDKException as exc:
if exc.error.errorCode not in (
vlDetectImagesForSecondTry = []
for idx, error in enumerate(exc.context):
if LunaVLError.Ok.errorCode == error.errorCode:
toRedetectSubTasks[idx].error = error
detections = []
if vlDetectImagesForSecondTry:
detections = self.detector.redetect(vlDetectImagesForSecondTry)
for index, taskDetections in enumerate(detections):
toRedetectSubTasks[index].detections = [
notEmptyDetection for notEmptyDetection in taskDetections if notEmptyDetection is not None
[docs] def getImageOrientation(self, image: Union[VLImage, ImageForRedetection]) -> OrientationType:
Get image orientation mode
image: original image
image orientation
if isinstance(image, VLImage):
return self.state.orientationModeEstimator.estimate(image)
if isinstance(image, ImageForRedetection):
return self.state.orientationModeEstimator.estimate(image.image)
raise RuntimeError(f"Unsupported image type: {image.__class__}")
[docs] @staticmethod
def getResultDetectionAsDict(subTask: BaseDetectorSubTask, detection: BaseDetection):
Get detection result as dict. Rotate detection and all its coordinates if original image has been rotated.
subTask: detector subtask
detection: detection
dict with detection results
[docs] @staticmethod
def collectResultsFromSubTasksToTasks(tasks: List[SDKTask], subTasks: List[BaseDetectorSubTask]):
Collect result from sub tasks to corresponding tasks.
tasks: tasks
subTasks: sub tasks
[docs] def batchDetect(self, subTasks: List[BaseDetectorSubTask]) -> None:
Batch detect faces for valid sub tasks.
subTasks: sub tasks
vlImages = []
vlRedectImages = []
toDetectSubTasks = []
toRedetectSubTasks = []
for task in subTasks:
if task.isValid:
if self.state.settings.useAutoRotation:
task.imageOrientation = self.getImageOrientation(task.sdkImage)
task.sdkImage = rotateImage(task.sdkImage, task.imageOrientation)
task.imageOrientation = None
task.sdkImage = task.sdkImage
if isinstance(task.sdkImage, ImageForRedetection):
if toDetectSubTasks:
self.detectAndRetry(vlImages, toDetectSubTasks)
if toRedetectSubTasks:
self.redetectAndRetry(vlRedectImages, toRedetectSubTasks)
[docs] def estimateDetectionAttributes(self, subTask: BaseDetectorSubTask):
Estimate detection attributes for a sub task.
subTask: sub task