Source code for luna_streams.app.streams_processes.master

"""
Luna-streams master processor module. By the way, the process will be done in one instance at time.
"""

import asyncio
import uuid
from dataclasses import dataclass
from typing import Awaitable, Callable

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name

from configs.config import STREAM_AUTORESTARTER_INTERVAL, STREAM_DOWNGRADE_INTERVAL
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils import mixins
from crutches_on_wheels.cow.utils.log import Logger
from db.context import DBContext, SingleProcessLock


[docs] @dataclass class DBJob: """Job which work with database""" # function for execution func: Callable[[DBContext], Awaitable] # running job interval interval: float # job name name: str
[docs] class MasterProcess(mixins.Initializable): """ Master process. This class is intended for a periodical jobs execution on only one instance of the service. A class instance try to get a database lock periodically. If the instance get the lock it will execute jobs and send heartbeat to the database for being a master, logic entry point `_lockMaster`. Attributes: logger: logger lockContext: db lock context heartbeatInterval: heartbeat interval obsoletingPeriod: stream status obsoleting period scheduler: scheduler for the `process` method _iAmMaster: internal state, master or not now _masterFuture: saved get sessionId: master lock session id """ lockContext: SingleProcessLock scheduler: AsyncIOScheduler | None = None _masterFuture: asyncio.Task | None = None def __init__(self, hearbeatInterval: int, obsoletingPeriod: int): self.name = "master_process" self.logger = Logger(self.name) self._iAmMaster = False self.heartbeatInterval = hearbeatInterval self.obsoletingPeriod = obsoletingPeriod self.sessionId = str(uuid.uuid4()) def _wrapJob(self, job: DBJob) -> Callable[[DBContext], Awaitable]: """ Wrap job function. New function will be call only if self is master process. """ async def f(): if self._iAmMaster: self.logger.debug(f"start {job.name} job") try: await job.func(self.lockContext.context) except Exception as e: self.logger.error(f"job {job.name} is failed", exc_info=e) else: self.logger.debug(f"skip {job.name}") return f
[docs] async def initializeProcess(self): """Init master instance processes""" self.lockContext = SingleProcessLock(self.logger, name=self.name) self._masterFuture = asyncio.create_task(self._lockMaster()) self.scheduler = AsyncIOScheduler(timezone=get_localzone_name()) async def downgradeStatus(context: DBContext): await context.downgradeStreamStatus(self.obsoletingPeriod) statusDowngradeProcessJob = DBJob(downgradeStatus, STREAM_DOWNGRADE_INTERVAL, "stream_downgrade_status") async def autorestart(context: DBContext): await context.autorestartStreams() autorestartStreamJob = DBJob(autorestart, STREAM_AUTORESTARTER_INTERVAL, "stream_autorestart") for job in [statusDowngradeProcessJob, autorestartStreamJob]: self.scheduler.add_job(self._wrapJob(job), trigger="interval", seconds=job.interval) self.scheduler.start()
[docs] async def close(self) -> None: """Shutdown scheduler""" if self.scheduler: self.scheduler.shutdown(wait=False) if self._masterFuture: self._masterFuture.cancel()
async def _tryToGetMasterLock(self): """ Function try to get the lock from database or send a heartbeat. Heartbeat is update expiration time in a database table. """ self.logger.debug(f"try to get lock") try: if self._iAmMaster: isLocked = await self.lockContext.heartbeat(self.name, self.heartbeatInterval, sessionId=self.sessionId) if not isLocked: self.logger.info("instance is not master now") else: isLocked = await self.lockContext.lockProcess( self.name, self.heartbeatInterval, sessionId=self.sessionId ) if isLocked: self.logger.info("instance has become a master") except VLException as exc: self.logger.error(f"failed to get master lock: {exc.error.detail}") isLocked = False except Exception as e: self.logger.error("failed to get master lock", exc_info=e) isLocked = False if isLocked: self.logger.debug(f"lock is acquired") else: self.logger.debug(f"lock miss: already taken") self._iAmMaster = isLocked return isLocked async def _lockMaster(self): """ A master lock infinity getter. Infinity loop of attempts to get master lock """ res = await self._tryToGetMasterLock() while True: if res: # next attempt must be before the lock expiration time is ended await asyncio.sleep(self.heartbeatInterval * 0.7) else: # next attempt must be after the lock expiration time is ended await asyncio.sleep(self.heartbeatInterval + 0.1) res = await self._tryToGetMasterLock()