Plugins¶
The service supports the system of plugins. Plugins must be written in the Python programming language.
Plugin types¶
There are two sorts of plugins:
On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.
event type
description
monitoring_event
Event contains monitoring points for sending to a custom monitoring system
Monitoring plugin example:
""" Module request monitoring plugin example """ import asyncio from abc import abstractmethod from typing import TypeVar from aiohttp import ClientSession from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin MonitoringPoint = TypeVar("MonitoringPoint") LunaApplication = TypeVar("LunaApplication") class BaseRequestMonitoringPlugin(BaseEventPlugin): """ Base class for requests monitoring. """ # event name for triggering callback eventName = "monitoring_event" @abstractmethod async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ All plugins must realize this method. This function call after end of request Args: point: point for monitoring logger: logger """ async def handleEvent(self, points: list["MonitoringPoint"], logger): await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points]) class RequestMonitoringPlugin(BaseRequestMonitoringPlugin): """ Example plugin sends a request data for monitoring to third-party source. Only one instance of this class exist during the program execution. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: ClientSession | None = None self.bucket = "plugin_test_bucket" async def close(self): """ Stop plugin. Close all open connections and ect """ if self.session: await self.session.close() async def initialize(self): """ Initialize plugin. Close all open connections and ect """ self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ Callback for sending a request monitoring data. Args: point: point for monitoring logger: logger """ logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}") msg = {"tags": point.tags, "fields": point.fields} async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.info(resp.status) logger.info(await resp.text())This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.
Background plugin. This sort of plugin is intended for background work.
The background plugin can implement:
custom route
background monitoring of service resources
collaboration of an event plugin and a background plugin (batching monitoring points)
connection to other data sources (Redis, RabbitMQ) and their data processing
Plugin example:
""" Module realizes background plugin example """ import asyncio from asyncio import Task from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse class HandlerExample(BaseBackgroundHandler): """ Handler example """ async def get(self, request): # pylint: disable=unused-argument """ Method get example. Returns: response """ return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418) def anotherHandlerExample(request): # pylint: disable=unused-argument """Standalone handler example""" return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"}) class BackgroundPluginExample(BaseBackgroundPlugin): """ Background plugin example. Create background task and add a route. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.task: Task | None = None self.temperature = 0 async def initialize(self): """ Initialize plugin """ self.addRoute("/teapot", HandlerExample) self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"}) async def close(self): """ Stop background process Returns: """ if self.task: self.task.cancel() async def usefulJob(self): """ Some useful async work """ while True: await asyncio.sleep(1) self.temperature = min(100, self.temperature + 1) if self.temperature < 100: self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}") else: self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?") async def start(self): """ Run background process .. warning:: The function suppose that the process is handle in this coroutine. The coroutine must start the process only without awaiting a end of the process """ self.task = asyncio.create_task(self.usefulJob())This plugin demonstrates background work and implements a route. All background plugins must implement the BaseRequestMonitoringPlugin abstract class.
Enable plugin¶
If the user implements a plugin, the file with the plugin should be added to the luna_remote_sdk/plugins directory of the service. The plugin filename should be added to the LUNA_REMOTE_SDK_ACTIVE_PLUGINS configuration setting.