Source code for luna_handlers.app.handlers.handler_raw_events_handler
"""Handler for saving event. """
from sanic.response import HTTPResponse
from app.global_vars.enums import HandlerTarget
from app.handlers.base_handler import BaseHandler
from app.handlers.mixins import HandlerCacheMixin
from classes.schemas.event_raw import RawEvent
from configs.configs.configs.settings.classes import ImageStoreAddressSettings
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.utils.functions import currentDateTime
from crutches_on_wheels.web.query_getters import boolFrom01Getter
[docs]class RawEventHandler(BaseHandler, HandlerCacheMixin):
    """
    Handler for save an user generated events.
    Resource: "/{api_version}/handlers"
    """
[docs]    def enrichEvent(self, event: RawEvent) -> None:
        """
        Enrich input event of server defaults (urls, create time, ...)
        Args:
            event: input event
        """
        currentTime = currentDateTime(self.config.storageTime)
        if self.config.additionalServicesUsage.lunaEvents:
            url = f"{self.config.eventsAddress.origin}/{self.config.eventsAddress.apiVersion}/events/{event.eventId}"
            event.internal.url = url
        if not event.createTime:
            event.createTime = currentTime
        if not event.endTime:
            event.endTime = currentTime
        if event.face:
            if not event.face.eventId:
                event.face.eventId = event.eventId
            if not event.face.url:
                baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}"
                event.face.url = f"{baseUri}/faces/{event.face.faceId}"
            if event.face.eventId is None:
                event.face.eventId = event.eventId
        for detection in event.detections:
            if not detection.detectTime:
                detection.detectTime = currentTime
            def generateSampleUrl(sampleId, settings: ImageStoreAddressSettings):
                return f"{settings.origin}/{settings.apiVersion}/buckets/{settings.bucket}/{sampleId}"
            if detection.samples.face and detection.samples.face.sampleId:
                if not detection.samples.face.isFieldSet("url"):
                    detection.samples.face.url = generateSampleUrl(
                        detection.samples.face.sampleId, self.config.faceSamplesStorage
                    )
            if detection.samples.body and detection.samples.body.sampleId and detection.samples.body.url == "":
                if not detection.samples.body.isFieldSet("url"):
                    detection.samples.body.url = generateSampleUrl(
                        detection.samples.body.sampleId, self.config.bodySamplesStorage
                    )
        if event.faceAttributes:
            if event.faceAttributes.attributeId and event.faceAttributes.url is None:
                baseUri = f"{self.config.facesAddress.origin}/{self.config.facesAddress.apiVersion}"
                event.faceAttributes.url = f"{baseUri}/attributes/{event.faceAttributes.attributeId}"
        event.updateAggregateEstimations(self.request.json, self.config.livenessSettings.realThreshold) 
[docs]    async def post(self, handlerId) -> HTTPResponse:
        """
        Save user generated event. See `spec_save_event`_.
        .. _spec_save_event:
            _static/api.html#operation/saveEvent
        Returns:
            response with event id and event location
        """
        if self.request.content_type != "application/json":
            raise VLException(Error.BadContentType, 400, isCriticalError=False)
        noCache = self.getQueryParam("no_cache", boolFrom01Getter)
        waitSaving = self.getQueryParam("wait_saving", boolFrom01Getter, default=True)
        event: RawEvent = self.loadDataFromJson(self.request.json, RawEvent)
        self.enrichEvent(event)
        handler, handlerVersion = await self.getHandlerWrapper(handlerId, event.accountId, noCache)
        if not handler[HandlerTarget.isDynamic.value]:
            raise VLException(isCriticalError=False, error=Error.HandlerMustByDynamic, statusCode=403)
        if not self.config.additionalServicesUsage.lunaEvents:
            raise VLException(isCriticalError=False, error=Error.LunaEventsIsDisabled, statusCode=403)
        event3 = event.toLuna3(handlerId=handlerId)
        coros = [
            self.app.ctx.pluginManager.sendEventToPlugins(
                "sending_event",
                [event],
                handlerId,
                event.accountId,
                self.requestId,
                event.createTime,
                event.endTime,
            )
        ]
        if self.app.ctx.serviceConfig.additionalServicesUsage.lunaSender:
            coros.append(self.redisContext.publishRawEvent(event, handlerId=handlerId, requestId=self.requestId))
        saveEvents = self.luna3Client.lunaEvents.saveEvents([event3], raiseError=True, waitEventsSaving=waitSaving)
        if waitSaving:
            await saveEvents
            statusCode = 201
        else:
            coros.insert(0, saveEvents)
            statusCode = 202
        self.app.ctx.asyncRunner.runNoWait(coros)
        location = event.url
        self.respHeaders["Location"] = location
        # Provide info about caching to clients
        extraHeaders = {"Cache-Handler-Version": handlerVersion} if handlerVersion else None
        return self.success(
            outputJson={"event_id": event3.eventId, "url": location}, statusCode=statusCode, extraHeaders=extraHeaders
        )