# -*- coding: utf-8 -*-
""" Base handler
Module realize base class for all handlers.
"""
from copy import deepcopy
from typing import Any, Dict, NamedTuple, Tuple, Union
import msgpack
import ujson
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest
from multidict import CIMultiDict, CIMultiDictProxy
from sanic.response import HTTPResponse
from app.app import ApiApp, ApiRequest
from app.global_vars.context_vars import requestIdCtx
from app.handlers.schemas import schemas
from configs.configs.configs.services import SettingsApi
from configs.configs.configs.settings.classes import (
ImageStoreAddressSettings,
ServiceAddressSettings,
ServiceTimeoutsSettings,
)
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.base_proxy_handler_class import SessionPool
from crutches_on_wheels.web.handlers import BaseHandler
FACE_SAMPLE_RELATIVE_URL = "/{apiVersion}/samples/faces/{sampleId}"
BODY_SAMPLE_RELATIVE_URL = "/{apiVersion}/samples/bodies/{sampleId}"
ATTRIBUTE_RELATIVE_URL = "/{apiVersion}/attributes/{attributeId}"
EVENT_RELATIVE_URL = "/{apiVersion}/events/{eventId}"
FACE_RELATIVE_URL = "/{apiVersion}/faces/{faceId}"
HEADERS_WHITE_LIST = {
"content-type",
"luna-request-id",
"content-disposition",
"luna-event-time",
"luna-event-end-time",
"accept",
}
[docs]class ProxyRequest(NamedTuple):
"""
Proxy request
"""
body: Any # body for sending to a proxed service
headers: Union[Dict[str, str], CIMultiDictProxy[str]] # headers for sending to a proxed service
query: Dict[str, Union[None, str]] # query for sending to a proxed service
[docs]class BaseRequestHandler(BaseHandler):
"""
Base handler for other handlers.
Attributes:
luna3Client (luna3.client.Client): luna3 client
accountId (str): account id
"""
# IDE type hint stub
request: ApiRequest
def __init__(self, request: ApiRequest):
super().__init__(request)
self.accountId = request.accountId
self.addDataForMonitoring(tags={"account_id": self.accountId})
requestIdCtx.set(self.requestId)
self.luna3Client: Client = self.app.ctx.luna3session.getClient(self.requestId)
@property
def app(self) -> ApiApp:
"""
Get running app
Returns:
app
"""
return self.request.app
@property
def config(self) -> SettingsApi:
"""
Get app config
Returns:
app config
"""
return self.app.ctx.serviceConfig
[docs] @staticmethod
def filterAllowedHeaders(headers: CIMultiDict[str, str]) -> dict:
"""
Filter only allowed headers.
Args:
allHeaders: headers to filter
Returns:
filtered headers
"""
return {k: v for k, v in headers.items() if k.lower() in HEADERS_WHITE_LIST}
[docs] async def options(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'options' method
"""
allowHeaders = {"Allow": ", ".join(self.getAllowedMethods())}
return self.success(body=None, extraHeaders=allowHeaders)
[docs]class BaseProxyHandler(BaseRequestHandler):
"""
Base proxy handler for other handlers.
"""
# list of allowed proxy methods
allowedMethods: Tuple[str, ...] = ()
# session pool
session = SessionPool()
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a proxed service address
Returns:
a proxed service address
"""
raise NotImplemented
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a proxed service timeouts
Returns:
a proxed service timeouts
"""
raise NotImplemented
@property
def serviceUrl(self) -> str:
"""Service url"""
return f"{self.serviceAddress.origin}/{self.serviceAddress.apiVersion}"
[docs] def prepareRequestDefault(self) -> ProxyRequest:
"""
Prepare proxy request by default
Returns:
default proxy request
"""
return ProxyRequest(self.request.body, self.prepareHeaders(), self.prepareQuery())
[docs] async def prepareRequestPost(self):
"""
Prepare proxy request for method `post`
Returns:
proxy request
"""
return self.prepareRequestDefault()
[docs] async def prepareRequestPatch(self):
"""
Prepare proxy request for method `patch`
Returns:
proxy request
"""
return self.prepareRequestDefault()
[docs] async def prepareRequestDelete(self):
"""
Prepare proxy request for method `delete`
Returns:
proxy request
"""
return self.prepareRequestDefault()
[docs] async def prepareRequestPut(self):
"""
Prepare proxy request for method `put`
Returns:
proxy request
"""
return self.prepareRequestDefault()
[docs] async def prepareProxyRequest(self) -> ProxyRequest:
"""
Prepare proxy request
Returns:
proxy request
"""
if self.request.method in ("GET", "OPTIONS", "HEAD"):
return ProxyRequest(b"", self.prepareHeaders(), self.prepareQuery())
elif self.request.method in ("POST", "PUT", "DELETE", "PATCH"):
return await getattr(self, f"prepareRequest{self.request.method.capitalize()}")()
raise RuntimeError(f"bad request method {self.request.method}")
[docs] def postProcessingDefault(self, response: LunaResponse) -> HTTPResponse:
"""
Default post processing response from the service
Args:
response: response
Returns:
response in api format
"""
headers = self.filterAllowedHeaders(response.headers)
return self.success(response.statusCode, body=response.body, extraHeaders=headers)
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
"""
Default post processing response from the service
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `get`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingPatch(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `patch`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingDelete(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `delete`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingPut(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `put`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingOptions(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `post`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessingHead(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response from the service for method `head`
Args:
response: response
Returns:
response in api format
"""
return self.postProcessingDefault(response)
[docs] async def postProcessing(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing response
Args:
response: response
Returns:
response in api format
"""
return await getattr(self, f"postProcessing{self.request.method.capitalize()}")(response)
[docs] def prepareRequestCreation(self) -> ProxyRequest:
"""
Add account id to root json for creating some objects
Returns:
proxy request
Raises:
VLException(Error.BadContentType): if request has incorrect content type
"""
headers = self.prepareHeaders()
contentType = self.request.content_type
if contentType == "application/json":
inputJson = self.request.json
elif contentType == "application/octet-stream":
inputJson = {}
headers["Content-Type"] = "application/json"
elif contentType == "application/msgpack":
inputJson = self.request.json
headers["Content-Type"] = "application/msgpack"
else:
raise VLException(Error.BadContentType, statusCode=400, isCriticalError=False)
self.validateJson(inputJson, schemas.OBJECT_SCHEMA)
inputJson["account_id"] = self.accountId
if contentType == "application/msgpack":
body = msgpack.packb(inputJson, use_bin_type=True)
else:
body = ujson.dumps(inputJson, ensure_ascii=False)
return ProxyRequest(body, headers, self.prepareQuery())
[docs] def prepareQuery(self) -> Dict[str, str]:
"""
Prepare headers for a proxy request
Returns:
dict with queries + account_id
Raises:
VLException(Error.BadQueryParams) if input request contains an account_id query
"""
queryParams = dict(self.request.args)
if "account_id" in queryParams:
raise VLException(
Error.BadQueryParams.format("account_id"),
statusCode=400,
isCriticalError=False,
)
queryParams["account_id"] = self.accountId
return queryParams
[docs] def prepareHeaders(self) -> Dict[str, str]:
"""
Prepare headers for a proxy request
Returns:
headers with `Luna-Request-Id` and other input headers
"""
headers = self.filterAllowedHeaders(self.request.headers)
headers["Luna-Request-Id"] = self.requestId
headers["Accept-Encoding"] = "identity, deflate, gzip"
return headers
[docs] def convertIncomingUrls(self, response: LunaResponse) -> Dict[str, str]:
"""
Convert service URLs to output JSON and update Location in handler
Returns:
output JSON with converted service URLs
"""
outputJson = deepcopy(response.json)
location = outputJson["url"].replace(f"/{self.serviceAddress.apiVersion}", f"/{self.app.ctx.apiVersion}", 1)
self.respHeaders["Location"] = location
outputJson["url"] = location
return outputJson
[docs] def prepareUrl(self) -> str:
"""
Prepare url to a service
Returns:
same url with correct api version
"""
uriWithoutApiVersion = self.request.server_path.replace(f"/{self.app.ctx.apiVersion}", "", 1)
return f"{self.serviceUrl}{uriWithoutApiVersion}"
[docs] async def makeRequestToService(self) -> HTTPResponse:
"""
Make request to custom service
Returns:
response
"""
if self.request.method not in self.allowedMethods:
return self.setMethodNotAllowed()
url = self.prepareUrl()
preparedRequest = await self.prepareProxyRequest()
kwargs = dict(
url=url,
method=self.request.method,
queryParams=preparedRequest.query,
headers=preparedRequest.headers,
asyncRequest=True,
totalTimeout=self.serviceTimeouts.totalTimeout,
connectTimeout=self.serviceTimeouts.connectTimeout,
sockConnectTimeout=self.serviceTimeouts.sockConnectTimeout,
sockReadTimeout=self.serviceTimeouts.sockReadTimeout,
session=self.session,
)
reply = await makeRequest(body=preparedRequest.body, **kwargs)
if reply.success:
return await self.postProcessing(reply)
return self.postProcessingDefault(reply)
[docs] async def post(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'post' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def put(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'put' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def get(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'get' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def delete(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'delete' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def patch(self, *args, **kwargs) -> HTTPResponse:
"""
Default 'patch' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def head(self, *args, **kwargs):
"""
Default 'head' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs]class EventServiceBaseHandler(BaseProxyHandler):
"""
Class for proxy handlers to events
"""
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a events service address
Returns:
a events service address
"""
return self.config.eventsAddress
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a events service timeouts
Returns:
a events service timeouts
"""
return self.config.eventsTimeouts
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.config.additionalServicesUsage.lunaEvents:
raise VLException(Error.LunaEventsIsDisabled, statusCode=403, isCriticalError=False)
[docs]class FacesServiceBaseHandler(BaseProxyHandler):
"""
Class for proxy handlers to faces
"""
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a faces service address
Returns:
a faces service address
"""
return self.config.facesAddress
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a faces service timeouts
Returns:
a faces service timeouts
"""
return self.config.facesTimeouts
[docs]class HandlersServiceBaseHandler(BaseProxyHandler):
"""
Class for proxy handlers to handlers
"""
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a handlers service address
Returns:
a handlers service address
"""
return self.config.handlersAddress
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a handlers service timeouts
Returns:
a handlers service timeouts
"""
return self.config.handlersTimeouts
[docs] def convertAttributeUrls(self, replyJson: dict) -> dict:
"""
Convert attribute URLs in output JSON
Args:
replyJson: JSON
Returns:
output JSON with converted attribute URLs
"""
outputJson = deepcopy(replyJson)
for attribute in outputJson:
if attribute["attribute_id"]:
attribute["url"] = ATTRIBUTE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion,
attributeId=attribute["attribute_id"],
)
return outputJson
[docs] def convertDetectorSampleUrls(self, replyJson: dict) -> dict:
"""
Convert detections sample URLs in output JSON
Args:
replyJson: JSON
Returns:
output JSON with converted sample URLs
"""
outputJson = deepcopy(replyJson)
for image in outputJson["images"]:
for sample in image["detections"]["samples"]:
sample["face"]["url"] = FACE_SAMPLE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion,
sampleId=sample["face"]["sample_id"],
)
return outputJson
[docs] def convertHandlerEventsUrls(self, replyJson: dict) -> dict:
"""
Convert events/faces/detections samples URLs in output JSON
Args:
replyJson: JSON
Returns:
output JSON with converted sample URLs
"""
outputJson = deepcopy(replyJson)
for event in outputJson["events"]:
if event["url"] is not None:
event["url"] = EVENT_RELATIVE_URL.format(apiVersion=self.app.ctx.apiVersion, eventId=event["event_id"])
if event["face"] is not None:
event["face"]["url"] = FACE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion, faceId=event["face"]["face_id"]
)
if event["face_attributes"] is not None and event["face_attributes"]["url"] is not None:
event["face_attributes"]["url"] = ATTRIBUTE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion,
attributeId=event["face_attributes"]["attribute_id"],
)
for detection in event["detections"]:
samples = detection["samples"]
if samples["face"] and samples["face"]["url"]:
samples["face"]["url"] = FACE_SAMPLE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion,
sampleId=samples["face"]["sample_id"],
)
if samples["body"] and samples["body"]["url"]:
samples["body"]["url"] = BODY_SAMPLE_RELATIVE_URL.format(
apiVersion=self.app.ctx.apiVersion,
sampleId=samples["body"]["sample_id"],
)
return outputJson
[docs]class TasksServiceBaseHandler(BaseProxyHandler):
"""
Class for proxy handlers to tasks
"""
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a tasks service address
Returns:
a tasks service address
"""
return self.config.tasksAddress
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a tasks service timeouts
Returns:
a tasks service timeouts
"""
return self.config.tasksTimeouts
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.config.additionalServicesUsage.lunaTasks:
raise VLException(Error.LunaTasksIsDisabled, statusCode=403, isCriticalError=False)
[docs]class PythonMatcherServiceBaseHandler(BaseProxyHandler):
"""
Class for proxy handlers to python-matcher/python-matcher-proxy
"""
@property
def serviceAddress(self) -> ServiceAddressSettings:
"""
Get a python matcher service address
Returns:
a python matcher service address
"""
return (
self.config.matcherProxyAddress
if self.config.additionalServicesUsage.lunaMatcherProxy
else self.config.pythonMatcherAddress
)
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a python matcher service timeouts
Returns:
a python matcher service timeouts
"""
return (
self.config.matcherProxyTimeouts
if self.config.additionalServicesUsage.lunaMatcherProxy
else self.config.pythonMatcherTimeouts
)
[docs]class BaseFaceSampleProxyHandler(BaseProxyHandler):
"""
Base face sample proxy.
"""
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
a image store service address
"""
return self.config.faceSamplesStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a image store service timeouts
Returns:
a image store service timeouts
"""
return self.config.faceSamplesStorageTimeouts
[docs]class BaseBodySampleProxyHandler(BaseProxyHandler):
"""
Base body sample proxy.
"""
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
a image store service address
"""
return self.config.bodySamplesStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a image store service timeouts
Returns:
a image store service timeouts
"""
return self.config.bodySamplesStorageTimeouts