"""
Module contains schemas for verifier
"""
import asyncio
from typing import Optional, List, Union, Tuple, Dict
from uuid import UUID
from lunavl.sdk.estimators.face_estimators.livenessv1 import LivenessPrediction
from lunavl.sdk.estimators.face_estimators.mask import MaskState
from pydantic import Field
from classes.raw_descriptor_data import RawDescriptorData
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from app.api_sdk_adaptors.handler import APISDKHandlerAdaptor
from app.global_vars.constants import MAX_ANGLE
from classes.event import Event
from classes.event_parts import ImageDetectorResult, RawDescriptorResult
from classes.monitoring import HandlersMonitoringData as DataForMonitoring
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema
from classes.schemas.detect_policies import BaseDetectPolicy
from classes.schemas.extract_policies import BaseExtractPolicy
from classes.schemas.handler import FaceInputEstimationsModel
from classes.schemas.match_policy import BaseMatchPolicy
from classes.schemas.policies import getImagesReplyData
from classes.schemas.storage_policy import saveSamples
from crutches_on_wheels.enums.attributes import Liveness
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.utils.log import Logger
from sdk.sdk_loop.enums import MultifacePolicy
from sdk.sdk_loop.estimation_targets import SDKFaceEstimationTargets, SDKEstimationTargets
from sdk.sdk_loop.sdk_task import SDKDetectableImage, SDKTask, SDKTaskFilters, TaskDataSource, FaceWarp
from sdk.sdk_loop.task_loop import SDKTaskLoop
[docs]class VerifierMatchPolicy(BaseMatchPolicy):
"""Verifier match policy schema"""
[docs]class VerifierDetectPolicy(BaseDetectPolicy):
"""Verifier detect policy"""
[docs]class VerifierAttributeStorePolicy(BaseSchema):
"""Verifier attribute storage policy"""
# whether to store attribute
storeAttribute: types.Int01 = 0
[docs] async def execute(self, events: List[Event], accountId: str, luna3Client: Client) -> None:
"""
Save attributes.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeAttribute:
return
futures, eventsToUpdate = [], []
for event in events:
if event.faceAttributes is None:
continue
kwargs = event.faceAttributes._getAttributeKwargs()
eventsToUpdate.append(event)
futures.append(
luna3Client.lunaFaces.createAttribute(accountId=accountId, ttl=None, **kwargs, raiseError=True,)
)
responses: List[LunaResponse] = await asyncio.gather(*futures)
for event, response in zip(eventsToUpdate, responses):
event.faceAttributes.sdkAttribute.attributeId = response.json["attribute_id"]
[docs]class VerifierFaceSampleStorePolicy(BaseSchema):
"""Verifier face sample storage policy"""
# whether to store face sample
storeSample: types.Int01 = 0
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None:
"""
Save face samples.
Args:
events: events
bucket: bucket name
accountId: account id
luna3Client: client
"""
if not self.storeSample:
return
warps = []
for event in events:
for detection in event.detections:
face = detection.detection.face
if face is not None:
warps.append(face.sdkEstimation.warp)
face.url = (
f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}"
f"/images/{face.sdkEstimation.warp.sampleId}"
)
await saveSamples(
warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaFaceSamplesStore
)
[docs]class VerifierStoragePolicy(BaseSchema):
"""Verifier storage policy"""
# attribute storage policy
attributePolicy: Optional[VerifierAttributeStorePolicy] = VerifierAttributeStorePolicy()
# face sample storage policy
faceSamplePolicy: Optional[VerifierFaceSampleStorePolicy] = VerifierFaceSampleStorePolicy()
[docs] async def execute(
self, events: List[Event], accountId: str, luna3Client: Client, facesBucket: str
) -> DataForMonitoring:
"""
Execute storage policy - save objects.
Args:
events: events
accountId: account id
luna3Client: luna 3 client
facesBucket: faces samples bucket
Returns:
monitoring data
"""
async def _faceSample() -> None:
with monitorTime(monitoringData.request, "face_sample_storage_policy_time"):
await self.faceSamplePolicy.execute(events, facesBucket, accountId, luna3Client)
async def _attribute() -> None:
with monitorTime(monitoringData.request, "face_attribute_storage_policy_time"):
await self.attributePolicy.execute(events, accountId, luna3Client)
monitoringData = DataForMonitoring()
await _faceSample()
# save attribute and face only after executing previous policies (^^^ samples are updated here ^^^)
await _attribute()
return monitoringData
[docs]class VerifierPoliciesModel(BaseSchema):
"""Verifier policies"""
# detect policy
detectPolicy: VerifierDetectPolicy = Field(default_factory=lambda: VerifierDetectPolicy())
# extract policy
extractPolicy: VerifierExtractPolicy = VerifierExtractPolicy()
# storage policy
storagePolicy: VerifierStoragePolicy = VerifierStoragePolicy()
# verification threshold
verificationThreshold: types.Float01 = 0.9
[docs] def prepareSDKTask(self, sdkData: List[Union[SDKDetectableImage, FaceWarp]], aggregate: int) -> SDKTask:
"""
Prepare sdk task
Args:
sdkData: a list of input images or warps
aggregate: aggregate all extracted samples to one or not
Returns:
sdk task
"""
faceTargets = SDKFaceEstimationTargets(
estimateQuality=self.detectPolicy.estimateQuality,
estimateMouthAttributes=self.detectPolicy.estimateMouthAttributes,
estimateAGS=0,
estimateGaze=self.detectPolicy.estimateGaze,
estimateEyesAttributes=self.detectPolicy.estimateEyesAttributes,
estimateEmotions=self.detectPolicy.estimateEmotions,
estimateMask=self.detectPolicy.estimateMask,
estimateHeadPose=self.detectPolicy.estimateHeadPose,
estimateBasicAttributes=self.extractPolicy.extractBasicAttributes,
estimateFaceDescriptor=1,
estimateLiveness=self.detectPolicy.estimateLiveness.asSDKPolicy(),
)
toEstimate = SDKEstimationTargets(estimateHuman=0, estimateFace=1, faceEstimationTargets=faceTargets)
def suitFilter(x):
""" Return useful thresholds. """
if x != MAX_ANGLE:
return x
return None
maskStates = None
if self.detectPolicy.maskStates:
maskStates = [MaskState(x) for x in self.detectPolicy.maskStates]
livenessStates = None
if self.detectPolicy.livenessStates is not None:
livenessStates = [LivenessPrediction(Liveness(x).name) for x in self.detectPolicy.livenessStates]
filters = SDKTaskFilters(
yawThreshold=suitFilter(self.detectPolicy.yawThreshold),
pitchThreshold=suitFilter(self.detectPolicy.pitchThreshold),
rollThreshold=suitFilter(self.detectPolicy.rollThreshold),
garbageScoreThreshold=self.extractPolicy.fdScoreThreshold,
maskStates=maskStates,
livenessStates=livenessStates,
)
return SDKTask(
toEstimate,
data=sdkData,
filters=filters,
aggregateAttributes=bool(aggregate),
multifacePolicy=MultifacePolicy(self.detectPolicy.multifacePolicy),
)
[docs] async def execute(
self,
accountId: str,
inputData: List[Union[SDKDetectableImage, FaceWarp, RawDescriptorData]],
sdkLoop: SDKTaskLoop,
luna3Client: Client,
logger: Logger,
matchPolicies: List[VerifierMatchPolicy],
facesBucket: str,
) -> Tuple[List[dict], List[Event], Dict[str, List[dict]], DataForMonitoring]:
"""
Executes given policies against provided data.
Args:
accountId: A str, account id
inputData: A list of images/descriptors
sdkLoop: A sdk loop instance
luna3Client: A luna3 client instance
logger: A Logger instance
matchPolicies: MatchingPolicy instances
facesBucket: faces samples bucket
Returns:
tuple, first - all detection, second - events, third - monitoring data
"""
processedDataSources: List[Union[ImageDetectorResult, RawDescriptorResult]] = [...] * len(inputData)
# Split input data into lists of images and descriptors
rawDescriptorIdxs, rawDescriptorData, rawImageIdxs, sdkData = [], [], [], []
for i, item in enumerate(inputData):
if isinstance(item, RawDescriptorData):
rawDescriptorIdxs.append(i)
rawDescriptorData.append(item)
else:
rawImageIdxs.append(i)
sdkData.append(item)
sdkAdapter = APISDKHandlerAdaptor(logger=logger, accountId=accountId, sdkLoop=sdkLoop,)
# Process images
if sdkData:
task = self.prepareSDKTask(sdkData, aggregate=0)
detectLandmarks68 = self.detectPolicy.detectLandmarks68 and task.source == TaskDataSource.images
events, filteredDetections, processedImages, monitoringData = await sdkAdapter.handle(
task=task, detectLandmarks68=detectLandmarks68
)
for index, processedImage in zip(rawImageIdxs, processedImages):
processedDataSources[index] = processedImage
else:
events, filteredDetections = [], sdkAdapter.prepareFilteredDetections()
monitoringData = DataForMonitoring()
# Process descriptors
rawDescriptorEvents, rawDescriptorProcessedImages = sdkAdapter.handleRawDescriptors(rawDescriptorData)
events += rawDescriptorEvents
for index, processedDescriptor in zip(rawDescriptorIdxs, rawDescriptorProcessedImages):
processedDataSources[index] = processedDescriptor
with monitorTime(monitoringData.request, "match_policy_time"):
await asyncio.gather(
*[matchByListPolicy.execute(events, luna3Client) for matchByListPolicy in matchPolicies]
)
# storage
monitoringData += await self.storagePolicy.execute(
events=events, facesBucket=facesBucket, accountId=accountId, luna3Client=luna3Client,
)
imagesReply = getImagesReplyData(
sdkData=sdkData, images=processedDataSources, extractExif=self.detectPolicy.extractExif
)
return imagesReply, events, filteredDetections, monitoringData
[docs]class VerifierModel(BaseSchema):
"""Verifier"""
# verifier description
description: types.Str128 = ""
# verifier policies
policies: VerifierPoliciesModel = Field(default_factory=lambda: VerifierPoliciesModel())
# verifier account id
accountId: UUID