Source code for luna_faces.app.handlers.lists_handler

from uuid import uuid4

from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseRequestHandler
from app.handlers.helpers import SearchListsFilters
from app.handlers.query_validators import validateIntAsBool
from app.handlers.schemas import CREATE_LIST_SCHEMAS, DELETE_LISTS_SCHEMA, GET_LISTS_WITH_KEYS_SCHEMA
from app.version import VERSION
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.web.query_getters import int01Getter, uuidGetter


[docs]class ListsHandler(BaseRequestHandler): """ Handler for work with lists Resource: "/{api_version}/lists" """
[docs] async def get(self) -> HTTPResponse: """ Get lists by filters. See `spec_get_lists`_. .. _spec_get_lists: _static/api.html#operation/getLists Returns: response json with lists """ page, pageSize = self.getPagination(maxPageSize=10_000) filters = SearchListsFilters().loadFromQuery(self.getQueryParam) lists = await self.facesContext.getLists(filters=filters, page=page, pageSize=pageSize) return self.success(200, outputJson={"lists": lists})
[docs] async def post(self) -> HTTPResponse: """ Create new list. See `spec_create_list`_. .. _spec_create_list: _static/api.html#operation/createList Returns: response json with new list id """ data = self.request.json listId = self.getQueryParam("list_id", uuidGetter, default=str(uuid4())) self.validateJson(data, CREATE_LIST_SCHEMAS, False) data["account_id"] = self.getAccountIdFromHeader() await self.facesContext.createList(**data, listId=listId) location = f"/{VERSION['Version']['api']}/lists/{listId}" self.respHeaders["Location"] = location return self.success(201, outputJson={"list_id": listId, "url": location})
[docs] async def delete(self) -> HTTPResponse: """ Delete several lists. See `spec_delete_lists`_. .. _spec_delete_lists: _static/api.html#operation/deleteLists Returns: response with status code 202 or 204 """ accountId = self.getQueryParam("account_id", uuidGetter) removeWithFaces = self.getQueryParam("with_faces", validateIntAsBool, default=0) data = self.request.json self.validateJson(data, DELETE_LISTS_SCHEMA, False) listIds = set(data["list_ids"]) nonExistListId = await self.facesContext.getNonexistentListId(listIds, accountId) if nonExistListId is not None: return self.error(400, error=Error.ListsNotFound.format(nonExistListId)) await self.facesContext.deleteLists(listIds, withFaces=removeWithFaces) if removeWithFaces: return self.success(202) return self.success(204)
[docs]class ListsLinkKeysHandler(BaseRequestHandler): """ Handler for getting link keys """
[docs] async def post(self) -> HTTPResponse: """ Get lists of faces with last link id and last unlink id. See `spec_get_link_keys`_. .. _spec_get_link_keys: _static/api.html#operation/getListsLinkkeys Returns: response with status code 200 """ data = self.request.json self.validateJson(data, GET_LISTS_WITH_KEYS_SCHEMA, False) parity = self.getQueryParam("use_parity", int01Getter, default=0) isMVUsing = False if self.config.useMaterialViews: isMVUsing = bool(parity) == bool(self.config.databaseNumber) if not isMVUsing: self.logger.warning("Request does not use material views in database.") listsInRequest = set(data["list_ids"]) self.addDataForMonitoring(tags={"use_parity": parity, "list_count": len(listsInRequest)}) if self.config.useMaterialViews and isMVUsing: result = await self.facesContext.getListsAndKeysFromMV(listsInRequest, parity) else: result = await self.facesContext.getListsWithKeys(listsInRequest, parity) return self.success(200, outputJson={"lists": result})