Source code for luna_handlers.app.handlers.handler_events_handler

""" Receive events on handler """

from datetime import datetime
from typing import List, Tuple, Type, Union

from sanic.response import HTTPResponse
from vlutils.helpers import convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner

from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandlerWithMultipart
from app.handlers.custom_query_getters import (
    externalIdValidator,
    locationGetter,
    sourceValidator,
    tagsValidator,
    userDataValidator,
)
from app.handlers.mixins import HandlerCacheMixin
from classes.event import EventMetadata, GeoPosition, Location
from classes.multipart_processing import HandlersMultipartProcessor
from classes.schemas.base_schema import BaseSchema
from classes.schemas.handler import CachedHandlerModel, HandlerInputEstimationsModel
from classes.schemas.policies import HandlerConfig, Policies
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.web.query_getters import (
    boolFrom01Getter,
    floatInRangeGetterFactory,
    int01Getter,
    timeFilterGetter,
    trackIdGetter,
    uuidGetter,
)
from sdk.sdk_loop.models.image import ImageType, InputImage

# longitude range
LONGITUDE_BOUNDS = (-180, 180)
# latitude range
LATITUDE_BOUNDS = (-90, 90)


[docs]class HandlerEventsHandler(BaseHandlerWithMultipart, HandlerCacheMixin): """ Receiver of events. Resource: "/{api_version}/handlers/{handlerId}/events" """ asyncRunner: AsyncRunner = None
[docs] def preparePolicyConfig(self): """Prepare config container for policies.""" return HandlerConfig( aggregate=self.getQueryParam("aggregate_attributes", int01Getter), useExifInfo=self.getQueryParam("use_exif_info", boolFrom01Getter, default=True), faceDescriptorVersion=self.config.defaultFaceDescriptorVersion, bodyDescriptorVersion=self.config.defaultHumanDescriptorVersion, useAutoRotation=self.config.useAutoRotation, facesBucket=self.config.faceSamplesStorage.bucket, bodiesBucket=self.config.bodySamplesStorage.bucket, originBucket=self.config.imageOriginStorage.bucket, lunaEventsUsage=self.config.additionalServicesUsage.lunaEvents, lunaSenderUsage=self.config.additionalServicesUsage.lunaSender, )
[docs] async def getDataFromMultipart( self, imageType: ImageType = ImageType.IMAGE ) -> Tuple[List[InputImage], Union[dict, None]]: """ Get data from multipart request Args: imageType: image type Returns: list of Images or list warps and optionally dict with policies (for multipart request with policies) """ dataFromRequest = await HandlersMultipartProcessor().getData(self.request) estimationDataFromMultiPart = self._getDataFromMultipart( dataFromRequest.images, imageType, allowRawDescriptors=True ) return estimationDataFromMultiPart, dataFromRequest.policies
[docs] def checkPolicyLicensing(self, policies: Policies): """ Check handler policies licensing Args: policies: handler policies """ iso = policies.detectPolicy.isFaceQualityChecksEnabled() bodyAttributes = policies.detectPolicy.isBodyAttributesEnabled() liveness = bool(policies.detectPolicy.estimateLiveness.estimate) self.assertLicence(iso=iso, bodyAttributes=bodyAttributes, liveness=liveness)
[docs] async def getInputEstimationData( self, request, validationModel: Type[BaseSchema], imageType: ImageType = ImageType.IMAGE, allowRawDescriptors: bool = True, ) -> Tuple[List[InputImage], Union[dict, None]]: """ Get images from request body to detect faces and, optionally, policies for a dynamic handler. Args: request: request imageType: imageType validationModel: validation model allowRawDescriptors: whether raw descriptor mimetypes allowed or not Returns: list of Images or list warps and, optionally, policies for a dynamic handler. """ if self.request.content_type.startswith("multipart/form-data"): images, policies = await self.getDataFromMultipart(imageType) else: images, policies = ( await self.getDataFromRequest( request, validationModel, imageType, allowRawDescriptors=allowRawDescriptors ), None, ) return images, policies
[docs] async def loadHandlerAndInputImages(self, handlerId) -> tuple[CachedHandlerModel, List[InputImage], datetime]: """ Prepare handler and input images for processing. If handler is `static`, policies are loaded from the database. If handler is `dynamic`, policies are required to be in the input request (multipart). Args: handlerId: handler id Returns: tuple with handler object, input images, version of cached handler or None """ noCache = self.getQueryParam("no_cache", boolFrom01Getter) imageType = self.getQueryParam("image_type", lambda x: ImageType(int(x)), default=ImageType.IMAGE) handlerJson, handlerVersion = await self.getHandlerWrapper(handlerId, self.accountId, noCache) if not handlerJson[HandlerTarget.isDynamic.value]: with monitorTime(self.request.dataForMonitoring, "load_images_for_processing_time"): inputData, policies = await self.getInputEstimationData( self.request, imageType=imageType, validationModel=HandlerInputEstimationsModel, ) if policies is not None: raise VLException( Error.BadMultipartInput.format("Not available to apply new policies to a non-dynamic handler"), 403, False, ) else: if "multipart/form-data" not in self.request.headers.get("Content-Type", ""): raise VLException(Error.BadMultipartInput.format("Required multipart in request"), 400, False) with monitorTime(self.request.dataForMonitoring, "load_images_for_processing_time"): inputData, policies = await self.getInputEstimationData( self.request, imageType=imageType, validationModel=HandlerInputEstimationsModel, ) if policies is None: raise VLException(Error.InvalidInputJson.format("Required policies for dynamic handler."), 400, False) handlerJson[HandlerTarget.policies.value] = policies handler = self.loadDataFromJson(handlerJson, CachedHandlerModel) await handler.policies.checkListsAvailability(self.luna3Client, accountId=handler.accountId) self.checkPolicyLicensing(handler.policies) if not self.config.additionalServicesUsage.lunaEvents and handler.policies.storagePolicy.eventPolicy.storeEvent: raise VLException(Error.LunaEventsIsDisabled, 403, isCriticalError=False) return handler, inputData, handlerVersion
[docs] def loadLocation(self) -> Location: """ Load location data from the query parameters. Returns: location """ latitude = self.getQueryParam("latitude", validator=floatInRangeGetterFactory(*LATITUDE_BOUNDS)) longitude = self.getQueryParam("longitude", validator=floatInRangeGetterFactory(*LONGITUDE_BOUNDS)) try: geoPosition = GeoPosition(latitude=latitude, longitude=longitude) except ValueError: raise VLException(Error.BadQueryParams.format("Geo position params is not properly specified"), 400, False) location = Location( street=self.getQueryParam("street", validator=locationGetter), houseNumber=self.getQueryParam("house_number", validator=locationGetter), district=self.getQueryParam("district", validator=locationGetter), area=self.getQueryParam("area", validator=locationGetter), city=self.getQueryParam("city", validator=locationGetter), geoPosition=geoPosition, ) return location
[docs] def parseTimeFromHeader(self, name: str, defaultTime: datetime) -> datetime: """ Parse datetime from header by name. Args: name: header name defaultTime: default time Returns: user datetime if correct header or default datetime """ timeFromHeader = self.request.headers.get(name) if timeFromHeader is not None: try: return timeFilterGetter(timeFromHeader) except ValueError: self.logger.warning(f"Incorrect '{name}' header: '{timeFromHeader}', " f"replaced with '{defaultTime}'") return defaultTime
[docs] def getEventTimes(self) -> tuple[str, str]: """ Get event times from header. Supported headers: Luna-Event-Time - event creation time Luna-Event-End-Time - event end time Returns: tuple with create and end event times """ defaultEventTime = datetime.utcnow() inUTC = self.config.storageTime == "UTC" createEventTime = convertTimeToString( self.parseTimeFromHeader(name="Luna-Event-Time", defaultTime=defaultEventTime), inUTC=inUTC ) endEventTime = convertTimeToString( self.parseTimeFromHeader(name="Luna-Event-End-Time", defaultTime=defaultEventTime), inUTC=inUTC ) return createEventTime, endEventTime
[docs] async def post(self, handlerId: str) -> HTTPResponse: """ Receive events handler. See `spec receive_events`_. .. _`spec receive_events`: _static/api.html#operation/receiveEvents Args: handlerId: handler id Returns: Response with events and detections Raises: VLException(Error.BadMultipartInput, 400, isCriticalError=False): if failed to read multipart """ self.addDataForMonitoring(tags={"handler_id": handlerId}) self.accountId = self.getQueryParam("account_id", uuidGetter, require=True) createEventTime, endEventTime = self.getEventTimes() eventMetadata = EventMetadata( accountId=self.accountId, handlerId=handlerId, createEventTime=createEventTime, endEventTime=endEventTime, source=self.getQueryParam("source", sourceValidator), tags=self.getQueryParam("tags", tagsValidator), userData=self.getQueryParam("user_data", userDataValidator, default=""), externalId=self.getQueryParam("external_id", externalIdValidator, default=""), location=self.loadLocation(), trackId=self.getQueryParam("track_id", trackIdGetter), ) handler, inputData, handlerVersion = await self.loadHandlerAndInputImages(handlerId=handlerId) result, monitoringData = await handler.policies.execute( inputData=inputData, eventMetadata=eventMetadata, config=self.preparePolicyConfig(), luna3Client=self.luna3Client, redisContext=self.redisContext, plugins=self.request.app.ctx.pluginManager, ) self.countLivenessEstimationsPerformed(monitoringData.sdkUsages.livenessEstimator) self.handleMonitoringData(monitoringData) # Provide info about caching to clients extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None return self.success(201, outputJson=result, extraHeaders=extraHeaders)