"""
Module contains base task
"""
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from time import perf_counter
from typing import Iterable, List, Optional, Type, final
import attr
from attr import dataclass
from .filters import Filters
from ..enums import LoopEstimations, MultifacePolicy
from ..errors.errors import EstimatorWasNotInitializedError, LoopError
from ..estimators.engine import SDKEngine
from ..models.body_sample import BodySample
from ..models.face_sample import FaceSample
from ..models.filtration.base import FiltrationResult
from ..models.image import Image, ImageType, InputImage
from ..models.sample import AggregatedSample, Sample
from ..monitoring_utils.storages import TASK_EXECUTION_MONITORING
from ..utils.logger import logger
[docs]@dataclass(slots=True)
class LivenessV1Params:
"""
Liveness v1 estimation params
"""
# liveness quality threshold ( Unknown or ...)
qualityThreshold: float = 0.5
# score threshold (Real or spoof)
scoreThreshold: Optional[float] = None
[docs]@dataclass(slots=True)
class TaskEstimationParams:
"""
Task estimation params
"""
# livenessV1 params
livenessv1: Optional[LivenessV1Params] = attr.field(factory=LivenessV1Params)
# face descriptor version for estimations
faceDescriptorVersion: Optional[int] = None
# body descriptor version for estimations
bodyDescriptorVersion: Optional[int] = None
[docs]class TaskParams:
"""
A container for user defined task parameters. It is one place for all user parameters except images.
These parameters are constant across a task group.
We suppose that user defines a set of interest parameters and create task with these parameters. User can to
instance this structure and does not calculate to the `requiredEstimations` on each request.
Attributes:
targets (set[LoopEstimations]): user defined targets
filters (Filters): user defined filters
useExifInfo (bool): use exif or not for correct loading image
estimatorsParams (TaskEstimationParams): params for estimations (thresholds and etc)
autoRotation (bool): try detect rotated images or not
aggregate (bool): estimate or not aggregated attributes
multifacePolicy: multiface policy
requiredEstimations (set[LoopEstimations]): predicted required estimations for correct processing all params
"""
__slots__ = (
"targets",
"filters",
"requiredEstimations",
"useExifInfo",
"estimatorsParams",
"autoRotation",
"multifacePolicy",
"aggregate",
)
def __init__(
self,
targets: Optional[Iterable[LoopEstimations]] = None,
filters: Optional[Filters] = None,
estimatorsParams: Optional[TaskEstimationParams] = None,
multifacePolicy: MultifacePolicy = MultifacePolicy.allowed,
useExifInfo: bool = True,
autoRotation: bool = False,
aggregate: bool = False,
):
if targets is None:
self.targets = set()
else:
self.targets = set(targets)
self.requiredEstimations = predictRequiredEstimations(self.targets, filters, useExifInfo)
if filters:
self.filters = filters
else:
self.filters = Filters()
self.useExifInfo = useExifInfo
if estimatorsParams:
self.estimatorsParams = estimatorsParams
else:
self.estimatorsParams = TaskEstimationParams()
self.autoRotation = autoRotation
self.multifacePolicy = multifacePolicy
self.aggregate = aggregate
[docs]class TaskResult:
"""
Task result container
Attributes:
images (Optional[list[Image]]): loaded images, None if image load is failed by unexpected reason
aggregatedSample (Optional[AggregatedSample]): aggregated sample
error (Optional[LoopError]): processing task error
"""
__slots__ = ("images", "aggregatedSample", "error")
def __init__(self, images: Optional[list[Image]], aggregatedSample: Optional[AggregatedSample]):
self.images = images
self.aggregatedSample = aggregatedSample
self.error: Optional[LoopError] = None
[docs]class TaskMonitoringPoint:
"""
Task monitoring point
Attributes:
executionTime (float): task execution time
eventTime (float): start task timestamp
imageCount (int): task image count
"""
__slots__ = ("executionTime", "eventTime", "error", "imageCount")
# series name
name: str = "sdkloop_task"
def __init__(self, eventTime: Optional[float] = None):
self.executionTime = 0
self.eventTime = eventTime if eventTime is not None else datetime.now().timestamp()
self.imageCount = 0
[docs] def getFields(self) -> dict:
"""Get monitoring fields"""
return {"execution_time": self.executionTime}
[docs]class TaskContent:
"""
Task content.
Attributes:
params: task params
images: source images for estimations
"""
__slots__ = ("params", "images")
def __init__(
self,
images: list[InputImage],
params: TaskParams,
):
self.params = params
self.images = images
# estimation dependencies graph (what need to estimated for estimate it)
_DEPENDENCIES_GRAPH = {
LoopEstimations.headPose: [LoopEstimations.faceDetection],
LoopEstimations.fisheye: [LoopEstimations.faceDetection],
LoopEstimations.faceDetectionBackground: [LoopEstimations.faceDetection],
LoopEstimations.mask: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.gaze: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.glasses: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.livenessV1: [LoopEstimations.faceDetection, LoopEstimations.faceLandmarks5],
LoopEstimations.emotions: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.faceDescriptor: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.basicAttributes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.faceLandmarks68: [LoopEstimations.faceDetection],
LoopEstimations.faceLandmarks5: [LoopEstimations.faceDetection],
LoopEstimations.eyes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.redEyes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.mouthAttributes: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.faceWarpQuality: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.headwear: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.faceNaturalLight: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.imageColorType: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.eyebrowExpression: [LoopEstimations.faceDetection, LoopEstimations.faceWarp],
LoopEstimations.faceWarp: [LoopEstimations.faceDetection, LoopEstimations.faceLandmarks5],
LoopEstimations.bodyWarp: [LoopEstimations.bodyDetection],
LoopEstimations.bodyLandmarks17: [LoopEstimations.bodyDetection],
LoopEstimations.bodyDescriptor: [LoopEstimations.bodyDetection, LoopEstimations.bodyWarp],
LoopEstimations.bodyAttributes: [LoopEstimations.bodyDetection, LoopEstimations.bodyWarp],
}
def _addFaceDetectionTargets(targets: set[LoopEstimations]):
"""helper, add face detection targets"""
targets.add(LoopEstimations.faceDetection)
targets.add(LoopEstimations.faceWarp)
[docs]def predictRequiredEstimations(
targets: set[LoopEstimations],
filters: Optional[Filters] = None,
needExifInfo: bool = False,
) -> set[LoopEstimations]:
"""
Predict required estimations for correct filters work, autorotation and etc
Args:
targets: user targets
filters: user filters
needExifInfo: need or not exif
Returns:
set all estimations.
"""
_targets = targets.copy()
if needExifInfo:
_targets.add(LoopEstimations.exif)
if filters:
if any(
(
filters.faceDetection.yawFilter is not None,
filters.faceDetection.pitchFilter is not None,
filters.faceDetection.rollFilter is not None,
)
):
_targets.add(LoopEstimations.headPose)
_addFaceDetectionTargets(_targets)
if filters.faceDetection.maskFilter:
_targets.add(LoopEstimations.mask)
_addFaceDetectionTargets(_targets)
if filters.faceDetection.livenessFilter:
_targets.add(LoopEstimations.livenessV1)
_addFaceDetectionTargets(_targets)
if filters.faceDetection.gcFilter:
_targets.add(LoopEstimations.faceDescriptor)
_addFaceDetectionTargets(_targets)
for target, deps in _DEPENDENCIES_GRAPH.items():
if target in _targets:
for dep in deps:
_targets.add(dep)
return _targets
[docs]def getNotFilteredSamples(images: list[Image]) -> tuple[list[FaceSample], list[BodySample]]:
"""Helper, get non filtered face and bodies samples from images"""
faceSamples = []
bodySamples = []
for image in images:
for sample in image.samples:
if (faceSample := sample.face) and not faceSample.filters.isFiltered():
faceSamples.append(faceSample)
if (bodySample := sample.body) and not bodySample.filters.isFiltered():
bodySamples.append(bodySample)
return faceSamples, bodySamples
[docs]class TaskState(Enum):
"""Task state enum"""
PENDING = 0 # task wait to execution start
IN_PROCESS = 1 # task in progress
DONE = 3 # task was done
[docs]class BaseTask(ABC):
"""
A base task class.
The task is an entity for a customization estimation business logic. Task determines what and how will be estimated.
For example, user can realize task class with own a multiface policy processing logic, filtration, some else.
Attributes:
monitoringPoint (TaskMonitoringPoint): execution task monitoring point (execution time, error, some else)
executionTime (float): task execution time
content (TaskContent): task content
result (TaskResult): task result
taskId (task id): pseudo unique id for logging
"""
__slots__ = ("_requiredEstimations", "content", "result", "taskId", "executionTime", "monitoringPoint", "_state")
engine: SDKEngine
# task counter, unique for python interpretator, task id generator
_taskCounter = 0
# monitoring point class, user can add some fields or tags to monitoring
monitoringPointClass = TaskMonitoringPoint
[docs] @classmethod
def initialize(cls, engine: SDKEngine, monitoringPointClass: Optional[Type[TaskMonitoringPoint]] = None, **kwargs):
"""
Task initialization. Setup globals: engine, monitoring point class
Args:
engine: engine
monitoringPointClass: user defined monitoring class (for custom fields and tags)
**kwargs: some args for Task class realization
"""
cls.engine = engine
cls.monitoringPointClass = monitoringPointClass if monitoringPointClass else TaskMonitoringPoint
def __init__(
self,
data: List[InputImage],
params: Optional[TaskParams] = None,
**kwargs: object,
):
self.monitoringPoint = self.monitoringPointClass()
self.executionTime = 0
self.__class__._taskCounter += 1
self.taskId = self._taskCounter
self.content = TaskContent(
params=params if params else TaskParams(),
images=data,
)
self._requiredEstimations = params.requiredEstimations
self.result = TaskResult(None, None)
self._state = TaskState.PENDING
[docs] def filterSampleByMask(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by mask logic
Args:
sample: face sample
Returns:
None if filter is None or mask not estimated (body sample) otherwise filtration result
"""
if (maskFilter := self.content.params.filters.faceDetection.maskFilter) is None:
return None
if sample.mask is None:
return None
result: FiltrationResult = maskFilter.filter(sample.mask.predominateMask)
sample.filters.append(result)
return result
[docs] def filterSampleByLiveness(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by livenessV1 logic.
Args:
sample: face sample
Returns:
None if filter is None or sample not estimated (body sample) otherwise filtration result
"""
if (livenessFilter := self.content.params.filters.faceDetection.livenessFilter) is None:
return None
if sample.livenessV1 is None:
return None
result: FiltrationResult = livenessFilter.filter(sample.livenessV1.prediction)
sample.filters.append(result)
return result
[docs] def filterByHeadPoseYaw(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by yaw angle logic.
Args:
sample: face sample
Returns:
None if filter is None or sample not estimated (body sample) otherwise filtration result
"""
if (yawFilter := self.content.params.filters.faceDetection.yawFilter) is None:
return None
if sample.headPose is None:
return None
result: FiltrationResult = yawFilter.filter(sample.headPose.yaw)
sample.filters.append(result)
return result
[docs] def filterByHeadPosePitch(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by pitch angle logic.
Args:
sample: face sample
Returns:
None if filter is None or head pose not estimated (body sample) otherwise filtration result
"""
if (pitchFilter := self.content.params.filters.faceDetection.pitchFilter) is None:
return None
if sample.headPose is None:
return None
result: FiltrationResult = pitchFilter.filter(sample.headPose.pitch)
sample.filters.append(result)
return result
[docs] def filterByHeadPoseRoll(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by roll angle logic.
Args:
sample: face sample
Returns:
None if filter is None or head pose not estimated (body sample) otherwise filtration result
"""
if (rollFilter := self.content.params.filters.faceDetection.rollFilter) is None:
return None
if sample.headPose is None:
return None
result: FiltrationResult = rollFilter.filter(sample.headPose.roll)
sample.filters.append(result)
return result
[docs] def filterFaceGS(self, sample: FaceSample) -> Optional[FiltrationResult]:
"""
Filter sample by garbage score logic.
Args:
sample: face sample
Returns:
None if filter is None or head pose not estimated (body sample) otherwise filtration result
"""
if (gcFilter := self.content.params.filters.faceDetection.gcFilter) is None:
return None
if sample.descriptor is None:
return None
result: FiltrationResult = gcFilter.filter(sample.descriptor.garbageScore)
sample.filters.append(result)
return result
[docs] def needEstimate(self, target: LoopEstimations) -> bool:
"""need estimate target or not"""
return target in self._requiredEstimations
@abstractmethod
async def _execute(self) -> None:
"""
Execution task method.
All logic must be realized here. What need to estimated, condition for estimations and ect.
"""
[docs] @final
async def execute(self) -> "BaseTask":
"""
Execution task method
"""
self._state = TaskState.IN_PROCESS
start = perf_counter()
logger.info(f"start task {self.taskId} execution")
try:
await self._execute()
return self
except EstimatorWasNotInitializedError as e:
self.result.error = e
finally:
executionTime = perf_counter() - start
self.executionTime = executionTime
self.monitoringPoint.executionTime = executionTime
self.monitoringPoint.error = self.result.error.error if self.result.error else None
self.monitoringPoint.imageCount = len(self.content.images)
if TASK_EXECUTION_MONITORING.enable:
TASK_EXECUTION_MONITORING.append(self.monitoringPoint)
self._state = TaskState.DONE
logger.info(f"finish task {self.taskId}, execution time {executionTime}")
[docs] async def prepare(self):
"""Prepare date for estimations"""
logger.debug(f"start prepare data for the task {self.taskId}")
images = await self.engine.prepareImages(
[image for image in self.content.images],
useExifInfo=self.content.params.useExifInfo,
autoRotation=self.content.params.autoRotation,
estimateExif=self.needEstimate(LoopEstimations.exif),
)
if self.needEstimate(LoopEstimations.imageOrientation):
for image in images:
if image.orientation is None and image.error is None:
orientationEstimator = self.engine.estimators.getImageOrientationsEstimator()
orientation = await orientationEstimator.estimate(image.sdkImage)
image.orientation = orientation
for image in images:
if image.origin.imageType == ImageType.FACE_WARP:
sample = Sample(face=FaceSample(warp=Image.sdkImage))
sample.face.warp = image.sdkImage
image.samples = [sample]
elif image.origin.imageType == ImageType.BODY_WARP:
sample = Sample(body=BodySample(warp=Image.sdkImage))
sample.body.warp = image.sdkImage
image.samples = [sample]
self.result.images = images
logger.debug(f"finish prepare data for the task {self.taskId}")
return self.result.images
[docs] @classmethod
async def close(cls, closeEngine: bool = True):
"""
Grace full shut down.
Args:
closeEngine: close external engine or not
"""
if closeEngine:
await cls.engine.close()
@property
def state(self) -> TaskState:
"""Get current task state"""
return self._state