"""
Module contains classes for sending data to monitoring.
"""
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name
from ..utils.log import Logger # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint # pylint: disable-msg=C0413
[docs]
class BaseMonitoringAdapter(ABC):
"""
Base monitoring adapter.
Attributes:
backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points
_buffer (Any): container of buffering points which is waiting sending to monitoring
flushingPeriod (float): period of flushing points (in seconds)
logger (Logger): logger
_job (Job): sending monitoring data job
"""
_buffer: Any
def __init__(self, flushingPeriod: float):
self._buffer: List[BaseRequestMonitoringPoint] = []
self.flushingPeriod = flushingPeriod
self.logger = Logger()
self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
self._job: Optional[Job] = None
[docs]
async def initializeMonitoring(self) -> None:
"""
Initialize monitoring.
"""
self.initializeScheduler()
[docs]
async def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all requests and stop getting new).
"""
self.stopScheduler()
@abstractmethod
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
[docs]
def initializeScheduler(self) -> None:
"""
Start the loop for sending data from the buffer to monitoring.
"""
self.logger.info("start sending data to monitoring")
self._job = self.backgroundScheduler.add_job(
self._flushPoints,
trigger="interval",
seconds=self.flushingPeriod,
)
self.backgroundScheduler.start()
async def _flushPoints(self) -> None:
"""
Send request to monitoring
"""
try:
if self._buffer:
copyPoints = self.clearBuffer()
await self._writePoints(copyPoints)
self.logger.debug(f"send {len(copyPoints)} points to monitoring")
except Exception as exc: # pylint: disable-msg=broad-except
self.logger.error(f"failed write monitoring point, message: {exc}")
[docs]
def stopScheduler(self) -> None:
"""
Stop monitoring.
"""
if self.backgroundScheduler.running:
self.backgroundScheduler.shutdown()
self.logger.info("stop sending data to monitoring")
[docs]
def updateFlushingPeriod(self, newPeriod: int):
"""
Update flushing period
Args:
newPeriod: new period
"""
self._job.reschedule(trigger="interval", seconds=newPeriod)
self.flushingPeriod = newPeriod
[docs]
@abstractmethod
def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to the buffer.
Args:
points: points
"""
[docs]
@abstractmethod
def clearBuffer(self) -> Any:
"""Clear monitoring buffer and return it's content"""