Source code for luna_handlers.app.handlers.handler_raw_events_handler
"""Handler for saving event. """
from sanic.response import HTTPResponse
from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandler
from app.handlers.mixins import HandlerCacheMixin
from classes.schemas.event_raw import RawEvent
from configs.configs.configs.settings.classes import ImageStoreAddressSettings
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.utils.functions import currentDateTime
from crutches_on_wheels.web.query_getters import boolFrom01Getter
[docs]class RawEventHandler(BaseHandler, HandlerCacheMixin):
"""
Handler for save an user generated events.
Resource: "/{api_version}/handlers"
"""
[docs] def enrichEvent(self, event: RawEvent) -> None:
"""
Enrich input event of server defaults (urls, create time, ...)
Args:
event: input event
"""
currentTime = currentDateTime(self.config.storageTime)
if self.config.additionalServicesUsage.lunaEvents:
url = f"{self.config.eventsAddress.origin}/{self.config.eventsAddress.apiVersion}/events/{event.eventId}"
event.internal.url = url
if not event.createTime:
event.createTime = currentTime
if not event.endTime:
event.endTime = currentTime
if event.face:
if not event.face.eventId:
event.face.eventId = event.eventId
if not event.face.url:
baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}"
event.face.url = f"{baseUri}/faces/{event.face.faceId}"
if event.face.eventId is None:
event.face.eventId = event.eventId
for detection in event.detections:
if not detection.detectTime:
detection.detectTime = currentTime
def generateSampleUrl(sampleId, settings: ImageStoreAddressSettings):
return f"{settings.origin}/{settings.apiVersion}/buckets/{settings.bucket}/{sampleId}"
if detection.samples.face and detection.samples.face.sampleId:
if not detection.samples.face.isFieldSet("url"):
detection.samples.face.url = generateSampleUrl(
detection.samples.face.sampleId, self.config.faceSamplesStorage
)
if detection.samples.body and detection.samples.body.sampleId and detection.samples.body.url == "":
if not detection.samples.body.isFieldSet("url"):
detection.samples.body.url = generateSampleUrl(
detection.samples.body.sampleId, self.config.bodySamplesStorage
)
if event.faceAttributes:
if event.faceAttributes.attributeId and event.faceAttributes.url is None:
baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}"
event.faceAttributes.url = f"{baseUri}/attributes/{event.faceAttributes.attributeId}"
event.updateAggregateEstimations(self.request.json, self.config.livenessSettings.realThreshold)
[docs] async def post(self, handlerId) -> HTTPResponse:
"""
Save user generated event. See `spec_save_event`_.
.. _spec_save_event:
_static/api.html#operation/saveEvent
Returns:
response with event id and event location
"""
if self.request.content_type != "application/json":
raise VLException(Error.BadContentType, 400, isCriticalError=False)
noCache = self.getQueryParam("no_cache", boolFrom01Getter)
waitSaving = self.getQueryParam("wait_saving", boolFrom01Getter, default=True)
event: RawEvent = self.loadDataFromJson(self.request.json, RawEvent)
self.enrichEvent(event)
handler, handlerVersion = await self.getHandlerWrapper(handlerId, event.accountId, noCache)
if not handler[HandlerTarget.isDynamic.value]:
raise VLException(isCriticalError=False, error=Error.HandlerMustByDynamic, statusCode=403)
if not self.config.additionalServicesUsage.lunaEvents:
raise VLException(isCriticalError=False, error=Error.LunaEventsIsDisabled, statusCode=403)
event3 = event.toLuna3(handlerId=handlerId)
coros = [
self.app.ctx.pluginManager.sendEventToPlugins(
"sending_event",
[event],
handlerId,
event.accountId,
self.requestId,
event.createTime,
event.endTime,
)
]
if self.app.ctx.serviceConfig.additionalServicesUsage.lunaSender:
coros.append(self.redisContext.publishRawEvent(event, handlerId=handlerId, requestId=self.requestId))
saveEvents = self.luna3Client.lunaEvents.saveEvents([event3], raiseError=True, waitEventsSaving=waitSaving)
if waitSaving:
await saveEvents
statusCode = 201
else:
coros.insert(0, saveEvents)
statusCode = 202
self.app.ctx.asyncRunner.runNoWait(coros)
location = event.url
self.respHeaders["Location"] = location
# Provide info about caching to clients
extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None
return self.success(
outputJson={"event_id": event3.eventId, "url": location}, statusCode=statusCode, extraHeaders=extraHeaders
)