Source code for luna_backport3.app.handlers.linker_handler

# -*- coding: utf-8 -*-
"""
Linker Handler

Module realize list linker handler.
"""

from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseHandler
from app.handlers.schemas import LINKER_SCHEMA
from app.mixins import LinkerMixin, ListMixin
from classes.enums import ListType
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException


[docs]class LinkerHandler(ListMixin, LinkerMixin, BaseHandler): """ Handler for resource `/storage/linker`. """
[docs] async def patch(self) -> HTTPResponse: """ Attach or detach persons or descriptors from a list, see `spec patch linker`_. .. _`spec patch linker`: _static/api.html#operation/patchLinker Raises: VLException(Error.ListNotFound, 400, isCriticalError=False) if list not found VLException(Error.WrongListType, 400, isCriticalError=False) if wrong list type VLException(Error.PersonNotFound, 400, isCriticalError=False) if person not found VLException(Error.Backport3DescriptorNotFound, 400, isCriticalError=False) if descriptor not found """ inputJson = self.request.json self.validateJson(inputJson, LINKER_SCHEMA, useJsonSchema=False) listId = inputJson["list_id"] action = inputJson["action"] personIds = inputJson.get("person_ids", []) faceIds = inputJson.get("descriptor_ids", []) try: listType = await self.getListType(listId=listId) except VLException as e: if e.error == Error.ListNotFound: e.statusCode = 400 raise try: if listType == ListType.persons: if faceIds: # PLATFORM 3 bug raise VLException( Error.WrongListType.format("descriptors", "persons", listId), 400, isCriticalError=False ) if personIds: await self.linkPersons(listId=listId, personIds=personIds, action=action) else: faceIdsToAttach = faceIds[:] if personIds: # PLATFORM 3 feature personDescrs = await self.getPersonsDescriptors(personIds=personIds) faceIdsToAttach.extend(personDescrs) if faceIdsToAttach: await self.linkDescriptors(listId=listId, descriptorIds=faceIdsToAttach, action=action) except VLException as e: if e.error in (Error.PersonNotFound, Error.Backport3DescriptorNotFound): e.statusCode = 400 raise return self.success(204)