Source code for luna_handlers.sdk.sdk_loop.task

"""Module realize Handlers task. This task logic based on luna-handlers service logic."""
import asyncio
from typing import Optional, Type, Union

from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.body_attributes import BodyAttributesEstimator

from .enums import LoopEstimations, MultifacePolicy
from .errors import errors
from .estimators.face_estimators.liveness1_estimator import aggregateLivenessFromEstimates
from .models.body_sample import AggregatedBodySample, BodySample
from .models.face_sample import AggregatedEmotions, AggregatedFaceSample, AggregatedMask, FaceSample
from .models.filtration.base import FiltrationResult
from .models.filtration.face_filters import FaceGCFilter, LivenessFilter
from .models.image import Image, ImageType
from .models.sample import AggregatedSample
from .tasks.task import BaseTask, getNotFilteredSamples
from .utils.detection_comporators import getBestDetection


[docs]def generateMultiEntitiesError( samples: Union[list[BodySample], list[FaceSample]], image: Image, error: Union[Type[errors.MultipleFaces], Type[errors.MultipleBodies]], ) -> errors.LoopError: """Generete error for several detections on image""" bBoxes = [sample.detection.boundingBox.rect.asDict() if sample.detection else {} for sample in samples] return error.format(image.origin.filename, bBoxes)
[docs]class HandlersTask(BaseTask): """ Handler Task, correlate with luna-handlers APi logic (multiface policy, filtration logic, ...) """
[docs] def executeAggregatedMultiFacePolicy(self, images: list[Image]): """ Execute multiface policy logic for task with aggregation. Args: images: list images """ multifacePolicy = self.content.params.multifacePolicy for image in images: if image.error or not image.samples: continue if multifacePolicy == MultifacePolicy.getBest: best = getBestDetection(image.samples) if best.face is None: # all samples have not faces image.samples = [] else: image.samples = [best] else: # aggregation supposes one detection per image samplesWithFaces = [sample.face for sample in image.samples if sample.face is not None] if len(samplesWithFaces) > 1: self.result.error = generateMultiEntitiesError(samplesWithFaces, image, errors.MultipleFaces) samplesWithBodies = [sample.body for sample in image.samples if sample.body is not None] # face multiface error has a privilege (luna-handlers) if len(samplesWithBodies) > 1 and not self.result.error: self.result.error = generateMultiEntitiesError(samplesWithBodies, image, errors.MultipleBodies) return images
[docs] def executeMultiFacePolicy(self, image: Image): """ Execute multiface policy logic. Args: image: image """ if image.samples: multifacePolicy = self.content.params.multifacePolicy if multifacePolicy == MultifacePolicy.getBest: best = getBestDetection(image.samples) if best.face is None: # all samples have not faces image.samples = [] else: image.samples = [best] elif multifacePolicy == MultifacePolicy.notAllowed: samplesWithFaces = [sample for sample in image.samples if sample.face is not None] if len(samplesWithFaces) > 1: faceSamples = [sample.face for sample in samplesWithFaces] image.error = generateMultiEntitiesError(faceSamples, image, errors.MultipleFaces) else: # all samples have not faces, remove all samples without face image.samples = samplesWithFaces
[docs] async def estimateFaceWarpAttributes(self, faceSamples: list[FaceSample]): """Parallel estimate face warp attributes""" estimations = [] if self.needEstimate(LoopEstimations.fisheye): estimations.append(self.engine.estimateFishEyeB(faceSamples)) if self.needEstimate(LoopEstimations.mask): estimations.append(self.engine.estimateMaskB(faceSamples)) if self.needEstimate(LoopEstimations.glasses): estimations.append(self.engine.estimateGlassesB(faceSamples)) if self.needEstimate(LoopEstimations.mouthAttributes): estimations.append(self.engine.estimateMouthB(faceSamples)) if self.needEstimate(LoopEstimations.emotions): estimations.append(self.engine.estimateEmotionsB(faceSamples)) if self.needEstimate(LoopEstimations.faceWarpQuality): estimations.append(self.engine.estimateFaceWarpQualityB(faceSamples)) if self.needEstimate(LoopEstimations.headwear): estimations.append(self.engine.estimateHeadwearB(faceSamples)) if self.needEstimate(LoopEstimations.faceNaturalLight): estimations.append(self.engine.estimateFaceNaturalLightB(faceSamples)) if self.needEstimate(LoopEstimations.eyebrowExpression): estimations.append(self.engine.estimateEyebrowExpressionB(faceSamples)) if self.needEstimate(LoopEstimations.imageColorType): estimations.append(self.engine.estimateImageColorTypeB(faceSamples)) if estimations: await asyncio.gather(*estimations)
[docs] async def estimateFaceDetectionAttributes(self, faceSamplesOnImage: list[FaceSample]): """ Estimate face detection attributes Args: faceSamplesOnImage: face samples with detections """ estimations = [] if self.needEstimate(LoopEstimations.headPose): estimations.append(self.engine.estimateHeadPoseB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.livenessV1): scoreThreshold = self.content.params.estimatorsParams.livenessv1.scoreThreshold qualityThreshold = self.content.params.estimatorsParams.livenessv1.qualityThreshold estimations.append( self.engine.estimateLivenessV1B( faceSamplesOnImage, scoreThreshold=scoreThreshold, qualityThreshold=qualityThreshold ) ) if self.needEstimate(LoopEstimations.eyes): estimations.append(self.engine.estimateEyesB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.gaze): estimations.append(self.engine.estimateGazeB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.redEyes): estimations.append(self.engine.estimateRedEyesB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.dynamicRange): estimations.append(self.engine.estimateDynamicRangeB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.portraitStyle): estimations.append(self.engine.estimatePortraitStyleB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.faceDetectionBackground): estimations.append(self.engine.estimateFaceDetectionBackgroundB(faceSamplesOnImage)) if self.needEstimate(LoopEstimations.faceLandmarks68): estimations.append(self.engine.estimateLandmarks68B(faceSamplesOnImage)) await asyncio.gather(*estimations)
[docs] async def estimateBodyWarpAttributes(self, bodySamplesOnImage: list[BodySample]): """ Estimate body detection attributes Args: bodySamplesOnImage: body samples with detections """ estimations = [] if self.needEstimate(LoopEstimations.bodyAttributes): estimations.append(self.engine.estimateBodyAttributesB(bodySamplesOnImage)) await asyncio.gather(*estimations)
[docs] async def estimateBodyAttributesFromImage(self, bodySamples: list[BodySample]) -> list[BodySample]: """ Run body attributes estimations on samples which generated from detection. Attributes: bodySamples: face samples with detections Returns: same body samples with estimation inside """ if not bodySamples: return bodySamples if self.needEstimate(LoopEstimations.bodyWarp): await self.engine.warpBodySampleB(bodySamples) await self.estimateBodyWarpAttributes(bodySamples) return bodySamples
[docs] async def estimateFaceAttributesFromImage(self, faceSamples: list[FaceSample]) -> list[FaceSample]: """ Run face attributes estimations on samples which generated from detection. Attributes: faceSamples: face samples with detections Returns: same face samples with estimation inside """ if not faceSamples: return faceSamples if self.needEstimate(LoopEstimations.faceLandmarks5): await self.engine.estimateLandmarks5B(faceSamples) if self.needEstimate(LoopEstimations.faceWarp): await self.engine.warpFaceSampleB(faceSamples) await asyncio.gather( self.estimateFaceDetectionAttributes(faceSamples), self.estimateFaceWarpAttributes(faceSamples) ) return faceSamples
[docs] async def estimateDetectionAttributes(self, image: Image): """ Estimate detection attributes All without face descriptotr, basic attributes, /detector of luna-handlers logic """ targets = self.content.params.requiredEstimations estimations = [] if image.origin.imageType == ImageType.IMAGE: await self.engine.detectSamples( image, detectFaces=LoopEstimations.faceDetection in targets, detectBodies=LoopEstimations.bodyDetection in targets, detect17Landmarks=LoopEstimations.bodyLandmarks17 in targets, ) faceSamples = [sample.face for sample in image.samples if sample.face] bodySamples = [sample.body for sample in image.samples if sample.body] bodyEstimationCoro = self.estimateBodyAttributesFromImage(bodySamples) faceEstimationCoro = self.estimateFaceAttributesFromImage(faceSamples) estimations = [bodyEstimationCoro, faceEstimationCoro] elif image.origin.imageType == ImageType.FACE_WARP: estimations.append(self.estimateFaceWarpAttributes([image.samples[0].face])) else: estimations.append(self.estimateBodyWarpAttributes([image.samples[0].body])) if estimations: await asyncio.gather(*estimations) for sample in image.samples: faceSample = sample.face if faceSample is None: continue self.filterSampleByMask(faceSample) self.filterByHeadPoseYaw(faceSample) self.filterByHeadPoseRoll(faceSample) self.filterByHeadPosePitch(faceSample)
[docs] async def estimatePeople(self, image: Image): """Estimate people""" if image.origin.imageType == ImageType.IMAGE: if self.needEstimate(LoopEstimations.peopleCount): await self.engine.estimatePeopleCount(image)
async def estimateHuman(self, image: Image): # detection attributes taskParams = self.content.params await self.estimateDetectionAttributes(image) for sample in image.samples: faceSample = sample.face if faceSample is None: break self.filterSampleByLiveness(faceSample) self.executeMultiFacePolicy(image) faceSamples = [sample.face for sample in image.samples if sample.face and not sample.face.filters.isFiltered()] estimations = [] # other attributes, after filtration if self.needEstimate(LoopEstimations.faceDescriptor): await self.engine.estimateFaceDescriptorB( faceSamples, version=taskParams.estimatorsParams.faceDescriptorVersion ) for sample in image.samples: if faceSample := sample.face: self.filterFaceGS(faceSample) if self.needEstimate(LoopEstimations.basicAttributes): faceSamples = [ sample.face for sample in image.samples if sample.face and not sample.face.filters.isFiltered() ] estimations.append(self.engine.estimateBasicAttributesB(faceSamples)) if self.needEstimate(LoopEstimations.bodyDescriptor): bodySamples = [ sample.body for sample in image.samples if sample.body and not sample.body.filters.isFiltered() ] estimations.append( self.engine.estimateBodyDescriptorB( bodySamples, version=taskParams.estimatorsParams.bodyDescriptorVersion ) ) await asyncio.gather(*estimations)
[docs] async def executeOnImage(self, image: Image): """Execute task on one image (without aggregation)""" try: await asyncio.gather(self.estimateHuman(image), self.estimatePeople(image)) except LunaSDKException as e: image.error = errors.SDKError(e.error) return
[docs] def aggregateFaceAttributes(self, aggregatedSample: AggregatedSample): """Aggregate face attributes""" if not (samples := aggregatedSample.face.samples): return if self.needEstimate(LoopEstimations.emotions): emotions = AggregatedEmotions([sample.emotions for sample in samples]) aggregatedSample.face.emotions = emotions if self.needEstimate(LoopEstimations.mask): mask = AggregatedMask([sample.mask for sample in samples]) aggregatedSample.face.mask = mask
[docs] async def estimateAggregatedFaceDescriptor( self, aggregatedSample: AggregatedFaceSample, version, gcFilter: Optional[FaceGCFilter] ) -> bool: """ Estimate aggregated face descriptor Args: aggregatedSample: aggregated face sample version: descriptor version gcFilter: optional gc filter from task Returns: true: if sample is not filtered (after filtration) false: if sample is filtered, all source samples marks as filtered """ if self.needEstimate(LoopEstimations.faceDescriptor) and aggregatedSample.samples: await self.engine.estimateAggregatedFaceDescriptor(aggregatedSample, version=version) if gcFilter is not None: result = gcFilter.filter(aggregatedSample.descriptor.garbageScore) aggregatedSample.filters.append(result) for sample in aggregatedSample.samples: sample.filters.append(result) if result.result: return False return True
[docs] def estimateAggregatedLivenessV1( self, faceSamples: list[FaceSample], aggregatedSample: AggregatedFaceSample, qualityThreshold: float, scoreThreshold: float, livenessFilter: Optional[LivenessFilter], ) -> bool: """ Estimate aggregated livenessV1. Args: faceSamples: source face samples aggregatedSample: aggregated sample qualityThreshold: quality threshold scoreThreshold: score threshold livenessFilter: liveness filter Returns: true: if sample is not filtered (after filtration) false: if sample is filtered, all source samples marks as filtered """ livenessEstimations = [sample.livenessV1 for sample in faceSamples if sample.livenessV1] if livenessEstimations and self.needEstimate(LoopEstimations.livenessV1): aggregatedLiveness = aggregateLivenessFromEstimates( livenessEstimations, qualityThreshold=qualityThreshold, scoreThreshold=scoreThreshold, ) aggregatedSample.liveness = aggregatedLiveness # filtration by liveness handlers logic if livenessFilter is not None: result: FiltrationResult = livenessFilter.filter(aggregatedLiveness.prediction) aggregatedSample.filters.append(result) for sample in faceSamples: sample.filters.append(result) if result.result: # not estimate anything further return False return True
[docs] def aggregateBodyAttributes(self, aggregatedSample: AggregatedSample): """Aggregate body attributes""" if not (samples := aggregatedSample.body.samples): return if self.needEstimate(LoopEstimations.bodyAttributes): estimator: BodyAttributesEstimator = ( self.engine.estimators.getBodyAttributesEstimator()._workers[0]._estimator ) aggregatedSample.body.attributes = estimator.aggregate([sample.attributes for sample in samples])
[docs] async def executeAggregatedTask(self, images: list[Image]): """Execute task with aggregation""" try: taskParams = self.content.params estimationOnImage = [] for image in images: estimationOnImage.append(self.estimateDetectionAttributes(image)) if self.needEstimate(LoopEstimations.exif) and image.exif is None: estimationOnImage.append(self.engine.extractExif(image)) estimationOnImage.append(self.estimatePeople(image)) await asyncio.gather(*estimationOnImage) self.executeAggregatedMultiFacePolicy(images) faceSamples, bodySamples = getNotFilteredSamples(images) # filter by aggregated liveness + liveness aggregation aggregatedSample = AggregatedSample( face=AggregatedFaceSample(samples=faceSamples), body=AggregatedBodySample(samples=bodySamples), ) self.result.aggregatedSample = aggregatedSample self.estimateAggregatedLivenessV1( faceSamples, aggregatedSample.face, qualityThreshold=taskParams.estimatorsParams.livenessv1.qualityThreshold, scoreThreshold=taskParams.estimatorsParams.livenessv1.scoreThreshold, livenessFilter=taskParams.filters.faceDetection.livenessFilter, ) self.aggregateFaceAttributes(aggregatedSample) self.aggregateBodyAttributes(aggregatedSample) isFiltered = await self.estimateAggregatedFaceDescriptor( aggregatedSample.face, version=taskParams.estimatorsParams.faceDescriptorVersion, gcFilter=taskParams.filters.faceDetection.gcFilter, ) if not isFiltered: # aggregated sample is filtered return estimations = [] if self.needEstimate(LoopEstimations.basicAttributes) and aggregatedSample.face.samples: estimations.append(self.engine.estimateAggregatedBasicAttributes(aggregatedSample.face)) if self.needEstimate(LoopEstimations.bodyDescriptor) and aggregatedSample.body.samples: estimations.append( self.engine.estimateAggregatedBodyDescriptor( aggregatedSample.body, version=taskParams.estimatorsParams.bodyDescriptorVersion ) ) await asyncio.gather(*estimations) except LunaSDKException as e: error = errors.SDKError(e.error) self.result.error = error for image in images: image.error = error
async def _execute(self): """ Execute task. Warnings: - If a target can not be estimated (image is broken or image type does not support this estimation) the executor will skip it. """ # prepare images and samples await self.prepare() validImages = [image for image in self.result.images if not image.error] if self.content.params.aggregate: # parallel image processing on independent images await self.executeAggregatedTask(validImages) else: # parallel image processing on independent images estimations = [] for image in validImages: estimations.append(self.executeOnImage(image)) await asyncio.gather(*estimations) return self