Plugins

The service supports the system of plugins. Plugins must be written in the Python programming language.

Plugin types

There are two sorts of plugins:

  • On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.

    event type

    description

    monitoring_event

    Event contains monitoring points for sending to a custom monitoring system

    sending_event

    Event contains handler-generated event data for sending to the third-party source

    Monitoring plugin example:

    """
    Module request monitoring plugin example
    """
    import asyncio
    from abc import abstractmethod
    
    from aiohttp import ClientSession
    from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin
    
    
    class BaseRequestMonitoringPlugin(BaseEventPlugin):
        """
        Base class for requests monitoring.
        """
    
        # event name for triggering callback
        eventName = "monitoring_event"
    
        @abstractmethod
        async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
            """
            All plugins must realize this method.
    
            This function call after end of request
    
            Args:
                point: point for monitoring
                logger: logger
            """
    
        async def handleEvent(self, points: list["MonitoringPoint"], logger):
            await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points])
    
    
    class RequestMonitoringPlugin(BaseRequestMonitoringPlugin):
        """
        Example plugin sends a request data for monitoring to third-party source.
        Only one instance of this class exist during the program execution.
        """
    
        def __init__(self, app: "LunaApplication"):
            super().__init__(app)
            self.url = "http://127.0.0.1:5020/1/buckets"
            self.session: ClientSession | None = None
            self.bucket = "plugin_test_bucket"
    
        async def close(self):
            """
            Stop plugin.
    
            Close all open connections and ect
            """
            if self.session:
                await self.session.close()
    
        async def initialize(self):
            """
            Initialize plugin.
    
            Close all open connections and ect
            """
            self.session = ClientSession()
            async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp:
                if resp.status not in (201, 409):
                    response = await resp.json()
                    raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}")
    
        async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
            """
            Callback for sending a request monitoring data.
    
            Args:
                point: point for monitoring
                logger: logger
            """
            logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}")
            msg = {"tags": point.tags, "fields": point.fields}
            async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp:
                logger.info(resp.status)
                logger.info(await resp.text())
    

    This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.

    Event sending plugin example:

    from abc import abstractmethod
    from typing import List, Optional, Union
    
    from aiohttp import ClientSession
    
    from classes.event import Event
    from classes.schemas.event_raw import RawEvent
    from crutches_on_wheels.cow.plugins.plugins_meta.base_plugins import BaseEventPlugin
    from crutches_on_wheels.cow.utils.log import Logger
    from crutches_on_wheels.cow.web.application import LunaApplication
    
    
    class BaseEventSendingPlugin(BaseEventPlugin):
        """Base class for event sending."""
    
        # event name for triggering callback
        eventName = "sending_event"
    
        @abstractmethod
        async def sendEvents(
            self,
            events: Union[List[Event], List[RawEvent]],
            handlerId: str,
            accountId: str,
            requestId: str,
            createEventTime: str,
            endEventTime: str,
            logger: Logger,
        ) -> None:
            """
            Callback that is triggered on every success request to handlers.
    
            Args:
                events: event list
                handlerId: handler id
                accountId: account id
                requestId: request id
                createEventTime: event creation time
                endEventTime: event end time
                logger: logger
    
            """
    
        async def handleEvent(
            self,
            events: Union[List[Event], List[RawEvent]],
            handlerId: str,
            accountId: str,
            requestId: str,
            createEventTime: str,
            endEventTime: str,
            logger: Logger,
        ):
            """
            Handle events.
            Args:
                events: event list
                handlerId: handler id
                accountId: account id
                requestId: request id
                createEventTime: event creation time
                endEventTime: event end time
                logger: logger
    
            """
            await self.sendEvents(events, handlerId, accountId, requestId, createEventTime, endEventTime, logger)
    
    
    class EventSendingPlugin(BaseEventSendingPlugin):
        """Sends events to the third-party source. Only one instance of this class exist during the program execution."""
    
        def __init__(self, app: LunaApplication):
            super().__init__(app)
            self.url = "http://127.0.0.1:5020/1/buckets"
            self.session: Optional[ClientSession] = None
            self.bucket = "plugin_test_bucket"
    
        async def close(self):
            """Stop plugin. Close all open connections and etc."""
            if self.session:
                await self.session.close()
    
        async def initialize(self):
            """Initialize plugin."""
            self.session = ClientSession()
            async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp:
                if resp.status not in (201, 409):
                    response = await resp.json()
                    raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}")
    
        async def sendEvents(
            self,
            events: Union[List[Event], List[RawEvent]],
            handlerId: str,
            accountId: str,
            requestId: str,
            createEventTime: str,
            endEventTime: str,
            logger: Logger,
        ) -> None:
            logger.debug(
                f"Plugin 'EventsOnFinishExampleClass' get events, request_id: {requestId}, "
                f"event_create_time: {createEventTime}, event_end_time: {endEventTime}"
            )
            prepareEvents = []
            for event in events:
                if isinstance(event, Event):
                    serializationEvent = event.asDict()
                else:
                    serializationEvent = event.asHandlerEventDict()
                prepareEvents.append(serializationEvent)
            msg = {
                "handler_id": handlerId,
                "account_id": accountId,
                "Luna-Request-Id": requestId,
                "events": prepareEvents,
                "event_create_time": createEventTime,
                "event_end_time": endEventTime,
            }
            async with ClientSession() as session:
                async with session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp:
                    logger.debug(resp.status)
                    logger.debug(await resp.text())
    

    This plugin demonstrates the sending of a handler generates event data to the third-party source. All event sending plugins must implement the BaseEventSendingPlugin abstract class.

  • Background plugin. This sort of plugin is intended for background work.

    The background plugin can implement:

    • custom route

    • background monitoring of service resources

    • collaboration of an event plugin and a background plugin (batching monitoring points)

    • connection to other data sources (Redis, RabbitMQ) and their data processing

    Plugin example:

    """
    Module realizes background plugin example
    """
    import asyncio
    from asyncio import Task
    
    from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse
    
    
    class HandlerExample(BaseBackgroundHandler):
        """
        Handler example
        """
    
        async def get(self, request):  # pylint: disable=unused-argument
            """
            Method get example.
    
            Returns:
                response
            """
            return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418)
    
    
    def anotherHandlerExample(request):  # pylint: disable=unused-argument
        """Standalone handler example"""
        return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"})
    
    
    class BackgroundPluginExample(BaseBackgroundPlugin):
        """
        Background plugin example.
    
        Create background task and add a route.
        """
    
        def __init__(self, app: "LunaApplication"):
            super().__init__(app)
            self.task: Task | None = None
            self.temperature = 0
    
        async def initialize(self):
            """
            Initialize plugin
            """
            self.addRoute("/teapot", HandlerExample)
            self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"})
    
        async def close(self):
            """
            Stop background process
            Returns:
    
            """
            if self.task:
                self.task.cancel()
    
        async def usefulJob(self):
            """
            Some useful  async work
            """
            while True:
                await asyncio.sleep(1)
                self.temperature = min(100, self.temperature + 1)
                if self.temperature < 100:
                    self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}")
                else:
                    self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?")
    
        async def start(self):
            """
            Run background process
    
            .. warning::
                The function suppose that the process is handle in this coroutine. The coroutine must start
                the process only without awaiting a end of the process
            """
            self.task = asyncio.create_task(self.usefulJob())
    

    This plugin demonstrates background work and implements a route. All background plugins must implement the BaseRequestMonitoringPlugin abstract class.

Enable plugin

If the user implements a plugin, the file with the plugin should be added to the luna_handlers/plugins directory of the service. The plugin filename should be added to the LUNA_HANDLERS_ACTIVE_PLUGINS configuration setting.

LUNA_HANDLERS_ACTIVE_PLUGINS = [event_sending_plugin_example]

List should contains filenames without extension (.py).