Source code for luna_backport3.app.handlers.person_handler

from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseHandler
from app.handlers.schemas import PATCH_PERSON_SCHEMA
from app.mixins import LinkerMixin
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException


[docs]class PersonHandler(LinkerMixin, BaseHandler): """ Person handler """
[docs] async def get(self, personId: str) -> HTTPResponse: """ Get person. See `spec_get_person`_. .. _spec_get_person: _static/api.html#operation/getPerson Returns: dict with person count and list with persons """ person = await self.dbContext.getPerson(accountId=self.accountId, personId=personId) person["id"] = person.pop("person_id") return self.success(200, outputJson=person)
[docs] async def patch(self, personId: str) -> HTTPResponse: """ Patch person. See `spec_patch_person`_. .. _spec_patch_person: _static/api.html#operation/patchPerson Raises: VLException(Error.PersonNotFound, 404, isCriticalError=False) if person not found """ reqJson = self.request.json self.validateJson(reqJson, PATCH_PERSON_SCHEMA, useJsonSchema=False) res = await self.dbContext.updatePerson( accountId=self.accountId, personId=personId, userData=reqJson.get("user_data"), externalId=reqJson.get("external_id"), ) if not res: raise VLException(Error.PersonNotFound.format(personId), 404, False) return self.success(204)
[docs] async def delete(self, personId: str) -> HTTPResponse: """ Delete person. See `spec_delete_person`_. .. _spec_delete_person: _static/api.html#operation/deletePerson Raises: RuntimeError if person removing failed """ personDescriptors = await self.dbContext.getPersonDescriptors(personId=personId, accountId=self.accountId) # person existence check here ^^^ if personDescriptors: personLists = await self.dbContext.getPersonLists(personId=personId) if personLists: for list_ in personLists: await self.linkDescriptors(listId=list_, descriptorIds=personDescriptors, action="detach") res = await self.dbContext.deletePerson(accountId=self.accountId, personId=personId) if not res: raise RuntimeError(f"Person {personId} was not removed") return self.success(204)