Source code for luna_handlers.classes.schemas.verifier

"""
Module contains schemas for verifier
"""
import asyncio
from typing import Optional, List, Union, Tuple, Dict
from uuid import UUID

from lunavl.sdk.estimators.face_estimators.livenessv1 import LivenessPrediction
from lunavl.sdk.estimators.face_estimators.mask import MaskState
from pydantic import Field

from classes.raw_descriptor_data import RawDescriptorData
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from app.api_sdk_adaptors.handler import APISDKHandlerAdaptor
from app.global_vars.constants import MAX_ANGLE
from classes.event import Event
from classes.event_parts import ImageDetectorResult, RawDescriptorResult
from classes.monitoring import HandlersMonitoringData as DataForMonitoring
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema
from classes.schemas.detect_policies import BaseDetectPolicy
from classes.schemas.extract_policies import BaseExtractPolicy
from classes.schemas.handler import FaceInputEstimationsModel
from classes.schemas.match_policy import BaseMatchPolicy
from classes.schemas.policies import getImagesReplyData
from classes.schemas.storage_policy import saveSamples
from crutches_on_wheels.enums.attributes import Liveness
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.utils.log import Logger
from sdk.sdk_loop.enums import MultifacePolicy
from sdk.sdk_loop.estimation_targets import SDKFaceEstimationTargets, SDKEstimationTargets
from sdk.sdk_loop.sdk_task import SDKDetectableImage, SDKTask, SDKTaskFilters, TaskDataSource, FaceWarp
from sdk.sdk_loop.task_loop import SDKTaskLoop


[docs]class VerifierMatchPolicy(BaseMatchPolicy): """Verifier match policy schema"""
[docs]class VerifierDetectPolicy(BaseDetectPolicy): """Verifier detect policy"""
[docs]class VerifierExtractPolicy(BaseExtractPolicy): """Verifier extract policy"""
[docs]class VerifierAttributeStorePolicy(BaseSchema): """Verifier attribute storage policy""" # whether to store attribute storeAttribute: types.Int01 = 0
[docs] async def execute(self, events: List[Event], accountId: str, luna3Client: Client) -> None: """ Save attributes. Args: events: events accountId: account id luna3Client: client """ if not self.storeAttribute: return futures, eventsToUpdate = [], [] for event in events: if event.faceAttributes is None: continue kwargs = event.faceAttributes._getAttributeKwargs() eventsToUpdate.append(event) futures.append( luna3Client.lunaFaces.createAttribute(accountId=accountId, ttl=None, **kwargs, raiseError=True,) ) responses: List[LunaResponse] = await asyncio.gather(*futures) for event, response in zip(eventsToUpdate, responses): event.faceAttributes.sdkAttribute.attributeId = response.json["attribute_id"]
[docs]class VerifierFaceSampleStorePolicy(BaseSchema): """Verifier face sample storage policy""" # whether to store face sample storeSample: types.Int01 = 0
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None: """ Save face samples. Args: events: events bucket: bucket name accountId: account id luna3Client: client """ if not self.storeSample: return warps = [] for event in events: for detection in event.detections: face = detection.detection.face if face is not None: warps.append(face.sdkEstimation.warp) face.url = ( f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}" f"/images/{face.sdkEstimation.warp.sampleId}" ) await saveSamples( warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaFaceSamplesStore )
[docs]class VerifierStoragePolicy(BaseSchema): """Verifier storage policy""" # attribute storage policy attributePolicy: Optional[VerifierAttributeStorePolicy] = VerifierAttributeStorePolicy() # face sample storage policy faceSamplePolicy: Optional[VerifierFaceSampleStorePolicy] = VerifierFaceSampleStorePolicy()
[docs] async def execute( self, events: List[Event], accountId: str, luna3Client: Client, facesBucket: str ) -> DataForMonitoring: """ Execute storage policy - save objects. Args: events: events accountId: account id luna3Client: luna 3 client facesBucket: faces samples bucket Returns: monitoring data """ async def _faceSample() -> None: with monitorTime(monitoringData.request, "face_sample_storage_policy_time"): await self.faceSamplePolicy.execute(events, facesBucket, accountId, luna3Client) async def _attribute() -> None: with monitorTime(monitoringData.request, "face_attribute_storage_policy_time"): await self.attributePolicy.execute(events, accountId, luna3Client) monitoringData = DataForMonitoring() await _faceSample() # save attribute and face only after executing previous policies (^^^ samples are updated here ^^^) await _attribute() return monitoringData
[docs]class VerifierPoliciesModel(BaseSchema): """Verifier policies""" # detect policy detectPolicy: VerifierDetectPolicy = Field(default_factory=lambda: VerifierDetectPolicy()) # extract policy extractPolicy: VerifierExtractPolicy = VerifierExtractPolicy() # storage policy storagePolicy: VerifierStoragePolicy = VerifierStoragePolicy() # verification threshold verificationThreshold: types.Float01 = 0.9
[docs] def prepareSDKTask(self, sdkData: List[Union[SDKDetectableImage, FaceWarp]], aggregate: int) -> SDKTask: """ Prepare sdk task Args: sdkData: a list of input images or warps aggregate: aggregate all extracted samples to one or not Returns: sdk task """ faceTargets = SDKFaceEstimationTargets( estimateQuality=self.detectPolicy.estimateQuality, estimateMouthAttributes=self.detectPolicy.estimateMouthAttributes, estimateAGS=0, estimateGaze=self.detectPolicy.estimateGaze, estimateEyesAttributes=self.detectPolicy.estimateEyesAttributes, estimateEmotions=self.detectPolicy.estimateEmotions, estimateMask=self.detectPolicy.estimateMask, estimateHeadPose=self.detectPolicy.estimateHeadPose, estimateBasicAttributes=self.extractPolicy.extractBasicAttributes, estimateFaceDescriptor=1, estimateLiveness=self.detectPolicy.estimateLiveness.asSDKPolicy(), ) toEstimate = SDKEstimationTargets(estimateHuman=0, estimateFace=1, faceEstimationTargets=faceTargets) def suitFilter(x): """ Return useful thresholds. """ if x != MAX_ANGLE: return x return None maskStates = None if self.detectPolicy.maskStates: maskStates = [MaskState(x) for x in self.detectPolicy.maskStates] livenessStates = None if self.detectPolicy.livenessStates is not None: livenessStates = [LivenessPrediction(Liveness(x).name) for x in self.detectPolicy.livenessStates] filters = SDKTaskFilters( yawThreshold=suitFilter(self.detectPolicy.yawThreshold), pitchThreshold=suitFilter(self.detectPolicy.pitchThreshold), rollThreshold=suitFilter(self.detectPolicy.rollThreshold), garbageScoreThreshold=self.extractPolicy.fdScoreThreshold, maskStates=maskStates, livenessStates=livenessStates, ) return SDKTask( toEstimate, data=sdkData, filters=filters, aggregateAttributes=bool(aggregate), multifacePolicy=MultifacePolicy(self.detectPolicy.multifacePolicy), )
[docs] async def execute( self, accountId: str, inputData: List[Union[SDKDetectableImage, FaceWarp, RawDescriptorData]], sdkLoop: SDKTaskLoop, luna3Client: Client, logger: Logger, matchPolicies: List[VerifierMatchPolicy], facesBucket: str, ) -> Tuple[List[dict], List[Event], Dict[str, List[dict]], DataForMonitoring]: """ Executes given policies against provided data. Args: accountId: A str, account id inputData: A list of images/descriptors sdkLoop: A sdk loop instance luna3Client: A luna3 client instance logger: A Logger instance matchPolicies: MatchingPolicy instances facesBucket: faces samples bucket Returns: tuple, first - all detection, second - events, third - monitoring data """ processedDataSources: List[Union[ImageDetectorResult, RawDescriptorResult]] = [...] * len(inputData) # Split input data into lists of images and descriptors rawDescriptorIdxs, rawDescriptorData, rawImageIdxs, sdkData = [], [], [], [] for i, item in enumerate(inputData): if isinstance(item, RawDescriptorData): rawDescriptorIdxs.append(i) rawDescriptorData.append(item) else: rawImageIdxs.append(i) sdkData.append(item) sdkAdapter = APISDKHandlerAdaptor(logger=logger, accountId=accountId, sdkLoop=sdkLoop,) # Process images if sdkData: task = self.prepareSDKTask(sdkData, aggregate=0) detectLandmarks68 = self.detectPolicy.detectLandmarks68 and task.source == TaskDataSource.images events, filteredDetections, processedImages, monitoringData = await sdkAdapter.handle( task=task, detectLandmarks68=detectLandmarks68 ) for index, processedImage in zip(rawImageIdxs, processedImages): processedDataSources[index] = processedImage else: events, filteredDetections = [], sdkAdapter.prepareFilteredDetections() monitoringData = DataForMonitoring() # Process descriptors rawDescriptorEvents, rawDescriptorProcessedImages = sdkAdapter.handleRawDescriptors(rawDescriptorData) events += rawDescriptorEvents for index, processedDescriptor in zip(rawDescriptorIdxs, rawDescriptorProcessedImages): processedDataSources[index] = processedDescriptor with monitorTime(monitoringData.request, "match_policy_time"): await asyncio.gather( *[matchByListPolicy.execute(events, luna3Client) for matchByListPolicy in matchPolicies] ) # storage monitoringData += await self.storagePolicy.execute( events=events, facesBucket=facesBucket, accountId=accountId, luna3Client=luna3Client, ) imagesReply = getImagesReplyData( sdkData=sdkData, images=processedDataSources, extractExif=self.detectPolicy.extractExif ) return imagesReply, events, filteredDetections, monitoringData
[docs]class VerifierModel(BaseSchema): """Verifier""" # verifier description description: types.Str128 = "" # verifier policies policies: VerifierPoliciesModel = Field(default_factory=lambda: VerifierPoliciesModel()) # verifier account id accountId: UUID
[docs]class VerifierInputEstimationsModel(FaceInputEstimationsModel): """Verifier model for incoming estimations: urls, image (only face bounding boxes available) or samples"""