Source code for luna_python_matcher.crutches_on_wheels.cow.monitoring.manager

"""
Module implement base class for monitoring
"""

import os
from collections.abc import Iterable
from typing import Literal, Protocol

from luna_plugins.base.manager import PluginManager

from ..utils import mixins
from ..utils.check_connection import checkConnectionToInflux
from ..utils.log import Logger
from .base_adapter import BaseMonitoringAdapter
from .clickhouse_adapter import ClickhouseMonitoringAdaptor, ClickhouseSettings
from .influx_adapter import InfluxMonitoringAdapter, InfluxSettings
from .points import BaseMonitoringPoint

logger = Logger("luna.monitoring")

MONITORING_TYPE = os.getenv("MONITORING_TYPE", "influx")


[docs] class MonitoringSettings(Protocol): """ Monitoring settings protocol """
[docs] class InfluxCredentials: """ Monitoring credentials """ organization: str token: str bucket: str
sendData: Literal[0, 1] useSsl: Literal[0, 1] flushingPeriod: int host: str port: int credentials: InfluxCredentials
[docs] class LunaMonitoringManager[TAdaptor: BaseMonitoringAdapter, TPoint: BaseMonitoringPoint](mixins.Initializable): """ Monitoring manager. Sends data to the monitoring storage and monitoring plugins. Attributes: settings: monitoring storage settings """ adapter: TAdaptor | None = None def __init__(self, settings: MonitoringSettings, name: str, pluginManager: PluginManager): self.settings = settings self.serviceName = name self.pluginManager = pluginManager
[docs] async def initialize(self) -> None: """ Initialize monitoring """ if not self.settings.sendData: return if MONITORING_TYPE == "influx": adapterSettings = InfluxSettings( url=f"{'https' if self.settings.useSsl else 'http'}://{self.settings.host}:{self.settings.port}", bucket=self.settings.credentials.bucket, organization=self.settings.credentials.organization, token=self.settings.credentials.token, ) self.adapter = InfluxMonitoringAdapter( settings=adapterSettings, flushingPeriod=self.settings.flushingPeriod ) elif MONITORING_TYPE == "clickhouse": self.adapter = ClickhouseMonitoringAdaptor( ClickhouseSettings( url=os.getenv("CLICKHOUSE_ADDRESS", "http://127.0.0.1:8123"), user=os.getenv("CLICKHOUSE_USER", "luna"), password=None, database=os.getenv("CLICKHOUSE_DB", "luna_monitoring"), ), flushingPeriod=self.settings.flushingPeriod, ) else: raise ValueError(f"Bad monitoring type {MONITORING_TYPE}") await self.adapter.initializeMonitoring()
[docs] async def close(self) -> None: """ Stop monitoring. """ if self.adapter: await self.adapter.stopMonitoring() self.adapter = None
[docs] def flushPoints(self, points: Iterable[TPoint]) -> None: """ Flush point to monitoring. Args: points: point """ if self.pluginManager: self.pluginManager.sendEventToPlugins("monitoring_event", points, logger) if self.adapter: self.adapter.addPointsToBuffer(points)
[docs] async def probe(self): """ Check that we can connect to the monitoring service with current configuration. Can be used without initialization """ if self.settings.sendData: return await checkConnectionToInflux( host=self.settings.host, port=self.settings.port, bucket=self.settings.credentials.bucket, organization=self.settings.credentials.organization, token=self.settings.credentials.token, ssl=bool(self.settings.useSsl), asyncCheck=True, ) return True