Source code for luna_handlers.app.handlers.handlers_handler
""" Handlers / handlers count handler. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from classes.query_schemas.handlers import HandlersCountQueries, HandlersQueries
from classes.schemas.handler import CreateHandlerModel
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class HandlersHandler(BaseHandler):
"""
Handler for create new handlers.
Resource: "/{api_version}/handlers"
"""
[docs] async def post(self) -> HTTPResponse:
"""
Create new handler. See `spec_create_handler`_.
.. _spec_create_handler:
_static/api.html#operation/createHandler
Returns:
response with handler id and handler location
"""
if self.request.content_type == "application/json":
accountId = self.getAccountIdFromHeader()
newHandler = self.loadDataFromJson(self.request.json, CreateHandlerModel)
if newHandler.isDynamic:
policies = None
else:
await newHandler.policies.checkListsAvailability(self.luna3Client, accountId=accountId)
policies = newHandler.policies.asDict()
handlerId = await self.dbContext.createHandler(
policies=policies,
description=newHandler.description,
accountId=accountId,
isDynamic=newHandler.isDynamic,
)
else:
raise VLException(Error.BadContentType, 400, isCriticalError=False)
location = f"/{self.app.ctx.apiVersion}/handlers/{handlerId}"
self.respHeaders["Location"] = location
return self.success(outputJson={"handler_id": handlerId, "url": location}, statusCode=201)
[docs] async def get(self) -> HTTPResponse:
"""
Get handlers with filters. See `spec_get_handlers`_.
.. _spec_get_handlers:
_static/api.html#operation/getHandlers
Returns:
response with a list of handlers
"""
page, pageSize = self.getPagination()
queries: HandlersQueries = self.loadDataFromQuery(HandlersQueries)
handlers = await self.dbContext.getHandlers(
accountId=queries.accountId,
description=queries.description,
isDynamic=queries.isDynamic,
pageSize=pageSize,
page=page,
)
return self.success(200, outputJson=handlers)
[docs]class HandlerCountHandler(BaseHandler):
"""
Handler for getting handler count
Resource: "/{api_version}/handlers/count"
"""
[docs] async def get(self) -> HTTPResponse:
"""
Get handler count which satisfy filters. See `spec_get_handler_count`_.
.. _spec_get_handler_count:
_static/api.html#operation/getHandlerCount
Returns:
Response with count of handlers
"""
queries: HandlersCountQueries = self.loadDataFromQuery(HandlersCountQueries)
handlerCount = await self.dbContext.getHandlerCount(
accountId=queries.accountId, description=queries.description, isDynamic=queries.isDynamic
)
return self.success(200, outputJson={"handlers_count": handlerCount})