Source code for luna_python_matcher.app_proxy.app

"""Application server."""
from typing import Awaitable, List, Optional, Tuple

from luna3.client import Luna3Session
from luna3.settings import HTTPSettings

from app.version import VERSION
from app_common.base_application import MatcherBaseApp, MatcherBaseCtx
from app_proxy.matcher.dispatcher import Dispatcher, createDispatcher
from configs.configs.configs.services.matcher_proxy import SettingsPythonMatcherProxy
from crutches_on_wheels.cow.utils.healthcheck import checkRedis, checkService, checkSql
from crutches_on_wheels.cow.web.base_proxy_handler_class import SessionPool


[docs]class PythonMatcherProxyCtx(MatcherBaseCtx): """Python matcher application context""" #: dispatcher for matching dispatcher: Dispatcher #: luna3 client luna3Session: Optional[Luna3Session]
[docs]class PythonMatcherProxyApp(MatcherBaseApp): """ Python matcher proxy application """ ctx: PythonMatcherProxyCtx def __init__(self): super().__init__( configClass=SettingsPythonMatcherProxy, apiVersion=VERSION["Version"]["api"], defaultPort=5110, ctxClass=PythonMatcherProxyCtx, )
[docs] async def initialize(self): """Initialize application.""" await super().initialize() await SessionPool.initialize() self.initLunaClient(self.ctx.serviceConfig) self.ctx.dispatcher = createDispatcher(self)
[docs] def initLunaClient(self, settings: SettingsPythonMatcherProxy): """Initialize session to internal services.""" lunaKwargs = dict( pythonMatcherSettings=HTTPSettings( origin=settings.lunaPythonMatcherAddress.origin, api=settings.lunaPythonMatcherAddress.apiVersion, connectTimeout=settings.lunaPythonMatcherTimeouts.connectTimeout, totalTimeout=settings.lunaPythonMatcherTimeouts.totalTimeout, sockConnectTimeout=settings.lunaPythonMatcherTimeouts.sockConnectTimeout, sockReadTimeout=settings.lunaPythonMatcherTimeouts.sockReadTimeout, ) ) self.ctx.luna3Session = Luna3Session(**lunaKwargs)
[docs] def getRuntimeChecks(self, includeLunaServices: bool = False) -> List[Tuple[str, Awaitable]]: """ Returns configured system checks, pairs of (name, coroutine). Args: includeLunaServices: A bool, whether to include checks for luna services. """ checks = [ ("faces_db", checkSql(self.ctx.facesDBAdaptor)), ("attributes_db", checkRedis(self.ctx.redisClient)), ] if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents: checks += [("events_db", checkSql(self.ctx.eventsDBAdaptor))] if includeLunaServices: pythonMatcherClient = self.ctx.luna3Session.pythonMatcherSession.getClient() checks += [ ("python_matcher", checkService(pythonMatcherClient, includeLunaServices)), ] return checks
[docs] async def shutdown(self): """Shutdown application services.""" await SessionPool.close() if self.ctx.luna3Session: await self.ctx.luna3Session.close() await super().shutdown()