Plugins¶
The service supports the system of plugins. Plugins must be written in the Python programming language.
Plugin types¶
There are three sorts of plugins:
On event plugin. The plugin is triggered when an event occurs. The plugin should implement a callback function. This function is called on each event of the corresponding type. The set of event types is defined by the service developers.
event type
description
monitoring_event
Event contains monitoring points for sending to a custom monitoring system
Monitoring plugin example:
""" Module request monitoring plugin example """ import asyncio from abc import abstractmethod from typing import TypeVar from aiohttp import ClientSession from luna_plugins.base.plugins_meta.base_plugins import BaseEventPlugin MonitoringPoint = TypeVar("MonitoringPoint") LunaApplication = TypeVar("LunaApplication") class BaseRequestMonitoringPlugin(BaseEventPlugin): """ Base class for requests monitoring. """ # event name for triggering callback eventName = "monitoring_event" @abstractmethod async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ All plugins must realize this method. This function call after end of request Args: point: point for monitoring logger: logger """ async def handleEvent(self, points: list["MonitoringPoint"], logger): await asyncio.gather(*[self.flushPointToMonitoring(point, logger) for point in points]) class RequestMonitoringPlugin(BaseRequestMonitoringPlugin): """ Example plugin sends a request data for monitoring to third-party source. Only one instance of this class exist during the program execution. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.url = "http://127.0.0.1:5020/1/buckets" self.session: ClientSession | None = None self.bucket = "plugin_test_bucket" async def close(self): """ Stop plugin. Close all open connections and ect """ if self.session: await self.session.close() async def initialize(self): """ Initialize plugin. Close all open connections and ect """ self.session = ClientSession() async with self.session.post(f"{self.url}?bucket={self.bucket}") as resp: if resp.status not in (201, 409): response = await resp.json() raise RuntimeError(f"failed create bucket, {self.bucket}, response: {response}") async def flushPointToMonitoring(self, point: "MonitoringPoint", logger) -> None: """ Callback for sending a request monitoring data. Args: point: point for monitoring logger: logger """ logger.debug(f"Plugin 'flushPointToMonitoring' get point, series: {point.series}, time: {point.eventTime}") msg = {"tags": point.tags, "fields": point.fields} async with self.session.post(f"{self.url}/{self.bucket}/objects", json=msg) as resp: logger.info(resp.status) logger.info(await resp.text())This plugin demonstrates the sending of a request monitoring data to another service. All monitoring plugins must implement the BaseRequestMonitoringPlugin abstract class.
Background plugin. This sort of plugin is intended for background work.
The background plugin can implement:
custom route
background monitoring of service resources
collaboration of an event plugin and a background plugin (batching monitoring points)
connection to other data sources (Redis, RabbitMQ) and their data processing
Plugin example:
""" Module realizes background plugin example """ import asyncio from asyncio import Task from luna_plugins.base.plugins_meta.base_plugins import BaseBackgroundHandler, BaseBackgroundPlugin, pluginHTTPResponse class HandlerExample(BaseBackgroundHandler): """ Handler example """ async def get(self, request): # pylint: disable=unused-argument """ Method get example. Returns: response """ return self.response(body="I am teapot", headers={"Content-Type": "text/plain"}, statusCode=418) def anotherHandlerExample(request): # pylint: disable=unused-argument """Standalone handler example""" return pluginHTTPResponse(statusCode=200, body="T800", headers={"Content-Type": "text/plain"}) class BackgroundPluginExample(BaseBackgroundPlugin): """ Background plugin example. Create background task and add a route. """ def __init__(self, app: "LunaApplication"): super().__init__(app) self.task: Task | None = None self.temperature = 0 async def initialize(self): """ Initialize plugin """ self.addRoute("/teapot", HandlerExample) self.addRoute("/teapot/version", anotherHandlerExample, methods={"get"}) async def close(self): """ Stop background process Returns: """ if self.task: self.task.cancel() async def usefulJob(self): """ Some useful async work """ while True: await asyncio.sleep(1) self.temperature = min(100, self.temperature + 1) if self.temperature < 100: self.app.ctx.logger.info(f"I boil water, temperature: {self.temperature}") else: self.app.ctx.logger.info("boiling water is ready, would you care for a cup of tea?") async def start(self): """ Run background process .. warning:: The function suppose that the process is handle in this coroutine. The coroutine must start the process only without awaiting a end of the process """ self.task = asyncio.create_task(self.usefulJob())This plugin demonstrates background work and implements a route. All background plugins must implement the BaseBackgroundPlugin abstract class.
Estimator replacement plugin. This type of plugin allows replacing standard Luna SDK estimators with custom implementations without modifying the core service code. This enables integration of custom neural networks, alternative algorithms, or optimized implementations for specific use cases.
Plugin example:
import asyncio from pathlib import Path from typing import Literal import numpy as np import onnxruntime as ort from luna_plugins.estimator_plugins.estimator_replacement_plugin import EstimatorBase, EstimatorReplacementPlugin from lunavl.sdk.estimators.image_estimators.people_count import EstimationTargets, ImageForPeopleEstimation, PeopleCount from lunavl.sdk.image_utils.geometry import Rect, Vec2D from lunavl.sdk.image_utils.image import VLImage def setFutureResult(future, result, error): """Set result to asyncio future.""" if not future.done(): future.set_result((error, result)) def ortCallback(result, waiter: tuple[asyncio.AbstractEventLoop, asyncio.Future], error: str): """Callback for onnx thread.""" loop = waiter[0] future = waiter[1] loop.call_soon_threadsafe(setFutureResult, future, result, error) async def execute(output_names, input_feed, session: ort.InferenceSession, loop: asyncio.AbstractEventLoop): """Async execute onnx prediction using run_async. Args: output_names: name of the outputs input_feed: dictionary { input_name: numpy_array } session: ort session loop: current asyncio event loop Return: prediction result """ future = loop.create_future() session.run_async(output_names=output_names, input_feed=input_feed, callback=ortCallback, user_data=(loop, future)) error, res = await future if error: raise RuntimeError(f"ONNX inference error: {error}") return res class OnnxruntimeWrapper: """Wrapper for asynchronous ONNX Runtime execution.""" def __init__(self, onnxSession): self.onnxSession = onnxSession async def forward(self, output_names, input_feed): """Execute inference asynchronously. Args: output_names: List of output names input_feed: Dict with numpy array inputs Returns: List of numpy arrays (outputs) """ loop = asyncio.get_running_loop() return await execute(output_names, input_feed, self.onnxSession, loop) class Estimation: """Container for people counting estimation results. Holds the count of detected people and their coordinate positions. Attributes: count: The number of people detected in the image. points: Points object containing coordinate information. """ class Points: """Container for point coordinates in an estimation. Attributes: coordinates: List of (x, y) coordinate tuples representing people positions. """ def __init__(self, coordinates: list[tuple[int, int]]): """Initialize Points with coordinate data. Args: coordinates: List of (x, y) tuples representing point locations. """ self.coordinates = coordinates def getPoints(self): """Convert coordinates to Vec2D objects. Returns: List of Vec2D objects representing the point coordinates. """ return [Vec2D(x, y) for x, y in self.coordinates] def __init__(self, count: int, coordinates: list[tuple[int, int]]): """Initialize an Estimation with count and coordinates. Args: count: The number of people detected. coordinates: List of (x, y) tuples for each detected person. """ self.count = count self.points = self.Points(coordinates) def extractCoordinatesFromDensityMap( densityMap: np.ndarray, imgHeight: int, imgWidth: int, count: int ) -> list[tuple[int, int]]: """Extract person coordinates from a density map. Finds the top peak locations in a density map and converts them to image coordinates scaled to the original image dimensions. Args: densityMap: The density map output from the model. imgHeight: Height of the original image. imgWidth: Width of the original image. count: Number of people to extract coordinates for. Returns: List of (x, y) tuples representing person locations in image coordinates. """ if count == 0: return [] # Remove batch and channel dimensions if densityMap.ndim == 4: densityMap = densityMap[0, 0] # [height, width] elif densityMap.ndim == 3: densityMap = densityMap[0] # [height, width] # Flatten and get top count positions flatMap = densityMap.flatten() topIndices = np.argsort(flatMap)[-count:] # Get top 'count' values # Convert flat indices to 2D coordinates coordinates = [] mapH, mapW = densityMap.shape imgH, imgW = imgHeight, imgWidth for idx in topIndices: yMap = idx // mapW xMap = idx % mapW # Scale coordinates from density map to original image size xImg = int((xMap / mapW) * imgW) yImg = int((yMap / mapH) * imgH) coordinates.append((xImg, yImg)) return coordinates def adjustCoordsToOriginal(coords: list[tuple[int, int]], cropArea: Rect) -> list[tuple[int, int]]: """Adjust coordinates from cropped space back to original image space. Args: coords: List of (x, y) coordinates in cropped image space. cropArea: Rectangle defining the crop region offset. Returns: List of (x, y) coordinates adjusted to original image space. """ offsetX, offsetY = int(cropArea.x), int(cropArea.y) return [(x + offsetX, y + offsetY) for x, y in coords] class ONNXPeopleCountEstimator(EstimatorBase): """ONNX-based people counting estimator using density map models. This estimator uses ONNX Runtime to perform inference with density map models like CSRNet or MCNN for counting people in images. Attributes: onnxWrapper: Async ONNX Runtime wrapper. session: ONNX Runtime inference session. input_name: Name of the model's input node. output_name: Name of the model's output node. """ def __init__(self, modelPath: Path, useGpu: bool = False, gpuDeviceId: int = 0, maxWorkers: int = 4) -> None: """Initialize the ONNX people count estimator. Args: modelPath: Path to the ONNX model file. useGpu: Whether to use GPU acceleration. gpuDeviceId: GPU device ID to use if GPU is enabled. maxWorkers: Maximum number of worker threads for async operations. Raises: ImportError: If onnxruntime is not installed. """ # Setup execution providers if useGpu: providers = [ ( "CUDAExecutionProvider", { "device_id": gpuDeviceId, "arena_extend_strategy": "kNextPowerOfTwo", "gpu_mem_limit": 2 * 1024 * 1024 * 1024, # 2GB "cudnn_conv_algo_search": "EXHAUSTIVE", "do_copy_in_default_stream": True, }, ), "CPUExecutionProvider", ] else: providers = ["CPUExecutionProvider"] # Create ONNX Runtime session sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession( str(modelPath), sess_options=sess_options, providers=providers, ) # Get input/output names self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name # Create async wrapper self.onnxWrapper = OnnxruntimeWrapper(self.session) def close(self): """Close the estimator and release resources.""" def _postprocessModelOutput( self, modelOutput: np.ndarray, imgHeight: int, imgWidth: int, estimationTargets: EstimationTargets ) -> Estimation: """Postprocess the model output to extract count and coordinates. Args: modelOutput: Raw output from the density map model. imgHeight: Height of the processed image. imgWidth: Width of the processed image. estimationTargets: Target type (T1 with coordinates, T2 count only). Returns: Estimation object containing count and optional coordinates. """ # For density map models (CSRNet, MCNN) # Sum all values in density map to get count count = int(np.sum(modelOutput)) count = max(0, count) # Ensure non-negative # Extract coordinates if requested (T1) if estimationTargets == EstimationTargets.T1: coordinates = extractCoordinatesFromDensityMap(modelOutput, imgHeight, imgWidth, count) else: # T2: only count, no coordinates coordinates = [] return Estimation(count, coordinates) async def _estimate( self, image: VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect], estimationTargets: EstimationTargets ) -> PeopleCount: """Internal async method to estimate people count in a single image. Args: image: Input image in various supported formats. estimationTargets: Target type for estimation. Returns: PeopleCount object with estimation results. Raises: Exception: If image format is not supported. """ if isinstance(image, VLImage): vlImage = image detectArea = None elif isinstance(image, ImageForPeopleEstimation): vlImage = image.image detectArea = image.detectArea elif ( isinstance(image, tuple) and len(image) == 2 and isinstance(image[0], VLImage) and isinstance(image[1], Rect) ): vlImage, detectArea = image else: raise ValueError("Unsupported image format") # Crop image if detection area is specified if detectArea is not None: processedImage = vlImage.crop(detectArea) else: processedImage = vlImage # Get image dimensions imgH, imgW = processedImage.rect.height, processedImage.rect.width # Convert to numpy array npArray = processedImage.asNPArray() # Run async inference input_feed = {self.input_name: npArray} outputs = await self.onnxWrapper.forward([self.output_name], input_feed) # Postprocess output to get count and coordinates estimation = self._postprocessModelOutput(outputs[0], imgH, imgW, estimationTargets) # Adjust coordinates back to original image space if cropped if detectArea is not None and estimation.points.coordinates: adjusted_coords = adjustCoordsToOriginal(estimation.points.coordinates, detectArea) estimation = Estimation(estimation.count, adjusted_coords) return PeopleCount(estimation) async def estimateBatch( self, images: list[VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect]], estimationTargets: EstimationTargets = EstimationTargets.T1, asyncEstimate: Literal[True] = True, ) -> list[PeopleCount]: """Estimate people count in multiple images asynchronously. Args: images: List of input images in various supported formats. estimationTargets: Target type (T1 with coordinates, T2 count only). Defaults to T1. asyncEstimate: Must be True (only async mode supported). Returns: List of PeopleCount objects with estimation results. """ print("estimate with ONNXPeopleCountEstimator: started") if not asyncEstimate: raise NotImplementedError("Synchronous estimation is not supported") # Process all images concurrently tasks = [self._estimate(image, estimationTargets) for image in images] print("estimate with ONNXPeopleCountEstimator: completed") return await asyncio.gather(*tasks) class PeopleCountReplacementPlugin(EstimatorReplacementPlugin): """Plugin for replacing the default people count estimator with ONNX implementation. This plugin integrates the ONNXPeopleCountEstimator into the Luna SDK, replacing the built-in estimator with a custom ONNX-based implementation. """ def close(self) -> None: ... @property def estimatorBrokerName(self) -> str: """Get the system name of the estimator to replace. Returns: The name "peopleCountEstimator". """ return "peopleCountEstimator" @property def modelDirPath(self) -> Path: """Get the directory containing the model file. Returns: Path to the directory containing this plugin file. """ return Path(__file__).parent @property def modelName(self) -> str: """Get the model filename. Returns: The ONNX model filename. """ return "csrnet_uint8.onnx" @property def modelPath(self) -> Path: """Get the full path to the model file. Returns: Complete path to the ONNX model file. """ return self.modelDirPath / self.modelName @property def deviceClass(self) -> Literal["cpu", "gpu"]: """Get the device class for this estimator. Checks the configuration for people counter device settings, falling back to global device class if set to "global". Returns: The device class, either "cpu" or "gpu". """ estimatorDeviceType = self.appConfig.estimatorsSettings.peopleCounter.deviceClass if estimatorDeviceType == "global": return self.globalDeviceClass return estimatorDeviceType def createReplacementEstimator(self) -> ONNXPeopleCountEstimator: """Create a new ONNX people count estimator instance. Returns: Configured ONNXPeopleCountEstimator instance. """ return ONNXPeopleCountEstimator( modelPath=self.modelPath, useGpu=True if self.deviceClass == "gpu" else False, gpuDeviceId=self.defaultGpuDevice, maxWorkers=4, )This plugin demonstrates replacement of the people count estimator with a custom ONNX Runtime implementation. All estimator replacement plugins must extend the EstimatorReplacementPlugin abstract class.
For detailed information about creating and integrating estimator replacement plugins, see:
Enable plugin¶
To enable a custom plugin:
Add plugin files: Copy the plugin directory to the luna_remote_sdk/plugins directory of the service.
Install dependencies: If the plugin has custom dependencies (listed in requirements.txt), they must be installed before enabling the plugin:
pip install -r luna_remote_sdk/plugins/my_plugin/requirements.txt
For containerized deployments, see the “Using plugins with containers” section below.
Activate plugin: Add the plugin filename to the LUNA_REMOTE_SDK_ACTIVE_PLUGINS configuration setting.
Restart service: Restart the service for changes to take effect.
Warning
Plugins with unmet dependencies will fail to load and may cause service startup errors. Always install plugin dependencies before activating the plugin in the configuration.
Using plugins with containers¶
If using containerized application, there are several ways to include the plugins in the container:
Build your own image based on original one. See example Dockerfile below.
Mount plugin files into the container by using –mount flag.
Copy plugin files into the running container, commit changes to the new image (docker commit) and use newly created image.
FROM my-registry/luna-remote-sdk:v.x.x.x
# Copy plugin files
COPY --chown=1001:0 my_plugin ./luna_remote_sdk/plugins/my_plugin
# Install plugin dependencies (if requirements.txt exists)
RUN pip install --no-cache-dir -r ./luna_remote_sdk/plugins/my_plugin/requirements.txt
Warning
Every imported from plugin module not abstract class (which realised base plugin class) will be activated. Sometimes
an unwary import may lead to import the same class twice. Don’t import plugin modules using top level
modules (from plugins.plugin_name.foo import bar, use relative import .foo import bar if possible).