Source code for luna_api.app.handlers.sample_handler

""" Handler to work with a sample """
from luna3.common.luna_response import LunaResponse
from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseBodySampleProxyHandler, BaseFaceSampleProxyHandler, BaseProxyHandler
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings
from crutches_on_wheels.cow.errors.errors import Error


class _BaseSampleProxyHandler(BaseProxyHandler):
    """
    Base proxy handler for Luna Image Store.
    """

    allowedMethods = {"GET", "HEAD", "DELETE"}

    serviceAddress: ImageStoreAddressSettings
    serviceTimeouts: ImageStoreAddressSettings

    def prepareUrl(self) -> str:
        """
        Prepare url to the LIS service.

        Returns:
            same url with correct api version
        """
        sampleId = self.request.match_info.get("sampleId")
        return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images/{sampleId}"

    async def postProcessingHead(self, response: LunaResponse) -> HTTPResponse:
        """
        Post process LIS response.
        Returns:
            response in api format
        """
        sampleId = self.request.match_info.get("sampleId")
        if response.success:
            return self.success(200, extraHeaders={"Content-Type": response.contentType})
        if response.statusCode == 404:
            return self.error(404, error=Error.ImageNotFoundError.format(sampleId))
        self.logger.error("Unknown error in image-store")
        return self.error(500, Error.UnknownError)


[docs]class FaceSampleProxyHandler(_BaseSampleProxyHandler, BaseFaceSampleProxyHandler): """ Face sample proxy. See `spec face sample <_static/api.html#operation/getFaceSample>`_. Resource: "/{api_version}/sample/faces/{sample_id}" """
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="faceSample")
@property def serviceAddress(self) -> ImageStoreAddressSettings: """ Get a image store service address Returns: a image store service address """ return self.config.faceSamplesStorage @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get a image store service timeouts Returns: a image store service timeouts """ return self.config.faceSamplesStorageTimeouts
[docs]class BodySampleProxyHandler(_BaseSampleProxyHandler, BaseBodySampleProxyHandler): """ Body sample proxy. See `spec body sample <_static/api.html#operation/getBodySample>`_. Resource: "/{api_version}/sample/bodies/{sample_id}" """
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="bodySample")
@property def serviceAddress(self) -> ImageStoreAddressSettings: """ Get a image store service address Returns: a image store service address """ return self.config.bodySamplesStorage @property def serviceTimeouts(self) -> ServiceTimeoutsSettings: """ Get image store service timeouts Returns: image store service timeouts """ return self.config.bodySamplesStorageTimeouts
[docs]class SampleProxyHandlerPreviousVersion(FaceSampleProxyHandler): """ Proxy handler for LIS for previous version Resource: "/{previous_version}/sample/{sample_id}" """
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="faceSample")