""" Handler for a certain handler. """
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Awaitable, Callable, List, Union
import dpath.util
import ujson
from luna3.common.exceptions import LunaApiException
from luna3.common.luna_response import LunaResponse
from sanic.response import HTTPResponse
from app.handlers.base_handler import HandlersServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error, ErrorInfo
from crutches_on_wheels.errors.exception import VLException
getValueByPath = partial(dpath.util.values, separator=".")
[docs]@dataclass
class RawEventObjectContainer:
"""
Event object container for checking its existence in the service.
"""
# glob path to object in input json (e.g. "face.face_id")
globPathToObject: str
# query name in camel case for client
queryName: str
# request client
client: Callable[..., Awaitable[LunaResponse]]
# possible error if the object for account was not found
objectNotFoundError: ErrorInfo
[docs]class HandlersProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for create new handlers. See `spec_handlers`_.
.. _spec_handlers:
_static/api.html#tag/handlers
Resource: "/{api_version}/handlers"
"""
allowedMethods = ("POST", "GET")
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="handler")
[docs] async def prepareRequestPost(self) -> ProxyRequest:
self.updatePolicies()
return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class HandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler replace handler. See `spec_handlers_handler`_.
.. _spec_handlers_handler:
_static/api.html#tag/handlers
Resource: "/{api_version}/handlers/{handlerId}"
"""
allowedMethods = ("PUT", "GET", "DELETE", "HEAD")
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="handler")
[docs] async def prepareRequestPut(self) -> ProxyRequest:
self.updatePolicies()
return self.prepareRequestCreation()
[docs]class CountHandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler replace handler. See `spec_handlers_count`_.
.. _spec_handlers_count:
_static/api.html#operation/getHandlerCount
Resource: "/{api_version}/handlers/count"
"""
allowedMethods = ("GET",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="handler")
[docs]class HandlersValidatorProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for validate handler policies. See `spec_validate_handler_policies`_.
.. _spec_validate_handler_policies:
_static/api.html#operation/validatePolicies
Resource: "/{api_version}/handlers/validator"
"""
allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest:
self.updatePolicies()
return self.prepareRequestCreation()
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
[docs]class DetectorProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for detecting faces on images. See `spec_detector`_.
.. _spec_detector:
_static/api.html#operation/detectFaces
Resource: "/{api_version}/detector"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
[docs] async def postProcessingPost(self, response: LunaResponse):
"""Post processing response from the service."""
return self.success(
response.statusCode,
outputJson=self.convertDetectorSampleUrls(response.json),
)
[docs]class ExtractorProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for extracting attributes such as gender, age, ethnicity, descriptor from samples.
See `spec_extractor`_.
.. _spec_extractor:
_static/api.html#operation/extractAttributes
Resource: "/{api_version}/extractor"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="attribute")
[docs] async def postProcessingPost(self, response: LunaResponse):
"""Post processing response from the service."""
return self.success(response.statusCode, outputJson=self.convertAttributeUrls(response.json))
[docs]class SDKHandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations.See `spec_sdk`_.
.. _spec_sdk:
_static/api.html#tag/sdk
Resource: "/{api_version}/sdk"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
if (permissions := self.request.credentials.permissions) is not None:
if self.request.method == "POST" and (resource := "sdk") not in permissions.resources:
raise VLException(Error.ResourceForbiddenByToken.format(resource), 403, False)
[docs]class ISOHandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for ISO estimations.See `spec_iso`_.
.. _spec_iso:
_static/api.html#operation/checkISO
Resource: "/{api_version}/iso"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
if (permissions := self.request.credentials.permissions) is not None:
if self.request.method == "POST" and (resource := "iso") not in permissions.resources:
raise VLException(Error.ResourceForbiddenByToken.format(resource), 403, False)
[docs]class EventHandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations. See `spec_emit_events`_.
.. _spec_emit_events:
_static/api.html#operation/generateEvents
Resource: "/{api_version}/handlers/{handler_id}/events"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
permissions = self.request.credentials.permissions
if permissions is None:
return
if not permissions.emitEvents.allowed:
raise VLException(Error.ForbiddenByTokenEventsGeneration, 403, False)
handlerId = self.request.match_info["handlerId"]
if permissions.emitEvents.blackList is not None and handlerId in permissions.emitEvents.blackList:
raise VLException(Error.ForbiddenByTokenEmitEvents, 403, False)
if permissions.emitEvents.whiteList is not None and handlerId not in permissions.emitEvents.whiteList:
raise VLException(Error.ForbiddenByTokenEmitEvents, 403, False)
[docs] async def postProcessingPost(self, response: LunaResponse):
"""Post processing response from the service."""
return self.success(response.statusCode, outputJson=self.convertHandlerEventsUrls(response.json))
[docs]class EventRawHandlerProxyHandler(HandlersServiceBaseHandler):
"""
Proxy handler for save raw event. See `spec_save_event`_.
.. _spec_save_event:
_static/api.html#operation/saveEvent
Resource: "/{api_version}/handlers/{handler_id}/events/raw"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="event")
[docs] async def checkExistingObject(self, container: RawEventObjectContainer, queryValue: Union[str, List[str]]) -> None:
"""
Check existing object by account id.
Args:
container: container class with request client and possible error
queryValue: query value
Raises:
VLException(Error.ObjectNotFound, 404) if object not found
"""
try:
response = await container.client(
**{container.queryName: queryValue},
accountId=self.accountId,
raiseError=True,
)
except LunaApiException as e:
if e.statusCode == 404:
raise VLException(
container.objectNotFoundError.format(queryValue),
e.statusCode,
isCriticalError=False,
)
raise
if container.objectNotFoundError == Error.ListsNotFound:
diffs = set(queryValue) - {list_["list_id"] for list_ in response.json["lists"]}
if diffs:
raise VLException(
container.objectNotFoundError.format(diffs.pop()),
404,
isCriticalError=False,
)
[docs] async def checkObjectsBelongToAccount(self) -> ProxyRequest:
"""
Check that event objects are associated with an account.
Returns:
Proxy request with raw event
"""
inputJson = self.request.json
self.validateJson(inputJson, schemas.EVENT_RAW_SCHEMA)
structObjects = [
RawEventObjectContainer(
globPathToObject="face.face_id",
queryName="faceId",
client=self.luna3Client.lunaFaces.checkFace,
objectNotFoundError=Error.FaceNotFound,
),
RawEventObjectContainer(
globPathToObject="face.lists",
queryName="listIds",
client=self.luna3Client.lunaFaces.getLists,
objectNotFoundError=Error.ListsNotFound,
),
RawEventObjectContainer(
globPathToObject="face_attributes.attribute_id",
queryName="attributeId",
client=self.luna3Client.lunaFaces.checkAttribute,
objectNotFoundError=Error.AttributesNotFound,
),
RawEventObjectContainer(
globPathToObject="detections.*.samples.face.sample_id",
queryName="imageId",
client=partial(
self.luna3Client.lunaFaceSamplesStore.checkImage,
bucketName=self.config.faceSamplesStorage.bucket,
),
objectNotFoundError=Error.SampleNotFound,
),
RawEventObjectContainer(
globPathToObject="detections.*.samples.body.sample_id",
queryName="imageId",
client=partial(
self.luna3Client.lunaBodySamplesStore.checkImage,
bucketName=self.config.bodySamplesStorage.bucket,
),
objectNotFoundError=Error.SampleNotFound,
),
]
tasks = []
for struct in structObjects:
queryValues = filter(
lambda v: v is not None,
getValueByPath(obj=inputJson, glob=struct.globPathToObject),
)
for queryValue in queryValues:
tasks.append(self.checkExistingObject(container=struct, queryValue=queryValue))
await asyncio.gather(*tasks)
return ProxyRequest(
ujson.dumps(inputJson, ensure_ascii=False),
self.prepareHeaders(),
self.prepareQuery(),
)
[docs] async def prepareRequestPost(self) -> ProxyRequest:
"""
Prepare proxy request for method `post`.
Returns:
Proxy request with raw event
"""
if self.request.content_type == "application/json":
return await self.checkObjectsBelongToAccount()
return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse):
"""Post processing response from the service."""
outputJson = deepcopy(response.json)
location = f"/{self.app.ctx.apiVersion}/events/{outputJson['event_id']}"
self.respHeaders["Location"] = outputJson["url"] = location
return self.success(response.statusCode, outputJson=outputJson)
[docs]class VerifiersHandlerProxy(HandlersServiceBaseHandler):
"""
Proxy handler for verifiers. See `spec_verifiers`_.
.. _spec_verifiers:
_static/api.html#tag/verifiers
Resource: "/{api_version}/verifiers"
"""
allowedMethods = ("POST", "GET")
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="verifier")
[docs] async def prepareRequestPost(self) -> ProxyRequest:
return self.prepareRequestCreation()
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class VerifierCountHandlerProxy(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations. See `spec_verifiers`_.
.. _spec_verifiers:
_static/api.html#tag/verifiers
Resource: "/{api_version}/verifiers/count"
"""
allowedMethods = ("GET",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="verifier")
[docs]class VerifierHandlerProxy(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations. See `spec_verifiers`_.
.. _spec_verifiers:
_static/api.html#tag/verifiers
Resource: "/{api_version}/verifiers/{verifier_id}"
"""
allowedMethods = ("GET", "PUT", "HEAD", "DELETE")
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="verifier")
[docs] async def prepareRequestPut(self) -> ProxyRequest:
return self.prepareRequestCreation()
[docs]class HandlerVerifyRawProxy(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations. See `spec_verifiers`_.
.. _spec_verifiers:
_static/api.html#tag/verifiers
Resource: "/{api_version}/verifiers/{verifier_id}/raw"
"""
allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
[docs]class VerifierVerificationsHandlerProxy(HandlersServiceBaseHandler):
"""
Proxy handler for SDK estimations. See `spec_verifiers`_.
.. _spec_verifiers:
_static/api.html#tag/verifiers
Resource: "/{api_version}/verifiers/{verifier_id}/verifications"
"""
allowedMethods = ("POST",)
[docs] async def postProcessingPost(self, response: LunaResponse):
return self.success(response.statusCode, outputJson=self.convertVerifierUrls(response.json))
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""