Source code for luna_streams.app.streams_processes.master
"""
Luna-streams master processor module. By the way, the process will be done in one instance at time.
"""
import asyncio
import uuid
from dataclasses import dataclass
from typing import Awaitable, Callable
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name
from configs.config import STREAM_AUTORESTARTER_INTERVAL, STREAM_DOWNGRADE_INTERVAL
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils import mixins
from crutches_on_wheels.cow.utils.log import Logger
from db.context import DBContext, SingleProcessLock
[docs]
@dataclass
class DBJob:
"""Job which work with database"""
# function for execution
func: Callable[[DBContext], Awaitable]
# running job interval
interval: float
# job name
name: str
[docs]
class MasterProcess(mixins.Initializable):
"""
Master process. This class is intended for a periodical jobs execution on only one instance of the service.
A class instance try to get a database lock periodically. If the instance get the lock it will execute jobs and
send heartbeat to the database for being a master, logic entry point `_lockMaster`.
Attributes:
logger: logger
lockContext: db lock context
heartbeatInterval: heartbeat interval
obsoletingPeriod: stream status obsoleting period
scheduler: scheduler for the `process` method
_iAmMaster: internal state, master or not now
_masterFuture: saved get
sessionId: master lock session id
"""
lockContext: SingleProcessLock
scheduler: AsyncIOScheduler | None = None
_masterFuture: asyncio.Task | None = None
def __init__(self, hearbeatInterval: int, obsoletingPeriod: int):
self.name = "master_process"
self.logger = Logger(self.name)
self._iAmMaster = False
self.heartbeatInterval = hearbeatInterval
self.obsoletingPeriod = obsoletingPeriod
self.sessionId = str(uuid.uuid4())
def _wrapJob(self, job: DBJob) -> Callable[[DBContext], Awaitable]:
"""
Wrap job function. New function will be call only if self is master process.
"""
async def f():
if self._iAmMaster:
self.logger.debug(f"start {job.name} job")
try:
await job.func(self.lockContext.context)
except Exception as e:
self.logger.error(f"job {job.name} is failed", exc_info=e)
else:
self.logger.debug(f"skip {job.name}")
return f
[docs]
async def initializeProcess(self):
"""Init master instance processes"""
self.lockContext = SingleProcessLock(self.logger, name=self.name)
self._masterFuture = asyncio.create_task(self._lockMaster())
self.scheduler = AsyncIOScheduler(timezone=get_localzone_name())
async def downgradeStatus(context: DBContext):
await context.downgradeStreamStatus(self.obsoletingPeriod)
statusDowngradeProcessJob = DBJob(downgradeStatus, STREAM_DOWNGRADE_INTERVAL, "stream_downgrade_status")
async def autorestart(context: DBContext):
await context.autorestartStreams()
autorestartStreamJob = DBJob(autorestart, STREAM_AUTORESTARTER_INTERVAL, "stream_autorestart")
for job in [statusDowngradeProcessJob, autorestartStreamJob]:
self.scheduler.add_job(self._wrapJob(job), trigger="interval", seconds=job.interval)
self.scheduler.start()
[docs]
async def close(self) -> None:
"""Shutdown scheduler"""
if self.scheduler:
self.scheduler.shutdown(wait=False)
if self._masterFuture:
self._masterFuture.cancel()
async def _tryToGetMasterLock(self):
"""
Function try to get the lock from database or send a heartbeat. Heartbeat is update expiration time in a
database table.
"""
self.logger.debug(f"try to get lock")
try:
if self._iAmMaster:
isLocked = await self.lockContext.heartbeat(self.name, self.heartbeatInterval, sessionId=self.sessionId)
if not isLocked:
self.logger.info("instance is not master now")
else:
isLocked = await self.lockContext.lockProcess(
self.name, self.heartbeatInterval, sessionId=self.sessionId
)
if isLocked:
self.logger.info("instance has become a master")
except VLException as exc:
self.logger.error(f"failed to get master lock: {exc.error.detail}")
isLocked = False
except Exception as e:
self.logger.error("failed to get master lock", exc_info=e)
isLocked = False
if isLocked:
self.logger.debug(f"lock is acquired")
else:
self.logger.debug(f"lock miss: already taken")
self._iAmMaster = isLocked
return isLocked
async def _lockMaster(self):
"""
A master lock infinity getter.
Infinity loop of attempts to get master lock
"""
res = await self._tryToGetMasterLock()
while True:
if res:
# next attempt must be before the lock expiration time is ended
await asyncio.sleep(self.heartbeatInterval * 0.7)
else:
# next attempt must be after the lock expiration time is ended
await asyncio.sleep(self.heartbeatInterval + 0.1)
res = await self._tryToGetMasterLock()