Source code for luna_remote_sdk.crutches_on_wheels.cow.plugins.plugin_examples.request_monitoring_plugin_example
"""
Module request monitoring plugin example
"""
import asyncio
from abc import abstractmethod
from aiohttp import ClientSession
from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin
[docs]
class BaseRequestMonitoringPlugin(BaseEventPlugin):
"""
Base class for requests monitoring.
"""
# event name for triggering callback
eventName = "monitoring_event"
[docs]
@abstractmethod
async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
"""
All plugins must realize this method.
This function call after end of request
Args:
point: point for monitoring
logger: logger
"""
[docs]
async def handleEvent(self, points: list["MonitoringPoint"], logger):
await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points])
[docs]
class RequestMonitoringPlugin(BaseRequestMonitoringPlugin):
"""
Example plugin sends a request data for monitoring to third-party source.
Only one instance of this class exist during the program execution.
"""
def __init__(self, app: "LunaApplication"):
super().__init__(app)
self.url = "http://127.0.0.1:5020/1/buckets"
self.session: ClientSession | None = None
self.bucket = "plugin_test_bucket"
[docs]
async def close(self):
"""
Stop plugin.
Close all open connections and ect
"""
if self.session:
await self.session.close()
[docs]
async def initialize(self):
"""
Initialize plugin.
Close all open connections and ect
"""
self.session = ClientSession()
async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp:
if resp.status not in (201, 409):
response = await resp.json()
raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}")
[docs]
async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
"""
Callback for sending a request monitoring data.
Args:
point: point for monitoring
logger: logger
"""
logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}")
msg = {"tags": point.tags, "fields": point.fields}
async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp:
logger.info(resp.status)
logger.info(await resp.text())