Source code for luna_faces.app.handlers.attribute_handler

"""
Module realized a handler for requests related to a single temporary attributes.
"""
from sanic.response import HTTPResponse

from app.handlers.base_handler import AttributesBaseRequestHandler
from app.handlers.query_validators import attributeTargetsGetter, ttlGetter
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.maps.vl_maps import TEMPORARY_ATTRIBUTE_TARGET_MAP
from crutches_on_wheels.cow.web.query_getters import boolFrom01Getter, uuidGetter

# Default temporary attributes targets
_DEFAULT_ATTRIBUTES_TARGETS = list(TEMPORARY_ATTRIBUTE_TARGET_MAP)


[docs]class AttributeHandler(AttributesBaseRequestHandler): """ Handler for work with a single attributes. """
[docs] async def head(self, attributeId: str) -> HTTPResponse: """ Check temporary attribute. See `spec_check_attribute`_. .. _spec_check_attribute: _static/api.html#operation/checkAttribute Returns: response with status code 200 if attribute is exists otherwise status code 404 """ accountId = self.getQueryParam("account_id", uuidGetter) isAttributesExists = await self.attributesContext.check(accountId=accountId, attributeId=attributeId) if not isAttributesExists: return self.error(404, error=Error.AttributesNotFound.format(attributeId)) return self.success(200, extraHeaders={"Content-Type": "application/json"})
[docs] async def get(self, attributeId: str) -> HTTPResponse: """ Get temporary attribute. See `spec_get_attribute`_. .. _spec_get_attribute: _static/api.html#operation/getAttribute Returns: response json with attribute """ accountId = self.getQueryParam("account_id", uuidGetter) targets = self.getQueryParam( "targets", attributeTargetsGetter, default=_DEFAULT_ATTRIBUTES_TARGETS, ) descriptorVersion = self.getQueryParam( "descriptor_version", int, default=self.config.defaultFaceDescriptorVersion ) attribute = await self.attributesContext.get(attributeId=attributeId, accountId=accountId) return self.success( outputJson=self.makeOutputAttribute(attribute, targets=targets, descriptorVersion=descriptorVersion) )
[docs] async def put(self, attributeId: str) -> HTTPResponse: """ Put temporary attribute. See `spec_put_attribute`_. .. _spec_put_attribute: _static/api.html#operation/putAttribute Returns: response with status code 204 """ attribute = await self.getInputAttribute(attributeId) ttl = self.getQueryParam( "ttl", lambda value: ttlGetter(value, self.config.attributesStorageSettings.maxTTL), default=self.config.attributesStorageSettings.defaultTTL, ) force = self.getQueryParam("force", boolFrom01Getter, default=False) await self.attributesContext.put(attribute, ttl, replace=force) self.respHeaders["Location"] = f"/{self.app.ctx.apiVersion}/attributes/{attribute.attributeId}" return self.success(204)
[docs] async def delete(self, attributeId: str) -> HTTPResponse: """ Delete temporary attribute. See `spec_delete_attribute`_. .. _spec_delete_attribute: _static/api.html#operation/deleteAttribute Returns: response with status code 204 """ accountId = self.getQueryParam("account_id", uuidGetter) await self.attributesContext.delete(attributeId, accountId=accountId) return self.success(204)