Plugins¶
The service supports the system of plugins. Plugins must be written in the Python programming language.
Plugin types¶
There are two sorts of plugins:
On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.
event type
description
monitoring_event
Event contains monitoring points for sending to a custom monitoring system
sending_event
Event contains handler-generated event data for sending to the third-party source
Monitoring plugin example:
""" Module request monitoring plugin example """ import asyncio from abc import abstractmethod from aiohttp import ClientSession from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin class BaseRequestMonitoringPlugin(BaseEventPlugin): """ Base class for requests monitoring. """ # event name for triggering callback eventName = "monitoring_event" @abstractmethod async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ All plugins must realize this method. This function call after end of request Args: point: point for monitoring logger: logger """ async def handleEvent(self, points: list["MonitoringPoint"], logger): await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points]) class RequestMonitoringPlugin(BaseRequestMonitoringPlugin): """ Example plugin sends a request data for monitoring to third-party source. Only one instance of this class exist during the program execution. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: ClientSession | None = None self.bucket = "plugin_test_bucket" async def close(self): """ Stop plugin. Close all open connections and ect """ if self.session: await self.session.close() async def initialize(self): """ Initialize plugin. Close all open connections and ect """ self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ Callback for sending a request monitoring data. Args: point: point for monitoring logger: logger """ logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}") msg = {"tags": point.tags, "fields": point.fields} async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.info(resp.status) logger.info(await resp.text())This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.
Event sending plugin example:
from aiohttp import ClientSession from log import Logger from luna_plugins.base.plugins_meta.base_plugins import T_LUNA_APPLICATION from luna_plugins.handler_event.base_plugin import BaseEventSendingPlugin, Event, RawEvent class EventSendingPlugin(BaseEventSendingPlugin): """Sends events to the third-party source. Only one instance of this class exist during the program execution.""" def __init__(self, app: T_LUNA_APPLICATION): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: ClientSession | None = None self.bucket = "plugin_test_bucket" async def close(self): """Stop plugin. Close all open connections and etc.""" if self.session: await self.session.close() async def initialize(self): """Initialize plugin.""" self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def sendEvents( self, events: list[Event | RawEvent], handlerId: str, accountId: str, requestId: str, createEventTime: str, endEventTime: str, logger: Logger, *args, **kwargs, ) -> None: logger.debug( f"Plugin 'EventsOnFinishExampleClass' get events, request_id: {requestId}, " f"event_create_time: {createEventTime}, event_end_time: {endEventTime}" ) prepareEvents = [] for event in events: if isinstance(event, Event): serializationEvent = event.asDict() else: serializationEvent = event.asHandlerEventDict() prepareEvents.append(serializationEvent) msg = { "handler_id": handlerId, "account_id": accountId, "Luna-Request-Id": requestId, "events": prepareEvents, "event_create_time": createEventTime, "event_end_time": endEventTime, } async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.debug(resp.status) logger.debug(await resp.text())This plugin demonstrates the sending of a handler generates event data to the third-party source. All event sending plugins must implement the BaseEventSendingPlugin abstract class.
Background plugin. This sort of plugin is intended for background work.
The background plugin can implement:
custom route
background monitoring of service resources
collaboration of an event plugin and a background plugin (batching monitoring points)
connection to other data sources (Redis, RabbitMQ) and their data processing
Plugin example:
""" Module realizes background plugin example """ import asyncio from asyncio import Task from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse class HandlerExample(BaseBackgroundHandler): """ Handler example """ async def get(self, request): # pylint: disable=unused-argument """ Method get example. Returns: response """ return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418) def anotherHandlerExample(request): # pylint: disable=unused-argument """Standalone handler example""" return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"}) class BackgroundPluginExample(BaseBackgroundPlugin): """ Background plugin example. Create background task and add a route. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.task: Task | None = None self.temperature = 0 async def initialize(self): """ Initialize plugin """ self.addRoute("/teapot", HandlerExample) self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"}) async def close(self): """ Stop background process Returns: """ if self.task: self.task.cancel() async def usefulJob(self): """ Some useful async work """ while True: await asyncio.sleep(1) self.temperature = min(100, self.temperature + 1) if self.temperature < 100: self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}") else: self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?") async def start(self): """ Run background process .. warning:: The function suppose that the process is handle in this coroutine. The coroutine must start the process only without awaiting a end of the process """ self.task = asyncio.create_task(self.usefulJob())This plugin demonstrates background work and implements a route. All background plugins must implement the BaseRequestMonitoringPlugin abstract class.
Enable plugin¶
If the user implements a plugin, the file with the plugin should be added to the luna_handlers/plugins directory of the service. The plugin filename should be added to the LUNA_HANDLERS_ACTIVE_PLUGINS configuration setting.
LUNA_HANDLERS_ACTIVE_PLUGINS = [event_sending_plugin_example]
List should contains filenames without extension (.py).