Source code for luna_api.app.handlers.base_handler

# -*- coding: utf-8 -*-
""" Base handler

Module realize base class for all handlers.
"""
from copy import deepcopy
from typing import Any, Dict, NamedTuple, Tuple, Union

import msgpack
import ujson
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest
from multidict import CIMultiDict, CIMultiDictProxy
from sanic.response import HTTPResponse

from app.app import ApiApp, ApiRequest
from app.global_vars.context_vars import requestIdCtx
from app.handlers.schemas import schemas
from configs.configs.configs.services import SettingsApi
from configs.configs.configs.settings.classes import (
    ImageStoreAddressSettings,
    ServiceAddressSettings,
    ServiceTimeoutsSettings,
)
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.base_proxy_handler_class import SessionPool
from crutches_on_wheels.web.handlers import BaseHandler

FACE_SAMPLE_RELATIVE_URL = "/{apiVersion}/samples/faces/{sampleId}"
BODY_SAMPLE_RELATIVE_URL = "/{apiVersion}/samples/bodies/{sampleId}"
ATTRIBUTE_RELATIVE_URL = "/{apiVersion}/attributes/{attributeId}"
EVENT_RELATIVE_URL = "/{apiVersion}/events/{eventId}"
FACE_RELATIVE_URL = "/{apiVersion}/faces/{faceId}"
HEADERS_WHITE_LIST = {
    "content-type",
    "luna-request-id",
    "content-disposition",
    "luna-event-time",
    "luna-event-end-time",
    "accept",
}


[docs]class ProxyRequest(NamedTuple): """ Proxy request """ body: Any # body for sending to a proxed service headers: Union[Dict[str, str], CIMultiDictProxy[str]] # headers for sending to a proxed service query: Dict[str, Union[None, str]] # query for sending to a proxed service
[docs]class BaseRequestHandler(BaseHandler): """ Base handler for other handlers. Attributes: luna3Client (luna3.client.Client): luna3 client accountId (str): account id """ # IDE type hint stub request: ApiRequest def __init__(self, request: ApiRequest): super().__init__(request) self.accountId = request.accountId self.addDataForMonitoring(tags={"account_id": self.accountId}) requestIdCtx.set(self.requestId) self.luna3Client: Client = self.app.ctx.luna3session.getClient(self.requestId) @property def app(self) -> ApiApp: """ Get running app Returns: app """ return self.request.app @property def config(self) -> SettingsApi: """ Get app config Returns: app config """ return self.app.ctx.serviceConfig
[docs] @staticmethod def filterAllowedHeaders(headers: CIMultiDict[str, str]) -> dict: """ Filter only allowed headers. Args: allHeaders: headers to filter Returns: filtered headers """ return {k: v for k, v in headers.items() if k.lower() in HEADERS_WHITE_LIST}
[docs] async def options(self, *args, **kwargs) -> HTTPResponse: """ Default 'options' method """ allowHeaders = {"Allow": ", ".join(self.getAllowedMethods())} return self.success(body=None, extraHeaders=allowHeaders)
[docs]class BaseProxyHandler(BaseRequestHandler): """ Base proxy handler for other handlers. """ # list of allowed proxy methods allowedMethods: Tuple[str, ...] = () # session pool session = SessionPool() @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a proxed service address Returns: a proxed service address """ raise NotImplemented @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a proxed service timeouts Returns: a proxed service timeouts """ raise NotImplemented @property def serviceUrl(self) -> str: """Service url""" return f"{self.serviceAddress.origin}/{self.serviceAddress.apiVersion}"
[docs] def prepareRequestDefault(self) -> ProxyRequest: """ Prepare proxy request by default Returns: default proxy request """ return ProxyRequest(self.request.body, self.prepareHeaders(), self.prepareQuery())
[docs] async def prepareRequestPost(self): """ Prepare proxy request for method `post` Returns: proxy request """ return self.prepareRequestDefault()
[docs] async def prepareRequestPatch(self): """ Prepare proxy request for method `patch` Returns: proxy request """ return self.prepareRequestDefault()
[docs] async def prepareRequestDelete(self): """ Prepare proxy request for method `delete` Returns: proxy request """ return self.prepareRequestDefault()
[docs] async def prepareRequestPut(self): """ Prepare proxy request for method `put` Returns: proxy request """ return self.prepareRequestDefault()
[docs] async def prepareProxyRequest(self) -> ProxyRequest: """ Prepare proxy request Returns: proxy request """ if self.request.method in ("GET", "OPTIONS", "HEAD"): return ProxyRequest(b"", self.prepareHeaders(), self.prepareQuery()) elif self.request.method in ("POST", "PUT", "DELETE", "PATCH"): return await getattr(self, f"prepareRequest{self.request.method.capitalize()}")() raise RuntimeError(f"bad request method {self.request.method}")
[docs] def postProcessingDefault(self, response: LunaResponse) -> HTTPResponse: """ Default post processing response from the service Args: response: response Returns: response in api format """ headers = self.filterAllowedHeaders(response.headers) return self.success(response.statusCode, body=response.body, extraHeaders=headers)
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: """ Default post processing response from the service Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `get` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingPatch(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `patch` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingDelete(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `delete` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingPut(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `put` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingOptions(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `post` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessingHead(self, response: LunaResponse) -> HTTPResponse: """ Post processing response from the service for method `head` Args: response: response Returns: response in api format """ return self.postProcessingDefault(response)
[docs] async def postProcessing(self, response: LunaResponse) -> HTTPResponse: """ Post processing response Args: response: response Returns: response in api format """ return await getattr(self, f"postProcessing{self.request.method.capitalize()}")(response)
[docs] def prepareRequestCreation(self) -> ProxyRequest: """ Add account id to root json for creating some objects Returns: proxy request Raises: VLException(Error.BadContentType): if request has incorrect content type """ headers = self.prepareHeaders() contentType = self.request.content_type if contentType == "application/json": inputJson = self.request.json elif contentType == "application/octet-stream": inputJson = {} headers["Content-Type"] = "application/json" elif contentType == "application/msgpack": inputJson = self.request.json headers["Content-Type"] = "application/msgpack" else: raise VLException(Error.BadContentType, statusCode=400, isCriticalError=False) self.validateJson(inputJson, schemas.OBJECT_SCHEMA) inputJson["account_id"] = self.accountId if contentType == "application/msgpack": body = msgpack.packb(inputJson, use_bin_type=True) else: body = ujson.dumps(inputJson, ensure_ascii=False) return ProxyRequest(body, headers, self.prepareQuery())
[docs] def prepareQuery(self) -> Dict[str, str]: """ Prepare headers for a proxy request Returns: dict with queries + account_id Raises: VLException(Error.BadQueryParams) if input request contains an account_id query """ queryParams = dict(self.request.args) if "account_id" in queryParams: raise VLException( Error.BadQueryParams.format("account_id"), statusCode=400, isCriticalError=False, ) queryParams["account_id"] = self.accountId return queryParams
[docs] def prepareHeaders(self) -> Dict[str, str]: """ Prepare headers for a proxy request Returns: headers with `Luna-Request-Id` and other input headers """ headers = self.filterAllowedHeaders(self.request.headers) headers["Luna-Request-Id"] = self.requestId headers["Accept-Encoding"] = "identity, deflate, gzip" return headers
[docs] def convertIncomingUrls(self, response: LunaResponse) -> Dict[str, str]: """ Convert service URLs to output JSON and update Location in handler Returns: output JSON with converted service URLs """ outputJson = deepcopy(response.json) location = outputJson["url"].replace(f"/{self.serviceAddress.apiVersion}", f"/{self.app.ctx.apiVersion}", 1) self.respHeaders["Location"] = location outputJson["url"] = location return outputJson
[docs] def prepareUrl(self) -> str: """ Prepare url to a service Returns: same url with correct api version """ uriWithoutApiVersion = self.request.server_path.replace(f"/{self.app.ctx.apiVersion}", "", 1) return f"{self.serviceUrl}{uriWithoutApiVersion}"
[docs] async def makeRequestToService(self) -> HTTPResponse: """ Make request to custom service Returns: response """ if self.request.method not in self.allowedMethods: return self.setMethodNotAllowed() url = self.prepareUrl() preparedRequest = await self.prepareProxyRequest() kwargs = dict( url=url, method=self.request.method, queryParams=preparedRequest.query, headers=preparedRequest.headers, asyncRequest=True, totalTimeout=self.serviceTimeouts.totalTimeout, connectTimeout=self.serviceTimeouts.connectTimeout, sockConnectTimeout=self.serviceTimeouts.sockConnectTimeout, sockReadTimeout=self.serviceTimeouts.sockReadTimeout, session=self.session, ) reply = await makeRequest(body=preparedRequest.body, **kwargs) if reply.success: return await self.postProcessing(reply) return self.postProcessingDefault(reply)
[docs] async def post(self, *args, **kwargs) -> HTTPResponse: """ Default 'post' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def put(self, *args, **kwargs) -> HTTPResponse: """ Default 'put' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def get(self, *args, **kwargs) -> HTTPResponse: """ Default 'get' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def delete(self, *args, **kwargs) -> HTTPResponse: """ Default 'delete' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def patch(self, *args, **kwargs) -> HTTPResponse: """ Default 'patch' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def head(self, *args, **kwargs): """ Default 'head' method Returns: proxy response """ return await self.makeRequestToService()
[docs]class EventServiceBaseHandler(BaseProxyHandler): """ Class for proxy handlers to events """ @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a events service address Returns: a events service address """ return self.config.eventsAddress @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a events service timeouts Returns: a events service timeouts """ return self.config.eventsTimeouts def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.config.additionalServicesUsage.lunaEvents: raise VLException(Error.LunaEventsIsDisabled, statusCode=403, isCriticalError=False)
[docs]class FacesServiceBaseHandler(BaseProxyHandler): """ Class for proxy handlers to faces """ @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a faces service address Returns: a faces service address """ return self.config.facesAddress @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a faces service timeouts Returns: a faces service timeouts """ return self.config.facesTimeouts
[docs]class HandlersServiceBaseHandler(BaseProxyHandler): """ Class for proxy handlers to handlers """ @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a handlers service address Returns: a handlers service address """ return self.config.handlersAddress @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a handlers service timeouts Returns: a handlers service timeouts """ return self.config.handlersTimeouts
[docs] def convertAttributeUrls(self, replyJson: dict) -> dict: """ Convert attribute URLs in output JSON Args: replyJson: JSON Returns: output JSON with converted attribute URLs """ outputJson = deepcopy(replyJson) for attribute in outputJson: if attribute["attribute_id"]: attribute["url"] = ATTRIBUTE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, attributeId=attribute["attribute_id"], ) return outputJson
[docs] def convertDetectorSampleUrls(self, replyJson: dict) -> dict: """ Convert detections sample URLs in output JSON Args: replyJson: JSON Returns: output JSON with converted sample URLs """ outputJson = deepcopy(replyJson) for image in outputJson["images"]: for sample in image["detections"]["samples"]: sample["face"]["url"] = FACE_SAMPLE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, sampleId=sample["face"]["sample_id"], ) return outputJson
[docs] def convertHandlerEventsUrls(self, replyJson: dict) -> dict: """ Convert events/faces/detections samples URLs in output JSON Args: replyJson: JSON Returns: output JSON with converted sample URLs """ outputJson = deepcopy(replyJson) for event in outputJson["events"]: if event["url"] is not None: event["url"] = EVENT_RELATIVE_URL.format(apiVersion=self.app.ctx.apiVersion, eventId=event["event_id"]) if event["face"] is not None: event["face"]["url"] = FACE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, faceId=event["face"]["face_id"] ) if event["face_attributes"] is not None and event["face_attributes"]["url"] is not None: event["face_attributes"]["url"] = ATTRIBUTE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, attributeId=event["face_attributes"]["attribute_id"], ) for detection in event["detections"]: samples = detection["samples"] if samples["face"] and samples["face"]["url"]: samples["face"]["url"] = FACE_SAMPLE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, sampleId=samples["face"]["sample_id"], ) if samples["body"] and samples["body"]["url"]: samples["body"]["url"] = BODY_SAMPLE_RELATIVE_URL.format( apiVersion=self.app.ctx.apiVersion, sampleId=samples["body"]["sample_id"], ) return outputJson
[docs]class TasksServiceBaseHandler(BaseProxyHandler): """ Class for proxy handlers to tasks """ @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a tasks service address Returns: a tasks service address """ return self.config.tasksAddress @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a tasks service timeouts Returns: a tasks service timeouts """ return self.config.tasksTimeouts def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.config.additionalServicesUsage.lunaTasks: raise VLException(Error.LunaTasksIsDisabled, statusCode=403, isCriticalError=False)
[docs]class PythonMatcherServiceBaseHandler(BaseProxyHandler): """ Class for proxy handlers to python-matcher/python-matcher-proxy """ @property def serviceAddress(self) -> ServiceAddressSettings: """ Get a python matcher service address Returns: a python matcher service address """ return ( self.config.matcherProxyAddress if self.config.additionalServicesUsage.lunaMatcherProxy else self.config.pythonMatcherAddress ) @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a python matcher service timeouts Returns: a python matcher service timeouts """ return ( self.config.matcherProxyTimeouts if self.config.additionalServicesUsage.lunaMatcherProxy else self.config.pythonMatcherTimeouts )
[docs]class BaseFaceSampleProxyHandler(BaseProxyHandler): """ Base face sample proxy. """ @property def serviceAddress(self) -> ImageStoreAddressSettings: """ Get a image store service address Returns: a image store service address """ return self.config.faceSamplesStorage @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a image store service timeouts Returns: a image store service timeouts """ return self.config.faceSamplesStorageTimeouts
[docs]class BaseBodySampleProxyHandler(BaseProxyHandler): """ Base body sample proxy. """ @property def serviceAddress(self) -> ImageStoreAddressSettings: """ Get a image store service address Returns: a image store service address """ return self.config.bodySamplesStorage @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a image store service timeouts Returns: a image store service timeouts """ return self.config.bodySamplesStorageTimeouts