Source code for luna_sender.crutches_on_wheels.web.health_handler
"""Health Handler
Module realizes healthcheck handler.
"""
from time import perf_counter
from sanic import response
from sanic.response import HTTPResponse
from ..web.handlers import BaseHandler
from .application import LunaApplication
from .query_getters import int01Getter
[docs]class HealthCheckHandler(BaseHandler): # pylint: disable-msg=W0223
"""
Handler for health check
"""
@property
def app(self) -> LunaApplication:
"""
Get application
Returns:
running application
"""
return self.request.app
@property
def config(self):
"""
Get running application config
Returns:
running application config
"""
return self.request.app.ctx.serviceConfig
[docs] async def get(self) -> HTTPResponse:
"""
Check health of service, see `spec`_
.. _spec:
_static/api.html#operation/healthcheck
Resource is reached by address '/healthcheck'
"""
start = perf_counter()
includeLunaServices = bool(self.getQueryParam("include_luna_services", int01Getter, default=0))
results = await self.app.checkHealth(includeLunaServices=includeLunaServices)
if not all(health.status for health in results):
outputJson = {"errors": [health.asDict() for health in results]}
return response.json(status=502, body=outputJson, headers=self.respHeaders)
outputJson = {"execution_time": perf_counter() - start}
return response.json(status=200, body=outputJson, headers=self.respHeaders)
[docs]class HealthCheckRedirectHandler(BaseHandler):
"""Redirect handler."""
@property
def app(self) -> LunaApplication:
"""
Get application
Returns:
running application
"""
return self.request.app
@property
def config(self):
"""
Get running application config
Returns:
running application config
"""
return self.request.app.ctx.serviceConfig
[docs] async def get(self) -> HTTPResponse:
"""Redirect request to the /{API}/{resource}"""
return response.redirect(f"/{self.app.ctx.apiVersion}{self.request.raw_url.decode()}", status=308)