Source code for luna_handlers.app.handlers.handler_handler
""" Handler for a certain handler. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from classes.query_schemas.handlers import HandlerQueries
from classes.schemas.handler import CreateHandlerModel
from crutches_on_wheels.cow.errors.errors import Error
[docs]class AccountHandlerHandler(BaseHandler):
"""
Handler for work with the handler.
Resource: "/{api_version}/handlers/{handlerId}"
"""
[docs] async def put(self, handlerId: str) -> HTTPResponse: # pylint: disable-msg=W0221
"""
Update the handler. See `spec_put_handler`_.
.. _spec_put_handler:
_static/api.html#operation/putHandler
Args:
handlerId: handler id
Returns:
Response with empty body
"""
handler = self.loadDataFromJson(self.request.json, CreateHandlerModel)
accountId = self.getAccountIdFromHeader()
if handler.isDynamic:
policies = None
else:
await handler.policies.checkListsAvailability(self.luna3Client, accountId=accountId)
policies = handler.policies.asDict()
res = await self.dbContext.putHandler(
handlerId=handlerId,
policies=policies,
description=handler.description,
accountId=accountId,
isDynamic=handler.isDynamic,
)
if res:
return self.success(204)
return self.error(404, Error.HandlerNotFound.format(handlerId))
[docs] async def get(self, handlerId: str) -> HTTPResponse:
"""
Get handler by id. See `spec_get_handler`_.
.. _spec_get_handler:
_static/api.html#operation/getHandler
Args:
handlerId: handler id
Returns:
handler in response
"""
queries: HandlerQueries = self.loadDataFromQuery(HandlerQueries)
handler = await self.dbContext.getHandler(handlerId, accountId=queries.accountId)
return self.success(200, outputJson=handler)
[docs] async def head(self, handlerId: str) -> HTTPResponse:
"""
Check a handler with id=handlerId existence. See `spec_head_handler`_.
.. _spec_head_handler:
_static/api.html#operation/checkHandler
Args:
handlerId: handler id
Returns:
response with status code 200, if handler exist and 404 if handler does not exist
"""
queries: HandlerQueries = self.loadDataFromQuery(HandlerQueries)
handlerExist = await self.dbContext.doesHandlerExist(handlerId, accountId=queries.accountId)
if handlerExist:
return self.success(200, extraHeaders={"Content-Type": "application/json"})
return self.error(404, Error.HandlerNotFound.format(handlerId))
[docs] async def delete(self, handlerId: str) -> HTTPResponse:
"""
Remove handler by id. See `spec_delete_handler`_.
.. _spec_delete_handler:
_static/api.html#operation/removeHandler
Args:
handlerId: handler id
Returns:
response with status code 204
"""
queries: HandlerQueries = self.loadDataFromQuery(HandlerQueries)
if await self.dbContext.deleteHandler(handlerId, queries.accountId):
return self.success(204)
else:
return self.error(404, Error.HandlerNotFound.format(handlerId))