Source code for luna_backport3.app.handlers.person_handler
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from app.handlers.schemas import PATCH_PERSON_SCHEMA
from app.mixins import LinkerMixin
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class PersonHandler(LinkerMixin, BaseHandler):
"""
Person handler
"""
[docs] async def get(self, personId: str) -> HTTPResponse:
"""
Get person. See `spec_get_person`_.
.. _spec_get_person:
_static/api.html#operation/getPerson
Returns:
dict with person count and list with persons
"""
person = await self.dbContext.getPerson(accountId=self.accountId, personId=personId)
person["id"] = person.pop("person_id")
return self.success(200, outputJson=person)
[docs] async def patch(self, personId: str) -> HTTPResponse:
"""
Patch person. See `spec_patch_person`_.
.. _spec_patch_person:
_static/api.html#operation/patchPerson
Raises:
VLException(Error.PersonNotFound, 404, isCriticalError=False) if person not found
"""
reqJson = self.request.json
self.validateJson(reqJson, PATCH_PERSON_SCHEMA, useJsonSchema=False)
res = await self.dbContext.updatePerson(
accountId=self.accountId,
personId=personId,
userData=reqJson.get("user_data"),
externalId=reqJson.get("external_id"),
)
if not res:
raise VLException(Error.PersonNotFound.format(personId), 404, False)
return self.success(204)
[docs] async def delete(self, personId: str) -> HTTPResponse:
"""
Delete person. See `spec_delete_person`_.
.. _spec_delete_person:
_static/api.html#operation/deletePerson
Raises:
RuntimeError if person removing failed
"""
personDescriptors = await self.dbContext.getPersonDescriptors(personId=personId, accountId=self.accountId)
# person existence check here ^^^
if personDescriptors:
personLists = await self.dbContext.getPersonLists(personId=personId)
if personLists:
for list_ in personLists:
await self.linkDescriptors(listId=list_, descriptorIds=personDescriptors, action="detach")
res = await self.dbContext.deletePerson(accountId=self.accountId, personId=personId)
if not res:
raise RuntimeError(f"Person {personId} was not removed")
return self.success(204)