Source code for luna_api.app.handlers.handlers_proxy_handler

""" Handler for a certain handler. """
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Awaitable, Callable, List, Union

import dpath.util
import ujson
from luna3.common.exceptions import LunaApiException
from luna3.common.luna_response import LunaResponse
from sanic.response import HTTPResponse

from app.handlers.base_handler import HandlersServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException

getValueByPath = partial(dpath.util.values, separator=".")


[docs]@dataclass class RawEventObjectContainer: """ Event object container for checking its existence in the service. """ # glob path to object in input json (e.g. "face.face_id") globPathToObject: str # query name in camel case for client queryName: str # request client client: Callable[..., Awaitable[LunaResponse]] # possible error if the object for account was not found objectNotFoundError: ErrorInfo
[docs]class HandlersProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for create new handlers. See `spec_handlers`_. .. _spec_handlers: _static/api.html#tag/handlers Resource: "/{api_version}/handlers" """ allowedMethods = ("POST", "GET")
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="handler")
[docs] async def prepareRequestPost(self) -> ProxyRequest: self.updatePolicies() return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class HandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler replace handler. See `spec_handlers_handler`_. .. _spec_handlers_handler: _static/api.html#tag/handlers Resource: "/{api_version}/handlers/{handlerId}" """ allowedMethods = ("PUT", "GET", "DELETE", "HEAD")
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="handler")
[docs] async def prepareRequestPut(self) -> ProxyRequest: self.updatePolicies() return self.prepareRequestCreation()
[docs]class CountHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler replace handler. See `spec_handlers_count`_. .. _spec_handlers_count: _static/api.html#operation/getHandlerCount Resource: "/{api_version}/handlers/count" """ allowedMethods = ("GET",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="handler")
[docs]class HandlersValidatorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for validate handler policies. See `spec_validate_handler_policies`_. .. _spec_validate_handler_policies: _static/api.html#operation/validatePolicies Resource: "/{api_version}/handlers/validator" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest: self.updatePolicies() return self.prepareRequestCreation()
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """
[docs]class DetectorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for detecting faces on images. See `spec_detector`_. .. _spec_detector: _static/api.html#operation/detectFaces Resource: "/{api_version}/detector" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success( response.statusCode, outputJson=self.convertDetectorSampleUrls(response.json), )
[docs]class ExtractorProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for extracting attributes such as gender, age, ethnicity, descriptor from samples. See `spec_extractor`_. .. _spec_extractor: _static/api.html#operation/extractAttributes Resource: "/{api_version}/extractor" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="attribute")
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success(response.statusCode, outputJson=self.convertAttributeUrls(response.json))
[docs]class SDKHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations.See `spec_sdk`_. .. _spec_sdk: _static/api.html#tag/sdk Resource: "/{api_version}/sdk" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ if (permissions := self.request.credentials.permissions) is not None: if self.request.method == "POST" and (resource := "sdk") not in permissions.resources: raise VLException(Error.ResourceForbiddenByToken.format(resource), 403, False)
[docs]class ISOHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for ISO estimations.See `spec_iso`_. .. _spec_iso: _static/api.html#operation/checkISO Resource: "/{api_version}/iso" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ if (permissions := self.request.credentials.permissions) is not None: if self.request.method == "POST" and (resource := "iso") not in permissions.resources: raise VLException(Error.ResourceForbiddenByToken.format(resource), 403, False)
[docs]class EventHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_emit_events`_. .. _spec_emit_events: _static/api.html#operation/generateEvents Resource: "/{api_version}/handlers/{handler_id}/events" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ permissions = self.request.credentials.permissions if permissions is None: return if not permissions.emitEvents.allowed: raise VLException(Error.ForbiddenByTokenEventsGeneration, 403, False) handlerId = self.request.match_info["handlerId"] if permissions.emitEvents.blackList is not None and handlerId in permissions.emitEvents.blackList: raise VLException(Error.ForbiddenByTokenEmitEvents, 403, False) if permissions.emitEvents.whiteList is not None and handlerId not in permissions.emitEvents.whiteList: raise VLException(Error.ForbiddenByTokenEmitEvents, 403, False)
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" return self.success(response.statusCode, outputJson=self.convertHandlerEventsUrls(response.json))
[docs]class EventRawHandlerProxyHandler(HandlersServiceBaseHandler): """ Proxy handler for save raw event. See `spec_save_event`_. .. _spec_save_event: _static/api.html#operation/saveEvent Resource: "/{api_version}/handlers/{handler_id}/events/raw" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="event")
[docs] async def checkExistingObject(self, container: RawEventObjectContainer, queryValue: Union[str, List[str]]) -> None: """ Check existing object by account id. Args: container: container class with request client and possible error queryValue: query value Raises: VLException(Error.ObjectNotFound, 404) if object not found """ try: response = await container.client( **{container.queryName: queryValue}, accountId=self.accountId, raiseError=True, ) except LunaApiException as e: if e.statusCode == 404: raise VLException( container.objectNotFoundError.format(queryValue), e.statusCode, isCriticalError=False, ) raise if container.objectNotFoundError == Error.ListsNotFound: diffs = set(queryValue) - {list_["list_id"] for list_ in response.json["lists"]} if diffs: raise VLException( container.objectNotFoundError.format(diffs.pop()), 404, isCriticalError=False, )
[docs] async def checkObjectsBelongToAccount(self) -> ProxyRequest: """ Check that event objects are associated with an account. Returns: Proxy request with raw event """ inputJson = self.request.json self.validateJson(inputJson, schemas.EVENT_RAW_SCHEMA) structObjects = [ RawEventObjectContainer( globPathToObject="face.face_id", queryName="faceId", client=self.luna3Client.lunaFaces.checkFace, objectNotFoundError=Error.FaceNotFound, ), RawEventObjectContainer( globPathToObject="face.lists", queryName="listIds", client=self.luna3Client.lunaFaces.getLists, objectNotFoundError=Error.ListsNotFound, ), RawEventObjectContainer( globPathToObject="face_attributes.attribute_id", queryName="attributeId", client=self.luna3Client.lunaFaces.checkAttribute, objectNotFoundError=Error.AttributesNotFound, ), RawEventObjectContainer( globPathToObject="detections.*.samples.face.sample_id", queryName="imageId", client=partial( self.luna3Client.lunaFaceSamplesStore.checkImage, bucketName=self.config.faceSamplesStorage.bucket, ), objectNotFoundError=Error.SampleNotFound, ), RawEventObjectContainer( globPathToObject="detections.*.samples.body.sample_id", queryName="imageId", client=partial( self.luna3Client.lunaBodySamplesStore.checkImage, bucketName=self.config.bodySamplesStorage.bucket, ), objectNotFoundError=Error.SampleNotFound, ), ] tasks = [] for struct in structObjects: queryValues = filter( lambda v: v is not None, getValueByPath(obj=inputJson, glob=struct.globPathToObject), ) for queryValue in queryValues: tasks.append(self.checkExistingObject(container=struct, queryValue=queryValue)) await asyncio.gather(*tasks) return ProxyRequest( ujson.dumps(inputJson, ensure_ascii=False), self.prepareHeaders(), self.prepareQuery(), )
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Prepare proxy request for method `post`. Returns: Proxy request with raw event """ if self.request.content_type == "application/json": return await self.checkObjectsBelongToAccount() return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse): """Post processing response from the service.""" outputJson = deepcopy(response.json) location = f"/{self.app.ctx.apiVersion}/events/{outputJson['event_id']}" self.respHeaders["Location"] = outputJson["url"] = location return self.success(response.statusCode, outputJson=outputJson)
[docs]class VerifiersHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for verifiers. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers" """ allowedMethods = ("POST", "GET")
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="verifier")
[docs] async def prepareRequestPost(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class VerifierCountHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/count" """ allowedMethods = ("GET",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="verifier")
[docs]class VerifierHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}" """ allowedMethods = ("GET", "PUT", "HEAD", "DELETE")
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="verifier")
[docs] async def prepareRequestPut(self) -> ProxyRequest: return self.prepareRequestCreation()
[docs]class HandlerVerifyRawProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}/raw" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """
[docs]class VerifierVerificationsHandlerProxy(HandlersServiceBaseHandler): """ Proxy handler for SDK estimations. See `spec_verifiers`_. .. _spec_verifiers: _static/api.html#tag/verifiers Resource: "/{api_version}/verifiers/{verifier_id}/verifications" """ allowedMethods = ("POST",)
[docs] async def postProcessingPost(self, response: LunaResponse): return self.success(response.statusCode, outputJson=self.convertVerifierUrls(response.json))
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """