"""Module realize Handlers task. This task logic based on luna-handlers service logic."""
import asyncio
from typing import Optional, Type, Union
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.body_attributes import BodyAttributesEstimator
from .enums import LoopEstimations, MultifacePolicy
from .errors import errors
from .estimators.face_estimators.liveness1_estimator import aggregateLivenessFromEstimates
from .models.body_sample import AggregatedBodySample, BodySample
from .models.face_sample import AggregatedEmotions, AggregatedFaceSample, AggregatedMask, FaceSample
from .models.filtration.base import FiltrationResult
from .models.filtration.face_filters import FaceGCFilter, LivenessFilter
from .models.image import Image, ImageType
from .models.sample import AggregatedSample
from .tasks.task import BaseTask, getNotFilteredSamples
from .utils.detection_comporators import getBestDetection
[docs]def generateMultiEntitiesError(
samples: Union[list[BodySample], list[FaceSample]],
image: Image,
error: Union[Type[errors.MultipleFaces], Type[errors.MultipleBodies]],
) -> errors.LoopError:
"""Generete error for several detections on image"""
bBoxes = [sample.detection.boundingBox.rect.asDict() if sample.detection else {} for sample in samples]
return error.format(image.origin.filename, bBoxes)
[docs]class HandlersTask(BaseTask):
"""
Handler Task, correlate with luna-handlers APi logic (multiface policy, filtration logic, ...)
"""
[docs] def executeAggregatedMultiFacePolicy(self, images: list[Image]):
"""
Execute multiface policy logic for task with aggregation.
Args:
images: list images
"""
multifacePolicy = self.content.params.multifacePolicy
for image in images:
if image.error or not image.samples:
continue
if multifacePolicy == MultifacePolicy.getBest:
best = getBestDetection(image.samples)
if best.face is None:
# all samples have not faces
image.samples = []
else:
image.samples = [best]
else:
# aggregation supposes one detection per image
samplesWithFaces = [sample.face for sample in image.samples if sample.face is not None]
if len(samplesWithFaces) > 1:
self.result.error = generateMultiEntitiesError(samplesWithFaces, image, errors.MultipleFaces)
samplesWithBodies = [sample.body for sample in image.samples if sample.body is not None]
# face multiface error has a privilege (luna-handlers)
if len(samplesWithBodies) > 1 and not self.result.error:
self.result.error = generateMultiEntitiesError(samplesWithBodies, image, errors.MultipleBodies)
return images
[docs] def executeMultiFacePolicy(self, image: Image):
"""
Execute multiface policy logic.
Args:
image: image
"""
if image.samples:
multifacePolicy = self.content.params.multifacePolicy
if multifacePolicy == MultifacePolicy.getBest:
best = getBestDetection(image.samples)
if best.face is None:
# all samples have not faces
image.samples = []
else:
image.samples = [best]
elif multifacePolicy == MultifacePolicy.notAllowed:
samplesWithFaces = [sample for sample in image.samples if sample.face is not None]
if len(samplesWithFaces) > 1:
faceSamples = [sample.face for sample in samplesWithFaces]
image.error = generateMultiEntitiesError(faceSamples, image, errors.MultipleFaces)
else:
# all samples have not faces, remove all samples without face
image.samples = samplesWithFaces
[docs] async def estimateFaceWarpAttributes(self, faceSamples: list[FaceSample]):
"""Parallel estimate face warp attributes"""
estimations = []
if self.needEstimate(LoopEstimations.fisheye):
estimations.append(self.engine.estimateFishEyeB(faceSamples))
if self.needEstimate(LoopEstimations.mask):
estimations.append(self.engine.estimateMaskB(faceSamples))
if self.needEstimate(LoopEstimations.glasses):
estimations.append(self.engine.estimateGlassesB(faceSamples))
if self.needEstimate(LoopEstimations.mouthAttributes):
estimations.append(self.engine.estimateMouthB(faceSamples))
if self.needEstimate(LoopEstimations.emotions):
estimations.append(self.engine.estimateEmotionsB(faceSamples))
if self.needEstimate(LoopEstimations.faceWarpQuality):
estimations.append(self.engine.estimateFaceWarpQualityB(faceSamples))
if self.needEstimate(LoopEstimations.headwear):
estimations.append(self.engine.estimateHeadwearB(faceSamples))
if self.needEstimate(LoopEstimations.faceNaturalLight):
estimations.append(self.engine.estimateFaceNaturalLightB(faceSamples))
if self.needEstimate(LoopEstimations.eyebrowExpression):
estimations.append(self.engine.estimateEyebrowExpressionB(faceSamples))
if self.needEstimate(LoopEstimations.imageColorType):
estimations.append(self.engine.estimateImageColorTypeB(faceSamples))
if estimations:
await asyncio.gather(*estimations)
[docs] async def estimateFaceDetectionAttributes(self, faceSamplesOnImage: list[FaceSample]):
"""
Estimate face detection attributes
Args:
faceSamplesOnImage: face samples with detections
"""
estimations = []
if self.needEstimate(LoopEstimations.headPose):
estimations.append(self.engine.estimateHeadPoseB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.livenessV1):
scoreThreshold = self.content.params.estimatorsParams.livenessv1.scoreThreshold
qualityThreshold = self.content.params.estimatorsParams.livenessv1.qualityThreshold
estimations.append(
self.engine.estimateLivenessV1B(
faceSamplesOnImage, scoreThreshold=scoreThreshold, qualityThreshold=qualityThreshold
)
)
if self.needEstimate(LoopEstimations.eyes):
estimations.append(self.engine.estimateEyesB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.gaze):
estimations.append(self.engine.estimateGazeB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.redEyes):
estimations.append(self.engine.estimateRedEyesB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.dynamicRange):
estimations.append(self.engine.estimateDynamicRangeB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.faceDetectionBackground):
estimations.append(self.engine.estimateFaceDetectionBackgroundB(faceSamplesOnImage))
if self.needEstimate(LoopEstimations.faceLandmarks68):
estimations.append(self.engine.estimateLandmarks68B(faceSamplesOnImage))
await asyncio.gather(*estimations)
[docs] async def estimateBodyWarpAttributes(self, bodySamplesOnImage: list[BodySample]):
"""
Estimate body detection attributes
Args:
bodySamplesOnImage: body samples with detections
"""
estimations = []
if self.needEstimate(LoopEstimations.bodyAttributes):
estimations.append(self.engine.estimateBodyAttributesB(bodySamplesOnImage))
await asyncio.gather(*estimations)
[docs] async def estimateBodyAttributesFromImage(self, bodySamples: list[BodySample]) -> list[BodySample]:
"""
Run body attributes estimations on samples which generated from detection.
Attributes:
bodySamples: face samples with detections
Returns:
same body samples with estimation inside
"""
if not bodySamples:
return bodySamples
if self.needEstimate(LoopEstimations.bodyWarp):
await self.engine.warpBodySampleB(bodySamples)
await self.estimateBodyWarpAttributes(bodySamples)
return bodySamples
[docs] async def estimateFaceAttributesFromImage(self, faceSamples: list[FaceSample]) -> list[FaceSample]:
"""
Run face attributes estimations on samples which generated from detection.
Attributes:
faceSamples: face samples with detections
Returns:
same face samples with estimation inside
"""
if not faceSamples:
return faceSamples
if self.needEstimate(LoopEstimations.faceLandmarks5):
await self.engine.estimateLandmarks5B(faceSamples)
if self.needEstimate(LoopEstimations.faceWarp):
await self.engine.warpFaceSampleB(faceSamples)
await asyncio.gather(
self.estimateFaceDetectionAttributes(faceSamples), self.estimateFaceWarpAttributes(faceSamples)
)
return faceSamples
[docs] async def estimateDetectionAttributes(self, image: Image):
"""
Estimate detection attributes
All without face descriptotr, basic attributes, /detector of luna-handlers logic
"""
targets = self.content.params.requiredEstimations
estimations = []
if image.origin.imageType == ImageType.IMAGE:
await self.engine.detectSamples(
image,
detectFaces=LoopEstimations.faceDetection in targets,
detectBodies=LoopEstimations.bodyDetection in targets,
detect17Landmarks=LoopEstimations.bodyLandmarks17 in targets,
)
faceSamples = [sample.face for sample in image.samples if sample.face]
bodySamples = [sample.body for sample in image.samples if sample.body]
bodyEstimationCoro = self.estimateBodyAttributesFromImage(bodySamples)
faceEstimationCoro = self.estimateFaceAttributesFromImage(faceSamples)
estimations = [bodyEstimationCoro, faceEstimationCoro]
elif image.origin.imageType == ImageType.FACE_WARP:
estimations.append(self.estimateFaceWarpAttributes([image.samples[0].face]))
else:
estimations.append(self.estimateBodyWarpAttributes([image.samples[0].body]))
if estimations:
await asyncio.gather(*estimations)
for sample in image.samples:
faceSample = sample.face
if faceSample is None:
continue
self.filterSampleByMask(faceSample)
self.filterByHeadPoseYaw(faceSample)
self.filterByHeadPoseRoll(faceSample)
self.filterByHeadPosePitch(faceSample)
[docs] async def estimatePeople(self, image: Image):
"""Estimate people"""
if image.origin.imageType == ImageType.IMAGE:
if self.needEstimate(LoopEstimations.peopleCount):
await self.engine.estimatePeopleCount(image)
async def estimateHuman(self, image: Image):
# detection attributes
taskParams = self.content.params
await self.estimateDetectionAttributes(image)
for sample in image.samples:
faceSample = sample.face
if faceSample is None:
break
self.filterSampleByLiveness(faceSample)
self.executeMultiFacePolicy(image)
faceSamples = [sample.face for sample in image.samples if sample.face and not sample.face.filters.isFiltered()]
estimations = []
# other attributes, after filtration
if self.needEstimate(LoopEstimations.faceDescriptor):
await self.engine.estimateFaceDescriptorB(
faceSamples, version=taskParams.estimatorsParams.faceDescriptorVersion
)
for sample in image.samples:
if faceSample := sample.face:
self.filterFaceGS(faceSample)
if self.needEstimate(LoopEstimations.basicAttributes):
faceSamples = [
sample.face for sample in image.samples if sample.face and not sample.face.filters.isFiltered()
]
estimations.append(self.engine.estimateBasicAttributesB(faceSamples))
if self.needEstimate(LoopEstimations.bodyDescriptor):
bodySamples = [
sample.body for sample in image.samples if sample.body and not sample.body.filters.isFiltered()
]
estimations.append(
self.engine.estimateBodyDescriptorB(
bodySamples, version=taskParams.estimatorsParams.bodyDescriptorVersion
)
)
await asyncio.gather(*estimations)
[docs] async def executeOnImage(self, image: Image):
"""Execute task on one image (without aggregation)"""
try:
await asyncio.gather(self.estimateHuman(image), self.estimatePeople(image))
except LunaSDKException as e:
image.error = errors.SDKError(e.error)
return
[docs] def aggregateFaceAttributes(self, aggregatedSample: AggregatedSample):
"""Aggregate face attributes"""
if not (samples := aggregatedSample.face.samples):
return
if self.needEstimate(LoopEstimations.emotions):
emotions = AggregatedEmotions([sample.emotions for sample in samples])
aggregatedSample.face.emotions = emotions
if self.needEstimate(LoopEstimations.mask):
mask = AggregatedMask([sample.mask for sample in samples])
aggregatedSample.face.mask = mask
[docs] async def estimateAggregatedFaceDescriptor(
self, aggregatedSample: AggregatedFaceSample, version, gcFilter: Optional[FaceGCFilter]
) -> bool:
"""
Estimate aggregated face descriptor
Args:
aggregatedSample: aggregated face sample
version: descriptor version
gcFilter: optional gc filter from task
Returns:
true: if sample is not filtered (after filtration)
false: if sample is filtered, all source samples marks as filtered
"""
if self.needEstimate(LoopEstimations.faceDescriptor) and aggregatedSample.samples:
await self.engine.estimateAggregatedFaceDescriptor(aggregatedSample, version=version)
if gcFilter is not None:
result = gcFilter.filter(aggregatedSample.descriptor.garbageScore)
aggregatedSample.filters.append(result)
for sample in aggregatedSample.samples:
sample.filters.append(result)
if result.result:
return False
return True
[docs] def estimateAggregatedLivenessV1(
self,
faceSamples: list[FaceSample],
aggregatedSample: AggregatedFaceSample,
qualityThreshold: float,
scoreThreshold: float,
livenessFilter: Optional[LivenessFilter],
) -> bool:
"""
Estimate aggregated livenessV1.
Args:
faceSamples: source face samples
aggregatedSample: aggregated sample
qualityThreshold: quality threshold
scoreThreshold: score threshold
livenessFilter: liveness filter
Returns:
true: if sample is not filtered (after filtration)
false: if sample is filtered, all source samples marks as filtered
"""
livenessEstimations = [sample.livenessV1 for sample in faceSamples if sample.livenessV1]
if livenessEstimations and self.needEstimate(LoopEstimations.livenessV1):
aggregatedLiveness = aggregateLivenessFromEstimates(
livenessEstimations,
qualityThreshold=qualityThreshold,
scoreThreshold=scoreThreshold,
)
aggregatedSample.liveness = aggregatedLiveness
# filtration by liveness handlers logic
if livenessFilter is not None:
result: FiltrationResult = livenessFilter.filter(aggregatedLiveness.prediction)
aggregatedSample.filters.append(result)
for sample in faceSamples:
sample.filters.append(result)
if result.result:
# not estimate anything further
return False
return True
[docs] def aggregateBodyAttributes(self, aggregatedSample: AggregatedSample):
"""Aggregate body attributes"""
if not (samples := aggregatedSample.body.samples):
return
if self.needEstimate(LoopEstimations.bodyAttributes):
estimator: BodyAttributesEstimator = (
self.engine.estimators.getBodyAttributesEstimator()._workers[0]._estimator
)
aggregatedSample.body.attributes = estimator.aggregate([sample.attributes for sample in samples])
[docs] async def executeAggregatedTask(self, images: list[Image]):
"""Execute task with aggregation"""
try:
taskParams = self.content.params
estimationOnImage = []
for image in images:
estimationOnImage.append(self.estimateDetectionAttributes(image))
if self.needEstimate(LoopEstimations.exif) and image.exif is None:
estimationOnImage.append(self.engine.extractExif(image))
estimationOnImage.append(self.estimatePeople(image))
await asyncio.gather(*estimationOnImage)
self.executeAggregatedMultiFacePolicy(images)
faceSamples, bodySamples = getNotFilteredSamples(images)
# filter by aggregated liveness + liveness aggregation
aggregatedSample = AggregatedSample(
face=AggregatedFaceSample(samples=faceSamples),
body=AggregatedBodySample(samples=bodySamples),
)
self.result.aggregatedSample = aggregatedSample
self.estimateAggregatedLivenessV1(
faceSamples,
aggregatedSample.face,
qualityThreshold=taskParams.estimatorsParams.livenessv1.qualityThreshold,
scoreThreshold=taskParams.estimatorsParams.livenessv1.scoreThreshold,
livenessFilter=taskParams.filters.faceDetection.livenessFilter,
)
self.aggregateFaceAttributes(aggregatedSample)
self.aggregateBodyAttributes(aggregatedSample)
isFiltered = await self.estimateAggregatedFaceDescriptor(
aggregatedSample.face,
version=taskParams.estimatorsParams.faceDescriptorVersion,
gcFilter=taskParams.filters.faceDetection.gcFilter,
)
if not isFiltered:
# aggregated sample is filtered
return
estimations = []
if self.needEstimate(LoopEstimations.basicAttributes) and aggregatedSample.face.samples:
estimations.append(self.engine.estimateAggregatedBasicAttributes(aggregatedSample.face))
if self.needEstimate(LoopEstimations.bodyDescriptor) and aggregatedSample.body.samples:
estimations.append(
self.engine.estimateAggregatedBodyDescriptor(
aggregatedSample.body, version=taskParams.estimatorsParams.bodyDescriptorVersion
)
)
await asyncio.gather(*estimations)
except LunaSDKException as e:
error = errors.SDKError(e.error)
self.result.error = error
for image in images:
image.error = error
async def _execute(self):
"""
Execute task.
Warnings:
- If a target can not be estimated (image is broken or image type does not support this estimation)
the executor will skip it.
"""
# prepare images and samples
await self.prepare()
validImages = [image for image in self.result.images if not image.error]
if self.content.params.aggregate:
# parallel image processing on independent images
await self.executeAggregatedTask(validImages)
else:
# parallel image processing on independent images
estimations = []
for image in validImages:
estimations.append(self.executeOnImage(image))
await asyncio.gather(*estimations)
return self