Source code for luna_api.app.handlers.images_proxy_handler

""" Handler for a certain images. """
from copy import deepcopy
from http.client import HTTPResponse

from luna3.common.luna_response import LunaResponse

from app.handlers.available_content_types import IMG_CONTENT_TYPES
from app.handlers.base_handler import BaseProxyHandler, ProxyRequest
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException


class _BaseImagesProxyHandler(BaseProxyHandler):
    """
    Base images proxy.
    """

    def checkTokenPermissions(self) -> None:
        """
        Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
        """
        self.checkTokenPermissionsDefault("image")

    @property
    def serviceAddress(self) -> ImageStoreAddressSettings:
        """
        Get a image store service address
        Returns:
            image store service address
        """
        return self.config.imageOriginStorage

    @property
    def serviceTimeouts(self) -> ServiceTimeoutsSettings:
        """
        Get a image store service timeouts
        Returns:
            image store service timeouts
        """
        return self.config.imageOriginStorageTimeouts


[docs]class ImagesHandler(_BaseImagesProxyHandler): """ Handler for create new images. See `spec_images`_. .. _spec_images: _static/api.html#tag/images Resource: "/{api_version}/images" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Prepare proxy request for method `post` Returns: proxy request """ contentType = self.request.headers.get("Content-Type") if contentType not in IMG_CONTENT_TYPES: raise VLException(Error.BadContentType, 400, isCriticalError=False) return ProxyRequest(self.request.body, self.prepareHeaders(), self.prepareQuery())
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: """ Default post processing response from the service Args: response: response Returns: response in api format """ outputJson = deepcopy(response.json) imageId = response.json["image_id"] self.respHeaders["Location"] = outputJson["url"] = f"/{self.app.ctx.apiVersion}/images/{imageId}" return self.success(201, outputJson=outputJson)
[docs] def prepareUrl(self) -> str: """ Prepare url to the LIS service. Returns: A str, url """ return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images"
[docs]class ImageHandler(_BaseImagesProxyHandler): """ Handler for work with an image. See `spec_images`_. .. _spec_images: _static/api.html#tag/images Resource: "/{api_version}/images/{imageId}" """ allowedMethods = ("GET", "DELETE", "HEAD")
[docs] def prepareUrl(self) -> str: """ Prepare url to the LIS service. Returns: same url with correct api version """ imageId = self.request.match_info.get("imageId") return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images/{imageId}"