Estimator Replacement Plugin Example

This document provides a detailed analysis of the people count estimator replacement plugin (people_count_onnx_plugin.py), which demonstrates how to replace the standard Luna SDK people counting estimator with a custom ONNX Runtime implementation using density map models.

Overview

The example plugin replaces the built-in people counting estimator with a custom implementation that:

  • Uses ONNX Runtime for inference with density map models (CSRNet, MCNN)

  • Supports both CPU and GPU execution

  • Implements asynchronous batch processing

  • Extracts people counts and coordinate locations from density maps

  • Handles image cropping and coordinate transformation

Plugin architecture

The plugin consists of three main components:

  1. ONNXPeopleCountEstimator - Custom estimator implementing the estimation logic

  2. OnnxruntimeWrapper - Wrapper for asynchronous ONNX Runtime execution

  3. PeopleCountReplacementPlugin - Plugin class that integrates the estimator into Luna SDK

Note

For plugin developers: To explore this example’s dependencies and understand the interfaces it implements, install luna-plugins with development dependencies:

pip install luna-plugins[remote_sdk]

This allows you to navigate to source code of lunavl.sdk.estimators.image_estimators.people_count.PeopleCountEstimatorV2 and other classes to understand their interfaces and return types.

Asynchronous ONNX Runtime execution

Why asynchrony is needed

Luna Remote SDK is built on asynchronous architecture using Python’s asyncio. When processing multiple estimation requests concurrently, blocking the event loop would:

  • Prevent other requests from being processed

  • Reduce overall throughput

  • Cause latency spikes

The challenge with ONNX Runtime

ONNX Runtime’s standard session.run() method is synchronous and blocking. When called, it blocks the calling thread until inference completes. This creates a problem:

  • Calling session.run() directly in an async function would block the entire event loop

  • Simply wrapping it in async def doesn’t make it non-blocking

  • Using asyncio.to_thread() or run_in_executor() would work but adds overhead

The solution: ONNX Runtime’s async API

ONNX Runtime provides a run_async() method that allows truly asynchronous execution:

def ortCallback(result, waiter: tuple[asyncio.AbstractEventLoop, asyncio.Future], error: str):
    """Callback for onnx thread."""
    loop = waiter[0]
    future = waiter[1]
    loop.call_soon_threadsafe(setFutureResult, future, result, error)

async def execute(output_names, input_feed, session: ort.InferenceSession, loop: asyncio.AbstractEventLoop):
    """Async execute onnx prediction using run_async."""
    future = loop.create_future()
    session.run_async(output_names=output_names, input_feed=input_feed,
                     callback=ortCallback, user_data=(loop, future))
    error, res = await future
    if error:
        raise RuntimeError(f"ONNX inference error: {error}")
    return res

How this works:

  1. Create a Future: loop.create_future() creates a Future object that will hold the result

  2. Start async inference: session.run_async() starts inference in ONNX Runtime’s thread pool

  3. Register callback: When inference completes, ONNX Runtime calls ortCallback from its thread

  4. Thread-safe result passing: The callback uses loop.call_soon_threadsafe() to safely pass the result back to the asyncio event loop

  5. Await result: The await future suspends the coroutine until the result is ready, allowing other coroutines to run

Why this approach:

  • Non-blocking: The event loop is not blocked during inference

  • Efficient: No thread pool executor overhead or thread creation per request

  • Native support: Uses ONNX Runtime’s built-in async capabilities

  • Scalable: Multiple inferences can run concurrently without blocking each other

OnnxruntimeWrapper class

The wrapper encapsulates the async execution logic:

class OnnxruntimeWrapper:
    """Wrapper for asynchronous ONNX Runtime execution."""

    def __init__(self, onnxSession):
        self.onnxSession = onnxSession

    async def forward(self, output_names, input_feed):
        """Execute inference asynchronously."""
        loop = asyncio.get_running_loop()
        return await execute(output_names, input_feed, self.onnxSession, loop)

This provides a clean interface: you call await wrapper.forward() and the inference happens asynchronously without blocking the event loop.

Custom estimator implementation

ONNXPeopleCountEstimator class

The estimator class handles model initialization, inference, and postprocessing:

Initialization

def __init__(self, modelPath: Path, useGpu: bool = False, gpuDeviceId: int = 0, maxWorkers: int = 4):
    # Setup execution providers
    if useGpu:
        providers = [
            ("CUDAExecutionProvider", {
                "device_id": gpuDeviceId,
                "arena_extend_strategy": "kNextPowerOfTwo",
                "gpu_mem_limit": 2 * 1024 * 1024 * 1024,  # 2GB
                "cudnn_conv_algo_search": "EXHAUSTIVE",
                "do_copy_in_default_stream": True,
            }),
            "CPUExecutionProvider",
        ]
    else:
        providers = ["CPUExecutionProvider"]

    # Create ONNX Runtime session
    sess_options = ort.SessionOptions()
    sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

    self.session = ort.InferenceSession(
        str(modelPath),
        sess_options=sess_options,
        providers=providers,
    )

Key points:

  • Execution providers: CUDA for GPU, with CPU fallback

  • CUDA configuration:

    • device_id: Which GPU to use

    • arena_extend_strategy: Memory allocation strategy

    • gpu_mem_limit: Maximum GPU memory usage

    • cudnn_conv_algo_search: cuDNN algorithm selection (EXHAUSTIVE for best performance)

  • Graph optimization: Enable all ONNX Runtime optimizations

  • Provider fallback: If CUDA fails, automatically falls back to CPU

Batch estimation

async def estimateBatch(
    self,
    images: list[VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect]],
    estimationTargets: EstimationTargets = EstimationTargets.T1,
    asyncEstimate: Literal[True] = True,
) -> list[PeopleCount]:
    """Estimate people count in multiple images asynchronously."""
    if not asyncEstimate:
        raise NotImplementedError("Synchronous estimation is not supported")

    # Process all images concurrently
    tasks = [self._estimate(image, estimationTargets) for image in images]
    return await asyncio.gather(*tasks)

Key points:

  • Async by default: Only asynchronous mode is supported

  • Concurrent processing: All images are processed concurrently using asyncio.gather()

  • Return type: Returns list[PeopleCount] matching the original estimator interface

Single image estimation

The _estimate() method processes a single image:

async def _estimate(
    self,
    image: VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect],
    estimationTargets: EstimationTargets
) -> PeopleCount:
    # Handle different input formats
    if isinstance(image, VLImage):
        vlImage = image
        detectArea = None
    elif isinstance(image, ImageForPeopleEstimation):
        vlImage = image.image
        detectArea = image.detectArea
    elif isinstance(image, tuple) and len(image) == 2:
        vlImage, detectArea = image
    else:
        raise ValueError("Unsupported image format")

    # Crop image if detection area specified
    if detectArea is not None:
        processedImage = vlImage.crop(detectArea)
    else:
        processedImage = vlImage

    # Get dimensions
    imgH, imgW = processedImage.rect.height, processedImage.rect.width

    # Convert to numpy and run inference
    npArray = processedImage.asNPArray()
    input_feed = {self.input_name: npArray}
    outputs = await self.onnxWrapper.forward([self.output_name], input_feed)

    # Postprocess
    estimation = self._postprocessModelOutput(outputs[0], imgH, imgW, estimationTargets)

    # Adjust coordinates back to original image space if cropped
    if detectArea is not None and estimation.points.coordinates:
        adjusted_coords = adjustCoordsToOriginal(estimation.points.coordinates, detectArea)
        estimation = Estimation(estimation.count, adjusted_coords)

    return PeopleCount(estimation)

Processing steps:

  1. Input handling: Supports multiple input formats (VLImage, ImageForPeopleEstimation, tuple)

  2. Image cropping: Applies detection area cropping if specified

  3. Inference: Runs async ONNX inference

  4. Postprocessing: Extracts count and coordinates from density map

  5. Coordinate adjustment: Maps coordinates back to original image space if cropped

  6. Return: Creates PeopleCount object matching the original estimator’s output format

Density map postprocessing

The plugin processes density map outputs to extract people counts and locations:

def _postprocessModelOutput(
    self,
    modelOutput: np.ndarray,
    imgHeight: int,
    imgWidth: int,
    estimationTargets: EstimationTargets
) -> Estimation:
    # For density map models (CSRNet, MCNN)
    # Sum all values in density map to get count
    count = int(np.sum(modelOutput))
    count = max(0, count)  # Ensure non-negative

    # Extract coordinates if requested (T1)
    if estimationTargets == EstimationTargets.T1:
        coordinates = extractCoordinatesFromDensityMap(modelOutput, imgHeight, imgWidth, count)
    else:
        # T2: only count, no coordinates
        coordinates = []

    return Estimation(count, coordinates)

Density map interpretation:

  • Count extraction: Sum all density map values to get total people count

  • Coordinate extraction: Find peaks in density map and convert to image coordinates

  • Target modes:

    • T1: Extract both count and coordinates

    • T2: Extract only count

Coordinate extraction:

def extractCoordinatesFromDensityMap(
    densityMap: np.ndarray,
    imgHeight: int,
    imgWidth: int,
    count: int
) -> list[tuple[int, int]]:
    if count == 0:
        return []

    # Remove batch and channel dimensions
    if densityMap.ndim == 4:
        densityMap = densityMap[0, 0]  # [height, width]
    elif densityMap.ndim == 3:
        densityMap = densityMap[0]  # [height, width]

    # Flatten and get top count positions
    flatMap = densityMap.flatten()
    topIndices = np.argsort(flatMap)[-count:]  # Get top 'count' values

    # Convert flat indices to 2D coordinates
    coordinates = []
    mapH, mapW = densityMap.shape
    imgH, imgW = imgHeight, imgWidth

    for idx in topIndices:
        yMap = idx // mapW
        xMap = idx % mapW

        # Scale coordinates from density map to original image size
        xImg = int((xMap / mapW) * imgW)
        yImg = int((yMap / mapH) * imgH)

        coordinates.append((xImg, yImg))

    return coordinates

Algorithm:

  1. Normalize dimensions: Remove batch and channel dimensions

  2. Find peaks: Use np.argsort() to find the highest density values

  3. Convert indices: Transform flat array indices to 2D coordinates

  4. Scale coordinates: Map from density map resolution to original image resolution

Plugin class implementation

PeopleCountReplacementPlugin class

The plugin class integrates the custom estimator:

class PeopleCountReplacementPlugin(EstimatorReplacementPlugin):
    """Plugin for replacing the default people count estimator with ONNX implementation."""

    def close(self) -> None: ...

    @property
    def estimatorBrokerName(self) -> str:
        """Get the system name of the estimator to replace."""
        return "peopleCountEstimator"

    @property
    def modelDirPath(self) -> Path:
        """Get the directory containing the model file."""
        return Path(__file__).parent

    @property
    def modelName(self) -> str:
        """Get the model filename."""
        return "csrnet_uint8.onnx"

    @property
    def modelPath(self) -> Path:
        """Get the full path to the model file."""
        return self.modelDirPath / self.modelName

    @property
    def deviceClass(self) -> Literal["cpu", "gpu"]:
        """Get the device class for this estimator.

        This implementation demonstrates the recommended pattern for respecting
        estimator-specific configuration settings.
        """
        # Access people counter specific configuration
        # 'peopleCounter' is the config name for peopleCountEstimator
        # (found in config.conf or configurator service)
        estimatorDeviceType = self.appConfig.estimatorsSettings.peopleCounter.deviceClass

        # If set to "global", fall back to global device class
        if estimatorDeviceType == "global":
            return self.globalDeviceClass

        # Otherwise use the estimator-specific setting
        return estimatorDeviceType

    def createReplacementEstimator(self) -> ONNXPeopleCountEstimator:
        """Create a new ONNX people count estimator instance."""
        return ONNXPeopleCountEstimator(
            modelPath=self.modelPath,
            useGpu=True if self.deviceClass == "gpu" else False,
            gpuDeviceId=self.defaultGpuDevice,
            maxWorkers=4,
        )

Configuration:

  • estimatorBrokerName: Identifies which estimator to replace in EstimatorsCollection

  • Model path: Constructs path to the ONNX model file located next to the plugin

  • Device selection: Implements the recommended pattern for respecting estimator-specific configuration. The plugin checks peopleCounter.deviceClass in the service configuration (the name peopleCounter corresponds to the peopleCountEstimator being replaced and can be found in config.conf or via the configurator service). If set to “global”, it falls back to the global device class setting.

  • Estimator creation: Instantiates ONNXPeopleCountEstimator with appropriate settings

Usage in Luna Remote SDK

Plugin structure

estimator_replacement_plugin/
├── __init__.py
├── people_count_onnx_plugin.py    # Main plugin implementation
├── csrnet_uint8.onnx              # ONNX model file
├── requirements.txt               # Dependencies
└── README.md                      # Documentation

Dependencies

The plugin requires the following Python packages:

onnxruntime>=1.22.0
onnxruntime-gpu>=1.16.0

Warning

These dependencies must be installed in the service environment before enabling the plugin. The plugin will fail to load if onnxruntime is not available.

Integration steps

  1. Copy plugin to container and install dependencies:

    FROM my-registry/luna-remote-sdk:v.x.x.x
    # Copy plugin files
    COPY --chown=1001:0 estimator_replacement_plugin ./luna_remote_sdk/plugins/estimator_replacement_plugin
    # Install dependencies BEFORE enabling the plugin
    RUN pip install --no-cache-dir -r ./luna_remote_sdk/plugins/estimator_replacement_plugin/requirements.txt
    

    Important

    The pip install step is mandatory. Without it, the plugin cannot import onnxruntime and will fail during initialization.

  2. Enable the plugin:

    export LUNA_REMOTE_SDK_ACTIVE_PLUGINS="estimator_replacement_plugin.people_count_onnx_plugin"
    
  3. Restart the service:

    docker restart <container_id>
    

Advanced topics

Performance optimization

Batch processing:

The plugin processes all images in a batch concurrently using asyncio.gather(). This allows:

  • Multiple inferences to run in parallel

  • Efficient use of GPU resources

  • Better throughput for multi-image requests

Memory management:

CUDA execution provider is configured with:

  • Memory limit: 2GB per estimator

  • Arena extension strategy: Powers of 2 for efficient allocation

  • Graph optimization: All ONNX Runtime optimizations enabled

Error handling

Async errors:

The execute() function checks for ONNX Runtime errors passed via callback:

error, res = await future
if error:
    raise RuntimeError(f"ONNX inference error: {error}")

Input validation:

The estimator validates input formats and raises ValueError for unsupported types.

Extending the example

Different models:

To use a different density map model:

  1. Replace csrnet_uint8.onnx with your model

  2. Adjust modelName property

  3. Ensure the model outputs a density map with compatible format

Custom postprocessing:

Override _postprocessModelOutput() to implement different postprocessing logic:

  • Different peak detection algorithms

  • Filtering of low-confidence detections

  • Custom coordinate extraction strategies

Additional estimators:

Follow the same pattern to replace other estimators:

  1. Identify the estimator broker name in EstimatorsCollection

  2. Find the return type in lunavl.sdk.estimators

  3. Implement your custom estimator returning compatible objects

  4. Create a plugin class with the appropriate estimatorBrokerName

Summary

This example demonstrates:

  • Async ONNX inference: Using run_async() with callbacks and Futures

  • Proper integration: Matching return types from the original estimator

  • Flexible configuration: Supporting both CPU and GPU execution

  • Complete implementation: From model loading to coordinate extraction

  • Production-ready: Error handling, resource management, and optimization

The key insight is the asynchronous execution pattern: ONNX Runtime’s run_async() method with thread-safe callbacks allows true non-blocking inference that integrates seamlessly with Luna SDK’s asyncio-based architecture.