Source code for luna_api.app.handlers.objects_proxy_handler

""" Handler for a certain objects. """
from copy import deepcopy
from http.client import HTTPResponse
from typing import AsyncIterable

from luna3.common.luna_response import LunaResponse
from sanic.views import stream

from app.handlers.base_handler import BaseProxyHandler, ProxyRequest
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings


class _BaseObjectsProxyHandler(BaseProxyHandler):
    """
    Base objects proxy.
    """

    @property
    def serviceAddress(self) -> ImageStoreAddressSettings:
        """
        Get a image store service address
        Returns:
            image store service address
        """
        return self.config.objectsOriginStorage

    @property
    def serviceTimeouts(self) -> ServiceTimeoutsSettings:
        """
        Get a image store service timeouts
        Returns:
            image store service timeouts
        """
        return self.config.objectsOriginStorageTimeouts


[docs]class ObjectsHandler(_BaseObjectsProxyHandler): """ Handler for create new objects. See `spec_objects`_. .. _spec_objects: _static/api.html#tag/objects Resource: "/{api_version}/objects" """ allowedMethods = ("POST",)
[docs] @stream async def post(self, *args, **kwargs) -> HTTPResponse: """ Streaming 'post' method Returns: proxy response """ return await self.makeRequestToService()
[docs] async def options(self, *args, **kwargs) -> HTTPResponse: """Options method""" # fix sanic error log `body not consumed` # sanic marks all view methods as `stream` and need manual load request body. # https://github.com/sanic-org/sanic/issues/2473 await self.request.receive_body() return await super().options(*args, **kwargs)
[docs] async def requestBodyStream(self) -> AsyncIterable[bytes]: """Read request body chunks by stream.""" while True: chunk = await self.request.stream.read() if chunk is None: break yield chunk
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Prepare proxy request for method `post` Returns: proxy request """ return ProxyRequest(self.requestBodyStream(), self.prepareHeaders(), self.prepareQuery())
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: """ Default post processing response from the service Args: response: response Returns: response in api format """ outputJson = deepcopy(response.json) objectId = response.json["object_id"] self.respHeaders["Location"] = outputJson["url"] = f"/{self.app.ctx.apiVersion}/objects/{objectId}" return self.success(201, outputJson=outputJson)
[docs] def prepareUrl(self) -> str: """ Prepare url to the LIS service. Returns: A str, url """ return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/objects"
[docs]class ObjectHandler(_BaseObjectsProxyHandler): """ Handler for work with an object. See `spec_objects`_. .. _spec_objects: _static/api.html#tag/objects Resource: "/{api_version}/objects/{objectId}" """ allowedMethods = ("GET", "DELETE", "HEAD")
[docs] def prepareUrl(self) -> str: """ Prepare url to the LIS service. Returns: same url with correct api version """ objectId = self.request.match_info.get("objectId") return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/objects/{objectId}"