Source code for luna_faces.crutches_on_wheels.cow.monitoring.manager
"""
Module implement base class for monitoring
"""
import os
from collections.abc import Iterable
from typing import Literal, Protocol
from luna_plugins.base.manager import PluginManager
from ..utils import mixins
from ..utils.check_connection import checkConnectionToInflux
from ..utils.log import Logger
from .base_adapter import BaseMonitoringAdapter
from .clickhouse_adapter import ClickhouseMonitoringAdaptor, ClickhouseSettings
from .influx_adapter import InfluxMonitoringAdapter, InfluxSettings
from .points import BaseMonitoringPoint
logger = Logger("luna.monitoring")
MONITORING_TYPE = os.getenv("MONITORING_TYPE", "influx")
[docs]
class MonitoringSettings(Protocol):
"""
Monitoring settings protocol
"""
[docs]
class InfluxCredentials:
"""
Monitoring credentials
"""
organization: str
token: str
bucket: str
sendData: Literal[0, 1]
useSsl: Literal[0, 1]
flushingPeriod: int
host: str
port: int
credentials: InfluxCredentials
[docs]
class LunaMonitoringManager[TAdaptor: BaseMonitoringAdapter, TPoint: BaseMonitoringPoint](mixins.Initializable):
"""
Monitoring manager. Sends data to the monitoring storage and monitoring plugins.
Attributes:
settings: monitoring storage settings
"""
adapter: TAdaptor | None = None
def __init__(self, settings: MonitoringSettings, name: str, pluginManager: PluginManager):
self.settings = settings
self.serviceName = name
self.pluginManager = pluginManager
[docs]
async def initialize(self) -> None:
"""
Initialize monitoring
"""
if not self.settings.sendData:
return
if MONITORING_TYPE == "influx":
adapterSettings = InfluxSettings(
url=f"{'https' if self.settings.useSsl else 'http'}://{self.settings.host}:{self.settings.port}",
bucket=self.settings.credentials.bucket,
organization=self.settings.credentials.organization,
token=self.settings.credentials.token,
)
self.adapter = InfluxMonitoringAdapter(
settings=adapterSettings, flushingPeriod=self.settings.flushingPeriod
)
elif MONITORING_TYPE == "clickhouse":
self.adapter = ClickhouseMonitoringAdaptor(
ClickhouseSettings(
url=os.getenv("CLICKHOUSE_ADDRESS", "http://127.0.0.1:8123"),
user=os.getenv("CLICKHOUSE_USER", "luna"),
password=None,
database=os.getenv("CLICKHOUSE_DB", "luna_monitoring"),
),
flushingPeriod=self.settings.flushingPeriod,
)
else:
raise ValueError(f"Bad monitoring type {MONITORING_TYPE}")
await self.adapter.initializeMonitoring()
[docs]
async def close(self) -> None:
"""
Stop monitoring.
"""
if self.adapter:
await self.adapter.stopMonitoring()
self.adapter = None
[docs]
def flushPoints(self, points: Iterable[TPoint]) -> None:
"""
Flush point to monitoring.
Args:
points: point
"""
if self.pluginManager:
self.pluginManager.sendEventToPlugins("monitoring_event", points, logger)
if self.adapter:
self.adapter.addPointsToBuffer(points)
[docs]
async def probe(self):
"""
Check that we can connect to the monitoring service with current configuration.
Can be used without initialization
"""
if self.settings.sendData:
return await checkConnectionToInflux(
host=self.settings.host,
port=self.settings.port,
bucket=self.settings.credentials.bucket,
organization=self.settings.credentials.organization,
token=self.settings.credentials.token,
ssl=bool(self.settings.useSsl),
asyncCheck=True,
)
return True