""" Receive events on handler """
from datetime import datetime
from typing import List, Tuple, Type, Union
from sanic.response import HTTPResponse
from vlutils.helpers import convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner
from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandlerWithMultipart
from app.handlers.custom_query_getters import (
externalIdValidator,
locationGetter,
sourceValidator,
tagsValidator,
userDataValidator,
)
from app.handlers.mixins import HandlerCacheMixin, ISOLicenseCheckerMixin
from classes.event import EventMetadata, GeoPosition, Location
from classes.multipart_processing import HandlersMultipartProcessor
from classes.schemas.base_schema import BaseSchema
from classes.schemas.handler import CachedHandlerModel, HandlerInputEstimationsModel
from classes.schemas.policies import HandlerConfig, Policies
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.web.query_getters import (
boolFrom01Getter,
floatInRangeGetterFactory,
int01Getter,
timeFilterGetter,
trackIdGetter,
uuidGetter,
)
from sdk.sdk_loop.models.image import ImageType, InputImage
# longitude range
LONGITUDE_BOUNDS = (-180, 180)
# latitude range
LATITUDE_BOUNDS = (-90, 90)
[docs]class HandlerEventsHandler(BaseHandlerWithMultipart, HandlerCacheMixin, ISOLicenseCheckerMixin):
"""
Receiver of events.
Resource: "/{api_version}/handlers/{handlerId}/events"
"""
asyncRunner: AsyncRunner = None
[docs] def preparePolicyConfig(self):
"""Prepare config container for policies."""
return HandlerConfig(
aggregate=self.getQueryParam("aggregate_attributes", int01Getter),
useExifInfo=self.getQueryParam("use_exif_info", boolFrom01Getter, default=True),
faceDescriptorVersion=self.config.defaultFaceDescriptorVersion,
bodyDescriptorVersion=self.config.defaultHumanDescriptorVersion,
useAutoRotation=self.config.useAutoRotation,
facesBucket=self.config.faceSamplesStorage.bucket,
bodiesBucket=self.config.bodySamplesStorage.bucket,
originBucket=self.config.imageOriginStorage.bucket,
lunaEventsUsage=self.config.additionalServicesUsage.lunaEvents,
lunaSenderUsage=self.config.additionalServicesUsage.lunaSender,
)
[docs] async def getDataFromMultipart(
self, imageType: ImageType = ImageType.IMAGE
) -> Tuple[List[InputImage], Union[dict, None]]:
"""
Get data from multipart request
Args:
imageType: image type
Returns:
list of Images or list warps and optionally dict with policies (for multipart request with policies)
"""
dataFromRequest = await HandlersMultipartProcessor().getData(self.request)
estimationDataFromMultiPart = self._getDataFromMultipart(
dataFromRequest.images, imageType, allowRawDescriptors=True
)
return estimationDataFromMultiPart, dataFromRequest.policies
[docs] def checkPolicyLicensing(self, policies: Policies):
"""
Check handler policies licensing
Args:
policies: handler policies
"""
super().checkPolicyLicensing(policies=policies)
if policies.detectPolicy.isFaceQualityChecksEnabled():
self.checkIsoLicensing()
[docs] def loadLocation(self) -> Location:
"""
Load location data from the query parameters.
Returns:
location
"""
latitude = self.getQueryParam("latitude", validator=floatInRangeGetterFactory(*LATITUDE_BOUNDS))
longitude = self.getQueryParam("longitude", validator=floatInRangeGetterFactory(*LONGITUDE_BOUNDS))
try:
geoPosition = GeoPosition(latitude=latitude, longitude=longitude)
except ValueError:
raise VLException(Error.BadQueryParams.format("Geo position params is not properly specified"), 400, False)
location = Location(
street=self.getQueryParam("street", validator=locationGetter),
houseNumber=self.getQueryParam("house_number", validator=locationGetter),
district=self.getQueryParam("district", validator=locationGetter),
area=self.getQueryParam("area", validator=locationGetter),
city=self.getQueryParam("city", validator=locationGetter),
geoPosition=geoPosition,
)
return location
[docs] def parseTimeFromHeader(self, name: str, defaultTime: datetime) -> datetime:
"""
Parse datetime from header by name.
Args:
name: header name
defaultTime: default time
Returns:
user datetime if correct header or default datetime
"""
timeFromHeader = self.request.headers.get(name)
if timeFromHeader is not None:
try:
return timeFilterGetter(timeFromHeader)
except ValueError:
self.logger.warning(f"Incorrect '{name}' header: '{timeFromHeader}', " f"replaced with '{defaultTime}'")
return defaultTime
[docs] def getEventTimes(self) -> tuple[str, str]:
"""
Get event times from header.
Supported headers:
Luna-Event-Time - event creation time
Luna-Event-End-Time - event end time
Returns:
tuple with create and end event times
"""
defaultEventTime = datetime.utcnow()
inUTC = self.config.storageTime == "UTC"
createEventTime = convertTimeToString(
self.parseTimeFromHeader(name="Luna-Event-Time", defaultTime=defaultEventTime), inUTC=inUTC
)
endEventTime = convertTimeToString(
self.parseTimeFromHeader(name="Luna-Event-End-Time", defaultTime=defaultEventTime), inUTC=inUTC
)
return createEventTime, endEventTime
[docs] async def post(self, handlerId: str) -> HTTPResponse:
"""
Receive events handler. See `spec receive_events`_.
.. _`spec receive_events`:
_static/api.html#operation/receiveEvents
Args:
handlerId: handler id
Returns:
Response with events and detections
Raises:
VLException(Error.BadMultipartInput, 400, isCriticalError=False): if failed to read multipart
"""
self.addDataForMonitoring(tags={"handler_id": handlerId})
self.accountId = self.getQueryParam("account_id", uuidGetter, require=True)
createEventTime, endEventTime = self.getEventTimes()
eventMetadata = EventMetadata(
accountId=self.accountId,
handlerId=handlerId,
createEventTime=createEventTime,
endEventTime=endEventTime,
source=self.getQueryParam("source", sourceValidator),
tags=self.getQueryParam("tags", tagsValidator),
userData=self.getQueryParam("user_data", userDataValidator, default=""),
externalId=self.getQueryParam("external_id", externalIdValidator, default=""),
location=self.loadLocation(),
trackId=self.getQueryParam("track_id", trackIdGetter),
)
handler, inputData, handlerVersion = await self.loadHandlerAndInputImages(handlerId=handlerId)
result, monitoringData = await handler.policies.execute(
inputData=inputData,
eventMetadata=eventMetadata,
config=self.preparePolicyConfig(),
luna3Client=self.luna3Client,
redisContext=self.redisContext,
plugins=self.request.app.ctx.pluginManager,
)
self.countLivenessEstimationsPerformed(monitoringData.sdkUsages.livenessEstimator)
self.handleMonitoringData(monitoringData)
# Provide info about caching to clients
extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None
return self.success(201, outputJson=result, extraHeaders=extraHeaders)