Source code for luna_handlers.app.handlers.handler_raw_events_handler

"""Handler for saving event. """

from sanic.response import HTTPResponse

from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandler
from app.handlers.mixins import HandlerCacheMixin
from classes.schemas.event_raw import RawEvent
from configs.configs.configs.settings.classes import ImageStoreAddressSettings
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.utils.functions import currentDateTime
from crutches_on_wheels.web.query_getters import boolFrom01Getter


[docs]class RawEventHandler(BaseHandler, HandlerCacheMixin): """ Handler for save an user generated events. Resource: "/{api_version}/handlers" """
[docs] def enrichEvent(self, event: RawEvent) -> None: """ Enrich input event of server defaults (urls, create time, ...) Args: event: input event """ currentTime = currentDateTime(self.config.storageTime) if self.config.additionalServicesUsage.lunaEvents: url = f"{self.config.eventsAddress.origin}/{self.config.eventsAddress.apiVersion}/events/{event.eventId}" event.internal.url = url if not event.createTime: event.createTime = currentTime if not event.endTime: event.endTime = currentTime if event.face: if not event.face.eventId: event.face.eventId = event.eventId if not event.face.url: baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}" event.face.url = f"{baseUri}/faces/{event.face.faceId}" if event.face.eventId is None: event.face.eventId = event.eventId for detection in event.detections: if not detection.detectTime: detection.detectTime = currentTime def generateSampleUrl(sampleId, settings: ImageStoreAddressSettings): return f"{settings.origin}/{settings.apiVersion}/buckets/{settings.bucket}/{sampleId}" if detection.samples.face and detection.samples.face.sampleId: if not detection.samples.face.isFieldSet("url"): detection.samples.face.url = generateSampleUrl( detection.samples.face.sampleId, self.config.faceSamplesStorage ) if detection.samples.body and detection.samples.body.sampleId and detection.samples.body.url == "": if not detection.samples.body.isFieldSet("url"): detection.samples.body.url = generateSampleUrl( detection.samples.body.sampleId, self.config.bodySamplesStorage ) if event.faceAttributes: if event.faceAttributes.attributeId and event.faceAttributes.url is None: baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}" event.faceAttributes.url = f"{baseUri}/attributes/{event.faceAttributes.attributeId}" event.updateAggregateEstimations(self.request.json, self.config.livenessSettings.realThreshold)
[docs] async def post(self, handlerId) -> HTTPResponse: """ Save user generated event. See `spec_save_event`_. .. _spec_save_event: _static/api.html#operation/saveEvent Returns: response with event id and event location """ if self.request.content_type != "application/json": raise VLException(Error.BadContentType, 400, isCriticalError=False) noCache = self.getQueryParam("no_cache", boolFrom01Getter) waitSaving = self.getQueryParam("wait_saving", boolFrom01Getter, default=True) event: RawEvent = self.loadDataFromJson(self.request.json, RawEvent) self.enrichEvent(event) handler, handlerVersion = await self.getHandlerWrapper(handlerId, event.accountId, noCache) if not handler[HandlerTarget.isDynamic.value]: raise VLException(isCriticalError=False, error=Error.HandlerMustByDynamic, statusCode=403) if not self.config.additionalServicesUsage.lunaEvents: raise VLException(isCriticalError=False, error=Error.LunaEventsIsDisabled, statusCode=403) event3 = event.toLuna3(handlerId=handlerId) coros = [ self.app.ctx.pluginManager.sendEventToPlugins( "sending_event", [event], handlerId, event.accountId, self.requestId, event.createTime, event.endTime, ) ] if self.app.ctx.serviceConfig.additionalServicesUsage.lunaSender: coros.append(self.redisContext.publishRawEvent(event, handlerId=handlerId, requestId=self.requestId)) saveEvents = self.luna3Client.lunaEvents.saveEvents([event3], raiseError=True, waitEventsSaving=waitSaving) if waitSaving: await saveEvents statusCode = 201 else: coros.insert(0, saveEvents) statusCode = 202 self.app.ctx.asyncRunner.runNoWait(coros) location = event.url self.respHeaders["Location"] = location # Provide info about caching to clients extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None return self.success( outputJson={"event_id": event3.eventId, "url": location}, statusCode=statusCode, extraHeaders=extraHeaders )