Source code for luna_handlers.app.handlers.handlers_handler

""" Handlers / handlers count handler. """
from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseHandler
from classes.schemas.handler import CreateHandlerModel
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.query_getters import boolFrom01Getter, uuidGetter


[docs]class HandlersHandler(BaseHandler): """ Handler for create new handlers. Resource: "/{api_version}/handlers" """
[docs] async def post(self) -> HTTPResponse: """ Create new handler. See `spec_create_handler`_. .. _spec_create_handler: _static/api.html#operation/createHandler Returns: response with handler id and handler location """ if self.request.content_type == "application/json": newHandler = self.loadDataFromJson(self.request.json, CreateHandlerModel) if newHandler.isDynamic: policies = None else: await newHandler.policies.checkListsAvailability(self.luna3Client, accountId=newHandler.accountId) policies = newHandler.policies.asDict() handlerId = await self.dbContext.createHandler( policies=policies, description=newHandler.description, accountId=str(newHandler.accountId), isDynamic=newHandler.isDynamic, ) else: raise VLException(Error.BadContentType, 400, isCriticalError=False) location = f"/{self.app.ctx.apiVersion}/handlers/{handlerId}" self.respHeaders["Location"] = location return self.success(outputJson={"handler_id": handlerId, "url": location}, statusCode=201)
[docs] async def get(self) -> HTTPResponse: """ Get handlers with filters. See `spec_get_handlers`_. .. _spec_get_handlers: _static/api.html#operation/getHandlers Returns: response with a list of handlers """ page, pageSize = self.getPagination() accountId = self.getQueryParam("account_id", uuidGetter) description = self.getQueryParam("description") isDynamic = self.getQueryParam("is_dynamic", boolFrom01Getter) handlers = await self.dbContext.getHandlers( accountId=accountId, description=description, isDynamic=isDynamic, pageSize=pageSize, page=page ) return self.success(200, outputJson=handlers)
[docs]class HandlerCountHandler(BaseHandler): """ Handler for getting handler count Resource: "/{api_version}/handlers/count" """
[docs] async def get(self) -> HTTPResponse: """ Get handler count which satisfy filters. See `spec_get_handler_count`_. .. _spec_get_handler_count: _static/api.html#operation/getHandlerCount Returns: Response with count of faces """ description = self.getQueryParam("description") accountId = self.getQueryParam("account_id", uuidGetter) isDynamic = self.getQueryParam("is_dynamic", boolFrom01Getter) handlerCount = await self.dbContext.getHandlerCount( accountId=accountId, description=description, isDynamic=isDynamic ) return self.success(200, outputJson={"handlers_count": handlerCount})