Source code for luna_handlers.crutches_on_wheels.cow.plugins.plugin_examples.request_monitoring_plugin_example

"""
Module request monitoring plugin example
"""
import asyncio
from abc import abstractmethod

from aiohttp import ClientSession
from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin


[docs] class BaseRequestMonitoringPlugin(BaseEventPlugin): """ Base class for requests monitoring. """ # event name for triggering callback eventName = "monitoring_event"
[docs] @abstractmethod async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ All plugins must realize this method. This function call after end of request Args: point: point for monitoring logger: logger """
[docs] async def handleEvent(self, points: list["MonitoringPoint"], logger): await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points])
[docs] class RequestMonitoringPlugin(BaseRequestMonitoringPlugin): """ Example plugin sends a request data for monitoring to third-party source. Only one instance of this class exist during the program execution. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: ClientSession | None = None self.bucket = "plugin_test_bucket"
[docs] async def close(self): """ Stop plugin. Close all open connections and ect """ if self.session: await self.session.close()
[docs] async def initialize(self): """ Initialize plugin. Close all open connections and ect """ self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}")
[docs] async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ Callback for sending a request monitoring data. Args: point: point for monitoring logger: logger """ logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}") msg = {"tags": point.tags, "fields": point.fields} async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.info(resp.status) logger.info(await resp.text())