Source code for luna_backport3.app.handlers.person_handler
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from app.handlers.schemas import PATCH_PERSON_SCHEMA
from app.mixins import LinkerMixin
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
[docs]class PersonHandler(LinkerMixin, BaseHandler):
    """
    Person handler
    """
[docs]    async def get(self, personId: str) -> HTTPResponse:
        """
        Get person. See `spec_get_person`_.
        .. _spec_get_person:
            _static/api.html#operation/getPerson
        Returns:
            dict with person count and list with persons
        """
        person = await self.dbContext.getPerson(accountId=self.accountId, personId=personId)
        person["id"] = person.pop("person_id")
        return self.success(200, outputJson=person) 
[docs]    async def patch(self, personId: str) -> HTTPResponse:
        """
        Patch person. See `spec_patch_person`_.
        .. _spec_patch_person:
            _static/api.html#operation/patchPerson
        Raises:
            VLException(Error.PersonNotFound, 404, isCriticalError=False) if person not found
        """
        reqJson = self.request.json
        self.validateJson(reqJson, PATCH_PERSON_SCHEMA, useJsonSchema=False)
        res = await self.dbContext.updatePerson(
            accountId=self.accountId,
            personId=personId,
            userData=reqJson.get("user_data"),
            externalId=reqJson.get("external_id"),
        )
        if not res:
            raise VLException(Error.PersonNotFound.format(personId), 404, False)
        return self.success(204) 
[docs]    async def delete(self, personId: str) -> HTTPResponse:
        """
        Delete person. See `spec_delete_person`_.
        .. _spec_delete_person:
            _static/api.html#operation/deletePerson
        Raises:
            RuntimeError if person removing failed
        """
        personDescriptors = await self.dbContext.getPersonDescriptors(personId=personId, accountId=self.accountId)
        # person existence check here ^^^
        if personDescriptors:
            personLists = await self.dbContext.getPersonLists(personId=personId)
            if personLists:
                for list_ in personLists:
                    await self.linkDescriptors(listId=list_, descriptorIds=personDescriptors, action="detach")
        res = await self.dbContext.deletePerson(accountId=self.accountId, personId=personId)
        if not res:
            raise RuntimeError(f"Person {personId} was not removed")
        return self.success(204)