Agent lambda development

Here is agent lambda development description.

More information about lambda types and differences of agent and others available at lambda types description.

More information about LUNA videoanalytics, in particular, luna-video-manager, video-agents and their interaction see its’ developers manuals.

Agent lambda requirements

Agent lambda has several requirements to addition with basic requirements:

  • Luna Video Manager available by credentials from Luna-Configurator

  • Luna Licenses available by credentials from Luna-Configurator

  • Luna Events available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Events usage

  • Luna Sender available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Sender usage

  • Luna Faces/Images Samples Store available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Image-Store usage

Agent lambda configuration

The agent lambda required several settings from luna-configurator, whose can be separated to several groups:

  • LUNA_LAMBDA_AGENT_UNIT_LOGGER - lambda logger settings

  • luna-services addresses and timeouts settings (for example, LUNA_EVENTS_ADDRESS and LUNA_EVENTS_TIMEOUTS will be used by lambda to make requests to luna-events service)

  • ADDITIONAL_SERVICES_USAGE setting will be used to determine which luna-services can be used by the lambda (the lambda will not check connection to disabled services and will raise an error if user try to make request to such service)

  • LUNA_LAMBDA_AGENT_UNIT_ANALYTICS_SETTINGS - lambda analytics-specific settings, for example, settings of which device must be used by one or another analytics - CPU or GPU

  • LUNA_LAMBDA_AGENT_UNIT_VIDEO_SETTINGS/LUNA_LAMBDA_AGENT_UNIT_RUNTIME_SETTINGS - lambda video processing settings

External agent

Agent can be started as external service by setting up agent creation parameters. In this regime agent will be treated as external to the main platform service and all communication with platform will be performed via luna-api service. Running agent in this regime requires luna account credentials or access token set via creation parameters. Note, that used token must be provided with specific permissions to work properly. Minimal set of permissions:

{
    "event": ["creation"],
    "image": ["creation"],
    "video_stream": ["creation"],
    "video_analytic": ["view", "creation", "modification"],
    "video_agent": ["creation", "deletion"],
}

Things to note

  • Services luna-api, luna-licenses and luna-configurator services must be available for external lambda agent

  • If some of the analytics were previously created by internal agent or external agent with other account id, agent will not start, as analytics update will fail. To overcome this analytics must be removed manually

  • Only luna-api client will be available for use in custom routes

Read luna-video-agent development documentation for more information

Agent lambda development

Lambda agent works with analytics, each lambda agent must have at least one video analytics.

It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The lambda_main.py and other configuration files will not differ in this cases.

The description below describes the case when analytics represents itself separate package, for description in the case of analytics is a module in lambda see analytics development chapter.

The agent lambda development in the easiest way consists of it’s configuration by specifying several parameters in lambda_main.py file.

Warning

The names of analytics, features, etc. listed below are fictitious and have nothing in common with the names of real analytics, etc.

The only required variable which must be specified is a list of available analytics, which also must be specified as lambda requirements, for example:

lambda_main.py
    #: list of analytics for agent lambda
    AVAILABLE_ANALYTICS = ["people"]
requirements.txt
    people_analytics==1.2.0

There are some more parameters which can also be relevant:

The ANALYTICS_LICENSE_FEATURES is a map with analytics and their licensing feature names. If analytics not required licensing, it must not be included into this map. If no one analytics must depends on licensing, this map must be empty or may not be included in lambda_main.py.

The ESTIMATORS_TO_INITIALIZE and ESTIMATORS_TARGETS are lists of SDK estimators and their targets which must initialized before analytics usage. If no one analytics not required SDK estimators usage, this lists must be empty or may not be included in lambda_main.py.

Note

In this example lambda has people_analytics package of version 1.2.0 as requirement, which represents video analytics package with analytics named people. The usage of this analytics is regulated by license feature named people_feature. It is requires the estimator from luna-sdkloop package named people and the people target estimator to be initialized before lambda starts.

lambda_main.py
    from sdk_loop.enums import Estimators, LoopEstimations

    #: list of analytics for agent lambda
    AVAILABLE_ANALYTICS = ["people"]
    #: map with analytics and their license features
    ANALYTICS_LICENSE_FEATURES = {"people": "people_feature"}
    #: list of estimators for initialization
    ESTIMATORS_TO_INITIALIZE = [Estimators.people]
    #: list of estimators targets for initialization
    ESTIMATORS_TARGETS = [LoopEstimations.people]
requirements.txt
    people_analytics==1.2.0

There are some more settings which can be used for agent lambda in lambda_main.py file:

lambda_main.py
    # count of video-agent registration attempt in *luna-video-manager*
    REGISTRATION_ATTEMPT_COUNT = 5
    # delay between registration attempts
    REGISTRATION_ATTEMPT_DELAY = 1
    # max size of feedback which can be sent to *luna-video-manager* at one time
    FEEDBACK_MAX_BATCH_COUNT = 1000
    # the code of response for ws-connections when agent shutdown
    WSCODE_GOING_AWAY = 1001
    # the name of lambda video-agent which will be registered in *luna-video-manager*
    AGENT_NAME = "luna-lambda-great-video-agent"
    # the description of lambda video-agent which will be registered in *luna-video-manager*
    AGENT_DESCRIPTION = "luna-lambda-greatest-video-agent"

The are many other possibilities to modify such lambda agent behavior, it’s interaction with analytics and so on, which can be realized in the same way as regular luna-video-agent, for more information, see luna-video-agent developers manual.

Agent video analytics development

It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The description below describes analytics module, which can then be included as a module or package.

Note

The complete for user video analytics development guide is in development and will be included in future releases.

In presented example(s) the poetry tool using for dependencies management, so the pyproject.toml file is filled by user and the poetry.lock file must be generated using poetry tool.

The example of video analytics which detect suits on video/streams and generate events if suits appears.

Suit analytics description

How it works: The analytics processes video frames through a ResNet-50 neural network to classify whether a person is wearing a suit. It supports both CPU and GPU inference, with configurable frame processing rates and region-of-interest (ROI) analysis.

When events are generated: Events are generated based on configurable policies: - Start trigger (default): Events when a suit is first detected - End trigger: Events when suit detection ends - Periodic trigger: Events at regular intervals during detection

By default event will be generated when suit is detected.


{

“account_id”: “string”, “event_create_time”: “string”, “event_end_time”: “string”, “event_type”: “suit”, “event”: {

“stream_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “event_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “track_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “source”: “string”, “name”: “suit”, “overview”: {

“image”: “string”, “time_offset”: 1.234

}, “location”: {

“city”: “Moscow”, “area”: “Central”, “district”: “Basmanny”, “street”: “Podsosensky lane”, “house_number”: “23 bldg.3”, “geo_position”: {

“longitude”: 36.616, “latitude”: 55.752

}

}, “video_segment”: {

“start_time_offset”: 0.123, “end_time_offset”: 1.234

}

}

}

Note

The example below uses image classification model (“resnet50-v2-7.onnx”) and json file (“imagenet-simple-labels.json”) from github. To make this example works it needs to get this files from github and place into data folder of lambda agent archive.

Suit analytics example file structure itself
  ├──__init__.py
  ├──analytics.py
  ├──classes.py
  ├──common.py
  ├──models.py
  ├──spec.html
  ├──spec.yml
  └──data
     ├──resnet50-v2-7.onnx
     └──imagenet-simple-labels.json
  └──suit_nodes
     ├──__init__.py
     ├──events.py
     ├──suit_estimator.py
     └──suid_nodes.py

Note

The video analytics requires some dependencies which are available on visionlabs pypi for local run. All provided examples use visionlabs pypi specification at tool.poetry.source section of pyproject.toml file which make possible to install dependencies from the above source. It is also required HASP license provided by VisionLabs. For agent lambda creation, all required video analytics dependencies already included in base image (which is using for agent lambda building).

If it needs to build this analytics as separate package, it must include its’ dependencies for example using pyproject.toml/poetry.lock files:

pyproject.toml
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "suit-analytics"
version = "0.0.4"
description = "suit analytics"
authors = [ "VisionLabs", ]

[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"

[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"

[tool.setuptools.package-data]
analytics-docs = ["spec.html"]
onnx = ["data/resnet50-v2-7.onnx"]

[tool.poetry.dependencies]
python = "^3.12"
luna-analytics-manager = {version="*", extras=["sdk"]}
opencv-python-headless = "*"
nvidia-curand-cu11 = "*"
nvidia-cufft-cu11 = "*"
onnxruntime-gpu = "*"
pynvcodec = "^2.6.0"

After dependencies resolving it must be built into a packages (for example, using poetry build).

The *__init__.py* module must contains all presented imported objects and classes to make analytics works.
__init__.py
"""Suit analytics."""

from video_analytics_helpers.luna_wraps import EventMetadata, LunaApiExternalParams, LunaWraps as PostProcessing

from .analytics import AnalyticsCtx, FrameCtx, initialize
from .common import getDocumentation, mandatoryNodes, targetNodes
from .models import Callbacks, Params, Targets
The *analytics.py* module contains analytics initialization, frame and analytics contexts.
analytics.py
"""Suit analytics."""

from logging import Logger
from pathlib import Path

import attr
import numpy as np
import onnxruntime as ort
from luna_analytics_manager.base import FrameEnviron
from luna_analytics_manager.templates.sdk import SDKAnalyticsCtx, SDKFrameCtx
from lunavl.sdk.image_utils.image import VLImage
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper

from .classes import AnalyticsRes, FrameRes
from .models import Callbacks, Params, Targets

logger = Logger("luna.suit")


async def initialize(dataPath: str | Path | None = None, **kwargs) -> None:
    """
    Initialize analytics onnx wrapper

    Args:
        dataPath: Path to the data directory
        **kwargs: Additional keyword arguments

    Returns:
        None
    """
    if dataPath is None:
        configPrefix = Path(__file__).absolute().parent / "data"
    elif isinstance(dataPath, str):
        configPrefix = Path(dataPath)
    else:
        configPrefix = dataPath

    configPrefix / "imagenet-simple-labels.json"
    runtimeSettings = kwargs.get("runtime_settings", {})
    options = ort.SessionOptions()
    options.intra_op_num_threads = runtimeSettings.get("num_threads", 6)

    backend = (
        [("CUDAExecutionProvider", {"device_id": 0})]
        if runtimeSettings.get("device_class", "cpu") == "gpu"
        else ["CPUExecutionProvider"]
    )

    globalSession = ort.InferenceSession(
        (configPrefix / "resnet50-v2-7.onnx").as_posix(),
        sess_options=options,
        providers=backend,
    )
    globalSession.set_providers(backend)
    onnxWrapper = OnnxruntimeWrapper(options.intra_op_num_threads, globalSession)

    AnalyticsCtx.initialize(onnxWrapper)


@attr.dataclass(slots=True)
class FrameCtx(SDKFrameCtx):
    """Frame processing context."""

    processed: FrameRes | None = None


@attr.dataclass(slots=True)
class AnalyticsCtx(SDKAnalyticsCtx):
    """Analytics context."""

    name: str = "suit"
    params: Params = attr.field(factory=Params)
    targets: Targets = attr.field(factory=Targets)
    callbacks: Callbacks = attr.field(factory=Callbacks)
    aggregated: AnalyticsRes = attr.field(factory=AnalyticsRes)

    @classmethod
    def initialize(cls, onnxWrapper: OnnxruntimeWrapper) -> None:
        """Initialize analytics context"""
        cls.onnxWrapper = onnxWrapper

    async def prepareFrameCtx(self, frame: FrameEnviron[np.ndarray, VLImage]) -> FrameCtx:
        """Prepare frame processing context."""

        return FrameCtx(
            image=frame.frame,
            number=frame.raw.number,
            timestamp=frame.raw.timestamp,
            lastFrame=frame.raw.lastFrame,
        )
The *classes.py* module contains frame and analytics results containers description.
classes.py
"""Analytics structures."""

import attr

from .suit_nodes.events import SuitEvent


@attr.dataclass(slots=True)
class FrameRes:
    """Analytics result on a single frame."""

    events: list[SuitEvent]

    def asDict(self):
        """Build results in API format."""

        return {"events": [event.asDict() for event in self.events]}


@attr.dataclass(slots=True)
class AnalyticsRes:
    """Analytics aggregated result."""

    def asDict(self):
        """Build results in API format."""

        return {}
The *common.py* module contains analytics target, mandatory nodes and `getDocumentation` function which returns raw html analytics documentation for user.
common.py
"""Analytics setup."""

from pathlib import Path

from .models import Params, Targets
from .suit_nodes.suit_nodes import NodeOverview, NodeSuit

NODES = {NodeOverview, NodeSuit}
_DOCS_PATH = Path(__file__).parent / "spec.html"


def targetNodes(targets: Targets):
    """Pre-select nodes by user-defined targets."""

    yield from (node for node in NODES if node.name in targets)  # type: ignore


def mandatoryNodes(params: Params, targets: Targets):
    """Pre-select mandatory nodes according to configuration parameters."""
    yield from (NodeSuit,)


def getDocumentation() -> str:
    """Get analytics documentation"""
    with open(_DOCS_PATH) as f:
        return f.read()
The *models.py* module contains analytics parameters models.
models.py
"""Analytics models."""

from typing import Annotated, Literal

from annotated_types import Ge
from luna_analytics_manager.templates.mixins.frame_frequency_mixins import RateTime
from luna_analytics_manager.templates.models import WrapUnionError
from luna_analytics_manager.templates.sdk import SDKAnalyticsTargets
from pydantic import Field, Strict
from video_analytics_helpers.models import EventAnalyticCallback, HttpAnalyticCallback, WSAnalyticCallback
from video_analytics_helpers.params import BaseParams
from vlutils.structures.pydantic import BaseModel


class RateFrame(BaseModel):
    """Rate frame model. Sets the rate of frames to be processed."""

    period: Annotated[int, Ge(1), Strict()] = 1
    unit: Literal["frame"] = "second"


class OnetimeEventPolicyEnd(BaseModel):
    """Onetime event policy end model. Event will be triggered at the end of the track."""

    trigger: Literal["end"] = "end"


class OnetimeEventPolicyStart(BaseModel):
    """Onetime event policy start model. Event will be triggered at the start of the track."""

    trigger: Literal["start"] = "start"


class PeriodEventPolicy(BaseModel):
    """Period event policy model. Event will be triggered at the specified interval."""

    trigger: Literal["period"]
    interval: Annotated[float, Ge(0.0), Strict()]


class Params(BaseParams):
    """Params of analytic"""

    rate: Annotated[RateFrame | RateTime, WrapUnionError, Field(discriminator="unit", default_factory=RateFrame)]
    eventPolicy: Annotated[
        OnetimeEventPolicyStart | OnetimeEventPolicyEnd | PeriodEventPolicy,
        Field(discriminator="trigger", default_factory=OnetimeEventPolicyStart),
    ]


class Targets(SDKAnalyticsTargets[Literal["suit", "overview"]]):
    """Targets of analytic"""

    _default = {"suit", "overview"}


Callbacks = Annotated[
    list[Annotated[HttpAnalyticCallback | WSAnalyticCallback | EventAnalyticCallback, Field(discriminator="type")]],
    Field(max_length=10),
]
The *__init__.py* module in *suit_nodes* path is empty.
__init__.py

The *events.py* module in *suit_nodes* path contains events and track structures.
events.py
import attr
from video_analytics_helpers.containers import (
    AggregatedEvent as BaseAggregatedEvent,
    Event as BaseEvent,
    Overview,
    Track as BaseTrack,
)


@attr.dataclass(slots=True)
class Suit:
    """Event frame result."""

    name: str = ""

    def asDict(self):
        """Build results in API format."""
        return {"name": self.name}


@attr.dataclass(slots=True)
class AggregatedSuitEvent(BaseAggregatedEvent[Overview]):
    """Aggregated suit event."""

    eventType: str = "suit"
    suit: Suit | None = None

    def asDict(self):
        """Build results in API format."""

        result = super().asDict()
        if self.suit is not None:
            result.update(self.suit.asDict())
        return result


@attr.dataclass(slots=True)
class SuitEvent(BaseEvent[AggregatedSuitEvent]):
    """Suit event result."""

    eventType: str = "suit"
    suit: Suit | None = None

    def asDict(self):
        """Build event data in API format."""
        res = super().asDict()
        if self.suit is not None:
            res.update(self.suit.asDict())
        return res


class SuitTrack(BaseTrack[SuitEvent, AggregatedSuitEvent]):
    """Analytics track."""
The *suit_estimator.py* module in *suit_nodes* path contains suit estimator responsible for image preprocessing, processing and neural model results postprocessing.
suit_estimator.py
from pathlib import Path

import cv2
import numpy as np
import ujson
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper


class SuitClassifier(object):
    """
    Suit estimator

    onnxWrapper: Wrapper around ONNX Runtime.
    crop_size: Image crop size. Defaults to 224.
    crop_type: Type of cropping strategy.

    """

    def __init__(
        self,
        onnxWrapper: OnnxruntimeWrapper,
        crop_size: int = 224,
        crop_type: str = "full_frame",
    ):
        self.onnxWrapper = onnxWrapper
        self.onnx_input_keys = [t.name for t in self.onnxWrapper.onnxSession.get_inputs()]

        self.is_new_id = dict()

        self.crop_size = crop_size
        self.crop_type = crop_type

        self.suit_id = dict()
        self._labels = None

    async def crop_and_scale(self, img_rgb):
        """Crops and resizes the input RGB image.

        Args:
            img_rgb: Input image in RGB format.

        Raises:
            Exception: If `crop_type` is not supported.

        Returns:
            numpy.ndarray: Cropped and resized RGB image.
        """
        assert img_rgb.shape[2] == 3
        img_shape = img_rgb.shape[:2]
        if self.crop_type == "center_crop":
            min_side = min(img_shape[0], img_shape[1])
            y_0, x_0 = (img_shape[0] - min_side) // 2, (img_shape[1] - min_side) // 2
            y_1, x_1 = y_0 + min_side, x_0 + min_side
            crop = img_rgb[y_0:y_1, x_0:x_1]
        elif self.crop_type == "full_frame":
            crop = img_rgb.copy()
        else:
            raise Exception(f"Unknown type of crop: {self.crop_type}")

        crop_resized = await self.onnxWrapper.runAsync(cv2.resize, crop, (self.crop_size, self.crop_size))
        return crop_resized

    async def preproc(self, img_rgb):
        """Preprocesses the image for ONNX model input.
        Args:
            img_rgb: Input RGB image.

        Returns:
            numpy.ndarray: Preprocessed image in NCHW format.
        """
        crop_prepared = await self.crop_and_scale(img_rgb)
        crop_prepared = crop_prepared.astype(np.float32) / 255.0
        crop_prepared = (crop_prepared - 0.5) / 0.5
        batch = crop_prepared.reshape((1,) + crop_prepared.shape)
        batch = batch.transpose(0, 3, 1, 2)
        return np.ascontiguousarray(batch)

    async def forward(self, img_rgb):
        """Runs a forward pass through the ONNX model.

        Args:
            img_rgb: Input RGB image, shape (H, W, 3).

        Returns:
            numpy.ndarray: Raw prediction scores for each class.
        """
        batch_numpy = await self.preproc(img_rgb)
        input_tensors_list = {self.onnx_input_keys[0]: batch_numpy}

        net_outs = await self.onnxWrapper.forward(input_tensors_list)
        return net_outs[0][0]

    def get_labels(self):
        """Loads class labels from a JSON file.

        Returns:
            numpy.ndarray: Array of labels as strings.
        """
        if self._labels is None:
            with open(Path(__file__).parent.parent / "data" / "imagenet-simple-labels.json") as f:
                data = ujson.load(f)
            self._labels = np.asarray(data)
        return self._labels

    async def run(self, img_rgb, cam_id):
        """Classifies the input image for a specific camera ID.

        Tracks camera IDs to differentiate between sources.

        Args:
            img_rgb: Input RGB image.
            cam_id: Identifier for the camera.

        Returns:
            Estimated class label.
        """
        if not cam_id in self.suit_id:
            self.suit_id[cam_id] = 0
            self.is_new_id[cam_id] = True

        preds = await self.forward(img_rgb)
        idx = np.argmax(preds)
        return self.get_labels()[idx]
The *suit_nodes.py* module in *suit_nodes* path contains suit nodes structures.
suit_nodes.py
import uuid

from luna_analytics_manager import Node
from luna_analytics_manager.sync import Syncer2 as Syncer
from luna_analytics_manager.templates.image_helpers.roi import getROIRect
from luna_analytics_manager.templates.models import EventStatus, VideoSegment
from video_analytics_helpers.frame_base_analytics import BaseFrameAnalyticsNode
from video_analytics_helpers.node_collections import NodeOverview as NodeOverviewBase

from ..analytics import AnalyticsCtx, FrameCtx
from ..classes import FrameRes
from .events import AggregatedSuitEvent, Overview, Suit, SuitEvent, SuitTrack
from .suit_estimator import SuitClassifier


class NodeSuit(
    BaseFrameAnalyticsNode[
        FrameCtx,
        AnalyticsCtx,
        SuitEvent,
        AggregatedSuitEvent,
        FrameRes,
        Suit,
        SuitTrack,
    ]
):
    """Suit Analytics node."""

    name = "suit"
    requires: list[Node] = []
    frameResCls = FrameRes
    # class for tracks
    trackCls = SuitTrack

    def __init__(self):
        super().__init__()
        self.frameSyncer = Syncer()
        self.track = None
        self.trackLength = 0

        self.suitEstimator = SuitClassifier(onnxWrapper=AnalyticsCtx.onnxWrapper)

    def buildEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> tuple[SuitEvent, AggregatedSuitEvent]:
        """Generate current event and aggregated event."""

        event = SuitEvent(suit=suit, timeOffset=frameCtx.timestamp)
        aggregatedEvent = AggregatedSuitEvent(
            videoSegment=VideoSegment(frameCtx.timestamp, frameCtx.timestamp),
            suit=suit,
            overview=Overview(frameCtx.timestamp, frameCtx.image),
        )
        event.eventId = aggregatedEvent.eventId = str(uuid.uuid4())
        return event, aggregatedEvent

    def updateEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """Update current event with new frame."""

        event = self.track.currentEvent
        event.suit = suit
        event.timeOffset = frameCtx.timestamp
        event.eventStatus = EventStatus.inProcess

    def updateAggregatedEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """Update current aggregated event with new frame."""

        aggregatedEvent = self.track.currentAggregatedEvent
        if aggregatedEvent.videoSegment is None:
            aggregatedEvent.videoSegment = VideoSegment(frameCtx.timestamp, frameCtx.timestamp)
        else:
            aggregatedEvent.videoSegment.endTimeOffset = frameCtx.timestamp

        if aggregatedEvent.suit is None:
            aggregatedEvent.suit = suit

    async def estimate(self, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> Suit | None:
        """
        Estimate suit on frame.
        Args:
            frameCtx: current frame context
            ctx: analytic context

        Returns:
            Detected suit label
        """
        img_rgb = frameCtx.image.asNPArray()  # check if it is really RGB format

        if ctx.params.roi:
            roi = getROIRect(frameCtx.image.rect, ctx.params.roi)
            img_rgb = img_rgb[roi.top : roi.bottom, roi.left : roi.right]

        camera_id = "0"
        detectedObj = await self.suitEstimator.run(img_rgb, camera_id)
        return Suit(name=detectedObj)

    def isTrackNeedEnd(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """
        Determinate that event is needed to end or not after current frame processing.

        If function return `True`, track will be stopped. If function return `False` track will be started or continue.
        """
        return suit.name != "suit"


class NodeOverview(NodeOverviewBase[FrameCtx, AnalyticsCtx]):
    """Overview node."""

    requires = [NodeSuit]
The *spec.yml* file contains openapi documentation with analytics parameters description in yaml format.
spec.yml
openapi: 3.0.0
info:
  version: 'v.0.0.1'
  title: 'Suit Analytics'
  description: |
     `Suit Analytics` is intended for determination of:
        - suit_class parameter;
        

     The `suit` analytics decode stream frames taking into account `rate` parameter
     from <a href="#tag/suit-analytics/paths/~1suit_analytics/post">stream creation request analytic parameters</a>
     and performs the following actions:
        - Calculates suit class
        - For each callback check it's conditions and execute callbacks, which fit conditions.

      The schema of data sending by `callback` described as stream creation request callback.

components:
  schemas:
    error:
      type: object
      properties:
        error_code:
          type: integer
          description: Error code.
        desc:
          type: string
          description: Short error description.
        detail:
          type: string
          description: Error details.
        link:
          type: string
          description: Link to the documentation website with the error description.
      required: [error_code, detail, desc, link]
      example:
        error_code: 1
        detail: internal server error
        desc: internal server error
        link: "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-1"

    int01:
      type: integer
      enum: [0,1]

    roi_int_coordinates:
      type: integer
      minimum: 0
      default: 0
      maximum: 65536
      example: 3327

    roi_float_percent:
      type: number
      format: float
      minimum: 0
      default: 0.0
      maximum: 100.0
      example: 87.4

    roi_float_percent_size:
      type: number
      format: float
      minimum: 0.00001
      maximum: 100.0
      example: 66.3

    roi_abs:
      type: object
      properties:
        x:
          $ref: '#/components/schemas/roi_int_coordinates'
        y:
          $ref: '#/components/schemas/roi_int_coordinates'
        width:
          $ref: '#/components/schemas/roi_int_coordinates'
        height:
          $ref: '#/components/schemas/roi_int_coordinates'
        mode:
          type: string
          enum: [abs]
          example: "abs"
          description: Coordinates and size are set in pixels.
      required: [x, y, width, height, mode]

    roi_percent:
      type: object
      properties:
        x:
          $ref: '#/components/schemas/roi_float_percent'
        y:
          $ref: '#/components/schemas/roi_float_percent'
        width:
          $ref: '#/components/schemas/roi_float_percent_size'
        height:
          $ref: '#/components/schemas/roi_float_percent_size'
        mode:
          type: string
          enum: [ percent ]
          example: "percent"
          description: Coordinates and size are set in percentage.
      required: [x, y, width, height, mode]

    roi:
      oneOf:
        - $ref: '#/components/schemas/roi_abs'
        - $ref: '#/components/schemas/roi_percent'
      description: |
          Region of interest on a frame. Boundaries of the area are described in `x`, `y` coordinates
          of the top left point and `width`, `height` properties
          **Region must not be any bigger than the original frame**

          The region of interest will be sent to estimator. The smaller the `roi`, the smaller the area the estimator
          will process and, accordingly, work faster.

    number01:
      type: number
      minimum: 0
      maximum: 1

    rate:
      type: object
      description: |
        Analytic rate execution determines on which frames the analytics will be launched.

        Analytic rate execution can be configured in next ways:
        - execute analytic on each Nth frame (`unit` - `frame`)
        - execute analytic on each frame corresponding to the Nth second (`unit` - `second`)
      properties:
        period:
          type: float
          minimum: 0
          default: 1
          description: Period length.
        unit:
          type: string
          default: second
          enum:
            - frame
            - second
          description: Unit for a period calculation (every `n` seconds or every `n` frames).
      required: [ unit, period ]

    callback_base:
      type: object
      properties:
        enable:
          type: integer
          enum: [0, 1]
          default: 1
          description: Whether callback enabled or not.

    callback_ws:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [luna-ws-notification]
              description: Event will be sent with `suit` type.
      required: [type]

    callback_basic_authorization:
      type: object
      properties:
        type:
          type: string
          description: Authorization type.
          enum: [basic]
        login:
          type: string
          maxLength: 128
          description: Login.
        password:
          type: string
          maxLength: 128
          description: Password.
      required: [ type, login, password]
      description: Callback basic authorization parameters.

    callback_http:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [http]
              description: Event will be sent to http url.
            url:
              type: string
              description: Callback url.
            authorization:
              $ref: '#/components/schemas/callback_basic_authorization'
            params:
              type: object
              properties:
                timeout:
                  type: integer
                  default: 60
                  description: Callback request timeout.
                content_type:
                  type: string
                  default: application/json
                  enum: [application/json]
                  description: Callback request content type.
                headers:
                  type: object
                  description: Callback request headers.
                  additionalProperties: true
              description: Callback request parameters
      required: [url, type]

    callback:
      oneOf:
        - $ref: '#/components/schemas/callback_http'
        - $ref: '#/components/schemas/callback_ws'
      discriminator:
        propertyName: type
        mapping:
          http: '#/components/schemas/callback_http'
          luna-ws-notification: '#/components/schemas/callback_ws'

    image_retain_policy:
      type: object
      description: |
        Image retain policy applicable when analytic `overview` target is specified,
        and configures parameters with which will image be saved.
      properties:
        mimetype:
          type: string
          enum:
            - PNG
            - JPEG
          default: JPEG
          description: Image format.
        quality:
          allOf:
            - $ref: '#/components/schemas/number01'
            - default: 1
          description: Image quality, on a scale from 0 (worst) to 1 (best). Has no effect on `PNG`.
        max_size:
          type: integer
          minimum: 0
          default: 640
          description: Image max size, in pxl. Neither the width nor the height will exceed this value.

    period_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [period]
        interval:
          type: float
          description: Event generation period interval.
          minimum: 0.0
          default: 1.0
      required: [trigger]

    start_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [start]
      required: [trigger]

    end_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [end]
      required: [trigger]

    parameters:
      type: object
      description: |
        Analytic parameters for stream processing.

        There are default analytic parameters which will be applied for stream processing if no one specified here.
      properties:
        targets:
          type: array
          default: [suit]
          items:
            type: string
            enum: [suit, overview]
          description: |
            Estimations to perform on the video.

            `suit` will calculate some `suit` value if specified.

            `overview` will add image to events.
        callbacks:
          type: array
          description: |
            Callbacks parameters.

            `http` type callback sends events to the specified url by POST request.

            `luna-ws-notification` type callback sends events to websocket using sender.
          maxItems: 10
          items:
            $ref: '#/components/schemas/callback'

        parameters:
          type: object
          description: Estimation parameters.
          properties:
            event_policy:
              description: |
                Event policy.
                
                When suit appears on frame, the new `track` will starts, when suit disappears, `track` will stops.
                
                - when track starts (`trigger` - `start`)
                - when track ends (`trigger` - `end`)
                - periodically while track exists (`trigger` - `period`)
              type: object
              properties:
              oneOf:
                - $ref: '#/components/schemas/period_event_policy'
                - $ref: '#/components/schemas/start_event_policy'
                - $ref: '#/components/schemas/end_event_policy'
              discriminator:
                propertyName: trigger
                mapping:
                  period: '#/components/schemas/period_event_policy'
                  start: '#/components/schemas/start_event_policy'
                  end: '#/components/schemas/end_event_policy'
              default:
                trigger: start
            image_retain_policy:
              $ref: '#/components/schemas/image_retain_policy'
            probe_count:
              description: |
                A number of consecutive suit estimations with not 'none' class. This parameter is intended to
                prevent false positives from analytics.
              type: integer
              minimum: 0
              default: 3
            roi:
              $ref: '#/components/schemas/roi'
            rate:
              allOf:
                - $ref: '#/components/schemas/rate'
                - default:
                    unit: second
                    period: 1

    stream:
      type: object
      description: |
        Full stream creation schema is available in `Luna-Video-Manager` documentation.

        The presented schema described only analytics parameters.
      properties:
        data:
          type: object
          description: stream data
          additionalProperties: true
        analytics:
          type: array
          description: analytics list
          items:
            type: object
            description: analytics
            properties:
              analytic_name:
                type: string
                maxLength: 36
                pattern: '^[a-zA-Z0-9_\-]{1,36}$'
                description: Analytic name.
                enum: [suit_analytics]
              parameters:
                type: object
                $ref: '#/components/schemas/parameters'
            required: [analytic_name]
      additionalProperties: true
      required: [data, analytics]

    string36_nullable:
      type: string
      maxLength: 36
      nullable: true

    longitude:
      type: number
      minimum: -180
      maximum: 180
      example: 36.616

    latitude:
      type: number
      minimum: -90
      maximum: 90
      example: 55.752

    geo_position:
      type: object
      nullable: true
      description: Geo position specified by geographic coordinates - longitude and latitude.
      properties:
        longitude:
          allOf:
            - $ref: '#/components/schemas/longitude'
            - description: Longitude in degrees.
        latitude:
          allOf:
            - $ref: '#/components/schemas/latitude'
            - description: Latitude in degrees.
      required: [longitude, latitude]
      example:
        longitude: 36.616
        latitude: 55.752

    uuid:
      type: string
      format: uuid
      pattern: '^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
      example: "557d54ec-29ad-4f3c-93b4-c9092ef12515"

    video_segment:
      type: object
      properties:
        start_time_offset:
          type: number
          example: 0.123
          description: Start video segment offset(seconds).
        end_time_offset:
          type: number
          example: 1.234
          description: Eng video segment offset(seconds).
      required: [ start_time_offset, end_time_offset ]

    callback_request:
      type: object
      properties:
        event_type:
          type: string
          enum: [suit]
          description: Event type.
        account_id:
          allOf:
            - $ref: '#/components/schemas/uuid'
          description: Account id of event.
        event_create_time:
          type: string
          description: Event creation time.
        event_end_time:
          type: string
          description: Event end time.
        event:
          type: object
          description: Event.
          properties:
            stream_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Stream ID.
            event_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Event ID.
            track_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Track id.
              nullable: true
            video_segment:
              $ref: '#/components/schemas/video_segment'
            overview:
              type: object
              properties:
                time_offset:
                  type: number
                  description: time offset
                image:
                  type: str
                  description: link to image
              required: [time_offset]
            location:
              type: object
              properties:
                city:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Moscow
                  description: City that stream belongs.
                area:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Central
                  description: Area that stream belongs.
                district:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Basmanny
                  description: District that stream belongs.
                street:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Podsosensky lane
                  description: Street that stream belongs.
                house_number:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: 23 bldg.3
                  description: Street that stream belongs.
                geo_position:
                  $ref: '#/components/schemas/geo_position'
              description: |
                Stream location parameters.

                Required callback `location` target to send this value.
          required: [stream_id, event_id, suit_class, track_id]
      required: [event_type, event, account_id, event_create_time, event_end_time]
paths:
  /suit_analytics:
    post:
      tags:
        - suit analytics
      summary: stream creation
      description: |
        Stream creation superficial request with detailed analytics parameters description.
        
        Full request description (exclude for this analytics description) available at `Luna-Video-Manager` documentation.
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/stream'
            example:
              data:
                reference: rtsp://stream_url
                type: stream
                downloadable: false
              analytics:
                - analytic_name: suit_analytics
                  parameters:
                    parameters:
                      probe_count: 2
                      image_retain_policy:
                        mimetype: PNG
                      roi:
                        mode: abs
                        x: 0
                        y: 0
                        width: 1000
                        height: 500
                    callbacks:
                      - url: "http://127.0.0.1:8001"
                        type: http
                      - url: "http://127.0.0.1:8002"
                        type: http
                      - type: luna-ws-notification
                    targets:
                      - suit
                      - overview
      callbacks:
        onData:
          '{$request.body.callbacks.*.url}':
            post:
              requestBody:
                description: subscription payload
                content:
                  application/json:
                    schema:
                      $ref: '#/components/schemas/callback_request'

The *spec.html* file contains openapi documentation with analytics parameters description in html format.


Agent lambda examples

Example of agent lambda with suit video analytics:

To include video analytics presented in previous chapter into lambda agent as package it needs the following lambda archive file structure:

Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as package)
  ├──pyproject.toml
  ├──poetry.lock
  └──lambda_main.py

The lambda_main.py module:

lambda_main.py
AVAILABLE_ANALYTICS = ["suit_analytics"]

It this case, the dependencies of lambda agent must include only analytics as dependency, not analytics dependencies:

*pyproject.toml*
pyproject.toml
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "lambda-agent"
version = "0.0.1"
description = "lambda-agent"
authors = [ "VisionLabs", ]

[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"

[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"

[tool.poetry.dependencies]
python = "^3.12"
suit-analytics = "0.0.4"

To include presented analytics into lambda agent as module it needs the following lambda archive file structure:

Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as module)
  ├──pyproject.toml
  ├──poetry.lock
  ├──lambda_main.py
  └──suit_analytics
      ├──__init__.py
      ├──analytics.py
      ├──classes.py
      ├──common.py
      ├──models.py
      ├──spec.html
      ├──spec.yml
      └──data
         ├──resnet50-v2-7.onnx
         └──imagenet-simple-labels.json
      └──suit_nodes
         ├──__init__.py
         ├──events.py
         ├──suit_estimator.py
         └──suid_nodes.py

In this case, the analytics dependencies must be included into lambda dependencies. The lambda dependencies described as pyproject.toml file:

*pyproject.toml*
pyproject.toml
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "lambda agent"
version = "0.0.1"
description = "lambda agent"
authors = [ "VisionLabs", ]

[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"

[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"

[tool.poetry.dependencies]
python = "^3.12"
luna-analytics-manager = {version="*", extras=["sdk"]}
opencv-python-headless = "*"
nvidia-curand-cu11 = "*"
nvidia-cufft-cu11 = "*"
onnxruntime-gpu = "*"
pynvcodec = ">=2.6.0"

Example of agent lambda with animals video analytics:

Video analytics implementation using ONNX YOLOv3 model from the ONNX Model Zoo.

The analytics tracks the appearance of animals in video frames. Processing frame by frame, it:

  • Starts a track when at least one animal appears in the frame that the YOLOv3 model is capable of detecting

  • Ends a track when all animals disappear from the frame

Example requires

Events sent via callback:

{
  "event_type": "animal_detection",
  "account_id": "557d54ec-29ad-4f3c-93b4-c9092ef12515",
  "event_create_time": "string",
  "event_end_time": "string",
  "event": {
    "stream_id": "557d54ec-29ad-4f3c-93b4-c9092ef12515",
    "track_id": "557d54ec-29ad-4f3c-93b4-c9092ef12515",
    "track_status": "started"
  }
}

Exemple requires

At key moments when tracks begin (animal appears) and end (animal disappears), the analytics creates events and executes the configured callbacks.

The lambda_main.py contains list of analytics supported by agent
AVAILABLE_ANALYTICS = ["animal_detector_analytics"]
The analytics.py module contains analytics initialization, analytics nodes, frame and analytics contexts.
from logging import Logger
from pathlib import Path
from uuid import uuid4

import attr
import numpy as np
from cow.errors.errors import ErrorInfo
from luna_analytics_manager import FrameEnviron, Node
from luna_analytics_manager.templates.models import TrackStatus, VideoSegment
from luna_analytics_manager.templates.sdk import SDKFrameCtx
from lunavl.sdk.image_utils.image import VLImage
from onnxruntime import InferenceSession, SessionOptions
from video_analytics_helpers.containers import Overview
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper

from .estimator import AnimalEstimator
from .models import (
    AggregatedAnimalDetection,
    AnalyticsRes,
    AnimalDetection,
    Callbacks,
    FrameRes,
    Params,
    Targets,
)

logger = Logger("luna.animal_detector")


async def initialize(dataPath: str | Path | None = None, **kwargs) -> None:
    """
    Initialize analytics context for animal detector.
    Method should be imported into __init__ and called before any other method upon analytics registration.
    sets onnxruntime session to analytics context to ease model access

    Args:
        dataPath: path to data directory, containing yolov3-10.onnx model
        https://github.com/onnx/models/tree/main/validated/vision/object_detection_segmentation/yolov3/model
        **kwargs: additional parameters
    """
    if dataPath is None:
        configPrefix = Path(__file__).absolute().parent / "data"
    elif isinstance(dataPath, str):
        configPrefix = Path(dataPath)
    else:
        configPrefix = dataPath

    runtimeSettings = kwargs.get("runtime_settings", {})
    options = SessionOptions()
    options.intra_op_num_threads = runtimeSettings.get("num_threads", 6)

    backend = (
        [("CUDAExecutionProvider", {"device_id": 0})]
        if runtimeSettings.get("device_class", "cpu") == "gpu"
        else ["CPUExecutionProvider"]
    )

    globalSession = InferenceSession(
        (configPrefix / "yolov3-10.onnx").as_posix(),
        sess_options=options,
        providers=backend,
    )
    globalSession.set_providers(backend)
    onnxWrapper = OnnxruntimeWrapper(options.intra_op_num_threads, globalSession)

    AnalyticsCtx.initialize(onnxWrapper)


@attr.dataclass(slots=True)
class FrameCtx(SDKFrameCtx):
    """Frame processing context."""

    processed: FrameRes | None = None


@attr.dataclass(slots=True)
class AnalyticsCtx:
    """Analytics context."""

    name: str = "animal_detector_analytics"
    params: Params = attr.field(factory=Params)
    callbacks: Callbacks = attr.field(factory=Callbacks)
    targets: Targets = attr.field(factory=Targets)

    aggregated: AnalyticsRes = attr.field(factory=AnalyticsRes)
    sdk: None = None
    accumulating: bool = False
    error: ErrorInfo | None = None

    @staticmethod
    def getAnalyticsFrameConcurrency():
        """Disables concurrency and allows to run node on frame by frame continuously"""
        return 1

    @classmethod
    def initialize(cls, onnxWrapper: OnnxruntimeWrapper):
        """
        Initialize analytics context for animal detector.
        sets onnxruntime session to analytics context to ease model access
        Args:
            onnxWrapper: onnxruntime wrapper instance
        """
        cls.onnxWrapper = onnxWrapper

    @staticmethod
    async def prepareFrameCtx(
        frame: FrameEnviron[np.ndarray, VLImage]
    ) -> FrameCtx:
        """Prepare frame processing context."""

        return FrameCtx(
            image=frame.frame,
            number=frame.raw.number,
            timestamp=frame.raw.timestamp,
            lastFrame=frame.raw.lastFrame,
        )


class NodeAnimal:
    """Animal detection analytics node."""

    name = "animal_detector"
    requires: list[Node] = []

    def __init__(self):
        super().__init__()
        self.track = None
        self.animalEstimator = AnimalEstimator(AnalyticsCtx.onnxWrapper)
        self.startTimestamp = 0.0

    def buildEvent(self, frameCtx: FrameCtx, trackStatus: TrackStatus) -> AnimalDetection:
        """
        Builds animal detection event with all the estimation data and enriches it with the aggregated event,
        which will be used upon farther callbacks execution
        Args:
            frameCtx: frame context
            trackStatus: track status (whether animal appears or disappears)
        """
        eventId = str(uuid4())
        aggregatedEvent = AggregatedAnimalDetection(
            videoSegment=VideoSegment(self.startTimestamp, frameCtx.timestamp),
            overview=Overview(
                timeOffset=frameCtx.timestamp,
                origin=frameCtx.image
            ),
            eventId=eventId,
            trackId=self.track,
            animalStatus = "appeared" if trackStatus == TrackStatus.started else "disappeared",
        )
        event = AnimalDetection(
            trackId=self.track,
            aggregated=aggregatedEvent,
            eventId=eventId,
            trackStatus=trackStatus,
            timeOffset=frameCtx.timestamp
        )
        return event


    def postProcessing(self, frameCtx: FrameCtx, estimationResult: bool) -> None:
        """
        Implements post-processing routine of estimation result.
        Puts the estimation result into the frame context.

        Args:
            frameCtx (FrameCtx): current frame context
            estimationResult (bool): estimation result - whether an animal detected on the frame
        """
        if frameCtx.processed is None:
            frameCtx.processed = FrameRes(events=[])
        if estimationResult and not self.track:
            self.startTimestamp = frameCtx.timestamp
            self.track = str(uuid4())
            event = self.buildEvent(frameCtx, trackStatus=TrackStatus.started)
            frameCtx.processed.events.append(event)
        elif not estimationResult and self.track is not None:
            event = self.buildEvent(frameCtx, trackStatus=TrackStatus.finished)
            frameCtx.processed.events.append(event)
            self.track = None

    async def estimate(self, frameCtx: FrameCtx) -> bool | None:
        """
        Convert image in acceptable format and estimate animal in frame.

        Args:
            frameCtx (FrameCtx): current frame context
        Returns:
            Animal detection dict in case if track started or ended, else None
        """
        image = frameCtx.image.asPillow()
        animalsDetected = await self.animalEstimator.forward(image)
        return animalsDetected

    async def run(self, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> None:
        """
        Processes one frame under the frameSyncer context and returns the result of the estimation.
        Closes the track if the last frame is reached, finalises the existing event if there was any.

        Args:
            frameCtx (FrameCtx): current frame context
            ctx (AnalyticsCtx): analytics context
        """
        estimation = await self.estimate(frameCtx)
        self.postProcessing(frameCtx, estimation)
The common.py module contains analytics target, mandatory nodes and getDocumentation function which returns raw html analytics documentation for user.
"""Analytics setup."""

from pathlib import Path

from .analytics import NodeAnimal
from .models import Params, Targets


def targetNodes(targets: Targets):
    """
    Defines target nodes according to configuration parameters.

    Args:
        targets: Analytics targets
    """
    yield from tuple()


def mandatoryNodes(params: Params, targets: Targets):
    """
    Defines mandatory nodes.

    Args:
        params: Analytics parameters
        targets: Analytics targets
    """

    yield from (NodeAnimal,)


def getDocumentation() -> str:
    """Get analytics documentation"""
    with open(Path(__file__).parent / "spec.html") as f:
        return f.read()
The estimator.py module in contains animals estimator responsible for image preprocessing, processing and neural model results postprocessing.
from enum import Enum, auto
from pathlib import Path
from typing import Iterable

import numpy as np
from PIL import Image
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper


class KnownAnimals(Enum):
    """Class of known animals that might be detected by the model."""

    bird = auto()
    cat = auto()
    dog = auto()
    horse = auto()
    sheep = auto()
    cow = auto()
    elephant = auto()
    bear = auto()
    zebra = auto()
    giraffe = auto()


class AnimalEstimator:
    """Animal detector model."""

    def __init__(self, onnxWrapper: OnnxruntimeWrapper):
        self.session = onnxWrapper
        self.track = None
        classesPath = Path(__file__).absolute().parent / "data" / "coco_classes.txt"
        self.labels = [line.rstrip("\n") for line in open(classesPath)]

    @staticmethod
    def letterboxImage(image: Image, size: tuple[int, int]) -> Image:
        """
        Resize image with unchanged aspect ratio using padding

        Args:
            image (Image): The input image to be resized.
            size (tuple[int, int]): The target size for the resized image.
        Returns:
            Image: The resized image with unchanged aspect ratio.
        """
        imageWidth, imageHeight = image.size
        width, height = size
        scale = min(width / imageWidth, height / imageHeight)
        scaledWidth = int(imageWidth * scale)
        scaledHeight = int(imageHeight * scale)

        image.resize((scaledWidth, scaledHeight), Image.BICUBIC)
        newImage = Image.new("RGB", size, (128, 128, 128))
        newImage.paste(
            image, ((width - scaledWidth) // 2, (height - scaledHeight) // 2)
        )
        return newImage

    def preprocess(self, image: Image) -> tuple[np.ndarray, np.ndarray]:
        """
        Preprocess the image for the model format.

        Args:
            image (Image): The input image to be processed.
        Returns:
            tuple[np.ndarray, np.ndarray]: The preprocessed image data and the original image size.
        """
        modelImageSize = (416, 416)
        boxedImage = self.letterboxImage(image, modelImageSize)
        imageData = np.array(boxedImage, dtype="float32")
        imageData /= 255.0
        imageData = np.transpose(imageData, [2, 0, 1])
        imageData = np.expand_dims(imageData, 0)
        imageSize = np.array([image.size[1], image.size[0]], dtype="float32").reshape(
            1, 2
        )
        return imageData, imageSize

    def postprocess(self, indices: Iterable) -> bool:
        """
        Postprocess the model output to determine if an animal was detected.

        Args:
            indices (Iterable): The indices of the detected objects.
        Returns:
            bool: Whether any of the detected objects is an animal.
        """
        for index in indices:
            if self.labels[index[1]] in KnownAnimals._member_names_:
                return True
        return False

    async def forward(self, image: Image) -> bool:
        """
        Prepares image data for the model, forward the prepared data to the model, and postprocesses the model output,
        returning the finite state whether an animal was detected in the image.

        Args:
            image (Image): The input image to be processed.
        Returns:
            bool: Whether an animal was detected in the image.
        """

        imageData, imageSize = self.preprocess(image)
        _, _, indices = await self.session.forward(
            {"input_1": imageData, "image_shape": imageSize}
        )
        return self.postprocess(indices)

The models.py module contains analytics result models.
from typing import Any

import attr
from luna_analytics_manager.templates.sdk import SDKAnalyticsTargets
from pydantic import Field
from typing_extensions import Annotated, Literal
from video_analytics_helpers.containers import AggregatedEvent, Event as BaseEvent, Overview
from video_analytics_helpers.models import (
    EventAnalyticCallback,
    HttpAnalyticCallback,
    WSAnalyticCallback
)
from vlutils.structures.pydantic import BaseModel


@attr.dataclass(slots=True)
class AggregatedAnimalDetection(AggregatedEvent[Overview]):
    """Aggregated animal detection event result"""
    eventType: str = "animal_detection"
    animalStatus: str = "appeared"


@attr.dataclass(slots=True)
class AnimalDetection(BaseEvent[AggregatedAnimalDetection]):
    """Detection event result"""

    eventType: str = "animal_detection"


@attr.dataclass(slots=True)
class FrameRes:
    """Analytics result. Contains all the events detected on a single frame."""

    events: list[AnimalDetection]

    def asDict(self):
        """Build results in API format."""
        return {"events": [event.asDict() for event in self.events]}


@attr.dataclass(slots=True)
class AnalyticsRes:
    """Analytics aggregated result."""

    def asDict(self):
        """Build results in API format."""
        return {}


class Params(BaseModel):
    """Parameters of analytic"""

    @classmethod
    def load(cls, obj: Any, *args, **kwargs):
        return cls.model_validate(obj, *args, **kwargs) if obj is not None else cls()


class Targets(SDKAnalyticsTargets[Literal["animal_detector"]]):
    """Targets of analytic"""

    _default = {"animal_detector"}


Callbacks = Annotated[
    list[
        Annotated[
            HttpAnalyticCallback | WSAnalyticCallback | EventAnalyticCallback,
            Field(discriminator="type"),
        ]
    ],
    Field(max_length=10),
]
The __init__.py contains all main objects and classes need for analytic to work.
from video_analytics_helpers.luna_wraps import (EventMetadata,
                                                LunaApiExternalParams)
from video_analytics_helpers.luna_wraps import LunaWraps as PostProcessing

from .analytics import AnalyticsCtx, FrameCtx, initialize
from .common import getDocumentation, mandatoryNodes, targetNodes
from .models import Callbacks, Params, Targets
spec.yml
openapi: 3.0.0
info:
  version: 'v.0.0.1'
  title: 'Animal Detector Analytics'
  description: |
     `Animal Detector Analytics` is intended for determination of:
        - animals on the frame;
        

     The `animal_detector_analytics` analytics decode stream frames
     from <a href="#tag/animal-detector-analytics/paths/~animal_detector_analytics/post">stream creation request analytic parameters</a>
     and performs the following actions:
        - Detect animals appearance or disappearance;
        - Executes callbacks once animal appeared on a video, or disappears from it.

      The schema of data sending by `callback` described as stream creation request callback.

components:
  schemas:
    error:
      type: object
      properties:
        error_code:
          type: integer
          description: Error code.
        desc:
          type: string
          description: Short error description.
        detail:
          type: string
          description: Error details.
        link:
          type: string
          description: Link to the documentation website with the error description.
      required: [error_code, detail, desc, link]
      example:
        error_code: 1
        detail: internal server error
        desc: internal server error
        link: "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-1"

    callback_base:
      type: object
      properties:
        enable:
          type: integer
          enum: [0, 1]
          default: 1
          description: Whether callback enabled or not.

    callback_ws:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [luna-ws-notification]
              description: Event will be sent with `animal_detector` type.
      required: [type]

    callback_basic_authorization:
      type: object
      properties:
        type:
          type: string
          description: Authorization type.
          enum: [basic]
        login:
          type: string
          maxLength: 128
          description: Login.
        password:
          type: string
          maxLength: 128
          description: Password.
      required: [ type, login, password]
      description: Callback basic authorization parameters.

    callback_http:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [http]
              description: Event will be sent to http url.
            url:
              type: string
              description: Callback url.
            authorization:
              $ref: '#/components/schemas/callback_basic_authorization'
            params:
              type: object
              properties:
                timeout:
                  type: integer
                  default: 60
                  description: Callback request timeout.
                content_type:
                  type: string
                  default: application/json
                  enum: [application/json]
                  description: Callback request content type.
                headers:
                  type: object
                  description: Callback request headers.
                  additionalProperties: true
              description: Callback request parameters
      required: [url, type]

    callback:
      oneOf:
        - $ref: '#/components/schemas/callback_http'
        - $ref: '#/components/schemas/callback_ws'
      discriminator:
        propertyName: type
        mapping:
          http: '#/components/schemas/callback_http'
          luna-ws-notification: '#/components/schemas/callback_ws'

    parameters:
      type: object
      description: |
        Analytic parameters for stream processing.

        There are default analytic parameters which will be applied for stream processing if no one specified here.
      properties:
        targets:
          type: array
          default: [animal_detector]
          items:
            type: string
            enum: [animal_detector]
          description: |
            Estimations to perform on the video.

        callbacks:
          type: array
          description: |
            Callbacks parameters.

            `http` type callback sends events to the specified url by POST request.
            `luna-ws-notification` type callback sends events to websocket using sender.

          maxItems: 10
          items:
            $ref: '#/components/schemas/callback'

        parameters:
          type: object
          description: Estimation parameters.
          nullable: true

    stream:
      type: object
      description: |
        Full stream creation schema is available in `Luna-Video-Manager` documentation.

        The presented schema described only analytics parameters.
      properties:
        data:
          type: object
          description: stream data
          additionalProperties: true
        analytics:
          type: array
          description: analytics list
          items:
            type: object
            description: analytics
            properties:
              analytic_name:
                type: string
                maxLength: 36
                pattern: '^[a-zA-Z0-9_\-]{1,36}$'
                description: Analytic name.
                enum: [animal_detector_analytics]
              parameters:
                type: object
                $ref: '#/components/schemas/parameters'
            required: [analytic_name]
      additionalProperties: true
      required: [data, analytics]

    uuid:
      type: string
      format: uuid
      pattern: '^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
      example: "557d54ec-29ad-4f3c-93b4-c9092ef12515"

    callback_request:
      type: object
      properties:
        event_type:
          type: string
          enum: [animal_detection]
          description: Event type.
        account_id:
          allOf:
            - $ref: '#/components/schemas/uuid'
          description: Account id of event.
        event_create_time:
          type: string
          description: Event creation time.
        event_end_time:
          type: string
          description: Event end time.
        event:
          type: object
          description: Event.
          properties:
            stream_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Stream ID.
            track_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Track id.
              nullable: true
            track_status:
              type: string
              enum: [started, finished]
              description: Status describes whether track started or finished.

          required: [stream_id, track_id]
      required: [event_type, event, account_id, event_create_time, event_end_time]
paths:
  /animal_detector_analytics:
    post:
      tags:
        - animal detector analytics
      summary: stream creation
      description: |
        Stream creation superficial request with detailed analytics parameters description.
        
        Full request description (exclude for this analytics description) available at `Luna-Video-Manager` documentation.
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/stream'
            example:
              data:
                reference: rtsp://stream_url
                type: stream
                downloadable: false
              analytics:
                - analytic_name: animal_detector_analytics
                  parameters:
                    parameters:
                      probe_count: 1
                    callbacks:
                      - url: "http://127.0.0.1:8001"
                        type: http
                      - url: "http://127.0.0.1:8002"
                      - type: luna-ws-notification
                    targets:
                      - animal_detector
      callbacks:
        onData:
          '{$request.body.callbacks.*.url}':
            post:
              requestBody:
                description: subscription payload
                content:
                  application/json:
                    schema:
                      $ref: '#/components/schemas/callback_request'

The *spec.html* file contains openapi documentation with analytics parameters description in html format.

Lambda agent with animal analytics example file structure (the case when analytics includes in lambda as module)
├── animal_detector_analytics
│  ├── analytics.py
│  ├── common.py
│  ├── config.conf
│  ├── data
│  │ ├── coco_classes.txt
│  │ └── yolov3-10.onnx
│  ├── estimator.py
│  ├── __init__.py
│  ├── models.py
│  ├── spec.html
│  └── spec.yml
├── lambda_main.py
├── poetry.lock
└── pyproject.toml

In this case, the analytics dependencies must be included into lambda dependencies. The lambda dependencies described as pyproject.toml file:

*pyproject.toml*
pyproject.toml
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "lambda agent"
version = "0.0.1"
description = "lambda agent"
authors = [ "VisionLabs", ]

[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"

[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"

[tool.poetry.dependencies]
python = "^3.13"
numpy = "*"
luna-lambda-tools = "*"
pillow = "*"
video_analytics_helpers = "*"
onnxruntime = "^1.22.1"