""" Receive events on handler """
from datetime import datetime
from typing import List, Tuple, Type, Union
from sanic.response import HTTPResponse
from vlutils.helpers import convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner
from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandlerWithMultipart
from app.handlers.mixins import HandlerCacheMixin
from classes.event import EventMetadata, GeoPosition, Location
from classes.multipart_processing import HandlersMultipartProcessor
from classes.query_schemas.emit_events import EmitEventsQueries
from classes.schemas.base_schema import BaseSchema
from classes.schemas.handler import CachedHandlerModel, HandlerInputEstimationsModel
from classes.schemas.policies import HandlerConfig, Policies
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.monitoring.points import monitorTime
from crutches_on_wheels.cow.web.query_getters import timeFilterGetter
from sdk.sdk_loop.models.image import ImageType, InputImage
[docs]class HandlerEventsHandler(BaseHandlerWithMultipart, HandlerCacheMixin):
"""
Receiver of events.
Resource: "/{api_version}/handlers/{handlerId}/events"
"""
asyncRunner: AsyncRunner = None
[docs] def preparePolicyConfig(self, queries: EmitEventsQueries) -> HandlerConfig:
"""Prepare config container for policies."""
return HandlerConfig(
aggregate=queries.aggregateAttributes,
useExifInfo=queries.useExifInfo,
faceDescriptorVersion=self.config.defaultFaceDescriptorVersion,
bodyDescriptorVersion=self.config.defaultHumanDescriptorVersion,
useAutoRotation=self.config.useAutoRotation,
facesBucket=self.config.faceSamplesStorage.bucket,
bodiesBucket=self.config.bodySamplesStorage.bucket,
originBucket=self.config.imageOriginStorage.bucket,
lunaEventsUsage=self.config.additionalServicesUsage.lunaEvents,
lunaSenderUsage=self.config.additionalServicesUsage.lunaSender,
)
[docs] async def getDataFromMultipart(
self, imageType: ImageType = ImageType.IMAGE
) -> Tuple[List[InputImage], Union[dict, None]]:
"""
Get data from multipart request
Args:
imageType: image type
Returns:
list of Images or list warps and optionally dict with policies (for multipart request with policies)
"""
dataFromRequest = await HandlersMultipartProcessor().getData(self.request)
estimationDataFromMultiPart = self._getDataFromMultipart(
dataFromRequest.images, imageType, allowRawDescriptors=True
)
return estimationDataFromMultiPart, dataFromRequest.policies
[docs] def checkPolicyLicensing(self, policies: Policies):
"""
Check handler policies licensing
Args:
policies: handler policies
"""
iso = policies.detectPolicy.isFaceQualityChecksEnabled()
bodyAttributes = policies.detectPolicy.isBodyAttributesEnabled()
liveness = bool(policies.detectPolicy.estimateLiveness.estimate)
self.assertLicence(iso=iso, bodyAttributes=bodyAttributes, liveness=liveness)
[docs] def loadLocation(self, queries: EmitEventsQueries) -> Location:
"""
Load location data from the query parameters.
Returns:
location
"""
try:
geoPosition = GeoPosition(latitude=queries.latitude, longitude=queries.longitude)
except ValueError:
raise VLException(Error.BadQueryParams.format("Geo position params is not properly specified"), 400, False)
location = Location(
street=queries.street,
houseNumber=queries.houseNumber,
district=queries.district,
area=queries.area,
city=queries.city,
geoPosition=geoPosition,
)
return location
[docs] def parseTimeFromHeader(self, name: str, defaultTime: datetime) -> datetime:
"""
Parse datetime from header by name.
Args:
name: header name
defaultTime: default time
Returns:
user datetime if correct header or default datetime
"""
timeFromHeader = self.request.headers.get(name)
if timeFromHeader is not None:
try:
return timeFilterGetter(timeFromHeader)
except ValueError:
self.logger.warning(f"Incorrect '{name}' header: '{timeFromHeader}', " f"replaced with '{defaultTime}'")
return defaultTime
[docs] def getEventTimes(self) -> tuple[str, str]:
"""
Get event times from header.
Supported headers:
Luna-Event-Time - event creation time
Luna-Event-End-Time - event end time
Returns:
tuple with create and end event times
"""
defaultEventTime = datetime.utcnow()
inUTC = self.config.storageTime == "UTC"
createEventTime = convertTimeToString(
self.parseTimeFromHeader(name="Luna-Event-Time", defaultTime=defaultEventTime), inUTC=inUTC
)
endEventTime = convertTimeToString(
self.parseTimeFromHeader(name="Luna-Event-End-Time", defaultTime=defaultEventTime), inUTC=inUTC
)
return createEventTime, endEventTime
[docs] async def post(self, handlerId: str) -> HTTPResponse:
"""
Receive events handler. See `spec receive_events`_.
.. _`spec receive_events`:
_static/api.html#operation/receiveEvents
Args:
handlerId: handler id
Returns:
Response with events and detections
Raises:
VLException(Error.BadMultipartInput, 400, isCriticalError=False): if failed to read multipart
"""
queries: EmitEventsQueries = self.loadDataFromQuery(EmitEventsQueries)
self.addDataForMonitoring(tags={"handler_id": handlerId})
self.accountId = self.getAccountIdFromHeader()
createEventTime, endEventTime = self.getEventTimes()
eventMetadata = EventMetadata(
accountId=self.accountId,
handlerId=handlerId,
createEventTime=createEventTime,
endEventTime=endEventTime,
source=queries.source,
tags=queries.tags,
userData=queries.userData,
externalId=queries.externalId,
location=self.loadLocation(queries),
trackId=queries.trackId,
)
handler, inputData, handlerVersion = await self.loadHandlerAndInputImages(queries, handlerId=handlerId)
result, monitoringData = await handler.policies.execute(
inputData=inputData,
eventMetadata=eventMetadata,
config=self.preparePolicyConfig(queries),
luna3Client=self.luna3Client,
redisContext=self.redisContext,
plugins=self.request.app.ctx.pluginManager,
)
self.countLivenessEstimationsPerformed(monitoringData.sdkUsages.livenessEstimator)
self.handleMonitoringData(monitoringData)
# Provide info about caching to clients
extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None
return self.success(201, outputJson=result, extraHeaders=extraHeaders)