Source code for luna_accounts.crutches_on_wheels.cow.monitoring.manager

"""
Module implement base class for monitoring
"""
from collections.abc import Iterable
from typing import Generic, Literal, Protocol, TypeVar

from luna_plugins.base.manager import PluginManager

from ..utils import mixins
from ..utils.log import Logger
from .influx_adapter import BaseMonitoringAdapter, InfluxMonitoringAdapter, InfluxSettings
from .points import BaseMonitoringPoint

T_MONITORING_ADAPTER = TypeVar("T_MONITORING_ADAPTER", bound=BaseMonitoringAdapter)
T_MONITORING_POINT = TypeVar("T_MONITORING_POINT", bound=BaseMonitoringPoint)


logger = Logger("luna.monitoring")


[docs] class MonitoringSettings(Protocol): """ Monitoring settings protocol """
[docs] class InfluxCredentials: """ Monitoring credentials """ organization: str token: str bucket: str
sendData: Literal[0, 1] useSsl: Literal[0, 1] flushingPeriod: int host: str port: str credentials: InfluxCredentials
[docs] class LunaMonitoringManager(mixins.Initializable, Generic[T_MONITORING_ADAPTER, T_MONITORING_POINT]): """ Monitoring manager. Sends data to the monitoring storage and monitoring plugins. Attributes: settings: monitoring storage settings """ adapter: T_MONITORING_ADAPTER | None = None def __init__(self, settings: MonitoringSettings, pluginManager: PluginManager): self.settings = settings self.pluginManager = pluginManager
[docs] async def initialize(self) -> None: """ Initialize monitoring """ if not self.settings.sendData: return adapterSettings = InfluxSettings( url=f"{'https' if self.settings.useSsl else 'http'}://{self.settings.host}:{self.settings.port}", bucket=self.settings.credentials.bucket, organization=self.settings.credentials.organization, token=self.settings.credentials.token, ) self.adapter = InfluxMonitoringAdapter(settings=adapterSettings, flushingPeriod=self.settings.flushingPeriod) self.adapter.initializeMonitoring()
[docs] async def close(self) -> None: """ Stop monitoring. """ if self.adapter: await self.adapter.stopMonitoring() self.adapter = None
[docs] def flushPoints(self, points: Iterable[T_MONITORING_POINT]) -> None: """ Flush point to monitoring. Args: points: point """ if self.pluginManager: self.pluginManager.sendEventToPlugins("monitoring_event", points, logger) if self.adapter: self.adapter.addPointsToBuffer(points)