Source code for luna_api.app.handlers.images_proxy_handler
""" Handler for a certain images. """
from copy import deepcopy
from http.client import HTTPResponse
from luna3.common.luna_response import LunaResponse
from app.handlers.available_content_types import IMG_CONTENT_TYPES
from app.handlers.base_handler import BaseProxyHandler, ProxyRequest
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
class _BaseImagesProxyHandler(BaseProxyHandler):
"""
Base images proxy.
"""
def checkTokenPermissions(self) -> None:
"""
Description see :func:`~BaseRequestHandler.checkTokenPermissions`.
"""
self.checkTokenPermissionsDefault("image")
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
image store service address
"""
return self.config.imageOriginStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a image store service timeouts
Returns:
image store service timeouts
"""
return self.config.imageOriginStorageTimeouts
[docs]class ImagesHandler(_BaseImagesProxyHandler):
"""
Handler for create new images. See `spec_images`_.
.. _spec_images:
_static/api.html#tag/images
Resource: "/{api_version}/images"
"""
allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest:
"""
Prepare proxy request for method `post`
Returns:
proxy request
"""
contentType = self.request.headers.get("Content-Type")
if contentType not in IMG_CONTENT_TYPES:
raise VLException(Error.BadContentType, 400, isCriticalError=False)
return ProxyRequest(self.request.body, self.prepareHeaders(), self.prepareQuery())
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
"""
Default post processing response from the service
Args:
response: response
Returns:
response in api format
"""
outputJson = deepcopy(response.json)
imageId = response.json["image_id"]
self.respHeaders["Location"] = outputJson["url"] = f"/{self.app.ctx.apiVersion}/images/{imageId}"
return self.success(201, outputJson=outputJson)
[docs] def prepareUrl(self) -> str:
"""
Prepare url to the LIS service.
Returns:
A str, url
"""
return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images"
[docs]class ImageHandler(_BaseImagesProxyHandler):
"""
Handler for work with an image. See `spec_images`_.
.. _spec_images:
_static/api.html#tag/images
Resource: "/{api_version}/images/{imageId}"
"""
allowedMethods = ("GET", "DELETE", "HEAD")
[docs] def prepareUrl(self) -> str:
"""
Prepare url to the LIS service.
Returns:
same url with correct api version
"""
imageId = self.request.match_info.get("imageId")
return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/images/{imageId}"