Source code for luna_handlers.app.handlers.handlers_handler
""" Handlers / handlers count handler. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from classes.schemas.handler import CreateHandlerModel
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.query_getters import boolFrom01Getter, uuidGetter
[docs]class HandlersHandler(BaseHandler):
"""
Handler for create new handlers.
Resource: "/{api_version}/handlers"
"""
[docs] async def post(self) -> HTTPResponse:
"""
Create new handler. See `spec_create_handler`_.
.. _spec_create_handler:
_static/api.html#operation/createHandler
Returns:
response with handler id and handler location
"""
if self.request.content_type == "application/json":
newHandler = self.loadDataFromJson(self.request.json, CreateHandlerModel)
if newHandler.isDynamic:
policies = None
else:
await newHandler.policies.checkListsAvailability(self.luna3Client, accountId=newHandler.accountId)
policies = newHandler.policies.asDict()
handlerId = await self.dbContext.createHandler(
policies=policies,
description=newHandler.description,
accountId=str(newHandler.accountId),
isDynamic=newHandler.isDynamic,
)
else:
raise VLException(Error.BadContentType, 400, isCriticalError=False)
location = f"/{self.app.ctx.apiVersion}/handlers/{handlerId}"
self.respHeaders["Location"] = location
return self.success(outputJson={"handler_id": handlerId, "url": location}, statusCode=201)
[docs] async def get(self) -> HTTPResponse:
"""
Get handlers with filters. See `spec_get_handlers`_.
.. _spec_get_handlers:
_static/api.html#operation/getHandlers
Returns:
response with a list of handlers
"""
page, pageSize = self.getPagination()
accountId = self.getQueryParam("account_id", uuidGetter)
description = self.getQueryParam("description")
isDynamic = self.getQueryParam("is_dynamic", boolFrom01Getter)
handlers = await self.dbContext.getHandlers(
accountId=accountId, description=description, isDynamic=isDynamic, pageSize=pageSize, page=page
)
return self.success(200, outputJson=handlers)
[docs]class HandlerCountHandler(BaseHandler):
"""
Handler for getting handler count
Resource: "/{api_version}/handlers/count"
"""
[docs] async def get(self) -> HTTPResponse:
"""
Get handler count which satisfy filters. See `spec_get_handler_count`_.
.. _spec_get_handler_count:
_static/api.html#operation/getHandlerCount
Returns:
Response with count of faces
"""
description = self.getQueryParam("description")
accountId = self.getQueryParam("account_id", uuidGetter)
isDynamic = self.getQueryParam("is_dynamic", boolFrom01Getter)
handlerCount = await self.dbContext.getHandlerCount(
accountId=accountId, description=description, isDynamic=isDynamic
)
return self.success(200, outputJson={"handlers_count": handlerCount})