Source code for luna_api.app.handlers.attributes_proxy_handler

""" Handler for a certain attributes. """
from typing import Union

import ujson
from aiohttp import MultipartWriter
from luna3.common.luna_response import LunaResponse
from sanic.request import File
from sanic.response import HTTPResponse

from app.handlers.base_handler import FacesServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.request import Field


[docs]class AttributesHandler(FacesServiceBaseHandler): """ Handler for getting attributes ids. See `spec_attributes`_. .. _spec_attributes: _static/api.html#tag/attributes Resource: "/{api_version}/attributes" """ allowedMethods = ("POST", "GET") def _getMeta(self, metaPart: Union[File, Field]): """ Get meta from input create attribute request. Args: metaPart: meta part from request Returns: meta for faces Raises: VLException(Error.BadMultipartInput): if part has incorrect Content type """ body = metaPart.body try: meta = ujson.loads(body) except ValueError: raise VLException(Error.BadMultipartInput.format(f"meta are not json"), 400, False) self.validateJson(meta, schemas.OBJECT_SCHEMA) meta["account_id"] = self.accountId return meta def _createProxyMultipart(self) -> ProxyRequest: """ Create proxy request for a creating attribute request with multipart Returns: proxy request Raises: VLException(Error.BadMultipartInput): if request contain not valid multipart body """ try: forms = self.request.form files = self.request.files metaParts = forms.getlist("meta", []) + files.getlist("meta", []) xpkParts = forms.getlist("xpk_file", []) + files.getlist("xpk_file", []) if not xpkParts: raise VLException( Error.BadMultipartInput.format(f"Part 'xpk_fie' is required"), 400, False, ) if not metaParts: raise VLException( Error.BadMultipartInput.format(f"Part 'meta' is required"), 400, False, ) with MultipartWriter("form-data", self.luna3Client.lunaFaces.multipartBoundary) as mpWriter: for part in xpkParts: xpkPart = mpWriter.append(part.body, {"Content-Type": part.type}) xpkPart.set_content_disposition( "attachement", name="xpk_file", filename="xpk-file", quote_fields=False, ) for part in metaParts: meta = self._getMeta(part) metaPart = mpWriter.append_json(meta, {"Content-Type": part.type}) metaPart.set_content_disposition("attachement", name="meta", filename="meta", quote_fields=False) headers = self.prepareHeaders() headers["Content-Type"] = mpWriter.content_type return ProxyRequest(mpWriter, headers, self.prepareQuery()) except VLException: raise except Exception as e: raise VLException(Error.BadMultipartInput.format(e), statusCode=400, isCriticalError=False)
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Prepare new attribute. Returns: Proxy request wit new attribute Raises: VLException(Error.BadMultipartInput): if request has incorrect Content Type """ if self.request.content_type == "application/json": return self.prepareRequestCreation() elif self.request.content_type == "multipart/form-data": return self._createProxyMultipart() else: raise VLException(Error.BadContentType, statusCode=400, isCriticalError=False)
[docs] async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse: """ Post processing new attribute. Args: response: response from faces Returns: response with new attribute """ return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))
[docs]class AttributeHandler(FacesServiceBaseHandler): """ Handler for work with attributes by id. See `spec_attributes`_. .. _spec_attributes: _static/api.html#tag/attributes Resource: "/{api_version}/attributes/{attributeId}" """ allowedMethods = ("HEAD", "GET", "DELETE")
[docs]class AttributeSamplesHandler(FacesServiceBaseHandler): """ Handler for work with attributes samples. See `spec_attributes`_. .. _spec_attributes: _static/api.html#tag/attributes Resource: "/{api_version}/attributes/{attributeId}/samples" """ allowedMethods = ("GET",)