Source code for luna_api.app.handlers.handlers_proxy_handler

""" Handler for a certain handler. """
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Awaitable, Callable, List, Union

import dpath.util
import ujson
from luna3.common.exceptions import LunaApiException
from luna3.common.luna_response import LunaResponse
from sanic.response import HTTPResponse

from app.handlers.base_handler import HandlersServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error, ErrorInfo
from crutches_on_wheels.errors.exception import VLException

getValueByPath = partial(dpath.util.values, separator=".")


[docs]@dataclass class RawEventObjectContainer: """ Event object container for checking its existence in the service. """ # glob path to object in input json (e.g. "face.face_id") globPathToObject: str # query name in camel case for client queryName: str # request client client: Callable[..., Awaitable[LunaResponse]] # possible error if the object for account was not found objectNotFoundError: ErrorInfo
[docs]class HandlersProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for create new handlers. See `spec_handlers`_. .. _spec_handlers: _static/api.html#tag/handlers Resource: "/{api_version}/handlers" """ allowedMethods = ("POST", "GET")
[docs] async def prepareRequestPost(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class HandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler replace handler. See `spec_handlers_handler`_. .. _spec_handlers_handler: _static/api.html#tag/handlers Resource: "/{api_version}/handlers/{handlerId}" """ allowedMethods = ("PUT", "GET", "DELETE", "HEAD")
[docs] async def prepareRequestPut(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs]class CountHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler replace handler. See `spec_handlers_count`_. .. _spec_handlers_count: _static/api.html#operation/getHandlerCount Resource: "/{api_version}/handlers/count" """ allowedMethods = ("GET",)
[docs]class HandlersValidatorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for validate handler policies. See `spec_validate_handler_policies`_. .. _spec_validate_handler_policies: _static/api.html#operation/validatePolicies Resource: "/{api_version}/handlers/validator" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs]class DetectorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for detecting faces on images. See `spec_detector`_. .. _spec_detector: _static/api.html#operation/detectFaces Resource: "/{api_version}/detector" """ allowedMethods = ("POST",)
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success( response.statusCode, outputJson=self.convertDetectorSampleUrls(response.json), )
[docs]class ExtractorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for extracting attributes such as gender, age, ethnicity, descriptor from samples. See `spec_extractor`_. .. _spec_extractor: _static/api.html#operation/extractAttributes Resource: "/{api_version}/extractor" """ allowedMethods = ("POST",)
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success(response.statusCode, outputJson=self.convertAttributeUrls(response.json))
[docs]class SDKHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations.See `spec_sdk`_. .. _spec_sdk: _static/api.html#tag/sdk Resource: "/{api_version}/sdk" """ allowedMethods = ("POST",)
[docs]class ISOHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for ISO estimations.See `spec_iso`_. .. _spec_iso: _static/api.html#operation/checkISO Resource: "/{api_version}/iso" """ allowedMethods = ("POST",)
[docs]class EventHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_emit_events`_. .. _spec_emit_events: _static/api.html#operation/generateEvents Resource: "/{api_version}/handlers/{handler_id}/events" """ allowedMethods = ("POST",)
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success(response.statusCode, outputJson=self.convertHandlerEventsUrls(response.json))
[docs]class EventRawHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for save raw event. See `spec_save_event`_. .. _spec_save_event: _static/api.html#operation/saveEvent Resource: "/{api_version}/handlers/{handler_id}/events/raw" """ allowedMethods = ("POST",)
[docs] async def checkExistingObject(self, container: RawEventObjectContainer, queryValue: Union[str, List[str]]) -> None: """ Check existing object by account id. Args: container: container class with request client and possible error queryValue: query value Raises: VLException(Error.ObjectNotFound, 404) if object not found """ try: response = await container.client( **{container.queryName: queryValue}, accountId=self.accountId, raiseError=True, ) except LunaApiException as e: if e.statusCode == 404: raise VLException( container.objectNotFoundError.format(queryValue), e.statusCode, isCriticalError=False, ) raise if container.objectNotFoundError == Error.ListsNotFound: diffs = set(queryValue) - {list_["list_id"] for list_ in response.json["lists"]} if diffs: raise VLException( container.objectNotFoundError.format(diffs.pop()), 404, isCriticalError=False, )
[docs] async def checkObjectsBelongToAccount(self) -> ProxyRequest: """ Check that event objects are associated with an account. Returns: Proxy request with raw event """ inputJson = self.request.json self.validateJson(inputJson, schemas.EVENT_RAW_SCHEMA) structObjects = [ RawEventObjectContainer( globPathToObject="face.face_id", queryName="faceId", client=self.luna3Client.lunaFaces.checkFace, objectNotFoundError=Error.FaceNotFound, ), RawEventObjectContainer( globPathToObject="face.lists", queryName="listIds", client=self.luna3Client.lunaFaces.getLists, objectNotFoundError=Error.ListsNotFound, ), RawEventObjectContainer( globPathToObject="face_attributes.attribute_id", queryName="attributeId", client=self.luna3Client.lunaFaces.checkAttribute, objectNotFoundError=Error.AttributesNotFound, ), RawEventObjectContainer( globPathToObject="detections.*.samples.face.sample_id", queryName="imageId", client=partial( self.luna3Client.lunaFaceSamplesStore.checkImage, bucketName=self.config.faceSamplesStorage.bucket, ), objectNotFoundError=Error.SampleNotFound, ), RawEventObjectContainer( globPathToObject="detections.*.samples.body.sample_id", queryName="imageId", client=partial( self.luna3Client.lunaBodySamplesStore.checkImage, bucketName=self.config.bodySamplesStorage.bucket, ), objectNotFoundError=Error.SampleNotFound, ), ] tasks = [] for struct in structObjects: queryValues = filter( lambda v: v is not None, getValueByPath(obj=inputJson, glob=struct.globPathToObject), ) for queryValue in queryValues: tasks.append(self.checkExistingObject(container=struct, queryValue=queryValue)) await asyncio.gather(*tasks) inputJson["account_id"] = self.accountId return ProxyRequest( ujson.dumps(inputJson, ensure_ascii=False), self.prepareHeaders(), self.prepareQuery(), )
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Prepare proxy request for method `post`. Returns: Proxy request with raw event """ if self.request.content_type == "application/json": return await self.checkObjectsBelongToAccount() return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" outputJson = deepcopy(response.json) location = f"/{self.app.ctx.apiVersion}/events/{outputJson['event_id']}" self.respHeaders["Location"] = outputJson["url"] = location return self.success(response.statusCode, outputJson=outputJson)
[docs]class VerifiersHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for verifiers. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers" """ allowedMethods = ("POST", "GET")
[docs] async def prepareRequestPost(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class VerifierCountHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/count" """ allowedMethods = ("GET",)
[docs]class VerifierHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}" """ allowedMethods = ("GET", "PUT", "HEAD", "DELETE")
[docs] async def prepareRequestPut(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs]class HandlerVerifyRawProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}/raw" """ allowedMethods = ("POST",)
[docs]class VerifierVerificationsHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}/verifications" """ allowedMethods = ("POST",)