Estimator Replacement Plugin Example¶
This document provides a detailed analysis of the people count estimator replacement plugin
(people_count_onnx_plugin.py), which demonstrates how to replace the standard Luna SDK people
counting estimator with a custom ONNX Runtime implementation using density map models.
Overview¶
The example plugin replaces the built-in people counting estimator with a custom implementation that:
Uses ONNX Runtime for inference with density map models (CSRNet, MCNN)
Supports both CPU and GPU execution
Implements asynchronous batch processing
Extracts people counts and coordinate locations from density maps
Handles image cropping and coordinate transformation
Plugin architecture¶
The plugin consists of three main components:
ONNXPeopleCountEstimator - Custom estimator implementing the estimation logic
OnnxruntimeWrapper - Wrapper for asynchronous ONNX Runtime execution
PeopleCountReplacementPlugin - Plugin class that integrates the estimator into Luna SDK
Note
For plugin developers: To explore this example’s dependencies and understand the interfaces it implements,
install luna-plugins with development dependencies:
pip install luna-plugins[remote_sdk]
This allows you to navigate to source code of lunavl.sdk.estimators.image_estimators.people_count.PeopleCountEstimatorV2
and other classes to understand their interfaces and return types.
Asynchronous ONNX Runtime execution¶
Why asynchrony is needed¶
Luna Remote SDK is built on asynchronous architecture using Python’s asyncio. When processing
multiple estimation requests concurrently, blocking the event loop would:
Prevent other requests from being processed
Reduce overall throughput
Cause latency spikes
The challenge with ONNX Runtime¶
ONNX Runtime’s standard session.run() method is synchronous and blocking. When called,
it blocks the calling thread until inference completes. This creates a problem:
Calling
session.run()directly in anasyncfunction would block the entire event loopSimply wrapping it in
async defdoesn’t make it non-blockingUsing
asyncio.to_thread()orrun_in_executor()would work but adds overhead
The solution: ONNX Runtime’s async API¶
ONNX Runtime provides a run_async() method that allows truly asynchronous execution:
def ortCallback(result, waiter: tuple[asyncio.AbstractEventLoop, asyncio.Future], error: str):
"""Callback for onnx thread."""
loop = waiter[0]
future = waiter[1]
loop.call_soon_threadsafe(setFutureResult, future, result, error)
async def execute(output_names, input_feed, session: ort.InferenceSession, loop: asyncio.AbstractEventLoop):
"""Async execute onnx prediction using run_async."""
future = loop.create_future()
session.run_async(output_names=output_names, input_feed=input_feed,
callback=ortCallback, user_data=(loop, future))
error, res = await future
if error:
raise RuntimeError(f"ONNX inference error: {error}")
return res
How this works:
Create a Future:
loop.create_future()creates a Future object that will hold the resultStart async inference:
session.run_async()starts inference in ONNX Runtime’s thread poolRegister callback: When inference completes, ONNX Runtime calls
ortCallbackfrom its threadThread-safe result passing: The callback uses
loop.call_soon_threadsafe()to safely pass the result back to the asyncio event loopAwait result: The
await futuresuspends the coroutine until the result is ready, allowing other coroutines to run
Why this approach:
Non-blocking: The event loop is not blocked during inference
Efficient: No thread pool executor overhead or thread creation per request
Native support: Uses ONNX Runtime’s built-in async capabilities
Scalable: Multiple inferences can run concurrently without blocking each other
OnnxruntimeWrapper class¶
The wrapper encapsulates the async execution logic:
class OnnxruntimeWrapper:
"""Wrapper for asynchronous ONNX Runtime execution."""
def __init__(self, onnxSession):
self.onnxSession = onnxSession
async def forward(self, output_names, input_feed):
"""Execute inference asynchronously."""
loop = asyncio.get_running_loop()
return await execute(output_names, input_feed, self.onnxSession, loop)
This provides a clean interface: you call await wrapper.forward() and the inference
happens asynchronously without blocking the event loop.
Custom estimator implementation¶
ONNXPeopleCountEstimator class¶
The estimator class handles model initialization, inference, and postprocessing:
Initialization¶
def __init__(self, modelPath: Path, useGpu: bool = False, gpuDeviceId: int = 0, maxWorkers: int = 4):
# Setup execution providers
if useGpu:
providers = [
("CUDAExecutionProvider", {
"device_id": gpuDeviceId,
"arena_extend_strategy": "kNextPowerOfTwo",
"gpu_mem_limit": 2 * 1024 * 1024 * 1024, # 2GB
"cudnn_conv_algo_search": "EXHAUSTIVE",
"do_copy_in_default_stream": True,
}),
"CPUExecutionProvider",
]
else:
providers = ["CPUExecutionProvider"]
# Create ONNX Runtime session
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(
str(modelPath),
sess_options=sess_options,
providers=providers,
)
Key points:
Execution providers: CUDA for GPU, with CPU fallback
CUDA configuration:
device_id: Which GPU to usearena_extend_strategy: Memory allocation strategygpu_mem_limit: Maximum GPU memory usagecudnn_conv_algo_search: cuDNN algorithm selection (EXHAUSTIVE for best performance)
Graph optimization: Enable all ONNX Runtime optimizations
Provider fallback: If CUDA fails, automatically falls back to CPU
Batch estimation¶
async def estimateBatch(
self,
images: list[VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect]],
estimationTargets: EstimationTargets = EstimationTargets.T1,
asyncEstimate: Literal[True] = True,
) -> list[PeopleCount]:
"""Estimate people count in multiple images asynchronously."""
if not asyncEstimate:
raise NotImplementedError("Synchronous estimation is not supported")
# Process all images concurrently
tasks = [self._estimate(image, estimationTargets) for image in images]
return await asyncio.gather(*tasks)
Key points:
Async by default: Only asynchronous mode is supported
Concurrent processing: All images are processed concurrently using
asyncio.gather()Return type: Returns
list[PeopleCount]matching the original estimator interface
Single image estimation¶
The _estimate() method processes a single image:
async def _estimate(
self,
image: VLImage | ImageForPeopleEstimation | tuple[VLImage, Rect],
estimationTargets: EstimationTargets
) -> PeopleCount:
# Handle different input formats
if isinstance(image, VLImage):
vlImage = image
detectArea = None
elif isinstance(image, ImageForPeopleEstimation):
vlImage = image.image
detectArea = image.detectArea
elif isinstance(image, tuple) and len(image) == 2:
vlImage, detectArea = image
else:
raise ValueError("Unsupported image format")
# Crop image if detection area specified
if detectArea is not None:
processedImage = vlImage.crop(detectArea)
else:
processedImage = vlImage
# Get dimensions
imgH, imgW = processedImage.rect.height, processedImage.rect.width
# Convert to numpy and run inference
npArray = processedImage.asNPArray()
input_feed = {self.input_name: npArray}
outputs = await self.onnxWrapper.forward([self.output_name], input_feed)
# Postprocess
estimation = self._postprocessModelOutput(outputs[0], imgH, imgW, estimationTargets)
# Adjust coordinates back to original image space if cropped
if detectArea is not None and estimation.points.coordinates:
adjusted_coords = adjustCoordsToOriginal(estimation.points.coordinates, detectArea)
estimation = Estimation(estimation.count, adjusted_coords)
return PeopleCount(estimation)
Processing steps:
Input handling: Supports multiple input formats (VLImage, ImageForPeopleEstimation, tuple)
Image cropping: Applies detection area cropping if specified
Inference: Runs async ONNX inference
Postprocessing: Extracts count and coordinates from density map
Coordinate adjustment: Maps coordinates back to original image space if cropped
Return: Creates
PeopleCountobject matching the original estimator’s output format
Density map postprocessing¶
The plugin processes density map outputs to extract people counts and locations:
def _postprocessModelOutput(
self,
modelOutput: np.ndarray,
imgHeight: int,
imgWidth: int,
estimationTargets: EstimationTargets
) -> Estimation:
# For density map models (CSRNet, MCNN)
# Sum all values in density map to get count
count = int(np.sum(modelOutput))
count = max(0, count) # Ensure non-negative
# Extract coordinates if requested (T1)
if estimationTargets == EstimationTargets.T1:
coordinates = extractCoordinatesFromDensityMap(modelOutput, imgHeight, imgWidth, count)
else:
# T2: only count, no coordinates
coordinates = []
return Estimation(count, coordinates)
Density map interpretation:
Count extraction: Sum all density map values to get total people count
Coordinate extraction: Find peaks in density map and convert to image coordinates
Target modes:
T1: Extract both count and coordinates
T2: Extract only count
Coordinate extraction:
def extractCoordinatesFromDensityMap(
densityMap: np.ndarray,
imgHeight: int,
imgWidth: int,
count: int
) -> list[tuple[int, int]]:
if count == 0:
return []
# Remove batch and channel dimensions
if densityMap.ndim == 4:
densityMap = densityMap[0, 0] # [height, width]
elif densityMap.ndim == 3:
densityMap = densityMap[0] # [height, width]
# Flatten and get top count positions
flatMap = densityMap.flatten()
topIndices = np.argsort(flatMap)[-count:] # Get top 'count' values
# Convert flat indices to 2D coordinates
coordinates = []
mapH, mapW = densityMap.shape
imgH, imgW = imgHeight, imgWidth
for idx in topIndices:
yMap = idx // mapW
xMap = idx % mapW
# Scale coordinates from density map to original image size
xImg = int((xMap / mapW) * imgW)
yImg = int((yMap / mapH) * imgH)
coordinates.append((xImg, yImg))
return coordinates
Algorithm:
Normalize dimensions: Remove batch and channel dimensions
Find peaks: Use
np.argsort()to find the highest density valuesConvert indices: Transform flat array indices to 2D coordinates
Scale coordinates: Map from density map resolution to original image resolution
Plugin class implementation¶
PeopleCountReplacementPlugin class¶
The plugin class integrates the custom estimator:
class PeopleCountReplacementPlugin(EstimatorReplacementPlugin):
"""Plugin for replacing the default people count estimator with ONNX implementation."""
def close(self) -> None: ...
@property
def estimatorBrokerName(self) -> str:
"""Get the system name of the estimator to replace."""
return "peopleCountEstimator"
@property
def modelDirPath(self) -> Path:
"""Get the directory containing the model file."""
return Path(__file__).parent
@property
def modelName(self) -> str:
"""Get the model filename."""
return "csrnet_uint8.onnx"
@property
def modelPath(self) -> Path:
"""Get the full path to the model file."""
return self.modelDirPath / self.modelName
@property
def deviceClass(self) -> Literal["cpu", "gpu"]:
"""Get the device class for this estimator.
This implementation demonstrates the recommended pattern for respecting
estimator-specific configuration settings.
"""
# Access people counter specific configuration
# 'peopleCounter' is the config name for peopleCountEstimator
# (found in config.conf or configurator service)
estimatorDeviceType = self.appConfig.estimatorsSettings.peopleCounter.deviceClass
# If set to "global", fall back to global device class
if estimatorDeviceType == "global":
return self.globalDeviceClass
# Otherwise use the estimator-specific setting
return estimatorDeviceType
def createReplacementEstimator(self) -> ONNXPeopleCountEstimator:
"""Create a new ONNX people count estimator instance."""
return ONNXPeopleCountEstimator(
modelPath=self.modelPath,
useGpu=True if self.deviceClass == "gpu" else False,
gpuDeviceId=self.defaultGpuDevice,
maxWorkers=4,
)
Configuration:
estimatorBrokerName: Identifies which estimator to replace in
EstimatorsCollectionModel path: Constructs path to the ONNX model file located next to the plugin
Device selection: Implements the recommended pattern for respecting estimator-specific configuration. The plugin checks
peopleCounter.deviceClassin the service configuration (the namepeopleCountercorresponds to thepeopleCountEstimatorbeing replaced and can be found inconfig.confor via the configurator service). If set to “global”, it falls back to the global device class setting.Estimator creation: Instantiates
ONNXPeopleCountEstimatorwith appropriate settings
Usage in Luna Remote SDK¶
Plugin structure¶
estimator_replacement_plugin/
├── __init__.py
├── people_count_onnx_plugin.py # Main plugin implementation
├── csrnet_uint8.onnx # ONNX model file
├── requirements.txt # Dependencies
└── README.md # Documentation
Dependencies¶
The plugin requires the following Python packages:
onnxruntime>=1.22.0
onnxruntime-gpu>=1.16.0
Warning
These dependencies must be installed in the service environment before enabling the plugin.
The plugin will fail to load if onnxruntime is not available.
Integration steps¶
Copy plugin to container and install dependencies:
FROM my-registry/luna-remote-sdk:v.x.x.x # Copy plugin files COPY --chown=1001:0 estimator_replacement_plugin ./luna_remote_sdk/plugins/estimator_replacement_plugin # Install dependencies BEFORE enabling the plugin RUN pip install --no-cache-dir -r ./luna_remote_sdk/plugins/estimator_replacement_plugin/requirements.txt
Important
The
pip installstep is mandatory. Without it, the plugin cannot importonnxruntimeand will fail during initialization.Enable the plugin:
export LUNA_REMOTE_SDK_ACTIVE_PLUGINS="estimator_replacement_plugin.people_count_onnx_plugin"
Restart the service:
docker restart <container_id>
Advanced topics¶
Performance optimization¶
Batch processing:
The plugin processes all images in a batch concurrently using asyncio.gather(). This allows:
Multiple inferences to run in parallel
Efficient use of GPU resources
Better throughput for multi-image requests
Memory management:
CUDA execution provider is configured with:
Memory limit: 2GB per estimator
Arena extension strategy: Powers of 2 for efficient allocation
Graph optimization: All ONNX Runtime optimizations enabled
Error handling¶
Async errors:
The execute() function checks for ONNX Runtime errors passed via callback:
error, res = await future
if error:
raise RuntimeError(f"ONNX inference error: {error}")
Input validation:
The estimator validates input formats and raises ValueError for unsupported types.
Extending the example¶
Different models:
To use a different density map model:
Replace
csrnet_uint8.onnxwith your modelAdjust
modelNamepropertyEnsure the model outputs a density map with compatible format
Custom postprocessing:
Override _postprocessModelOutput() to implement different postprocessing logic:
Different peak detection algorithms
Filtering of low-confidence detections
Custom coordinate extraction strategies
Additional estimators:
Follow the same pattern to replace other estimators:
Identify the estimator broker name in
EstimatorsCollectionFind the return type in
lunavl.sdk.estimatorsImplement your custom estimator returning compatible objects
Create a plugin class with the appropriate
estimatorBrokerName
Summary¶
This example demonstrates:
Async ONNX inference: Using
run_async()with callbacks and FuturesProper integration: Matching return types from the original estimator
Flexible configuration: Supporting both CPU and GPU execution
Complete implementation: From model loading to coordinate extraction
Production-ready: Error handling, resource management, and optimization
The key insight is the asynchronous execution pattern: ONNX Runtime’s run_async() method
with thread-safe callbacks allows true non-blocking inference that integrates seamlessly
with Luna SDK’s asyncio-based architecture.