Source code for luna_handlers.sdk.sdk_loop.sdk_task

"""
Module contains sdk task
"""
import dataclasses
import io
from contextlib import contextmanager
from enum import Enum
from itertools import chain
from time import perf_counter as perfTime
from time import time
from typing import List, Union, Optional, Iterable, Dict, Any
from uuid import uuid4

from PIL import Image
from PIL.Image import Image as PilImage
from lunavl.sdk.errors.errors import ErrorInfo as SDKErrorInfo
from lunavl.sdk.estimators.face_estimators.head_pose import HeadPose
from lunavl.sdk.estimators.face_estimators.livenessv1 import LivenessPrediction, LivenessV1
from lunavl.sdk.estimators.face_estimators.mask import Mask, MaskState
from numpy import ndarray
from vlutils.structures.dataclasses import dataclass

from .crutches_on_wheels.enums.attributes import Liveness
from .crutches_on_wheels.errors.errors import ErrorInfo, Error
from .enums import Stages, MultifacePolicy
from .estimation_targets import SDKEstimationTargets
from .liveness_predictor import LivenessPredicted
from .monitoring import TaskMonitoringData, MonitoringField


[docs]class TaskDataSource(Enum): """ Enum for a task data source """ images = 1 # images faceWarps = 2 #: face warped images humanWarps = 3 #: human body warped images
[docs]class SDKTaskFilters: """ Task filter's. Attributes: yawThreshold (Optional[int]): filter by yaw head angle for detection ([0...180]) pitchThreshold (Optional[int]): filter by pitch head angle for detection ([0...180]) rollThreshold (Optional[int]): filter by roll head angle for detection ([0...180]) maskStates (Optional[List[MaskState]]): filter by medical mask states for detection livenessStates (Optional[List[LivenessPrediction]]): filter by liveness states for detection garbageScoreThreshold (Optional[float]): filter by garbage score for descriptor ([0...1]) """ __slots__ = ( "yawThreshold", "rollThreshold", "pitchThreshold", "maskStates", "livenessStates", "garbageScoreThreshold", ) def __init__( self, yawThreshold: Optional[int] = None, rollThreshold: Optional[int] = None, pitchThreshold: Optional[int] = None, livenessStates: Optional[List[LivenessPrediction]] = None, maskStates: Optional[List[MaskState]] = None, garbageScoreThreshold: Optional[float] = None, ): self.yawThreshold: Optional[int] = yawThreshold self.rollThreshold: Optional[int] = rollThreshold self.pitchThreshold: Optional[int] = pitchThreshold self.maskStates: Optional[List[MaskState]] = maskStates self.livenessStates: Optional[List[LivenessPrediction]] = livenessStates self.garbageScoreThreshold: Optional[float] = garbageScoreThreshold def __repr__(self): return "SDKTaskFilters({})".format({key: getattr(self, key) for key in self.__slots__})
[docs] def needFilterByAngles(self) -> bool: """ Need or not filter gy head angles. Returns: True if any of angle threshold is not none. """ return any((self.yawThreshold is not None, self.pitchThreshold is not None, self.rollThreshold is not None))
[docs] def checkFilterByAngles(self, headPose: HeadPose) -> dict: """ Check filter by angles. Args: headPose: estimate head pose Returns: dict in api format:: { "is_filtered": <bool>, "filter_reasons": [<reasons>] } """ res = {"is_filtered": False, "filter_reasons": []} if not self.needFilterByAngles(): return res if self.yawThreshold is not None and self.yawThreshold < abs(headPose.yaw): res["is_filtered"] = True res["filter_reasons"].append( {"filter_name": "yaw_threshold", "object_value": headPose.yaw, "threshold_value": self.yawThreshold} ) if self.pitchThreshold is not None and self.pitchThreshold < abs(headPose.pitch): res["is_filtered"] = True res["filter_reasons"].append( { "filter_name": "pitch_threshold", "object_value": headPose.pitch, "threshold_value": self.pitchThreshold, } ) if self.rollThreshold is not None and self.rollThreshold < abs(headPose.roll): res["is_filtered"] = True res["filter_reasons"].append( {"filter_name": "roll_threshold", "object_value": headPose.roll, "threshold_value": self.rollThreshold} ) return res
[docs] def checkFilterByMaskState(self, mask: Mask) -> dict: """ Check filter by medical mask state. Args: mask: medical mask estimation Returns: dict in api format:: { "is_filtered": <bool>, "filter_reasons": [<reasons>] } """ res = {"is_filtered": False, "filter_reasons": []} maskState = mask.predominateMask if self.maskStates and maskState not in self.maskStates: res["is_filtered"] = True res["filter_reasons"].append( { "filter_name": "mask_states", "object_value": maskState.value, "threshold_value": [state.value for state in self.maskStates], } ) return res
[docs] def checkFilterByLiveness(self, liveness: Union[LivenessV1, LivenessPredicted]) -> dict: """ Check filter by liveness. Args: liveness: liveness estimation Returns: dict in api format:: { "is_filtered": <bool>, "filter_reasons": [<reasons>] } """ res = {"is_filtered": False, "filter_reasons": []} if self.livenessStates and liveness.prediction not in self.livenessStates: res["is_filtered"] = True res["filter_reasons"].append( { "filter_name": "liveness_states", "object_value": Liveness[liveness.prediction.value].value, "threshold_value": [Liveness[state.value].value for state in self.livenessStates], } ) return res
[docs] def checkFilterByGS(self, gs: float) -> dict: # pylint: disable-msg=C0103 """ Check garbage score filter. Args: gs: estimated garbage score Returns: dict in api format:: { "is_filtered": <bool>, "filter_reasons": [<reasons>] } """ isFiltered = gs < self.garbageScoreThreshold res = {"is_filtered": isFiltered} if isFiltered: res["filter_reasons"] = [ {"filter_name": "score_threshold", "object_value": gs, "threshold_value": self.garbageScoreThreshold} ] return res
[docs]class WarpAttributes: """ Warp attributes. Attributes: emotions (dict): dict with emotions from lunavl mouthState (dict): dict with mouth state from lunavl warpQuality (dict): dict with quality state from lunavl mask (dict): dict with mask state from lunavl glasses (dict): dict with glasses state from lunavl """ __slots__ = ("emotions", "mouthState", "warpQuality", "mask", "glasses") def __init__(self): self.emotions: Optional[dict] = None self.mouthState: Optional[dict] = None self.warpQuality: Optional[dict] = None self.mask: Optional[dict] = None self.glasses: Optional[dict] = None
[docs] def asDict(self) -> Dict[str, dict]: """ Convert to dict without not extracted attributes. Returns: dict in api format:: { "emotions": <self.emotions>, "mouth_attributes": <self.mouthState>, "quality": <self.warpQuality> } """ res = {} if self.emotions is not None: res["emotions"] = self.emotions if self.mouthState is not None: res["mouth_attributes"] = self.mouthState # fsdk v.5.0.0.0 does not calculate mouth attributes score res["mouth_attributes"]["score"] = 1.0 if self.warpQuality is not None: res["quality"] = self.warpQuality if self.mask is not None: res["mask"] = self.mask if self.glasses is not None: res["glasses"] = self.glasses return res
[docs]class SDKWarp: """ SDK Warp. Attributes: body (Union[bytes, bytearray, PilImage]): binary body outerSampleId (Optional[str]): id of sample if warps is obtained from the sample storage filename (Optional[str]): warp source filename detectTime (Optional[str]): detection time in ISO format id (str): warp id isFiltered (bool): warp is filtered or not in a task processing (by garbage score) imageId (Optional[str]): image id which is a source for warp error (Optional[Union[ErrorInfo, SDKErrorInfo]]): error which is occurred during task processing isSaved (bool): whether sample is saved in luna-image-store imageOrigin (Optional[str]): detection image origin """ __slots__ = ( "body", "outerSampleId", "filename", "id", "isFiltered", "attributes", "imageId", "error", "isSaved", "detectTime", "imageOrigin", ) def __init__( self, body: Union[bytes, bytearray, ndarray], sampleId: Optional[str] = None, filename: Optional[str] = None, imageId: Optional[str] = None, detectTime: Optional[str] = None, imageOrigin: Optional[str] = None, ): self.body: Union[bytes, bytearray, ndarray] = body self.outerSampleId: Optional[str] = sampleId self.filename: Optional[str] = filename self.detectTime: Optional[str] = detectTime self.imageOrigin: Optional[str] = imageOrigin self.id: str = str(uuid4()) # pylint: disable-msg=C0103 self.isFiltered: bool = False self.isSaved = self.outerSampleId is not None self.imageId: Optional[str] = imageId self.error: Optional[Union[ErrorInfo, SDKErrorInfo]] = None @property def sampleId(self) -> str: """ Get sample id of warp. Returns: outerSampleId if it is not None otherwise self.id """ return self.outerSampleId if self.outerSampleId else self.id
[docs] def asBytes(self) -> bytes: """ Get warped image as bytes. Raises: RuntimeError: if body has incorrect type Returns: image as bytes """ if isinstance(self.body, ndarray): with io.BytesIO() as imageData: pillowImage = Image.fromarray(self.body) pillowImage.save(imageData, format="JPEG") return imageData.getvalue() if isinstance(self.body, (bytes, bytearray)): return bytes(self.body) raise RuntimeError(f"Bad warped image type: {type(self.body)}")
[docs] def asPillow(self) -> PilImage: """ Get image data as pillow object. Raises: RuntimeError: if body has incorrect type Returns: PIL image """ if isinstance(self.body, ndarray): return Image.fromarray(self.body) if isinstance(self.body, (bytes, bytearray)): return Image.open(io.BytesIO(bytes(self.body))) raise RuntimeError(f"Bad warped image type: {type(self.body)}")
[docs]class FaceWarp(SDKWarp): """ Face warp """
[docs]class HumanWarp(SDKWarp): """ Human warp """
[docs]class FaceAttributes: """ Face attributes: descriptor, gender, ethnicity, age. Attributes: basicAttributes (Optional[dict]): dict with basic attributes from lunavl descriptor (Optional[dict]): dict with descriptor from lunavl warps (List[str]): list of warps filtered (bool): attributes is filtered or not in a task processing (by garbage score) attributeId (Optional[str]): attribute id """ __slots__ = ("basicAttributes", "descriptor", "warps", "filtered", "attributeId") def __init__( self, warps: List[SDKWarp], basicAttributes: Optional[dict] = None, descriptor: Optional[dict] = None, attributeId: Optional[str] = None, ): self.basicAttributes: Optional[dict] = basicAttributes self.descriptor: Optional[dict] = descriptor self.warps: List[SDKWarp] = warps self.filtered: bool = False self.attributeId: str = attributeId
[docs]class HumanAttributes: """ Face attributes: descriptor, gender, ethnicity, age. Attributes: descriptor (Optional[dict]): dict with descriptor from lunavl warps (List[HumanWarp]): warp list filtered (bool): attributes is filtered or not in a task processing (by garbage score) attributeId (Optional[str]): attribute id """ __slots__ = ("descriptor", "warps", "filtered", "attributeId") def __init__( self, warps: List[HumanWarp], descriptor: Optional[dict] = None, attributeId: Optional[str] = None, ): self.descriptor: dict = descriptor self.warps: List[HumanWarp] = warps self.filtered: bool = False self.attributeId: str = attributeId
[docs]class BoundingBox: """ Face bounding box. Attributes: width (float): width height (float): height x (float): x coordinate of top left angle y (float): y coordinate of top left angle """ __slots__ = ("x", "y", "width", "height") def __init__(self, width: int, height: int, x: int, y: int): self.x: float = float(x) # pylint: disable-msg=C0103 self.y: float = float(y) # pylint: disable-msg=C0103 self.width: float = float(width) self.height: float = float(height)
[docs]class SDKDetectableImage: """ Image for detect faces. Attributes: id (str): image id body (Union[bytes, bytearray, PilImage]): body faceBoundingBoxes (Optional[List[BoundingBox]]): list face detections humanBoundingBoxes (Optional[List[BoundingBox]]): list human body detections filename (Optional[str]): image filename (meta data) error (Optional[SDKErrorInfo]): error which is occurred during task processing url (Optional[str]): image source detectTime (Optional[str]): detection time in ISO format imageOrigin (Optional[str]): detection image origin """ __slots__ = ( "id", "body", "faceBoundingBoxes", "humanBoundingBoxes", "filename", "error", "detectTime", "url", "imageOrigin", ) def __init__( self, body: Union[bytes, bytearray, PilImage], filename: Optional[str] = None, faceBoundingBoxes: Optional[List[BoundingBox]] = None, humanBoundingBoxes: Optional[List[BoundingBox]] = None, url: Optional[str] = None, detectTime: Optional[str] = None, imageOrigin: Optional[str] = None, ): self.id: str = str(uuid4()) # pylint: disable-msg=C0103 self.body: bytes = self.asBytes(body) self.faceBoundingBoxes: Optional[List[BoundingBox]] = faceBoundingBoxes self.humanBoundingBoxes: Optional[List[BoundingBox]] = humanBoundingBoxes self.filename: Optional[str] = filename self.detectTime: Optional[str] = detectTime self.error: Optional[SDKErrorInfo] = None self.url: Optional[str] = url self.imageOrigin: Optional[str] = imageOrigin
[docs] @staticmethod def asBytes( imageData: Union[bytes, bytearray, PilImage], imageFormat: Optional[str] = None, exif: Optional[bytes] = None ) -> bytes: """ Get image data as bytes. Convert Pil image to bytes if needed. Args: imageData: pillow object imageFormat: image format (determined from the filename extension) Returns: image data as bytes """ if isinstance(imageData, PilImage): with io.BytesIO() as outputData: if exif is not None: imageData.save(outputData, format=imageFormat or imageData.format, exif=exif) else: imageData.save(outputData, format=imageFormat or imageData.format) return outputData.getvalue() return bytes(imageData)
[docs] def asPillow(self) -> Optional[PilImage]: """ Get image data as pillow object. Returns: PIL image """ try: return Image.open(io.BytesIO(self.body)) except OSError: self.error = Error.InvalidType.format("Unsupported type") return None
[docs] def exifTranspose(self) -> None: """ Based on genuine function from pillow https://pillow.readthedocs.io/en/latest/_modules/PIL/ImageOps.html#exif_transpose The only difference is that manipulation is being done inplace, allowing us to leverage internal caching of exif data. NOTE: Transposed image might have incorrect values in particular tags. For example, orientation, length, width. """ pilImage = self.asPillow() if pilImage: exif = pilImage.getexif() orientation = exif.get(0x0112) method = { 2: Image.FLIP_LEFT_RIGHT, 3: Image.ROTATE_180, 4: Image.FLIP_TOP_BOTTOM, 5: Image.TRANSPOSE, 6: Image.ROTATE_270, 7: Image.TRANSVERSE, 8: Image.ROTATE_90, }.get(orientation) if method is not None: transposedImage = pilImage.transpose(method) self.body = self.asBytes(transposedImage, imageFormat=pilImage.format, exif=exif)
[docs]@dataclass(withSlots=True) class BaseEstimation: """ Base container for estimation result """ # (SDKWarp): estimation warp warp: SDKWarp # (Optional[dict]): detection bounding rectangle rect: Optional[dict] = None # (Optional[dict]): estimated attributes attributes: Optional[dict] = None # (Optional[Union[HumanAttributes, FaceAttributes]]): extracted attributes extractedAttributes: Optional[Union[HumanAttributes, FaceAttributes]] = None # (Optional[dict]): image visual properties estimation quality: Optional[dict] = None # (Optional[dict]): estimation filter filter: Optional[dict] = None
[docs]@dataclass(withSlots=True) class HumanEstimation(BaseEstimation): """ Container for human body estimation result """ # (Optional[dict]): array of 17 body detection landmarks landmarks17: Optional[dict] = None # (Optional[float]): estimation score score: Optional[float] = None
[docs]@dataclass(withSlots=True) class FaceEstimation(BaseEstimation): """ Container for face estimation result """ # (Optional[dict]): array of 5 face detection landmarks landmarks5: Optional[dict] = None # (Optional[dict]): array of 68 face detection landmarks landmarks68: Optional[dict] = None
[docs]@dataclass(withSlots=True) class SDKEstimation: """ Container for sdk estimation result. Attributes: id: unique estimation ID """ # (str): unique estimation ID id: str = None # pylint: disable-msg=C0103 # (Optional[HumanEstimation]): human body estimation body: Optional[HumanEstimation] = None # (Optional[FaceEstimation]): face estimation face: Optional[FaceEstimation] = None def __post_init__(self): """ Init container with unique id. """ self.id = str(uuid4()) @property def errors(self) -> List[ErrorInfo]: """ Estimation errors Returns: list of all estimation errors """ return [estimation.warp.error for estimation in (self.face, self.body) if estimation and estimation.warp.error]
[docs]class ImageEstimation: """ Container for image estimation result. Attributes: image: source image filename: source image filename estimations: image estimations """ def __init__(self, image: Union[SDKDetectableImage, HumanWarp, FaceWarp]): """ Init container with source image. Args: image: source image """ if isinstance(image, SDKDetectableImage): self.estimations: List[SDKEstimation] = [] elif isinstance(image, FaceWarp): self.estimations = [SDKEstimation(face=FaceEstimation(warp=image))] elif isinstance(image, HumanWarp): self.estimations = [SDKEstimation(body=HumanEstimation(warp=image))] else: raise RuntimeError("Unsupported source image type.") self.image = image self.filename = self.image.filename @property def errors(self) -> List[ErrorInfo]: """ Estimation errors. Returns: list of all estimation errors """ errors = [] if self.image.error: errors.append(self.image.error) errors.extend(chain(*(estimation.errors for estimation in self.estimations))) return errors @property def error(self) -> Optional[ErrorInfo]: """ Estimation error. Returns: first estimation error if any errors occur, otherwise None """ return self.errors[0] if self.errors else None
[docs]@dataclass(withSlots=True) class AggregatedDetectionFaceAttributes: """ Container for aggregated face detection result """ # (List[SDKWarp]): list of warps on which aggregation is performed warps: List[SDKWarp] # (dict): dict with liveness prediction and estimation data liveness: dict # (bool): attributes is filtered or not in a task processing (by garbage score) filtered: bool
[docs]@dataclass(withSlots=True) class AggregatedDetectionHumanAttributes: """ Container for aggregated human detection result """
[docs]@dataclass(withSlots=True) class AggregatedFaceWarpAttributes: """ Container for the aggregated result of face warp attributes """ # (Optional[Dict[str, Any]]): dict with predominant emotion and estimation data emotions: Optional[Dict[str, Any]] = None # (Optional[Dict[str, Any]]): dict with predominant mask and estimation data mask: Optional[Dict[str, Any]] = None
[docs]@dataclass(withSlots=True) class AggregatedHumanWarpAttributes: """ Container for the aggregated result of human warp attributes """
[docs]@dataclass(withSlots=True) class AggregatedDetectionEstimations: """ Container for aggregated detection result """ # (Optional[FaceAttributes]): face aggregated attributes face: Optional[AggregatedDetectionFaceAttributes] = None # (Optional[HumanAttributes]): body aggregated attributes body: Optional[AggregatedDetectionHumanAttributes] = None
[docs]@dataclass(withSlots=True) class AggregatedExtractionEstimations: """ Container for aggregated extraction result """ # (Optional[FaceAttributes]): face aggregated attributes face: Optional[FaceAttributes] = None # (Optional[HumanAttributes]): body aggregated attributes body: Optional[HumanAttributes] = None
[docs]@dataclass(withSlots=True) class AggregatedWarpEstimations: """ Container for aggregated estimation result """ # (Optional[AggregatedFaceWarpAttributes]): face aggregated attributes face: Optional[AggregatedFaceWarpAttributes] = None # (Optional[AggregatedHumanWarpAttributes]): body aggregated attributes body: Optional[AggregatedHumanWarpAttributes] = None
[docs]@dataclass(withSlots=True) class AggregatedEstimations: """ Container for aggregated estimation result """ # (AggregatedDetectionEstimations): aggregated estimations of detector stage detection: AggregatedDetectionEstimations = dataclasses.field(default_factory=AggregatedDetectionEstimations) # (AggregatedExtractionEstimations): aggregated estimations of extractor stage extraction: AggregatedExtractionEstimations = dataclasses.field(default_factory=AggregatedExtractionEstimations) # (AggregatedWarpEstimations): aggregated estimations of warp estimator stage estimation: AggregatedWarpEstimations = dataclasses.field(default_factory=AggregatedWarpEstimations)
[docs]@dataclass(withSlots=True) class FilteredEstimation: """ Container for filtered image estimation result """ # (str): image filename filename: str # (Union[HumanEstimation, FaceEstimation]): image estimations estimation: Union[HumanEstimation, FaceEstimation] @property def filterReasons(self): """ Estimation filter reason. """ return self.estimation.filter["filter_reasons"]
[docs]class SDKTask: """ Task for execution in the sdk loop. Attributes: createTime (float): task create time, timestamp taskId (int): unique task id source (TaskDataSource): source of data for task toEstimation (SDKEstimationTargets): set of estimations for task aggregateAttributes (bool): aggregate face attributes to one from input data or not _pipeline (List[Stages]): list of stages for task filters (Optional[SDKTaskFilters]): filters for processing estimation values error (Optional[SDKErrorInfo]): error which is occurred during task processing multifacePolicy (MultifacePolicy): multiple face detection policy monitoringData (TaskMonitoringData): task processing monitoring data startExecutionTime: start execution time lastCheckpointTime: last checkpoint time images (List[ImageEstimation]): task image processing results aggregatedEstimations (AggregatedEstimations): task aggregated estimations filteredEstimations(List[FilteredEstimation]): task filtered estimations Raises: RuntimeError: if input data is not correct """ #: generator for task id _taskCounter = 0 __slots__ = ( "createTime", "taskId", "source", "toEstimation", "aggregateAttributes", "_pipeline", "isFiltered", "filters", "multifacePolicy", "error", "monitoringData", "startExecutionTime", "lastCheckpointTime", "images", "aggregatedEstimations", "filteredEstimations", ) def __init__( self, toEstimation: SDKEstimationTargets, data: Union[List[SDKDetectableImage], List[HumanWarp], List[FaceWarp]], filters: Optional[SDKTaskFilters] = None, aggregateAttributes: bool = False, multifacePolicy: MultifacePolicy = MultifacePolicy.allowed, ): self.createTime = time() SDKTask._taskCounter += 1 self.taskId = SDKTask._taskCounter self.images: List[ImageEstimation] = [] self.aggregatedEstimations: AggregatedEstimations = AggregatedEstimations() self.filteredEstimations: List[FilteredEstimation] = [] if not data: raise RuntimeError("Empty source for sdk task") if isinstance(data[0], SDKDetectableImage): self.source: TaskDataSource = TaskDataSource.images elif isinstance(data[0], FaceWarp): self.source = TaskDataSource.faceWarps elif isinstance(data[0], HumanWarp): self.source = TaskDataSource.humanWarps else: raise RuntimeError("sdk task does not assume simultaneously images and warps source of task") self.images = [ImageEstimation(image) for image in data] self.filters: SDKTaskFilters = filters or SDKTaskFilters() if self.filters.needFilterByAngles(): toEstimation.faceEstimationTargets.estimateHeadPose = 1 if self.filters.livenessStates: toEstimation.faceEstimationTargets.estimateLiveness.estimate = 1 if self.filters.maskStates: toEstimation.faceEstimationTargets.estimateMask = 1 self.toEstimation: SDKEstimationTargets = toEstimation self.aggregateAttributes: bool = aggregateAttributes self._pipeline: List[Stages] = self.getPipeline() self.multifacePolicy: MultifacePolicy = multifacePolicy self.error: Union[ErrorInfo, SDKErrorInfo, None] = None self.monitoringData: TaskMonitoringData = TaskMonitoringData() self.startExecutionTime = None self.lastCheckpointTime = None @property def pipeline(self) -> List[Stages]: """ Getter for pipe line. Returns: order list of stages """ return self._pipeline
[docs] def checkMultiFacesRule(self): """ Check task multiple face detection. Returns: True if multiple faces or humans not detected else False """ for image in self.images: faceDetections = [estimation.face for estimation in image.estimations if estimation.face] if len(faceDetections) > 1: bBoxes = [detection.rect for detection in faceDetections] self.error = Error.MultipleFaces.format(image.filename, bBoxes) return False return True
@property def faceWarps(self) -> List[FaceWarp]: """ Task face warps. """ faceWarps = [] for image in self.images: for estimation in image.estimations: if estimation.face: faceWarps.append(estimation.face.warp) return faceWarps @property def humanWarps(self) -> List[HumanWarp]: """ Task human body warps. """ humanWarps = [] for image in self.images: for estimation in image.estimations: if estimation.body: humanWarps.append(estimation.body.warp) return humanWarps
[docs] def getNextStage(self, currentStage: Stages) -> Union[Stages, None]: """ Get next stage of the task. Args: currentStage: current stage Returns: next stage or None (task need put to results queue) """ if all((image.errors for image in self.images)): return None currentStageIndex = self.pipeline.index(currentStage) nextStageIndex = currentStageIndex + 1 if len(self.pipeline) == nextStageIndex: return None nextStage = self.pipeline[nextStageIndex] if nextStage == Stages.faceExtractor: if self.aggregateAttributes: if not self.checkMultiFacesRule(): return None if all((sample.isFiltered for sample in self.faceWarps)): nextStageIndex += 1 if len(self.pipeline) == nextStageIndex: return None nextStage = self.pipeline[nextStageIndex] if nextStage == Stages.humanExtractor: if all((sample.isFiltered for sample in self.humanWarps)): return None return nextStage
[docs] def getObjectCountForStage(self, stage: Stages) -> int: """ Get count objects (images or warps) for task processing for stage. Args: stage: stage Returns: count images if stage is detector otherwise warps count """ return len(self.images) if stage == Stages.faceDetector else len(self.faceWarps)
[docs] def getPipeline(self) -> List[Stages]: """ Get task pipeline based on estimations. Returns: order list of stages list """ pipeline = [] if self.toEstimation.estimateFace and (self.source != TaskDataSource.humanWarps): faceEstimationTargets = self.toEstimation.faceEstimationTargets if self.source == TaskDataSource.images: pipeline.append(Stages.faceDetector) if any( ( faceEstimationTargets.estimateQuality, faceEstimationTargets.estimateEmotions, faceEstimationTargets.estimateMouthAttributes, faceEstimationTargets.estimateMask, faceEstimationTargets.estimateGlasses, ) ): pipeline.append(Stages.faceEstimator) if any((faceEstimationTargets.estimateBasicAttributes, faceEstimationTargets.estimateFaceDescriptor)): pipeline.append(Stages.faceExtractor) if self.toEstimation.estimateHuman and (self.source != TaskDataSource.faceWarps): humanEstimationTargets = self.toEstimation.humanEstimationTargets if self.source == TaskDataSource.images: pipeline.append(Stages.humanDetector) if humanEstimationTargets.estimateHumanDescriptor: pipeline.append(Stages.humanExtractor) return pipeline
[docs] def updateEstimationsWithExtractionResult(self, subtask: "BaseExtractorSubTask"): """ Update task estimations with attribute extraction result. Args: subtask: extractor subtask """ for image in self.images: for estimation in image.estimations: if estimation.id == subtask.estimationId: if isinstance(subtask.attributes, HumanAttributes): estimation.body.extractedAttributes = subtask.attributes elif isinstance(subtask.attributes, FaceAttributes): estimation.face.extractedAttributes = subtask.attributes else: raise RuntimeError("Unsupported extracted attribute type.") return
[docs]@contextmanager def tasksTimeMonitoring(fieldName, tasks: Iterable[SDKTask]): """ Context manager for updating execution time of tasks. Args: fieldName: monitoring field name tasks: tasks """ start = perfTime() yield executionTime = perfTime() - start for task in tasks: field: MonitoringField = getattr(task.monitoringData, fieldName) field.value = executionTime