Source code for luna_api.app.handlers.objects_proxy_handler
""" Handler for a certain objects. """
from copy import deepcopy
from http.client import HTTPResponse
from typing import AsyncIterable
from luna3.common.luna_response import LunaResponse
from sanic.views import stream
from app.handlers.base_handler import BaseProxyHandler, ProxyRequest
from configs.configs.configs.settings.classes import ImageStoreAddressSettings, ServiceTimeoutsSettings
class _BaseObjectsProxyHandler(BaseProxyHandler):
"""
Base objects proxy.
"""
@property
def serviceAddress(self) -> ImageStoreAddressSettings:
"""
Get a image store service address
Returns:
image store service address
"""
return self.config.objectsOriginStorage
@property
def serviceTimeouts(self) -> ServiceTimeoutsSettings:
"""
Get a image store service timeouts
Returns:
image store service timeouts
"""
return self.config.objectsOriginStorageTimeouts
[docs]class ObjectsHandler(_BaseObjectsProxyHandler):
"""
Handler for create new objects. See `spec_objects`_.
.. _spec_objects:
_static/api.html#tag/objects
Resource: "/{api_version}/objects"
"""
allowedMethods = ("POST",)
[docs] @stream
async def post(self, *args, **kwargs) -> HTTPResponse:
"""
Streaming 'post' method
Returns:
proxy response
"""
return await self.makeRequestToService()
[docs] async def options(self, *args, **kwargs) -> HTTPResponse:
"""Options method"""
# fix sanic error log `body not consumed`
# sanic marks all view methods as `stream` and need manual load request body.
# https://github.com/sanic-org/sanic/issues/2473
await self.request.receive_body()
return await super().options(*args, **kwargs)
[docs] async def requestBodyStream(self) -> AsyncIterable[bytes]:
"""Read request body chunks by stream."""
while True:
chunk = await self.request.stream.read()
if chunk is None:
break
yield chunk
[docs] async def prepareRequestPost(self) -> ProxyRequest:
"""
Prepare proxy request for method `post`
Returns:
proxy request
"""
return ProxyRequest(self.requestBodyStream(), self.prepareHeaders(), self.prepareQuery())
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
"""
Default post processing response from the service
Args:
response: response
Returns:
response in api format
"""
outputJson = deepcopy(response.json)
objectId = response.json["object_id"]
self.respHeaders["Location"] = outputJson["url"] = f"/{self.app.ctx.apiVersion}/objects/{objectId}"
return self.success(201, outputJson=outputJson)
[docs] def prepareUrl(self) -> str:
"""
Prepare url to the LIS service.
Returns:
A str, url
"""
return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/objects"
[docs]class ObjectHandler(_BaseObjectsProxyHandler):
"""
Handler for work with an object. See `spec_objects`_.
.. _spec_objects:
_static/api.html#tag/objects
Resource: "/{api_version}/objects/{objectId}"
"""
allowedMethods = ("GET", "DELETE", "HEAD")
[docs] def prepareUrl(self) -> str:
"""
Prepare url to the LIS service.
Returns:
same url with correct api version
"""
objectId = self.request.match_info.get("objectId")
return f"{self.serviceUrl}/buckets/{self.serviceAddress.bucket}/objects/{objectId}"