Source code for luna_image_store.app.handlers.images_handler

""" Handler for work with a images """
import uuid

from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseRequestHandler
from app.handlers.helpers import convertImageToCurrentFormat, isImageCorrectContentType
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.web.query_getters import uuidGetter


[docs]class ImagesHandler(BaseRequestHandler): """ Handler for work with single image or several images. Resource: "/{api_version}/buckets/{bucketName}/images" """
[docs] async def post(self, bucketName: str) -> HTTPResponse: """ Post image to bucket. See `spec_post_images`_. .. _spec_post_images: _static/api.html#operation/createImages Args: bucketName: bucket name Returns: response with image id and location, if status code 201, Error.BadContentType with status code 400, if bad content type, Error.ConvertImageError with status code 400, if failed convert image bytes. """ accountId = self.getAccountIdFromHeader(required=False) contentType = self.request.headers.get("Content-Type", None) if not isImageCorrectContentType(contentType): return self.error(400, error=Error.BadContentType) imgBytes = self.request.body image = convertImageToCurrentFormat(imgBytes, self.app.ctx.serviceConfig.storage.defaultImageExtension) if image is None: return self.error(400, error=Error.ConvertImageError.format(self.config.storage.defaultImageExtension)) imageId = str(uuid.uuid4()) await self.storageCtx.saveImage(image, imageId, bucketName, accountId=accountId) location = f"/{self.apiVersion}/buckets/{bucketName}/images/{imageId}" response = {"image_id": imageId, "url": location} return self.success(outputJson=response, statusCode=201, extraHeaders={"Location": location})
[docs] async def delete(self, bucketName: str) -> HTTPResponse: """ Delete images from bucket. See `spec_delete_images`_. .. _spec_delete_images: _static/api.html#operation/deleteImages Args: bucketName: bucket name Returns: response with status code 204, Error.EmptyJson with status code 400, if request contain empty json, Error.FieldNotInJSON with status code 400, if field "images" not found in json, Error.ImageCountExceededLimit with status code 400, if image count exceeded limit 1000. """ accountId = self.getQueryParam("account_id", uuidGetter) bucketName = self.request.match_info.get("bucketName") reqJson = self.request.json if reqJson is None: return self.error(400, Error.EmptyJson) if not ("images" in reqJson): error = Error.FieldNotInJSON.format("images") return self.error(400, error) images = reqJson["images"] if len(images) > 1000: return self.error(400, Error.ImageCountExceededLimit) await self.storageCtx.deleteImages(images, bucketName, accountId=accountId) return self.success(204)