Source code for luna_handlers.app.handlers.handler_events_handler

""" Receive events on handler """

from datetime import datetime
from typing import List, Tuple, Type, Union

from sanic.response import HTTPResponse
from vlutils.helpers import convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner

from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandlerWithMultipart
from app.handlers.mixins import HandlerCacheMixin
from classes.event import EventMetadata, GeoPosition, Location
from classes.multipart_processing import HandlersMultipartProcessor
from classes.query_schemas.emit_events import EmitEventsQueries
from classes.schemas.base_schema import BaseSchema
from classes.schemas.handler import CachedHandlerModel, HandlerInputEstimationsModel
from classes.schemas.policies import HandlerConfig, Policies
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.monitoring.points import monitorTime
from crutches_on_wheels.cow.web.query_getters import timeFilterGetter
from sdk.sdk_loop.models.image import ImageType, InputImage


[docs]class HandlerEventsHandler(BaseHandlerWithMultipart, HandlerCacheMixin): """ Receiver of events. Resource: "/{api_version}/handlers/{handlerId}/events" """ asyncRunner: AsyncRunner = None
[docs] def preparePolicyConfig(self, queries: EmitEventsQueries) -> HandlerConfig: """Prepare config container for policies.""" return HandlerConfig( aggregate=queries.aggregateAttributes, useExifInfo=queries.useExifInfo, faceDescriptorVersion=self.config.defaultFaceDescriptorVersion, bodyDescriptorVersion=self.config.defaultHumanDescriptorVersion, useAutoRotation=self.config.useAutoRotation, facesBucket=self.config.faceSamplesStorage.bucket, bodiesBucket=self.config.bodySamplesStorage.bucket, originBucket=self.config.imageOriginStorage.bucket, lunaEventsUsage=self.config.additionalServicesUsage.lunaEvents, lunaSenderUsage=self.config.additionalServicesUsage.lunaSender, )
[docs] async def getDataFromMultipart( self, imageType: ImageType = ImageType.IMAGE ) -> Tuple[List[InputImage], Union[dict, None]]: """ Get data from multipart request Args: imageType: image type Returns: list of Images or list warps and optionally dict with policies (for multipart request with policies) """ dataFromRequest = await HandlersMultipartProcessor().getData(self.request) estimationDataFromMultiPart = self._getDataFromMultipart( dataFromRequest.images, imageType, allowRawDescriptors=True ) return estimationDataFromMultiPart, dataFromRequest.policies
[docs] def checkPolicyLicensing(self, policies: Policies): """ Check handler policies licensing Args: policies: handler policies """ iso = policies.detectPolicy.isFaceQualityChecksEnabled() bodyAttributes = policies.detectPolicy.isBodyAttributesEnabled() liveness = bool(policies.detectPolicy.estimateLiveness.estimate) self.assertLicence(iso=iso, bodyAttributes=bodyAttributes, liveness=liveness)
[docs] async def getInputEstimationData( self, request, validationModel: Type[BaseSchema], imageType: ImageType = ImageType.IMAGE, allowRawDescriptors: bool = True, ) -> Tuple[List[InputImage], Union[dict, None]]: """ Get images from request body to detect faces and, optionally, policies for a dynamic handler. Args: request: request imageType: imageType validationModel: validation model allowRawDescriptors: whether raw descriptor mimetypes allowed or not Returns: list of Images or list warps and, optionally, policies for a dynamic handler. """ if self.request.content_type.startswith("multipart/form-data"): images, policies = await self.getDataFromMultipart(imageType) else: images, policies = ( await self.getDataFromRequest( request, validationModel, imageType, allowRawDescriptors=allowRawDescriptors ), None, ) return images, policies
[docs] async def loadHandlerAndInputImages( self, queries: EmitEventsQueries, handlerId: str ) -> tuple[CachedHandlerModel, List[InputImage], datetime]: """ Prepare handler and input images for processing. If handler is `static`, policies are loaded from the database. If handler is `dynamic`, policies are required to be in the input request (multipart). Args: queries: prepared queries handlerId: handler id Returns: tuple with handler object, input images, version of cached handler or None """ noCache = bool(queries.noCache) imageType = queries.imageType handlerJson, handlerVersion = await self.getHandlerWrapper(handlerId, self.accountId, noCache) if not handlerJson[HandlerTarget.isDynamic.value]: with monitorTime(self.request.dataForMonitoring, "load_images_for_processing_time"): inputData, policies = await self.getInputEstimationData( self.request, imageType=imageType, validationModel=HandlerInputEstimationsModel, ) if policies is not None: raise VLException( Error.BadMultipartInput.format("Not available to apply new policies to a non-dynamic handler"), 403, False, ) else: if "multipart/form-data" not in self.request.headers.get("Content-Type", ""): raise VLException(Error.BadMultipartInput.format("Required multipart in request"), 400, False) with monitorTime(self.request.dataForMonitoring, "load_images_for_processing_time"): inputData, policies = await self.getInputEstimationData( self.request, imageType=imageType, validationModel=HandlerInputEstimationsModel, ) if policies is None: raise VLException(Error.InvalidInputJson.format("Required policies for dynamic handler."), 400, False) handlerJson[HandlerTarget.policies.value] = policies handler = self.loadDataFromJson(handlerJson, CachedHandlerModel) await handler.policies.checkListsAvailability(self.luna3Client, accountId=handler.accountId) self.checkPolicyLicensing(handler.policies) if not self.config.additionalServicesUsage.lunaEvents and handler.policies.storagePolicy.eventPolicy.storeEvent: raise VLException(Error.LunaEventsIsDisabled, 403, isCriticalError=False) return handler, inputData, handlerVersion
[docs] def loadLocation(self, queries: EmitEventsQueries) -> Location: """ Load location data from the query parameters. Returns: location """ try: geoPosition = GeoPosition(latitude=queries.latitude, longitude=queries.longitude) except ValueError: raise VLException(Error.BadQueryParams.format("Geo position params is not properly specified"), 400, False) location = Location( street=queries.street, houseNumber=queries.houseNumber, district=queries.district, area=queries.area, city=queries.city, geoPosition=geoPosition, ) return location
[docs] def parseTimeFromHeader(self, name: str, defaultTime: datetime) -> datetime: """ Parse datetime from header by name. Args: name: header name defaultTime: default time Returns: user datetime if correct header or default datetime """ timeFromHeader = self.request.headers.get(name) if timeFromHeader is not None: try: return timeFilterGetter(timeFromHeader) except ValueError: self.logger.warning(f"Incorrect '{name}' header: '{timeFromHeader}', " f"replaced with '{defaultTime}'") return defaultTime
[docs] def getEventTimes(self) -> tuple[str, str]: """ Get event times from header. Supported headers: Luna-Event-Time - event creation time Luna-Event-End-Time - event end time Returns: tuple with create and end event times """ defaultEventTime = datetime.utcnow() inUTC = self.config.storageTime == "UTC" createEventTime = convertTimeToString( self.parseTimeFromHeader(name="Luna-Event-Time", defaultTime=defaultEventTime), inUTC=inUTC ) endEventTime = convertTimeToString( self.parseTimeFromHeader(name="Luna-Event-End-Time", defaultTime=defaultEventTime), inUTC=inUTC ) return createEventTime, endEventTime
[docs] async def post(self, handlerId: str) -> HTTPResponse: """ Receive events handler. See `spec receive_events`_. .. _`spec receive_events`: _static/api.html#operation/receiveEvents Args: handlerId: handler id Returns: Response with events and detections Raises: VLException(Error.BadMultipartInput, 400, isCriticalError=False): if failed to read multipart """ queries: EmitEventsQueries = self.loadDataFromQuery(EmitEventsQueries) self.addDataForMonitoring(tags={"handler_id": handlerId}) self.accountId = self.getAccountIdFromHeader() createEventTime, endEventTime = self.getEventTimes() eventMetadata = EventMetadata( accountId=self.accountId, handlerId=handlerId, createEventTime=createEventTime, endEventTime=endEventTime, source=queries.source, tags=queries.tags, userData=queries.userData, externalId=queries.externalId, location=self.loadLocation(queries), trackId=queries.trackId, ) handler, inputData, handlerVersion = await self.loadHandlerAndInputImages(queries, handlerId=handlerId) result, monitoringData = await handler.policies.execute( inputData=inputData, eventMetadata=eventMetadata, config=self.preparePolicyConfig(queries), luna3Client=self.luna3Client, redisContext=self.redisContext, plugins=self.request.app.ctx.pluginManager, ) self.countLivenessEstimationsPerformed(monitoringData.sdkUsages.livenessEstimator) self.handleMonitoringData(monitoringData) # Provide info about caching to clients extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None return self.success(201, outputJson=result, extraHeaders=extraHeaders)