Source code for luna_tasks.crutches_on_wheels.web.health_handler

"""Health Handler

Module realizes healthcheck handler.
"""
from time import perf_counter

from sanic import response
from sanic.response import HTTPResponse

from .application import LunaApplication
from .query_getters import int01Getter
from ..web.handlers import BaseHandler


[docs]class HealthCheckHandler(BaseHandler): # pylint: disable-msg=W0223 """ Handler for health check """ @property def app(self) -> LunaApplication: """ Get application Returns: running application """ return self.request.app @property def config(self): """ Get running application config Returns: running application config """ return self.request.app.ctx.serviceConfig
[docs] async def get(self) -> HTTPResponse: """ Check health of service, see `spec`_ .. _spec: _static/api.html#operation/healthcheck Resource is reached by address '/healthcheck' """ start = perf_counter() includeLunaServices = bool(self.getQueryParam("include_luna_services", int01Getter, default=0)) results = await self.app.checkHealth(includeLunaServices=includeLunaServices) if not all(health.status for health in results): outputJson = {"errors": [health.asDict() for health in results]} return response.json(status=502, body=outputJson, headers=self.respHeaders) outputJson = {"execution_time": perf_counter() - start} return response.json(status=200, body=outputJson, headers=self.respHeaders)
[docs]class HealthCheckRedirectHandler(BaseHandler): """ Redirect handler. """ @property def app(self) -> LunaApplication: """ Get application Returns: running application """ return self.request.app @property def config(self): """ Get running application config Returns: running application config """ return self.request.app.ctx.serviceConfig
[docs] async def get(self) -> HTTPResponse: """ Redirect request to the /{API}/{resource} """ return response.redirect(f"/{self.app.ctx.apiVersion}{self.request.raw_url.decode()}", status=308)