Source code for luna_image_store.app.handlers.images_handler
""" Handler for work with a images """
import uuid
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseRequestHandler
from app.handlers.helpers import convertImageToCurrentFormat, isImageCorrectContentType
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.web.query_getters import uuidGetter
[docs]class ImagesHandler(BaseRequestHandler):
"""
Handler for work with single image or several images.
Resource: "/{api_version}/buckets/{bucketName}/images"
"""
[docs] async def post(self, bucketName: str) -> HTTPResponse:
"""
Post image to bucket. See `spec_post_images`_.
.. _spec_post_images:
_static/api.html#operation/createImages
Args:
bucketName: bucket name
Returns:
response with image id and location, if status code 201,
Error.BadContentType with status code 400, if bad content type,
Error.ConvertImageError with status code 400, if failed convert image bytes.
"""
accountId = self.getAccountIdFromHeader(required=False)
contentType = self.request.headers.get("Content-Type", None)
if not isImageCorrectContentType(contentType):
return self.error(400, error=Error.BadContentType)
imgBytes = self.request.body
image = convertImageToCurrentFormat(imgBytes, self.app.ctx.serviceConfig.storage.defaultImageExtension)
if image is None:
return self.error(400, error=Error.ConvertImageError.format(self.config.storage.defaultImageExtension))
imageId = str(uuid.uuid4())
await self.storageCtx.saveImage(image, imageId, bucketName, accountId=accountId)
location = f"/{self.apiVersion}/buckets/{bucketName}/images/{imageId}"
response = {"image_id": imageId, "url": location}
return self.success(outputJson=response, statusCode=201, extraHeaders={"Location": location})
[docs] async def delete(self, bucketName: str) -> HTTPResponse:
"""
Delete images from bucket. See `spec_delete_images`_.
.. _spec_delete_images:
_static/api.html#operation/deleteImages
Args:
bucketName: bucket name
Returns:
response with status code 204,
Error.EmptyJson with status code 400, if request contain empty json,
Error.FieldNotInJSON with status code 400, if field "images" not found in json,
Error.ImageCountExceededLimit with status code 400, if image count exceeded limit 1000.
"""
accountId = self.getQueryParam("account_id", uuidGetter)
bucketName = self.request.match_info.get("bucketName")
reqJson = self.request.json
if reqJson is None:
return self.error(400, Error.EmptyJson)
if not ("images" in reqJson):
error = Error.FieldNotInJSON.format("images")
return self.error(400, error)
images = reqJson["images"]
if len(images) > 1000:
return self.error(400, Error.ImageCountExceededLimit)
await self.storageCtx.deleteImages(images, bucketName, accountId=accountId)
return self.success(204)