Source code for luna_admin.crutches_on_wheels.monitoring.base_monitoring
"""
Module implement base class for monitoring
"""
from abc import ABC, abstractmethod
from typing import List, Iterable, Optional, TypeVar
from .influx_adapter import InfluxSettings, InfluxMonitoringAdapter
from .points import BaseMonitoringPoint
T_INFLUX_CREDENTIALS = TypeVar("T_INFLUX_CREDENTIALS")
[docs]class BaseLunaMonitoring(ABC):
"""
Base class for monitoring
"""
[docs] @abstractmethod
def flushPoints(self, points: List[BaseMonitoringPoint]) -> None:
"""
Flush point to monitoring.
Args:
points: point
"""
[docs]class LunaRequestInfluxMonitoring(BaseLunaMonitoring):
"""
Class for sending data which is associated with request to influx
Attributes:
settings (InfluxSettings): influxdb settings
flushingPeriod (int): period of flushing points (in seconds)
"""
adapter: Optional[InfluxMonitoringAdapter] = None
def __init__(
self,
credentials: T_INFLUX_CREDENTIALS,
host: str = "localhost",
port: int = 8086,
ssl: bool = False,
flushingPeriod: int = 1,
):
self.settings = self.prepareSettings(host=host, port=port, credentials=credentials, ssl=ssl)
self.flushingPeriod = flushingPeriod
[docs] @staticmethod
def prepareSettings(credentials: T_INFLUX_CREDENTIALS, host: str, port: int, ssl: bool,) -> InfluxSettings:
"""
Prepare influxdb settings
Args:
credentials: database credentials
host: influx host
port: influx port
ssl: use or not ssl for connecting to influx
Returns:
influxdb settings container
"""
scheme = "https" if ssl else "http"
return InfluxSettings(
url=f"{scheme}://{host}:{port}",
bucket=credentials.bucket,
organization=credentials.organization,
token=credentials.token,
ssl=ssl,
)
def _initializeAdapter(self, settings: InfluxSettings, flushingPeriod: int = 1) -> None:
"""
Initialize adapter
Args:
settings: influxdb settings
flushingPeriod: period of flushing points (in seconds)
"""
self.adapter = InfluxMonitoringAdapter(settings=settings, flushingPeriod=flushingPeriod)
[docs] def initializeMonitoring(self) -> None:
"""
Initialize monitoring
"""
self._initializeAdapter(settings=self.settings, flushingPeriod=self.flushingPeriod)
self.adapter.initializeMonitoring()
[docs] async def stopMonitoring(self) -> None:
"""
Stop monitoring.
"""
self.adapter.stopMonitoring()
self.adapter = None
[docs] def flushPoints(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Flush point to influx.
Args:
points: point
"""
self.adapter.addPointsToBuffer(points)