Source code for cow.monitoring.base_adapter

"""
Module contains classes for sending a data to monitoring.
"""

from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional

from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name

from ..utils.log import Logger  # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint  # pylint: disable-msg=C0413


[docs] class BaseMonitoringAdapter(ABC): """ Base monitoring adapter. Attributes: backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points _buffer (Any): container of buffering points which is waiting sending to monitoring flushingPeriod (float): period of flushing points (in seconds) logger (Logger): logger _job (Job): sending monitoring data job """ _buffer: Any def __init__(self, flushingPeriod: int): self._buffer: List[BaseRequestMonitoringPoint] = [] self.flushingPeriod = flushingPeriod self.logger = Logger() self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name()) self._job: Optional[Job] = None
[docs] async def initializeMonitoring(self) -> None: """ Initialize monitoring. """ self.initializeScheduler()
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring (cancel all request and stop getting new). """ self.stopScheduler()
@abstractmethod async def _writePoints(self, points): """ Write points. Args: points """
[docs] def initializeScheduler(self) -> None: """ Start the loop for sending data from the buffer to monitoring. """ self.logger.info("start sending data to monitoring") self._job = self.backgroundScheduler.add_job( self._flushPoints, trigger="interval", seconds=self.flushingPeriod, ) self.backgroundScheduler.start()
async def _flushPoints(self) -> None: """ Send request to monitoring """ try: if self._buffer: copyPoints = self.clearBuffer() await self._writePoints(copyPoints) self.logger.debug(f"send {len(copyPoints)} points to monitoring") except Exception as exc: # pylint: disable-msg=broad-except self.logger.error(f"failed write monitoring point, message: {exc}")
[docs] def stopScheduler(self) -> None: """ Stop monitoring. """ if self.backgroundScheduler.running: self.backgroundScheduler.shutdown() self.logger.info("stop sending data to monitoring")
[docs] def updateFlushingPeriod(self, newPeriod: int): """ Update flushing period Args: newPeriod: new period """ self._job.reschedule(trigger="interval", seconds=newPeriod) self.flushingPeriod = newPeriod
[docs] @abstractmethod def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to buffer. Args: points: points """
[docs] @abstractmethod def clearBuffer(self) -> Any: """Clear monitoring buffer and return it`s content"""