Source code for luna_python_matcher.app_proxy.app

"""Application server."""
from typing import Optional, List, Tuple, Awaitable

from luna3.client import Luna3Session
from luna3.settings import QueueSettings, HTTPSettings

from app.version import VERSION
from app_common.base_application import MatcherBaseApp, MatcherBaseCtx
from app_proxy.matcher.dispatcher import Dispatcher, createDispatcher
from configs.configs.configs.services.matcher_proxy import SettingsPythonMatcherProxy
from configs.matcher_clusters import MatcherClusters
from crutches_on_wheels.utils.healthcheck import checkSql, checkRedis, checkService, checkCoreService
from crutches_on_wheels.web.base_proxy_handler_class import SessionPool


[docs]class PythonMatcherProxyCtx(MatcherBaseCtx): """Python matcher application context""" #: dispatcher for matching dispatcher: Dispatcher #: luna3 client luna3Session: Optional[Luna3Session]
[docs]class PythonMatcherProxyApp(MatcherBaseApp): """ Python matcher proxy application """ ctx: PythonMatcherProxyCtx def __init__(self): super().__init__(configClass=SettingsPythonMatcherProxy, apiVersion=VERSION["Version"]["api"], defaultPort=5110, ctxClass=PythonMatcherProxyCtx)
[docs] async def initialize(self): """Initialize application.""" await super().initialize() await SessionPool.initialize() self.initLunaClient(self.ctx.serviceConfig) self.ctx.dispatcher = createDispatcher(self)
[docs] def getQueueSettings(self, settings: SettingsPythonMatcherProxy) -> QueueSettings: """ Get queue settings for matcher-proxy. Args: settings: settings Returns: :class:`QueueSettings` """ queueSettings = QueueSettings(**settings.queueSettings.asLuna3QueueSettingsKwargs()) clusters = MatcherClusters(settings) queueSettings.indexedMatcherExchange = clusters.indexedMatcherCluster.rabbitExchange queueSettings.indexedMatcherRoutingKey = clusters.indexedMatcherCluster.routingKey return queueSettings
[docs] def initLunaClient(self, settings: SettingsPythonMatcherProxy): """Initialize session to internal services.""" lunaKwargs = dict( pythonMatcherSettings=HTTPSettings( origin=settings.lunaPythonMatcherAddress.origin, api=settings.lunaPythonMatcherAddress.apiVersion, connectTimeout=settings.lunaPythonMatcherTimeouts.connectTimeout, totalTimeout=settings.lunaPythonMatcherTimeouts.totalTimeout, sockConnectTimeout=settings.lunaPythonMatcherTimeouts.sockConnectTimeout, sockReadTimeout=settings.lunaPythonMatcherTimeouts.sockReadTimeout, ) ) if settings.additionalServicesUsage.indexedMatcher: lunaKwargs['queueSettings'] = self.getQueueSettings(settings) lunaKwargs['indexManagerSettings'] = HTTPSettings( settings.indexManagerAddress.apiVersion, settings.indexManagerAddress.origin, totalTimeout=settings.indexManagerTimeouts.totalTimeout, connectTimeout=settings.indexManagerTimeouts.connectTimeout, sockConnectTimeout=settings.indexManagerTimeouts.sockConnectTimeout, sockReadTimeout=settings.indexManagerTimeouts.sockReadTimeout, asyncRequest=True, ) self.ctx.luna3Session = Luna3Session(**lunaKwargs)
[docs] def getRuntimeChecks(self, includeLunaServices: bool = False) -> List[Tuple[str, Awaitable]]: """ Returns configured system checks, pairs of (name, coroutine). Args: includeLunaServices: A bool, whether to include checks for luna services. """ checks = [ ("faces_db", checkSql(self.ctx.facesDBAdaptor)), ("attributes_db", checkRedis(self.ctx.redisClient)), ] if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents: checks += [ ("events_db", checkSql(self.ctx.eventsDBAdaptor)) ] if includeLunaServices: pythonMatcherClient = self.ctx.luna3Session.pythonMatcherSession.getClient() checks += [ ("python_matcher", checkService(pythonMatcherClient, includeLunaServices)), ] if self.ctx.serviceConfig.additionalServicesUsage.indexedMatcher: coreClient = self.ctx.luna3Session.queueSession.getClient() checks += [ ("indexed_matcher", checkCoreService(coreClient, "Indexed Matcher")), ] if includeLunaServices: indexManagerClient = self.ctx.luna3Session.indexManagerSession.getClient() checks += [ ("index_manager", checkService(indexManagerClient, includeLunaServices)), ] return checks
[docs] async def shutdown(self): """Shutdown application services.""" await SessionPool.close() if self.ctx.luna3Session: await self.ctx.luna3Session.close() await super().shutdown()