Source code for luna_handlers.sdk.sdk_loop.tasks.task

"""
Module contains base task
"""
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from time import perf_counter
from typing import Iterable, List, Optional, Type, final

import attr
from attr import dataclass

from .filters import Filters
from ..enums import LoopEstimations, MultifacePolicy
from ..errors.errors import EstimatorWasNotInitializedError, LoopError
from ..estimators.engine import SDKEngine
from ..models.body_sample import BodySample
from ..models.face_sample import FaceSample
from ..models.filtration.base import FiltrationResult
from ..models.image import Image, ImageType, InputImage
from ..models.sample import AggregatedSample, Sample
from ..monitoring_utils.storages import TASK_EXECUTION_MONITORING
from ..utils.logger import logger


[docs]@dataclass(slots=True) class LivenessV1Params: """ Liveness v1 estimation params """ # liveness quality threshold ( Unknown or ...) qualityThreshold: float = 0.5 # score threshold (Real or spoof) scoreThreshold: Optional[float] = None
[docs]@dataclass(slots=True) class TaskEstimationParams: """ Task estimation params """ # livenessV1 params livenessv1: Optional[LivenessV1Params] = attr.field(factory=LivenessV1Params) # face descriptor version for estimations faceDescriptorVersion: Optional[int] = None # body descriptor version for estimations bodyDescriptorVersion: Optional[int] = None
[docs]class TaskParams: """ A container for user defined task parameters. It is one place for all user parameters except images. These parameters are constant across a task group. We suppose that user defines a set of interest parameters and create task with these parameters. User can to instance this structure and does not calculate to the `requiredEstimations` on each request. Attributes: targets (set[LoopEstimations]): user defined targets filters (Filters): user defined filters useExifInfo (bool): use exif or not for correct loading image estimatorsParams (TaskEstimationParams): params for estimations (thresholds and etc) autoRotation (bool): try detect rotated images or not aggregate (bool): estimate or not aggregated attributes multifacePolicy: multiface policy requiredEstimations (set[LoopEstimations]): predicted required estimations for correct processing all params """ __slots__ = ( "targets", "filters", "requiredEstimations", "useExifInfo", "estimatorsParams", "autoRotation", "multifacePolicy", "aggregate", ) def __init__( self, targets: Optional[Iterable[LoopEstimations]] = None, filters: Optional[Filters] = None, estimatorsParams: Optional[TaskEstimationParams] = None, multifacePolicy: MultifacePolicy = MultifacePolicy.allowed, useExifInfo: bool = False, autoRotation: bool = False, aggregate: bool = False, ): if targets is None: self.targets = set() else: self.targets = set(targets) self.requiredEstimations = predictRequiredEstimations(self.targets, filters, useExifInfo) if filters: self.filters = filters else: self.filters = Filters() self.useExifInfo = useExifInfo if estimatorsParams: self.estimatorsParams = estimatorsParams else: self.estimatorsParams = TaskEstimationParams() self.autoRotation = autoRotation self.multifacePolicy = multifacePolicy self.aggregate = aggregate
[docs]class TaskResult: """ Task result container Attributes: images (Optional[list[Image]]): loaded images, None if image load is failed by unexpected reason aggregatedSample (Optional[AggregatedSample]): aggregated sample error (Optional[LoopError]): processing task error """ __slots__ = ("images", "aggregatedSample", "error") def __init__(self, images: Optional[list[Image]], aggregatedSample: Optional[AggregatedSample]): self.images = images self.aggregatedSample = aggregatedSample self.error: Optional[LoopError] = None
[docs]class TaskMonitoringPoint: """ Task monitoring point Attributes: executionTime (float): task execution time eventTime (float): start task timestamp imageCount (int): task image count """ __slots__ = ("executionTime", "eventTime", "error", "imageCount") # series name name: str = "sdkloop_task" def __init__(self, eventTime: Optional[float] = None): self.executionTime = 0 self.eventTime = eventTime if eventTime is not None else datetime.now().timestamp() self.imageCount = 0
[docs] def getFields(self) -> dict: """Get monitoring fields""" return {"execution_time": self.executionTime}
[docs] def getTags(self) -> dict: """Get monitoring tags""" return {"image_count": self.imageCount}
[docs]class TaskContent: """ Task content. Attributes: params: task params images: source images for estimations """ __slots__ = ("params", "images") def __init__( self, images: list[InputImage], params: TaskParams, ): self.params = params self.images = images
# estimation dependencies graph (what need to estimated for estimate it) _DEPENDENCIES_GRAPH = { LoopEstimations.headPose: [LoopEstimations.faceDetection], LoopEstimations.fisheye: [LoopEstimations.faceDetection], LoopEstimations.dynamicRange: [LoopEstimations.faceDetection], LoopEstimations.faceDetectionBackground: [LoopEstimations.faceDetection], LoopEstimations.mask: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.gaze: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.glasses: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.livenessV1: [LoopEstimations.faceDetection, LoopEstimations.faceLandmarks5], LoopEstimations.emotions: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.faceDescriptor: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.basicAttributes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.faceLandmarks68: [LoopEstimations.faceDetection], LoopEstimations.faceLandmarks5: [LoopEstimations.faceDetection], LoopEstimations.eyes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.redEyes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.mouthAttributes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.faceWarpQuality: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.headwear: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.faceNaturalLight: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.imageColorType: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.eyebrowExpression: [LoopEstimations.faceDetection, LoopEstimations.faceWarp], LoopEstimations.faceWarp: [LoopEstimations.faceDetection, LoopEstimations.faceLandmarks5], LoopEstimations.bodyWarp: [LoopEstimations.bodyDetection], LoopEstimations.bodyLandmarks17: [LoopEstimations.bodyDetection], LoopEstimations.bodyDescriptor: [LoopEstimations.bodyDetection, LoopEstimations.bodyWarp], LoopEstimations.bodyAttributes: [LoopEstimations.bodyDetection, LoopEstimations.bodyWarp], } def _addFaceDetectionTargets(targets: set[LoopEstimations]): """helper, add face detection targets""" targets.add(LoopEstimations.faceDetection) targets.add(LoopEstimations.faceWarp)
[docs]def predictRequiredEstimations( targets: set[LoopEstimations], filters: Optional[Filters] = None, needExifInfo: bool = False, ) -> set[LoopEstimations]: """ Predict required estimations for correct filters work, autorotation and etc Args: targets: user targets filters: user filters needExifInfo: need or not exif Returns: set all estimations. """ _targets = targets.copy() if needExifInfo: _targets.add(LoopEstimations.exif) if filters: if any( ( filters.faceDetection.yawFilter is not None, filters.faceDetection.pitchFilter is not None, filters.faceDetection.rollFilter is not None, ) ): _targets.add(LoopEstimations.headPose) _addFaceDetectionTargets(_targets) if filters.faceDetection.maskFilter: _targets.add(LoopEstimations.mask) _addFaceDetectionTargets(_targets) if filters.faceDetection.livenessFilter: _targets.add(LoopEstimations.livenessV1) _addFaceDetectionTargets(_targets) if filters.faceDetection.gcFilter: _targets.add(LoopEstimations.faceDescriptor) _addFaceDetectionTargets(_targets) for target, deps in _DEPENDENCIES_GRAPH.items(): if target in _targets: for dep in deps: _targets.add(dep) return _targets
[docs]def getNotFilteredSamples(images: list[Image]) -> tuple[list[FaceSample], list[BodySample]]: """Helper, get non filtered face and bodies samples from images""" faceSamples = [] bodySamples = [] for image in images: for sample in image.samples: if (faceSample := sample.face) and not faceSample.filters.isFiltered(): faceSamples.append(faceSample) if (bodySample := sample.body) and not bodySample.filters.isFiltered(): bodySamples.append(bodySample) return faceSamples, bodySamples
[docs]class TaskState(Enum): """Task state enum""" PENDING = 0 # task wait to execution start IN_PROCESS = 1 # task in progress DONE = 3 # task was done
[docs]class BaseTask(ABC): """ A base task class. The task is an entity for a customization estimation business logic. Task determines what and how will be estimated. For example, user can realize task class with own a multiface policy processing logic, filtration, some else. Attributes: monitoringPoint (TaskMonitoringPoint): execution task monitoring point (execution time, error, some else) executionTime (float): task execution time content (TaskContent): task content result (TaskResult): task result taskId (task id): pseudo unique id for logging """ __slots__ = ("_requiredEstimations", "content", "result", "taskId", "executionTime", "monitoringPoint", "_state") engine: SDKEngine # task counter, unique for python interpretator, task id generator _taskCounter = 0 # monitoring point class, user can add some fields or tags to monitoring monitoringPointClass = TaskMonitoringPoint
[docs] @classmethod def initialize(cls, engine: SDKEngine, monitoringPointClass: Optional[Type[TaskMonitoringPoint]] = None, **kwargs): """ Task initialization. Setup globals: engine, monitoring point class Args: engine: engine monitoringPointClass: user defined monitoring class (for custom fields and tags) **kwargs: some args for Task class realization """ cls.engine = engine cls.monitoringPointClass = monitoringPointClass if monitoringPointClass else TaskMonitoringPoint
def __init__( self, data: List[InputImage], params: Optional[TaskParams] = None, **kwargs: object, ): self.monitoringPoint = self.monitoringPointClass() self.executionTime = 0 self.__class__._taskCounter += 1 self.taskId = self._taskCounter self.content = TaskContent( params=params if params else TaskParams(), images=data, ) self._requiredEstimations = params.requiredEstimations self.result = TaskResult(None, None) self._state = TaskState.PENDING
[docs] def filterSampleByMask(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by mask logic Args: sample: face sample Returns: None if filter is None or mask not estimated (body sample) otherwise filtration result """ if (maskFilter := self.content.params.filters.faceDetection.maskFilter) is None: return None if sample.mask is None: return None result: FiltrationResult = maskFilter.filter(sample.mask.predominateMask) sample.filters.append(result) return result
[docs] def filterSampleByLiveness(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by livenessV1 logic. Args: sample: face sample Returns: None if filter is None or sample not estimated (body sample) otherwise filtration result """ if (livenessFilter := self.content.params.filters.faceDetection.livenessFilter) is None: return None if sample.livenessV1 is None: return None result: FiltrationResult = livenessFilter.filter(sample.livenessV1.prediction) sample.filters.append(result) return result
[docs] def filterByHeadPoseYaw(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by yaw angle logic. Args: sample: face sample Returns: None if filter is None or sample not estimated (body sample) otherwise filtration result """ if (yawFilter := self.content.params.filters.faceDetection.yawFilter) is None: return None if sample.headPose is None: return None result: FiltrationResult = yawFilter.filter(sample.headPose.yaw) sample.filters.append(result) return result
[docs] def filterByHeadPosePitch(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by pitch angle logic. Args: sample: face sample Returns: None if filter is None or head pose not estimated (body sample) otherwise filtration result """ if (pitchFilter := self.content.params.filters.faceDetection.pitchFilter) is None: return None if sample.headPose is None: return None result: FiltrationResult = pitchFilter.filter(sample.headPose.pitch) sample.filters.append(result) return result
[docs] def filterByHeadPoseRoll(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by roll angle logic. Args: sample: face sample Returns: None if filter is None or head pose not estimated (body sample) otherwise filtration result """ if (rollFilter := self.content.params.filters.faceDetection.rollFilter) is None: return None if sample.headPose is None: return None result: FiltrationResult = rollFilter.filter(sample.headPose.roll) sample.filters.append(result) return result
[docs] def filterFaceGS(self, sample: FaceSample) -> Optional[FiltrationResult]: """ Filter sample by garbage score logic. Args: sample: face sample Returns: None if filter is None or head pose not estimated (body sample) otherwise filtration result """ if (gcFilter := self.content.params.filters.faceDetection.gcFilter) is None: return None if sample.descriptor is None: return None result: FiltrationResult = gcFilter.filter(sample.descriptor.garbageScore) sample.filters.append(result) return result
[docs] def needEstimate(self, target: LoopEstimations) -> bool: """need estimate target or not""" return target in self._requiredEstimations
@abstractmethod async def _execute(self) -> None: """ Execution task method. All logic must be realized here. What need to estimated, condition for estimations and ect. """
[docs] @final async def execute(self) -> "BaseTask": """ Execution task method """ self._state = TaskState.IN_PROCESS start = perf_counter() logger.info(f"start task {self.taskId} execution") try: await self._execute() return self except EstimatorWasNotInitializedError as e: self.result.error = e finally: executionTime = perf_counter() - start self.executionTime = executionTime self.monitoringPoint.executionTime = executionTime self.monitoringPoint.error = self.result.error.error if self.result.error else None self.monitoringPoint.imageCount = len(self.content.images) if TASK_EXECUTION_MONITORING.enable: TASK_EXECUTION_MONITORING.append(self.monitoringPoint) self._state = TaskState.DONE logger.info(f"finish task {self.taskId}, execution time {executionTime}")
[docs] async def prepare(self): """Prepare date for estimations""" logger.debug(f"start prepare data for the task {self.taskId}") images = await self.engine.prepareImages( [image for image in self.content.images], useExifInfo=self.content.params.useExifInfo, autoRotation=self.content.params.autoRotation, estimateExif=self.needEstimate(LoopEstimations.exif), ) if self.needEstimate(LoopEstimations.imageOrientation): for image in images: if image.orientation is None and image.error is None: orientationEstimator = self.engine.estimators.getImageOrientationsEstimator() orientation = await orientationEstimator.estimate(image.sdkImage) image.orientation = orientation for image in images: if image.origin.imageType == ImageType.FACE_WARP: sample = Sample(face=FaceSample(warp=Image.sdkImage)) sample.face.warp = image.sdkImage image.samples = [sample] elif image.origin.imageType == ImageType.BODY_WARP: sample = Sample(body=BodySample(warp=Image.sdkImage)) sample.body.warp = image.sdkImage image.samples = [sample] self.result.images = images logger.debug(f"finish prepare data for the task {self.taskId}") return self.result.images
[docs] @classmethod async def close(cls, closeEngine: bool = True): """ Grace full shut down. Args: closeEngine: close external engine or not """ if closeEngine: await cls.engine.close()
@property def state(self) -> TaskState: """Get current task state""" return self._state