Plugins

The service supports the system of plugins. Plugins must be written in the Python programming language.

Plugin types

There are three sorts of plugins:

  • On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.

    event type

    description

    monitoring_event

    Event contains monitoring points for sending to a custom monitoring system

    Monitoring plugin example:

    """
    Module request monitoring plugin example
    """
    
    import asyncio
    from abc import abstractmethod
    from typing import TypeVar
    
    from aiohttp import ClientSession
    from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin
    
    MonitoringPoint = TypeVar("MonitoringPoint")
    LunaApplication = TypeVar("LunaApplication")
    
    
    class BaseRequestMonitoringPlugin(BaseEventPlugin):
        """
        Base class for requests monitoring.
        """
    
        # event name for triggering callback
        eventName = "monitoring_event"
    
        @abstractmethod
        async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
            """
            All plugins must realize this method.
    
            This function call after end of request
    
            Args:
                point: point for monitoring
                logger: logger
            """
    
        async def handleEvent(self, points: list["MonitoringPoint"], logger):
            await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points])
    
    
    class RequestMonitoringPlugin(BaseRequestMonitoringPlugin):
        """
        Example plugin sends a request data for monitoring to third-party source.
        Only one instance of this class exist during the program execution.
        """
    
        def __init__(self, app: "LunaApplication"):
            super().__init__(app)
            self.url = "http://127.0.0.1:5020/1/buckets"
            self.session: ClientSession | None = None
            self.bucket = "plugin_test_bucket"
    
        async def close(self):
            """
            Stop plugin.
    
            Close all open connections and ect
            """
            if self.session:
                await self.session.close()
    
        async def initialize(self):
            """
            Initialize plugin.
    
            Close all open connections and ect
            """
            self.session = ClientSession()
            async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp:
                if resp.status not in (201, 409):
                    response = await resp.json()
                    raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}")
    
        async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None:
            """
            Callback for sending a request monitoring data.
    
            Args:
                point: point for monitoring
                logger: logger
            """
            logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}")
            msg = {"tags": point.tags, "fields": point.fields}
            async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp:
                logger.info(resp.status)
                logger.info(await resp.text())
    

    This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.

  • Background plugin. This sort of plugin is intended for background work.

    The background plugin can implement:

    • custom route

    • background monitoring of service resources

    • collaboration of an event plugin and a background plugin (batching monitoring points)

    • connection to other data sources (Redis, RabbitMQ) and their data processing

    Plugin example:

    """
    Module realizes background plugin example
    """
    
    import asyncio
    from asyncio import Task
    
    from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse
    
    
    class HandlerExample(BaseBackgroundHandler):
        """
        Handler example
        """
    
        async def get(self, request):  # pylint: disable=unused-argument
            """
            Method get example.
    
            Returns:
                response
            """
            return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418)
    
    
    def anotherHandlerExample(request):  # pylint: disable=unused-argument
        """Standalone handler example"""
        return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"})
    
    
    class BackgroundPluginExample(BaseBackgroundPlugin):
        """
        Background plugin example.
    
        Create background task and add a route.
        """
    
        def __init__(self, app: "LunaApplication"):
            super().__init__(app)
            self.task: Task | None = None
            self.temperature = 0
    
        async def initialize(self):
            """
            Initialize plugin
            """
            self.addRoute("/teapot", HandlerExample)
            self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"})
    
        async def close(self):
            """
            Stop background process
            Returns:
    
            """
            if self.task:
                self.task.cancel()
    
        async def usefulJob(self):
            """
            Some useful  async work
            """
            while True:
                await asyncio.sleep(1)
                self.temperature = min(100, self.temperature + 1)
                if self.temperature < 100:
                    self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}")
                else:
                    self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?")
    
        async def start(self):
            """
            Run background process
    
            .. warning::
                The function suppose that the process is handle in this coroutine. The coroutine must start
                the process only without awaiting a end of the process
            """
            self.task = asyncio.create_task(self.usefulJob())
    

    This plugin demonstrates background work and implements a route. All background plugins must implement the BaseBackgroundPlugin abstract class.

  • Estimator replacement plugin. This type of plugin allows replacing standard Luna SDK estimators with custom implementations without modifying the core service code. This enables integration of custom neural networks, alternative algorithms, or optimized implementations for specific use cases.

    Plugin example:

    import asyncio
    from pathlib import Path
    from typing import Literal
    
    import numpy as np
    import onnxruntime as ort
    from luna_plugins.estimator_plugins.estimator_replacement_plugin import EstimatorBase, EstimatorReplacementPlugin
    from lunavl.sdk.estimators.image_estimators.people_count import EstimationTargets, ImageForPeopleEstimation, PeopleCount
    from lunavl.sdk.image_utils.geometry import Rect, Vec2D
    from lunavl.sdk.image_utils.image import VLImage
    
    
    def setFutureResult(future, result, error):
        """Set result to asyncio future."""
        if not future.done():
            future.set_result((error, result))
    
    
    def ortCallback(result, waiter: tuple[asyncio.AbstractEventLoop, asyncio.Future], error: str):
        """Callback for onnx thread."""
        loop = waiter[0]
        future = waiter[1]
        loop.call_soon_threadsafe(setFutureResult, future, result, error)
    
    
    async def execute(output_names, input_feed, session: ort.InferenceSession, loop: asyncio.AbstractEventLoop):
        """Async execute onnx prediction using run_async.
    
        Args:
            output_names: name of the outputs
            input_feed: dictionary { input_name: numpy_array }
            session: ort session
            loop: current asyncio event loop
        Return:
            prediction result
        """
        future = loop.create_future()
        session.run_async(output_names=output_names, input_feed=input_feed, callback=ortCallback, user_data=(loop, future))
        error, res = await future
        if error:
            raise RuntimeError(f"ONNX inference error: {error}")
        return res
    
    
    class OnnxruntimeWrapper:
        """Wrapper for asynchronous ONNX Runtime execution."""
    
        def __init__(self, onnxSession):
            self.onnxSession = onnxSession
    
        async def forward(self, output_names, input_feed):
            """Execute inference asynchronously.
    
            Args:
                output_names: List of output names
                input_feed: Dict with numpy array inputs
    
            Returns:
                List of numpy arrays (outputs)
            """
            loop = asyncio.get_running_loop()
            return await execute(output_names, input_feed, self.onnxSession, loop)
    
    
    class Estimation:
        """Container for people counting estimation results.
    
        Holds the count of detected people and their coordinate positions.
    
        Attributes:
            count: The number of people detected in the image.
            points: Points object containing coordinate information.
        """
    
        class Points:
            """Container for point coordinates in an estimation.
    
            Attributes:
                coordinates: List of (x, y) coordinate tuples representing people positions.
            """
    
            def __init__(self, coordinates: list[tuple[int, int]]):
                """Initialize Points with coordinate data.
    
                Args:
                    coordinates: List of (x, y) tuples representing point locations.
                """
                self.coordinates = coordinates
    
            def getPoints(self):
                """Convert coordinates to Vec2D objects.
    
                Returns:
                    List of Vec2D objects representing the point coordinates.
                """
                return [Vec2D(x, y) for x, y in self.coordinates]
    
        def __init__(self, count: int, coordinates: list[tuple[int, int]]):
            """Initialize an Estimation with count and coordinates.
    
            Args:
                count: The number of people detected.
                coordinates: List of (x, y) tuples for each detected person.
            """
            self.count = count
            self.points = self.Points(coordinates)
    
    
    def extractCoordinatesFromDensityMap(
        densityMap: np.ndarray, imgHeight: int, imgWidth: int, count: int
    ) -> list[tuple[int, int]]:
        """Extract person coordinates from a density map.
    
        Finds the top peak locations in a density map and converts them to
        image coordinates scaled to the original image dimensions.
    
        Args:
            densityMap: The density map output from the model.
            imgHeight: Height of the original image.
            imgWidth: Width of the original image.
            count: Number of people to extract coordinates for.
    
        Returns:
            List of (x, y) tuples representing person locations in image coordinates.
        """
        if count == 0:
            return []
    
        # Remove batch and channel dimensions
        if densityMap.ndim == 4:
            densityMap = densityMap[0, 0]  # [height, width]
        elif densityMap.ndim == 3:
            densityMap = densityMap[0]  # [height, width]
    
        # Flatten and get top count positions
        flatMap = densityMap.flatten()
        topIndices = np.argsort(flatMap)[-count:]  # Get top 'count' values
    
        # Convert flat indices to 2D coordinates
        coordinates = []
        mapH, mapW = densityMap.shape
        imgH, imgW = imgHeight, imgWidth
    
        for idx in topIndices:
            yMap = idx // mapW
            xMap = idx % mapW
    
            # Scale coordinates from density map to original image size
            xImg = int((xMap / mapW) * imgW)
            yImg = int((yMap / mapH) * imgH)
    
            coordinates.append((xImg, yImg))
    
        return coordinates
    
    
    def adjustCoordsToOriginal(coords: list[tuple[int, int]], cropArea: Rect) -> list[tuple[int, int]]:
        """Adjust coordinates from cropped space back to original image space.
    
        Args:
            coords: List of (x, y) coordinates in cropped image space.
            cropArea: Rectangle defining the crop region offset.
    
        Returns:
            List of (x, y) coordinates adjusted to original image space.
        """
        offsetX, offsetY = int(cropArea.x), int(cropArea.y)
        return [(x + offsetX, y + offsetY) for x, y in coords]
    
    
    class ONNXPeopleCountEstimator(EstimatorBase):
        """ONNX-based people counting estimator using density map models.
    
        This estimator uses ONNX Runtime to perform inference with density map
        models like CSRNet or MCNN for counting people in images.
    
        Attributes:
            onnxWrapper: Async ONNX Runtime wrapper.
            session: ONNX Runtime inference session.
            input_name: Name of the model's input node.
            output_name: Name of the model's output node.
        """
    
        def __init__(self, modelPath: Path, useGpu: bool = False, gpuDeviceId: int = 0, maxWorkers: int = 4) -> None:
            """Initialize the ONNX people count estimator.
    
            Args:
                modelPath: Path to the ONNX model file.
                useGpu: Whether to use GPU acceleration.
                gpuDeviceId: GPU device ID to use if GPU is enabled.
                maxWorkers: Maximum number of worker threads for async operations.
    
            Raises:
                ImportError: If onnxruntime is not installed.
            """
            # Setup execution providers
            if useGpu:
                providers = [
                    (
                        "CUDAExecutionProvider",
                        {
                            "device_id": gpuDeviceId,
                            "arena_extend_strategy": "kNextPowerOfTwo",
                            "gpu_mem_limit": 2 * 1024 * 1024 * 1024,  # 2GB
                            "cudnn_conv_algo_search": "EXHAUSTIVE",
                            "do_copy_in_default_stream": True,
                        },
                    ),
                    "CPUExecutionProvider",
                ]
            else:
                providers = ["CPUExecutionProvider"]
    
            # Create ONNX Runtime session
            sess_options = ort.SessionOptions()
            sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
    
            self.session = ort.InferenceSession(
                str(modelPath),
                sess_options=sess_options,
                providers=providers,
            )
    
            # Get input/output names
            self.input_name = self.session.get_inputs()[0].name
            self.output_name = self.session.get_outputs()[0].name
    
            # Create async wrapper
            self.onnxWrapper = OnnxruntimeWrapper(self.session)
    
        def close(self):
            """Close the estimator and release resources."""
    
        def _postprocessModelOutput(
            self, modelOutput: np.ndarray, imgHeight: int, imgWidth: int, estimationTargets: EstimationTargets
        ) -> Estimation:
            """Postprocess the model output to extract count and coordinates.
    
            Args:
                modelOutput: Raw output from the density map model.
                imgHeight: Height of the processed image.
                imgWidth: Width of the processed image.
                estimationTargets: Target type (T1 with coordinates, T2 count only).
    
            Returns:
                Estimation object containing count and optional coordinates.
            """
            # For density map models (CSRNet, MCNN)
            # Sum all values in density map to get count
            count = int(np.sum(modelOutput))
            count = max(0, count)  # Ensure non-negative
    
            # Extract coordinates if requested (T1)
            if estimationTargets == EstimationTargets.T1:
                coordinates = extractCoordinatesFromDensityMap(modelOutput, imgHeight, imgWidth, count)
            else:
                # T2: only count, no coordinates
                coordinates = []
    
            return Estimation(count, coordinates)
    
        async def _estimate(
            self, image: VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect], estimationTargets: EstimationTargets
        ) -> PeopleCount:
            """Internal async method to estimate people count in a single image.
    
            Args:
                image: Input image in various supported formats.
                estimationTargets: Target type for estimation.
    
            Returns:
                PeopleCount object with estimation results.
    
            Raises:
                Exception: If image format is not supported.
            """
            if isinstance(image, VLImage):
                vlImage = image
                detectArea = None
            elif isinstance(image, ImageForPeopleEstimation):
                vlImage = image.image
                detectArea = image.detectArea
            elif (
                isinstance(image, tuple)
                and len(image) == 2
                and isinstance(image[0], VLImage)
                and isinstance(image[1], Rect)
            ):
                vlImage, detectArea = image
            else:
                raise ValueError("Unsupported image format")
    
            # Crop image if detection area is specified
            if detectArea is not None:
                processedImage = vlImage.crop(detectArea)
            else:
                processedImage = vlImage
    
            # Get image dimensions
            imgH, imgW = processedImage.rect.height, processedImage.rect.width
    
            # Convert to numpy array
            npArray = processedImage.asNPArray()
    
            # Run async inference
            input_feed = {self.input_name: npArray}
            outputs = await self.onnxWrapper.forward([self.output_name], input_feed)
    
            # Postprocess output to get count and coordinates
            estimation = self._postprocessModelOutput(outputs[0], imgH, imgW, estimationTargets)
    
            # Adjust coordinates back to original image space if cropped
            if detectArea is not None and estimation.points.coordinates:
                adjusted_coords = adjustCoordsToOriginal(estimation.points.coordinates, detectArea)
                estimation = Estimation(estimation.count, adjusted_coords)
    
            return PeopleCount(estimation)
    
        async def estimateBatch(
            self,
            images: list[VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect]],
            estimationTargets: EstimationTargets = EstimationTargets.T1,
            asyncEstimate: Literal[True] = True,
        ) -> list[PeopleCount]:
            """Estimate people count in multiple images asynchronously.
    
            Args:
                images: List of input images in various supported formats.
                estimationTargets: Target type (T1 with coordinates, T2 count only).
                    Defaults to T1.
                asyncEstimate: Must be True (only async mode supported).
    
            Returns:
                List of PeopleCount objects with estimation results.
            """
            print("estimate with ONNXPeopleCountEstimator: started")
            if not asyncEstimate:
                raise NotImplementedError("Synchronous estimation is not supported")
            # Process all images concurrently
            tasks = [self._estimate(image, estimationTargets) for image in images]
            print("estimate with ONNXPeopleCountEstimator: completed")
            return await asyncio.gather(*tasks)
    
    
    class PeopleCountReplacementPlugin(EstimatorReplacementPlugin):
        """Plugin for replacing the default people count estimator with ONNX implementation.
    
        This plugin integrates the ONNXPeopleCountEstimator into the Luna SDK,
        replacing the built-in estimator with a custom ONNX-based implementation.
        """
    
        def close(self) -> None: ...
    
        @property
        def estimatorBrokerName(self) -> str:
            """Get the system name of the estimator to replace.
    
            Returns:
                The name "peopleCountEstimator".
            """
            return "peopleCountEstimator"
    
        @property
        def modelDirPath(self) -> Path:
            """Get the directory containing the model file.
    
            Returns:
                Path to the directory containing this plugin file.
            """
            return Path(__file__).parent
    
        @property
        def modelName(self) -> str:
            """Get the model filename.
    
            Returns:
                The ONNX model filename.
            """
            return "csrnet_uint8.onnx"
    
        @property
        def modelPath(self) -> Path:
            """Get the full path to the model file.
    
            Returns:
                Complete path to the ONNX model file.
            """
            return self.modelDirPath / self.modelName
    
        @property
        def deviceClass(self) -> Literal["cpu", "gpu"]:
            """Get the device class for this estimator.
    
            Checks the configuration for people counter device settings,
            falling back to global device class if set to "global".
    
            Returns:
                The device class, either "cpu" or "gpu".
            """
            estimatorDeviceType = self.appConfig.estimatorsSettings.peopleCounter.deviceClass
            if estimatorDeviceType == "global":
                return self.globalDeviceClass
            return estimatorDeviceType
    
        def createReplacementEstimator(self) -> ONNXPeopleCountEstimator:
            """Create a new ONNX people count estimator instance.
    
            Returns:
                Configured ONNXPeopleCountEstimator instance.
            """
            return ONNXPeopleCountEstimator(
                modelPath=self.modelPath,
                useGpu=True if self.deviceClass == "gpu" else False,
                gpuDeviceId=self.defaultGpuDevice,
                maxWorkers=4,
            )
    

    This plugin demonstrates replacement of the people count estimator with a custom ONNX Runtime implementation. All estimator replacement plugins must extend the EstimatorReplacementPlugin abstract class.

    For detailed information about creating and integrating estimator replacement plugins, see:

Enable plugin

To enable a custom plugin:

  1. Add plugin files: Copy the plugin directory to the luna_remote_sdk/plugins directory of the service.

  2. Install dependencies: If the plugin has custom dependencies (listed in requirements.txt), they must be installed before enabling the plugin:

    pip install -r luna_remote_sdk/plugins/my_plugin/requirements.txt
    

    For containerized deployments, see the “Using plugins with containers” section below.

  3. Activate plugin: Add the plugin filename to the LUNA_REMOTE_SDK_ACTIVE_PLUGINS configuration setting.

  4. Restart service: Restart the service for changes to take effect.

Warning

Plugins with unmet dependencies will fail to load and may cause service startup errors. Always install plugin dependencies before activating the plugin in the configuration.

Using plugins with containers

If using containerized application, there are several ways to include the plugins in the container:

  1. Build your own image based on original one. See example Dockerfile below.

  2. Mount plugin files into the container by using –mount flag.

  3. Copy plugin files into the running container, commit changes to the new image (docker commit) and use newly created image.

FROM my-registry/luna-remote-sdk:v.x.x.x

# Copy plugin files
COPY --chown=1001:0 my_plugin ./luna_remote_sdk/plugins/my_plugin

# Install plugin dependencies (if requirements.txt exists)
RUN pip install --no-cache-dir -r ./luna_remote_sdk/plugins/my_plugin/requirements.txt

Warning

Every imported from plugin module not abstract class (which realised base plugin class) will be activated. Sometimes an unwary import may lead to import the same class twice. Don’t import plugin modules using top level modules (from plugins.plugin_name.foo import bar, use relative import .foo import bar if possible).