Source code for luna_handlers.app.handlers.handlers_handler

""" Handlers / handlers count handler. """
from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseHandler
from classes.query_schemas.handlers import HandlersCountQueries, HandlersQueries
from classes.schemas.handler import CreateHandlerModel
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException


[docs]class HandlersHandler(BaseHandler): """ Handler for create new handlers. Resource: "/{api_version}/handlers" """
[docs] async def post(self) -> HTTPResponse: """ Create new handler. See `spec_create_handler`_. .. _spec_create_handler: _static/api.html#operation/createHandler Returns: response with handler id and handler location """ if self.request.content_type == "application/json": accountId = self.getAccountIdFromHeader() newHandler = self.loadDataFromJson(self.request.json, CreateHandlerModel) if newHandler.isDynamic: policies = None else: await newHandler.policies.checkListsAvailability(self.luna3Client, accountId=accountId) policies = newHandler.policies.asDict() handlerId = await self.dbContext.createHandler( policies=policies, description=newHandler.description, accountId=accountId, isDynamic=newHandler.isDynamic, ) else: raise VLException(Error.BadContentType, 400, isCriticalError=False) location = f"/{self.app.ctx.apiVersion}/handlers/{handlerId}" self.respHeaders["Location"] = location return self.success(outputJson={"handler_id": handlerId, "url": location}, statusCode=201)
[docs] async def get(self) -> HTTPResponse: """ Get handlers with filters. See `spec_get_handlers`_. .. _spec_get_handlers: _static/api.html#operation/getHandlers Returns: response with a list of handlers """ page, pageSize = self.getPagination() queries: HandlersQueries = self.loadDataFromQuery(HandlersQueries) handlers = await self.dbContext.getHandlers( accountId=queries.accountId, description=queries.description, isDynamic=queries.isDynamic, pageSize=pageSize, page=page, ) return self.success(200, outputJson=handlers)
[docs]class HandlerCountHandler(BaseHandler): """ Handler for getting handler count Resource: "/{api_version}/handlers/count" """
[docs] async def get(self) -> HTTPResponse: """ Get handler count which satisfy filters. See `spec_get_handler_count`_. .. _spec_get_handler_count: _static/api.html#operation/getHandlerCount Returns: Response with count of handlers """ queries: HandlersCountQueries = self.loadDataFromQuery(HandlersCountQueries) handlerCount = await self.dbContext.getHandlerCount( accountId=queries.accountId, description=queries.description, isDynamic=queries.isDynamic ) return self.success(200, outputJson={"handlers_count": handlerCount})