Source code for luna_licenses.crutches_on_wheels.cow.monitoring.manager
"""
Module implement base class for monitoring
"""
from collections.abc import Iterable
from typing import Generic, Literal, Protocol, TypeVar
from luna_plugins.base.manager import PluginManager
from ..utils import mixins
from ..utils.log import Logger
from .influx_adapter import BaseMonitoringAdapter, InfluxMonitoringAdapter, InfluxSettings
from .points import BaseMonitoringPoint
T_MONITORING_ADAPTER = TypeVar("T_MONITORING_ADAPTER", bound=BaseMonitoringAdapter)
T_MONITORING_POINT = TypeVar("T_MONITORING_POINT", bound=BaseMonitoringPoint)
logger = Logger("luna.monitoring")
[docs]
class MonitoringSettings(Protocol):
"""
Monitoring settings protocol
"""
[docs]
class InfluxCredentials:
"""
Monitoring credentials
"""
organization: str
token: str
bucket: str
sendData: Literal[0, 1]
useSsl: Literal[0, 1]
flushingPeriod: int
host: str
port: str
credentials: InfluxCredentials
[docs]
class LunaMonitoringManager(mixins.Initializable, Generic[T_MONITORING_ADAPTER, T_MONITORING_POINT]):
"""
Monitoring manager. Sends data to the monitoring storage and monitoring plugins.
Attributes:
settings: monitoring storage settings
"""
adapter: T_MONITORING_ADAPTER | None = None
def __init__(self, settings: MonitoringSettings, pluginManager: PluginManager):
self.settings = settings
self.pluginManager = pluginManager
[docs]
async def initialize(self) -> None:
"""
Initialize monitoring
"""
if not self.settings.sendData:
return
adapterSettings = InfluxSettings(
url=f"{'https' if self.settings.useSsl else 'http'}://{self.settings.host}:{self.settings.port}",
bucket=self.settings.credentials.bucket,
organization=self.settings.credentials.organization,
token=self.settings.credentials.token,
)
self.adapter = InfluxMonitoringAdapter(settings=adapterSettings, flushingPeriod=self.settings.flushingPeriod)
self.adapter.initializeMonitoring()
[docs]
async def close(self) -> None:
"""
Stop monitoring.
"""
if self.adapter:
await self.adapter.stopMonitoring()
self.adapter = None
[docs]
def flushPoints(self, points: Iterable[T_MONITORING_POINT]) -> None:
"""
Flush point to monitoring.
Args:
points: point
"""
if self.pluginManager:
self.pluginManager.sendEventToPlugins("monitoring_event", points, logger)
if self.adapter:
self.adapter.addPointsToBuffer(points)