Source code for luna_python_matcher.app_common.base_application

"""
Module contains base application and request class.
"""

from abc import ABC
from typing import Optional

import aioredis
from aioredis import Redis

from app_common.handlers.schemas import initializeSchemas
from configs.config_common import DATABASE_NUMBER, RESPONSE_TIMEOUT
from configs.configs.configs.settings.classes import DBSetting, RedisStorageSetting
from crutches_on_wheels.adaptors.adaptor import DBAdaptor
from crutches_on_wheels.utils.check_connection import (
    checkConnectionToDB,
    checkConnectionToRedis,
    checkConnectionToSentinel,
)
from crutches_on_wheels.web.application import BaseLunaApplicationCtx, LunaApplication
from crutches_on_wheels.web.cmd_parser import ApplicationCMDParser
from crutches_on_wheels.web.request import VLRequest
from db.context_attributes import RedisContext as AttributesDBContext
from db.context_events import DBContext as EventsDBContext
from db.context_faces import DBContext as FacesDBContext
from db.events_db_tools.models.config import DBConfig as EventsDBConfig
from db.events_db_tools.models.events_models import Base as EventsBase
from db.faces_db_tools.models.config import DBConfig as FacesDBConfig
from db.faces_db_tools.models.faces_models import Base as FacesBase


[docs]class MatcherBaseCtx(BaseLunaApplicationCtx): """Base luna-python-matcher context""" #: faces database adaptor facesDBAdaptor: Optional[DBAdaptor] #: events database adaptor eventsDBAdaptor: Optional[DBAdaptor] #: redis client redisClient: Optional[Redis] #: faces database context facesDBContext: FacesDBContext #: events database context eventsDBContext: EventsDBContext #: attributes database context attributesDBContext: AttributesDBContext
[docs]class MatcherBaseApp(LunaApplication, ABC): """Base service application""" ctx: MatcherBaseCtx def __init__(self, *args, **kwargs): super().__init__(*args, requestClass=VLRequest, **kwargs) self.config["RESPONSE_TIMEOUT"] = RESPONSE_TIMEOUT
[docs] def initContexts(self) -> None: """Initialize faces, events and attributes databases context""" self.ctx.facesDBContext = FacesDBContext(self.ctx.logger) self.ctx.eventsDBContext = EventsDBContext(self.ctx.logger) self.ctx.attributesDBContext = AttributesDBContext(client=self.ctx.redisClient, logger=self.ctx.logger)
@staticmethod def _getCMDArgParser(defaultPort: int, serviceConfig: "BasePythonServiceSettingsClass") -> ApplicationCMDParser: """ Get command line arguments parser. Overloading `_getCMDArgParser` to get the service type. Returns: parser with options """ argParser = ApplicationCMDParser( defaultPort=defaultPort, tagableSettings=serviceConfig.getServiceSettingNames() ) argParser.argParser.add_argument( "--service-type", type=str, choices=["matcher", "proxy"], help='service type: one of "matcher" or "proxy"' ) return argParser
[docs] async def initialize(self): """Initialize application.""" initializeSchemas(self.ctx.serviceConfig.platformLimits) await self.initDBContext() await super().initialize() await self.initRedisDB(self.ctx.serviceConfig.attributesDB) self.initContexts()
[docs] async def initRedisDB(self, redisDBSettings: RedisStorageSetting): """ Initialize Redis client. Args: redisDBSettings: redis db settings """ if not redisDBSettings.sentinel.sentinels: self.ctx.redisClient = await aioredis.create_redis_pool( address=f"redis://{redisDBSettings.host}:{redisDBSettings.port}", db=redisDBSettings.dbNumber, password=redisDBSettings.password or None, ) else: sentinel = await aioredis.create_sentinel( redisDBSettings.sentinel.sentinels, db=redisDBSettings.dbNumber, password=redisDBSettings.password or None, ) self.ctx.redisClient = sentinel.master_for(redisDBSettings.sentinel.masterName)
[docs] async def initDBContext(self): """Initialize faces & events database context.""" if self.ctx.serviceConfig.additionalServicesUsage.lunaEvents: EventsDBConfig.initialize(dbType=self.ctx.serviceConfig.eventsDB.type) await EventsDBContext.initDBContext( dbSettings=self.ctx.serviceConfig.eventsDB, storageTime=self.ctx.serviceConfig.storageTime, ) self.ctx.eventsDBAdaptor = EventsDBContext.adaptor FacesDBConfig.initialize(dbType=self.ctx.serviceConfig.facesDB.type, databaseNumber=DATABASE_NUMBER) await FacesDBContext.initDBContext( dbSettings=self.ctx.serviceConfig.facesDB, storageTime=self.ctx.serviceConfig.storageTime, ) self.ctx.facesDBAdaptor = FacesDBContext.adaptor
[docs] @staticmethod async def checkConnectionToAttributesDB(dbSettings: RedisStorageSetting) -> bool: """ Check connections to temporary attributes db Args: dbSettings: redis settings Returns: True, if check was passed successfully, else False. """ if not dbSettings.sentinel.sentinels: return await checkConnectionToRedis( redisHost=dbSettings.host, redisPort=dbSettings.port, redisPassword=dbSettings.password, asyncCheck=True, ) else: return await checkConnectionToSentinel( sentinels=dbSettings.sentinel.sentinels, redisPassword=dbSettings.password, masterName=dbSettings.sentinel.masterName, asyncCheck=True, )
[docs] @staticmethod async def checkConnectionToFacesDB(dbSettings: DBSetting) -> bool: """ Check connections to faces db Args: dbSettings: database settings Returns: True, if check was passed successfully, else False. """ return await checkConnectionToDB( dbSetting=dbSettings, checkMigrationVersion=False, modelNames=[table.__tablename__ for table in FacesBase.__subclasses__()], postfix="luna-faces", asyncCheck=True, )
[docs] @staticmethod async def checkConnectionToEventsDB(dbSettings: DBSetting) -> bool: """ Check connections to events db Args: dbSettings: database settings Returns: True, if check was passed successfully, else False. """ return await checkConnectionToDB( dbSetting=dbSettings, checkMigrationVersion=False, modelNames=[table.__tablename__ for table in EventsBase.__subclasses__()], postfix="luna-events", asyncCheck=True, )
[docs] async def shutdown(self): """Shutdown application services.""" if self.ctx.facesDBAdaptor: await self.ctx.facesDBAdaptor.close() if self.ctx.eventsDBAdaptor: await self.ctx.eventsDBAdaptor.close() if self.ctx.redisClient: self.ctx.redisClient.close() await self.ctx.redisClient.wait_closed() await super().shutdown()