Source code for luna_api.app.handlers.attributes_proxy_handler
""" Handler for a certain attributes. """
from typing import Union
import ujson
from aiohttp import MultipartWriter
from luna3.common.luna_response import LunaResponse
from sanic.request import File
from sanic.response import HTTPResponse
from app.handlers.base_handler import FacesServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.request import Field
[docs]class AttributesHandler(FacesServiceBaseHandler):
"""
Handler for getting attributes ids. See `spec_attributes`_.
.. _spec_attributes:
_static/api.html#tag/attributes
Resource: "/{api_version}/attributes"
"""
allowedMethods = ("POST", "GET")
def _getMeta(self, metaPart: Union[File, Field]):
"""
Get meta from input create attribute request.
Args:
metaPart: meta part from request
Returns:
meta for faces
Raises:
VLException(Error.BadMultipartInput): if part has incorrect Content type
"""
body = metaPart.body
try:
meta = ujson.loads(body)
except ValueError:
raise VLException(Error.BadMultipartInput.format(f"meta are not json"), 400, False)
self.validateJson(meta, schemas.OBJECT_SCHEMA)
meta["account_id"] = self.accountId
return meta
def _createProxyMultipart(self) -> ProxyRequest:
"""
Create proxy request for a creating attribute request with multipart
Returns:
proxy request
Raises:
VLException(Error.BadMultipartInput): if request contain not valid multipart body
"""
try:
forms = self.request.form
files = self.request.files
metaParts = forms.getlist("meta", []) + files.getlist("meta", [])
xpkParts = forms.getlist("xpk_file", []) + files.getlist("xpk_file", [])
if not xpkParts:
raise VLException(
Error.BadMultipartInput.format(f"Part 'xpk_fie' is required"),
400,
False,
)
if not metaParts:
raise VLException(
Error.BadMultipartInput.format(f"Part 'meta' is required"),
400,
False,
)
with MultipartWriter("form-data", self.luna3Client.lunaFaces.multipartBoundary) as mpWriter:
for part in xpkParts:
xpkPart = mpWriter.append(part.body, {"Content-Type": part.type})
xpkPart.set_content_disposition(
"attachement",
name="xpk_file",
filename="xpk-file",
quote_fields=False,
)
for part in metaParts:
meta = self._getMeta(part)
metaPart = mpWriter.append_json(meta, {"Content-Type": part.type})
metaPart.set_content_disposition("attachement", name="meta", filename="meta", quote_fields=False)
headers = self.prepareHeaders()
headers["Content-Type"] = mpWriter.content_type
return ProxyRequest(mpWriter, headers, self.prepareQuery())
except VLException:
raise
except Exception as e:
raise VLException(Error.BadMultipartInput.format(e), statusCode=400, isCriticalError=False)
[docs] async def prepareRequestPost(self) -> ProxyRequest:
"""
Prepare new attribute.
Returns:
Proxy request wit new attribute
Raises:
VLException(Error.BadMultipartInput): if request has incorrect Content Type
"""
if self.request.content_type == "application/json":
return self.prepareRequestCreation()
elif self.request.content_type == "multipart/form-data":
return self._createProxyMultipart()
else:
raise VLException(Error.BadContentType, statusCode=400, isCriticalError=False)
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
"""
Post processing new attribute.
Args:
response: response from faces
Returns:
response with new attribute
"""
return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class AttributeHandler(FacesServiceBaseHandler):
"""
Handler for work with attributes by id. See `spec_attributes`_.
.. _spec_attributes:
_static/api.html#tag/attributes
Resource: "/{api_version}/attributes/{attributeId}"
"""
allowedMethods = ("HEAD", "GET", "DELETE")
[docs]class AttributeSamplesHandler(FacesServiceBaseHandler):
"""
Handler for work with attributes samples. See `spec_attributes`_.
.. _spec_attributes:
_static/api.html#tag/attributes
Resource: "/{api_version}/attributes/{attributeId}/samples"
"""
allowedMethods = ("GET",)