Plugins¶
The service supports the system of plugins. Plugins must be written in the Python programming language.
Plugin types¶
There are two sorts of plugins:
On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.
event type
description
monitoring_event
Event contains monitoring points for sending to a custom monitoring system
sending_event
Event contains handler-generated event data for sending to the third-party source
Monitoring plugin example:
""" Module request monitoring plugin example """ from abc import abstractmethod from logging import Logger from typing import List, Optional from aiohttp import ClientSession from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin class BaseRequestMonitoringPlugin(BaseEventPlugin): """ Base class for requests monitoring. """ # event name for triggering callback eventName = "monitoring_event" @abstractmethod async def flushPointToMonitoring(self, points: List["BaseRequestMonitoringPoint"], logger: Logger) -> None: """ All plugins must realize this method. This function call after end of request Args: points: points for monitoring which corresponding the request logger: logger """ async def handleEvent( # pylint: disable-msg=C0204,W0221 self, points: List["BaseRequestMonitoringPoint"], logger: Logger ): await self.flushPointToMonitoring(points=points, logger=logger) class RequestMonitoringPlugin(BaseRequestMonitoringPlugin): """ Example plugin sends a request data for monitoring to third-party source. Only one instance of this class exist during the program execution. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: Optional[ClientSession] = None self.bucket = "plugin_test_bucket" async def close(self): """ Stop plugin. Close all open connections and ect """ if self.session: await self.session.close() async def initialize(self): """ Initialize plugin. Close all open connections and ect """ self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def flushPointToMonitoring(self, points: List["BaseRequestMonitoringPoint"], logger: Logger) -> None: """ Callback for sending a request monitoring data. Args: points: point for monitoring which corresponding the request logger: logger """ if not points: return point = points[0] logger.debug( f"Plugin 'flushPointToMonitoring' get point, request_id: {point.requestId}, " f"start time: {point.eventTime}" ) msg = {"tags": point.tags, "fields": point.fields} async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.info(resp.status) logger.info(await resp.text())This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.
Event sending plugin example:
from abc import abstractmethod from typing import List, Optional, Union from aiohttp import ClientSession from classes.event import Event from classes.schemas.event_raw import RawEvent from crutches_on_wheels.cow.plugins.plugins_meta.base_plugins import BaseEventPlugin from crutches_on_wheels.cow.utils.log import Logger from crutches_on_wheels.cow.web.application import LunaApplication class BaseEventSendingPlugin(BaseEventPlugin): """Base class for event sending.""" # event name for triggering callback eventName = "sending_event" @abstractmethod async def sendEvents( self, events: Union[List[Event], List[RawEvent]], handlerId: str, accountId: str, requestId: str, createEventTime: str, endEventTime: str, logger: Logger, ) -> None: """ Callback that is triggered on every success request to handlers. Args: events: event list handlerId: handler id accountId: account id requestId: request id createEventTime: event creation time endEventTime: event end time logger: logger """ async def handleEvent( self, events: Union[List[Event], List[RawEvent]], handlerId: str, accountId: str, requestId: str, createEventTime: str, endEventTime: str, logger: Logger, ): """ Handle events. Args: events: event list handlerId: handler id accountId: account id requestId: request id createEventTime: event creation time endEventTime: event end time logger: logger """ await self.sendEvents(events, handlerId, accountId, requestId, createEventTime, endEventTime, logger) class EventSendingPlugin(BaseEventSendingPlugin): """Sends events to the third-party source. Only one instance of this class exist during the program execution.""" def __init__(self, app: LunaApplication): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: Optional[ClientSession] = None self.bucket = "plugin_test_bucket" async def close(self): """Stop plugin. Close all open connections and etc.""" if self.session: await self.session.close() async def initialize(self): """Initialize plugin.""" self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def sendEvents( self, events: Union[List[Event], List[RawEvent]], handlerId: str, accountId: str, requestId: str, createEventTime: str, endEventTime: str, logger: Logger, ) -> None: logger.debug( f"Plugin 'EventsOnFinishExampleClass' get events, request_id: {requestId}, " f"event_create_time: {createEventTime}, event_end_time: {endEventTime}" ) prepareEvents = [] for event in events: if isinstance(event, Event): serializationEvent = event.asDict() else: serializationEvent = event.asHandlerEventDict() prepareEvents.append(serializationEvent) msg = { "handler_id": handlerId, "account_id": accountId, "Luna-Request-Id": requestId, "events": prepareEvents, "event_create_time": createEventTime, "event_end_time": endEventTime, } async with ClientSession() as session: async with session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.debug(resp.status) logger.debug(await resp.text())This plugin demonstrates the sending of a handler generates event data to the third-party source. All event sending plugins must implement the BaseEventSendingPlugin abstract class.
Background plugin. This sort of plugin is intended for background work.
The background plugin can implement:
custom route
background monitoring of service resources
collaboration of an event plugin and a background plugin (batching monitoring points)
connection to other data sources (Redis, RabbitMQ) and their data processing
Plugin example:
""" Module realizes background plugin example """ import asyncio from asyncio import Task from typing import Optional from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundPlugin from sanic.response import HTTPResponse from ...web.handlers import BaseHandler class HandlerExample(BaseHandler): """ Handler example """ async def get(self) -> HTTPResponse: """ Method get example. Returns: response """ return self.success(body="I am teapot", contentType="text/plain", statusCode=418) @property def app(self): """ Get app Abstract method of `BaseHandler` Returns: app """ return self.request.app @property def config(self): """ Get app config Abstract method of `BaseHandler` Returns: app config """ return self.request.app.ctx.serviceConfig class BackgroundPluginExample(BaseBackgroundPlugin): """ Background plugin example. Create background task and add a route. """ def __init__(self, app: "LunaApplication"): super().__init__(app) app.router.reset() app.addRoutes([("/teapot", HandlerExample)]) app.finalize() self.task: Optional[Task] = None self.temperature = 0 async def initialize(self): """ Initialize plugin """ async def close(self): """ Stop background process Returns: """ if self.task: self.task.cancel() async def usefulJob(self): """ Some useful async work """ while True: await asyncio.sleep(1) self.temperature = min(100, self.temperature + 1) if self.temperature < 100: self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}") else: self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?") async def start(self): """ Run background process .. warning:: The function suppose that the process is handle in this coroutine. The coroutine must start the process only without awaiting a end of the process """ self.task = asyncio.create_task(self.usefulJob())This plugin demonstrates background work and implements a route. All background plugins must implement the BaseRequestMonitoringPlugin abstract class.
Enable plugin¶
If the user implements a plugin, the file with the plugin should be added to the luna_handlers/plugins directory of the service. The plugin filename should be added to the LUNA_HANDLERS_ACTIVE_PLUGINS configuration setting.
LUNA_HANDLERS_ACTIVE_PLUGINS = [event_sending_plugin_example]
List should contains filenames without extension (.py).