Source code for luna_python_matcher.app.app
"""Sanic application server."""
import asyncio
import multiprocessing
import os
import signal
from typing import Awaitable, List, Optional, Tuple
from pymatcherlib.pymatcherlib import Matcher
from sanic.mixins.listeners import ListenerEvent
from app.version import VERSION
from app_common.base_application import MatcherBaseApp, MatcherBaseCtx
from classes.cross_match import CrossMatcher
from classes.match_processing import MatchLPMProcessor
from classes.matcher_utils import setLogger
from classes.matcherlib_match_tool import CachedMatcherMatchTool
from classes.python_matcher_match_tools import AttributesToolMatcherlib
from configs.config_common import MATCHER_ADDRESS
from configs.configs.configs.services.python_matcher import SettingsPythonMatcher
from crutches_on_wheels.utils.healthcheck import checkRedis, checkSql
from crutches_on_wheels.utils.log import Logger
[docs]async def checkCachedMatcherProcess(app: MatcherBaseApp, loop: asyncio.BaseEventLoop) -> None:
"""
Check connection to Cached Matcher. Terminate Sanic process if connection check failed.
Args:
app: python matcher application
loop: event loop
"""
if not await CachedMatcherMatchTool.checkConnection(MATCHER_ADDRESS):
app.ctx.logger.error(f"Failed check connection to cached matcher: {MATCHER_ADDRESS}")
app.ctx.serviceConfig.printConfigValues()
os.kill(os.getpid(), signal.SIGTERM)
[docs]class PythonMatcherCtx(MatcherBaseCtx):
"""Python matcher application context"""
#: matcher
matcher: Optional[Matcher]
[docs]class PythonMatcherApp(MatcherBaseApp):
"""
Python matcher application
"""
def __init__(self):
super().__init__(
configClass=SettingsPythonMatcher,
apiVersion=VERSION["Version"]["api"],
defaultPort=5100,
ctxClass=PythonMatcherCtx,
)
# check cached matcher process is healthy before app process start
self.register_listener(checkCachedMatcherProcess, ListenerEvent.MAIN_PROCESS_START)
[docs] def initializeMatcher(self):
"""
Initialize matcher and matcherlib and pass reference to the matcher to clients.
It's important to configure logging first.
"""
logger = Logger("matcherlib")
setLogger(logger)
self.ctx.matcher = Matcher(
numOfMatcherThreads=max(multiprocessing.cpu_count() - 1, 1),
storageCapacity=max(
self.ctx.serviceConfig.platformLimits.match.resultCandidateLimit,
self.ctx.serviceConfig.platformLimits.crossMatch.resultCandidateLimit,
),
)
CrossMatcher.initialize(self.ctx.matcher, self.ctx.serviceConfig.platformLimits.crossMatch)
AttributesToolMatcherlib.initialize(self.ctx.matcher)
[docs] async def initialize(self):
"""Initialize application."""
await super().initialize()
self.initializeMatcher()
if self.ctx.serviceConfig.descriptorsCache.cacheEnabled:
CachedMatcherMatchTool.initialize(
MATCHER_ADDRESS, rpcSettings=self.ctx.serviceConfig.descriptorsCache.rpcSettings
)
self.ctx.logger.info("Wait until cached matcher becomes ready before serving http requests")
await CachedMatcherMatchTool.wait()
MatchLPMProcessor.cacheEnabled = True
[docs] async def shutdown(self):
"""Shutdown application services."""
if self.ctx.serviceConfig.descriptorsCache.cacheEnabled:
CachedMatcherMatchTool.close()
await super().shutdown()
[docs] def getRuntimeChecks(self, includeLunaServices: bool = False) -> List[Tuple[str, Awaitable]]:
"""
Returns configured system checks, pairs of (name, coroutine).
Args:
includeLunaServices: A bool, whether to include checks for luna services.
"""
checks = [
("faces_db", checkSql(self.ctx.facesDBAdaptor)),
("attributes_db", checkRedis(self.ctx.redisClient)),
]
if self.ctx.serviceConfig.descriptorsCache.cacheEnabled:
checks += [
("cached_matcher", CachedMatcherMatchTool.healthcheck()),
]
if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents:
checks += [("events_db", checkSql(self.ctx.eventsDBAdaptor))]
return checks