Source code for luna_python_matcher.app.app

"""Sanic application server."""
import asyncio
import multiprocessing
import os
import signal
from typing import Optional, List, Tuple, Awaitable

from pymatcherlib.pymatcherlib import Matcher
from sanic.mixins.listeners import ListenerEvent

from app.version import VERSION
from app_common.base_application import MatcherBaseApp, MatcherBaseCtx
from classes.cross_match import CrossMatcher
from classes.match_processing import MatchLPMProcessor
from classes.matcher_utils import setLogger
from classes.matcherlib_match_tool import CachedMatcherMatchTool
from classes.python_matcher_match_tools import AttributesToolMatcherlib
from configs.config_common import MATCHER_ADDRESS
from configs.configs.configs.services.python_matcher import SettingsPythonMatcher
from crutches_on_wheels.utils.healthcheck import checkSql, checkRedis
from crutches_on_wheels.utils.log import Logger


[docs]async def checkCachedMatcherProcess(app: MatcherBaseApp, loop: asyncio.BaseEventLoop) -> None: """ Check connection to Cached Matcher. Terminate Sanic process if connection check failed. Args: app: python matcher application loop: event loop """ if not await CachedMatcherMatchTool.checkConnection(MATCHER_ADDRESS): app.ctx.logger.error(f"Failed check connection to cached matcher: {MATCHER_ADDRESS}") app.ctx.serviceConfig.printConfigValues() os.kill(os.getpid(), signal.SIGTERM)
[docs]class PythonMatcherCtx(MatcherBaseCtx): """Python matcher application context""" #: matcher matcher: Optional[Matcher]
[docs]class PythonMatcherApp(MatcherBaseApp): """ Python matcher application """ def __init__(self): super().__init__(configClass=SettingsPythonMatcher, apiVersion=VERSION["Version"]["api"], defaultPort=5100, ctxClass=PythonMatcherCtx) # check cached matcher process is healthy before app process start self.register_listener(checkCachedMatcherProcess, ListenerEvent.MAIN_PROCESS_START)
[docs] def initializeMatcher(self): """ Initialize matcher and matcherlib and pass reference to the matcher to clients. It's important to configure logging first. """ logger = Logger('matcherlib') setLogger(logger) self.ctx.matcher = Matcher( numOfMatcherThreads=max(multiprocessing.cpu_count() - 1, 1), storageCapacity=max(self.ctx.serviceConfig.platformLimits.match.resultCandidateLimit, self.ctx.serviceConfig.platformLimits.crossMatch.resultCandidateLimit) ) CrossMatcher.initialize(self.ctx.matcher, self.ctx.serviceConfig.platformLimits.crossMatch) AttributesToolMatcherlib.initialize(self.ctx.matcher)
[docs] async def initialize(self): """Initialize application.""" await super().initialize() self.initializeMatcher() if self.ctx.serviceConfig.descriptorsCache.cacheEnabled: CachedMatcherMatchTool.initialize( MATCHER_ADDRESS, rpcSettings=self.ctx.serviceConfig.descriptorsCache.rpcSettings ) self.ctx.logger.info("Wait until cached matcher becomes ready before serving http requests") await CachedMatcherMatchTool.wait() MatchLPMProcessor.cacheEnabled = True
[docs] async def shutdown(self): """Shutdown application services.""" if self.ctx.serviceConfig.descriptorsCache.cacheEnabled: CachedMatcherMatchTool.close() await super().shutdown()
[docs] def getRuntimeChecks(self, includeLunaServices: bool = False) -> List[Tuple[str, Awaitable]]: """ Returns configured system checks, pairs of (name, coroutine). Args: includeLunaServices: A bool, whether to include checks for luna services. """ checks = [ ("faces_db", checkSql(self.ctx.facesDBAdaptor)), ("attributes_db", checkRedis(self.ctx.redisClient)), ] if self.ctx.serviceConfig.descriptorsCache.cacheEnabled: checks += [ ("cached_matcher", CachedMatcherMatchTool.healthcheck()), ] if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents: checks += [ ("events_db", checkSql(self.ctx.eventsDBAdaptor)) ] return checks