Source code for luna_api.app.handlers.attributes_proxy_handler
""" Handler for a certain attributes. """
from typing import Union
import ujson
from aiohttp import MultipartWriter
from luna3.common.luna_response import LunaResponse
from sanic.request import File
from sanic.response import HTTPResponse
from app.handlers.base_handler import FacesServiceBaseHandler, ProxyRequest
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.request import Field
[docs]class AttributesHandler(FacesServiceBaseHandler):
    """
    Handler for getting attributes ids. See `spec_attributes`_.
        .. _spec_attributes:
            _static/api.html#tag/attributes
    Resource: "/{api_version}/attributes"
    """
    allowedMethods = ("POST", "GET")
    def _getMeta(self, metaPart: Union[File, Field]):
        """
        Get meta from input create attribute request.
        Args:
            metaPart: meta part from request
        Returns:
            meta for faces
        Raises:
            VLException(Error.BadMultipartInput): if part has incorrect Content type
        """
        body = metaPart.body
        try:
            meta = ujson.loads(body)
        except ValueError:
            raise VLException(Error.BadMultipartInput.format(f"meta are not json"), 400, False)
        self.validateJson(meta, schemas.OBJECT_SCHEMA)
        meta["account_id"] = self.accountId
        return meta
    def _createProxyMultipart(self) -> ProxyRequest:
        """
        Create proxy request for a creating attribute  request with  multipart
        Returns:
            proxy request
        Raises:
            VLException(Error.BadMultipartInput): if request contain not valid multipart body
        """
        try:
            forms = self.request.form
            files = self.request.files
            metaParts = forms.getlist("meta", []) + files.getlist("meta", [])
            xpkParts = forms.getlist("xpk_file", []) + files.getlist("xpk_file", [])
            if not xpkParts:
                raise VLException(
                    Error.BadMultipartInput.format(f"Part 'xpk_fie' is required"),
                    400,
                    False,
                )
            if not metaParts:
                raise VLException(
                    Error.BadMultipartInput.format(f"Part 'meta' is required"),
                    400,
                    False,
                )
            with MultipartWriter("form-data", self.luna3Client.lunaFaces.multipartBoundary) as mpWriter:
                for part in xpkParts:
                    xpkPart = mpWriter.append(part.body, {"Content-Type": part.type})
                    xpkPart.set_content_disposition(
                        "attachement",
                        name="xpk_file",
                        filename="xpk-file",
                        quote_fields=False,
                    )
                for part in metaParts:
                    meta = self._getMeta(part)
                    metaPart = mpWriter.append_json(meta, {"Content-Type": part.type})
                    metaPart.set_content_disposition("attachement", name="meta", filename="meta", quote_fields=False)
            headers = self.prepareHeaders()
            headers["Content-Type"] = mpWriter.content_type
            return ProxyRequest(mpWriter, headers, self.prepareQuery())
        except VLException:
            raise
        except Exception as e:
            raise VLException(Error.BadMultipartInput.format(e), statusCode=400, isCriticalError=False)
[docs]    async def prepareRequestPost(self) -> ProxyRequest:
        """
        Prepare new attribute.
        Returns:
            Proxy request wit new attribute
        Raises:
            VLException(Error.BadMultipartInput): if request  has incorrect Content Type
        """
        if self.request.content_type == "application/json":
            return self.prepareRequestCreation()
        elif self.request.content_type == "multipart/form-data":
            return self._createProxyMultipart()
        else:
            raise VLException(Error.BadContentType, statusCode=400, isCriticalError=False) 
[docs]    async def postProcessingPost(self, response: LunaResponse) -> HTTPResponse:
        """
        Post processing new attribute.
        Args:
            response: response from faces
        Returns:
            response with new attribute
        """
        return self.success(response.statusCode, outputJson=self.convertIncomingUrls(response))  
[docs]class AttributeHandler(FacesServiceBaseHandler):
    """
    Handler for work with attributes by id. See `spec_attributes`_.
        .. _spec_attributes:
            _static/api.html#tag/attributes
    Resource: "/{api_version}/attributes/{attributeId}"
    """
    allowedMethods = ("HEAD", "GET", "DELETE") 
[docs]class AttributeSamplesHandler(FacesServiceBaseHandler):
    """
    Handler for work with attributes samples. See `spec_attributes`_.
        .. _spec_attributes:
            _static/api.html#tag/attributes
    Resource: "/{api_version}/attributes/{attributeId}/samples"
    """
    allowedMethods = ("GET",)