"""
Module contains classes for sending a data to monitoring.
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name
from ..utils.log import Logger # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint # pylint: disable-msg=C0413
[docs]
class BaseMonitoringAdapter(ABC):
"""
Base monitoring adapter.
Attributes:
backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points
_buffer (Any): container of buffering points which is waiting sending to monitoring
flushingPeriod (float): period of flushing points (in seconds)
logger (Logger): logger
_job (Job): sending monitoring data job
"""
_buffer: Any
def __init__(self, flushingPeriod: int):
self._buffer: List[BaseRequestMonitoringPoint] = []
self.flushingPeriod = flushingPeriod
self.logger = Logger()
self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
self._job: Optional[Job] = None
[docs]
async def initializeMonitoring(self) -> None:
"""
Initialize monitoring.
"""
self.initializeScheduler()
[docs]
async def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all request and stop getting new).
"""
self.stopScheduler()
@abstractmethod
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
[docs]
def initializeScheduler(self) -> None:
"""
Start the loop for sending data from the buffer to monitoring.
"""
self.logger.info("start sending data to monitoring")
self._job = self.backgroundScheduler.add_job(
self._flushPoints,
trigger="interval",
seconds=self.flushingPeriod,
)
self.backgroundScheduler.start()
async def _flushPoints(self) -> None:
"""
Send request to monitoring
"""
try:
if self._buffer:
copyPoints = self.clearBuffer()
await self._writePoints(copyPoints)
self.logger.debug(f"send {len(copyPoints)} points to monitoring")
except Exception as exc: # pylint: disable-msg=broad-except
self.logger.error(f"failed write monitoring point, message: {exc}")
[docs]
def stopScheduler(self) -> None:
"""
Stop monitoring.
"""
if self.backgroundScheduler.running:
self.backgroundScheduler.shutdown()
self.logger.info("stop sending data to monitoring")
[docs]
def updateFlushingPeriod(self, newPeriod: int):
"""
Update flushing period
Args:
newPeriod: new period
"""
self._job.reschedule(trigger="interval", seconds=newPeriod)
self.flushingPeriod = newPeriod
[docs]
@abstractmethod
def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to buffer.
Args:
points: points
"""
[docs]
@abstractmethod
def clearBuffer(self) -> Any:
"""Clear monitoring buffer and return it`s content"""