"""Application server."""
from typing import Awaitable, List, Optional, Tuple
from luna3.client import Luna3Session
from luna3.settings import HTTPSettings, QueueSettings
from app.version import VERSION
from app_common.base_application import MatcherBaseApp, MatcherBaseCtx
from app_proxy.matcher.dispatcher import Dispatcher, createDispatcher
from configs.configs.configs.services.matcher_proxy import SettingsPythonMatcherProxy
from configs.matcher_clusters import MatcherClusters
from crutches_on_wheels.cow.utils.healthcheck import checkCoreService, checkRedis, checkService, checkSql
from crutches_on_wheels.cow.web.base_proxy_handler_class import SessionPool
[docs]class PythonMatcherProxyCtx(MatcherBaseCtx):
"""Python matcher application context"""
#: dispatcher for matching
dispatcher: Dispatcher
#: luna3 client
luna3Session: Optional[Luna3Session]
[docs]class PythonMatcherProxyApp(MatcherBaseApp):
"""
Python matcher proxy application
"""
ctx: PythonMatcherProxyCtx
def __init__(self):
super().__init__(
configClass=SettingsPythonMatcherProxy,
apiVersion=VERSION["Version"]["api"],
defaultPort=5110,
ctxClass=PythonMatcherProxyCtx,
)
[docs] async def initialize(self):
"""Initialize application."""
await super().initialize()
await SessionPool.initialize()
self.initLunaClient(self.ctx.serviceConfig)
self.ctx.dispatcher = createDispatcher(self)
[docs] def getQueueSettings(self, settings: SettingsPythonMatcherProxy) -> QueueSettings:
"""
Get queue settings for matcher-proxy.
Args:
settings: settings
Returns:
:class:`QueueSettings`
"""
queueSettings = QueueSettings(**settings.queueSettings.asLuna3QueueSettingsKwargs())
clusters = MatcherClusters(settings)
queueSettings.indexedMatcherExchange = clusters.indexedMatcherCluster.rabbitExchange
queueSettings.indexedMatcherRoutingKey = clusters.indexedMatcherCluster.routingKey
return queueSettings
[docs] def initLunaClient(self, settings: SettingsPythonMatcherProxy):
"""Initialize session to internal services."""
lunaKwargs = dict(
pythonMatcherSettings=HTTPSettings(
origin=settings.lunaPythonMatcherAddress.origin,
api=settings.lunaPythonMatcherAddress.apiVersion,
connectTimeout=settings.lunaPythonMatcherTimeouts.connectTimeout,
totalTimeout=settings.lunaPythonMatcherTimeouts.totalTimeout,
sockConnectTimeout=settings.lunaPythonMatcherTimeouts.sockConnectTimeout,
sockReadTimeout=settings.lunaPythonMatcherTimeouts.sockReadTimeout,
)
)
if settings.additionalServicesUsage.indexedMatcher:
lunaKwargs["queueSettings"] = self.getQueueSettings(settings)
lunaKwargs["indexManagerSettings"] = HTTPSettings(
settings.indexManagerAddress.apiVersion,
settings.indexManagerAddress.origin,
totalTimeout=settings.indexManagerTimeouts.totalTimeout,
connectTimeout=settings.indexManagerTimeouts.connectTimeout,
sockConnectTimeout=settings.indexManagerTimeouts.sockConnectTimeout,
sockReadTimeout=settings.indexManagerTimeouts.sockReadTimeout,
asyncRequest=True,
)
self.ctx.luna3Session = Luna3Session(**lunaKwargs)
[docs] def getRuntimeChecks(self, includeLunaServices: bool = False) -> List[Tuple[str, Awaitable]]:
"""
Returns configured system checks, pairs of (name, coroutine).
Args:
includeLunaServices: A bool, whether to include checks for luna services.
"""
checks = [
("faces_db", checkSql(self.ctx.facesDBAdaptor)),
("attributes_db", checkRedis(self.ctx.redisClient)),
]
if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents:
checks += [("events_db", checkSql(self.ctx.eventsDBAdaptor))]
if includeLunaServices:
pythonMatcherClient = self.ctx.luna3Session.pythonMatcherSession.getClient()
checks += [
("python_matcher", checkService(pythonMatcherClient, includeLunaServices)),
]
if self.ctx.serviceConfig.additionalServicesUsage.indexedMatcher:
coreClient = self.ctx.luna3Session.queueSession.getClient()
checks += [
("indexed_matcher", checkCoreService(coreClient, "Indexed Matcher")),
]
if includeLunaServices:
indexManagerClient = self.ctx.luna3Session.indexManagerSession.getClient()
checks += [
("index_manager", checkService(indexManagerClient, includeLunaServices)),
]
return checks
[docs] async def shutdown(self):
"""Shutdown application services."""
await SessionPool.close()
if self.ctx.luna3Session:
await self.ctx.luna3Session.close()
await super().shutdown()