Source code for cow.monitoring.base_adapter

"""
Module contains classes for sending data to monitoring.
"""

from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional

from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name

from ..utils.log import Logger  # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint  # pylint: disable-msg=C0413


[docs] class BaseMonitoringAdapter(ABC): """ Base monitoring adapter. Attributes: backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points _buffer (Any): container of buffering points which is waiting sending to monitoring flushingPeriod (float): period of flushing points (in seconds) logger (Logger): logger _job (Job): sending monitoring data job """ _buffer: Any def __init__(self, flushingPeriod: float): self._buffer: List[BaseRequestMonitoringPoint] = [] self.flushingPeriod = flushingPeriod self.logger = Logger() self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name()) self._job: Optional[Job] = None
[docs] async def initializeMonitoring(self) -> None: """ Initialize monitoring. """ self.initializeScheduler()
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring (cancel all requests and stop getting new). """ self.stopScheduler()
@abstractmethod async def _writePoints(self, points): """ Write points. Args: points """
[docs] def initializeScheduler(self) -> None: """ Start the loop for sending data from the buffer to monitoring. """ self.logger.info("start sending data to monitoring") self._job = self.backgroundScheduler.add_job( self._flushPoints, trigger="interval", seconds=self.flushingPeriod, ) self.backgroundScheduler.start()
async def _flushPoints(self) -> None: """ Send request to monitoring """ try: if self._buffer: copyPoints = self.clearBuffer() await self._writePoints(copyPoints) self.logger.debug(f"send {len(copyPoints)} points to monitoring") except Exception as exc: # pylint: disable-msg=broad-except self.logger.error(f"failed write monitoring point, message: {exc}")
[docs] def stopScheduler(self) -> None: """ Stop monitoring. """ if self.backgroundScheduler.running: self.backgroundScheduler.shutdown() self.logger.info("stop sending data to monitoring")
[docs] def updateFlushingPeriod(self, newPeriod: int): """ Update flushing period Args: newPeriod: new period """ self._job.reschedule(trigger="interval", seconds=newPeriod) self.flushingPeriod = newPeriod
[docs] @abstractmethod def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to the buffer. Args: points: points """
[docs] @abstractmethod def clearBuffer(self) -> Any: """Clear monitoring buffer and return it's content"""