Source code for luna_api.app.handlers.sample_handler
""" Handler to work with a sample """
from luna3.common.luna_response import LunaResponse
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseBodySampleProxyHandler, BaseFaceSampleProxyHandler, BaseProxyHandler
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings
from crutches_on_wheels.cow.errors.errors import Error
class _BaseSampleProxyHandler(BaseProxyHandler):
"""
Base proxy handler for Luna Image Store.
"""
allowedMethods = {"GET", "HEAD", "DELETE"}
serviceAddress: ImageStoreAddressSettings
serviceTimeouts: ImageStoreAddressSettings
def prepareUrl(self) -> str:
"""
Prepare url to the LIS service.
Returns:
same url with correct api version
"""
sampleId = self.request.match_info.get("sampleId")
return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images/{sampleId}"
async def postProcessingHead(self, response: LunaResponse) -> HTTPResponse:
"""
Post process LIS response.
Returns:
response in api format
"""
sampleId = self.request.match_info.get("sampleId")
if response.success:
return self.success(200, extraHeaders={"Content-Type": response.contentType})
if response.statusCode == 404:
return self.error(404, error=Error.ImageNotFoundError.format(sampleId))
self.logger.error("Unknown error in image-store")
return self.error(500, Error.UnknownError)
[docs]class FaceSampleProxyHandler(_BaseSampleProxyHandler, BaseFaceSampleProxyHandler):
"""
Face sample proxy. See `spec face sample <_static/api.html#operation/getFaceSample>`_.
Resource: "/{api_version}/sample/faces/{sample_id}"
"""
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="faceSample")
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
a image store service address
"""
return self.config.faceSamplesStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a image store service timeouts
Returns:
a image store service timeouts
"""
return self.config.faceSamplesStorageTimeouts
[docs]class BodySampleProxyHandler(_BaseSampleProxyHandler, BaseBodySampleProxyHandler):
"""
Body sample proxy. See `spec body sample <_static/api.html#operation/getBodySample>`_.
Resource: "/{api_version}/sample/bodies/{sample_id}"
"""
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="bodySample")
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
a image store service address
"""
return self.config.bodySamplesStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get image store service timeouts
Returns:
image store service timeouts
"""
return self.config.bodySamplesStorageTimeouts
[docs]class SampleProxyHandlerPreviousVersion(FaceSampleProxyHandler):
"""
Proxy handler for LIS for previous version
Resource: "/{previous_version}/sample/{sample_id}"
"""
[docs] def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault(objectName="faceSample")