Source code for luna_tasks.crutches_on_wheels.monitoring.base_monitoring

"""
Module implement base class for monitoring
"""
from abc import ABC, abstractmethod
from typing import List, Iterable, Optional, TypeVar

from .influx_adapter import InfluxSettings, InfluxMonitoringAdapter
from .points import BaseMonitoringPoint

T_INFLUX_CREDENTIALS = TypeVar("T_INFLUX_CREDENTIALS")


[docs]class BaseLunaMonitoring(ABC): """ Base class for monitoring """
[docs] @abstractmethod def flushPoints(self, points: List[BaseMonitoringPoint]) -> None: """ Flush point to monitoring. Args: points: point """
[docs]class LunaRequestInfluxMonitoring(BaseLunaMonitoring): """ Class for sending data which is associated with request to influx Attributes: settings (InfluxSettings): influxdb settings flushingPeriod (int): period of flushing points (in seconds) """ adapter: Optional[InfluxMonitoringAdapter] = None def __init__( self, credentials: T_INFLUX_CREDENTIALS, host: str = "localhost", port: int = 8086, ssl: bool = False, flushingPeriod: int = 1, ): self.settings = self.prepareSettings(host=host, port=port, credentials=credentials, ssl=ssl) self.flushingPeriod = flushingPeriod
[docs] @staticmethod def prepareSettings(credentials: T_INFLUX_CREDENTIALS, host: str, port: int, ssl: bool,) -> InfluxSettings: """ Prepare influxdb settings Args: credentials: database credentials host: influx host port: influx port ssl: use or not ssl for connecting to influx Returns: influxdb settings container """ scheme = "https" if ssl else "http" return InfluxSettings( url=f"{scheme}://{host}:{port}", bucket=credentials.bucket, organization=credentials.organization, token=credentials.token, ssl=ssl, )
def _initializeAdapter(self, settings: InfluxSettings, flushingPeriod: int = 1) -> None: """ Initialize adapter Args: settings: influxdb settings flushingPeriod: period of flushing points (in seconds) """ self.adapter = InfluxMonitoringAdapter(settings=settings, flushingPeriod=flushingPeriod)
[docs] def initializeMonitoring(self) -> None: """ Initialize monitoring """ self._initializeAdapter(settings=self.settings, flushingPeriod=self.flushingPeriod) self.adapter.initializeMonitoring()
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring. """ self.adapter.stopMonitoring() self.adapter = None
[docs] def flushPoints(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Flush point to influx. Args: points: point """ self.adapter.addPointsToBuffer(points)