Agent lambda development

Here is agent lambda development description.

More information about lambda types and differences of agent and others available at lambda types description.

More information about LUNA videoanalytics, in particular, luna-video-manager, video-agents and their interaction see its’ developers manuals.

Agent lambda requirements

Agent lambda has several requirements to addition with basic requirements:

  • Luna Video Manager available by credentials from Luna-Configurator

  • Luna Licenses available by credentials from Luna-Configurator

  • Luna Events available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Events usage

  • Luna Sender available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Sender usage

  • Luna Faces/Images Samples Store available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Image-Store usage

Agent lambda configuration

The agent lambda required several settings from luna-configurator, whose can be separated to several groups:

  • LUNA_LAMBDA_AGENT_UNIT_LOGGER - lambda logger settings

  • luna-services addresses and timeouts settings (for example, LUNA_EVENTS_ADDRESS and LUNA_EVENTS_TIMEOUTS will be used by lambda to make requests to luna-events service)

  • ADDITIONAL_SERVICES_USAGE setting will be used to determine which luna-services can be used by the lambda (the lambda will not check connection to disabled services and will raise an error if user try to make request to such service)

  • LUNA_LAMBDA_AGENT_UNIT_ANALYTICS_SETTINGS - lambda analytics-specific settings, for example, settings of which device must be used by one or another analytics - CPU or GPU

  • LUNA_LAMBDA_AGENT_UNIT_VIDEO_SETTINGS/LUNA_LAMBDA_AGENT_UNIT_RUNTIME_SETTINGS - lambda video processing settings

External agent

Agent can be started as external service by setting up agent creation parameters. In this regime agent will be treated as external to the main platform service and all communication with platform will be performed via luna-api service. Running agent in this regime requires luna account credentials or access token set via creation parameters. Note, that used token must be provided with specific permissions to work properly. Minimal set of permissions:

{
    "event": ["creation"],
    "image": ["creation"],
    "video_stream": ["creation"],
    "video_analytic": ["view", "creation", "modification"],
    "video_agent": ["creation", "deletion"],
}

Things to note

  • Services luna-api, luna-licenses and luna-configurator services must be available for external lambda agent

  • If some of the analytics were previously created by internal agent or external agent with other account id, agent will not start, as analytics update will fail. To overcome this analytics must be removed manually

  • Only luna-api client will be available for use in custom routes

Read luna-video-agent development documentation for more information

Agent lambda development

Lambda agent works with analytics, each lambda agent must have at least one video analytics.

It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The lambda_main.py and other configuration files will not differ in this cases.

The description below describes the case when analytics represents itself separate package, for description in the case of analytics is a module in lambda see analytics development chapter.

The agent lambda development in the easiest way consists of it’s configuration by specifying several parameters in lambda_main.py file.

Warning

The names of analytics, features, etc. listed below are fictitious and have nothing in common with the names of real analytics, etc.

The only required variable which must be specified is a list of available analytics, which also must be specified as lambda requirements, for example:

lambda_main.py
    #: list of analytics for agent lambda
    AVAILABLE_ANALYTICS = ["people"]
requirements.txt
    people_analytics==1.2.0

There are some more parameters which can also be relevant:

The ANALYTICS_LICENSE_FEATURES is a map with analytics and their licensing feature names. If analytics not required licensing, it must not be included into this map. If no one analytics must depends on licensing, this map must be empty or may not be included in lambda_main.py.

The ESTIMATORS_TO_INITIALIZE and ESTIMATORS_TARGETS are lists of SDK estimators and their targets which must initialized before analytics usage. If no one analytics not required SDK estimators usage, this lists must be empty or may not be included in lambda_main.py.

Note

In this example lambda has people_analytics package of version 1.2.0 as requirement, which represents video analytics package with analytics named people. The usage of this analytics is regulated by license feature named people_feature. It is requires the estimator from luna-sdkloop package named people and the people target estimator to be initialized before lambda starts.

lambda_main.py
    from sdk_loop.enums import Estimators, LoopEstimations

    #: list of analytics for agent lambda
    AVAILABLE_ANALYTICS = ["people"]
    #: map with analytics and their license features
    ANALYTICS_LICENSE_FEATURES = {"people": "people_feature"}
    #: list of estimators for initialization
    ESTIMATORS_TO_INITIALIZE = [Estimators.people]
    #: list of estimators targets for initialization
    ESTIMATORS_TARGETS = [LoopEstimations.people]
requirements.txt
    people_analytics==1.2.0

There are some more settings which can be used for agent lambda in lambda_main.py file:

lambda_main.py
    # count of video-agent registration attempt in *luna-video-manager*
    REGISTRATION_ATTEMPT_COUNT = 5
    # delay between registration attempts
    REGISTRATION_ATTEMPT_DELAY = 1
    # max size of feedback which can be sent to *luna-video-manager* at one time
    FEEDBACK_MAX_BATCH_COUNT = 1000
    # the code of response for ws-connections when agent shutdown
    WSCODE_GOING_AWAY = 1001
    # the name of lambda video-agent which will be registered in *luna-video-manager*
    AGENT_NAME = "luna-lambda-great-video-agent"
    # the description of lambda video-agent which will be registered in *luna-video-manager*
    AGENT_DESCRIPTION = "luna-lambda-greatest-video-agent"

The are many other possibilities to modify such lambda agent behavior, it’s interaction with analytics and so on, which can be realized in the same way as regular luna-video-agent, for more information, see luna-video-agent developers manual.

Agent video analytics development

It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The description below describes analytics module, which can then be included as a module or package.

Note

The complete for user video analytics development guide is in development and will be included in future releases.

In presented example(s) the poetry tool using for dependencies management, so the pyproject.toml file is filled by user and the poetry.lock file must be generated using poetry tool.

The example of video analytics which detect suits on video/streams and generate events if suits appears.

Suit analytics description

How it works: The analytics processes video frames through a ResNet-50 neural network to classify whether a person is wearing a suit. It supports both CPU and GPU inference, with configurable frame processing rates and region-of-interest (ROI) analysis.

When events are generated: Events are generated based on configurable policies: - Start trigger (default): Events when a suit is first detected - End trigger: Events when suit detection ends - Periodic trigger: Events at regular intervals during detection

By default event will be generated when suit is detected.


{

“account_id”: “string”, “event_create_time”: “string”, “event_end_time”: “string”, “event_type”: “suit”, “event”: {

“stream_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “event_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “track_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “source”: “string”, “name”: “suit”, “overview”: {

“image”: “string”, “time_offset”: 1.234

}, “location”: {

“city”: “Moscow”, “area”: “Central”, “district”: “Basmanny”, “street”: “Podsosensky lane”, “house_number”: “23 bldg.3”, “geo_position”: {

“longitude”: 36.616, “latitude”: 55.752

}

}, “video_segment”: {

“start_time_offset”: 0.123, “end_time_offset”: 1.234

}

}

}

Note

The example below uses image classification model (“resnet50-v2-7.onnx”) and json file (“imagenet-simple-labels.json”) from github. To make this example works it needs to get this files from github and place into data folder of lambda agent archive.

Suit analytics example file structure itself
  ├──__init__.py
  ├──analytics.py
  ├──classes.py
  ├──common.py
  ├──models.py
  ├──spec.html
  ├──spec.yml
  └──data
     ├──resnet50-v2-7.onnx
     └──imagenet-simple-labels.json
  └──suit_nodes
     ├──__init__.py
     ├──events.py
     ├──suit_estimator.py
     └──suid_nodes.py

Note

The video analytics requires some dependencies which are available on visionlabs pypi for local run. All provided examples use visionlabs pypi specification at tool.poetry.source section of pyproject.toml file which make possible to install dependencies from the above source. It is also required HASP license provided by VisionLabs. For agent lambda creation, all required video analytics dependencies already included in base image (which is using for agent lambda building).

If it needs to build this analytics as separate package, it must include its’ dependencies for example using pyproject.toml/poetry.lock files:

pyproject.toml
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "suit-analytics"
version = "0.0.4"
description = "suit analytics"
authors = [ "VisionLabs", ]

[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"

[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"

[tool.setuptools.package-data]
analytics-docs = ["spec.html"]
onnx = ["data/resnet50-v2-7.onnx"]

[tool.poetry.dependencies]
python = "^3.12"
luna-analytics-manager = {version="*", extras=["sdk"]}
opencv-python-headless = "*"
nvidia-curand-cu11 = "*"
nvidia-cufft-cu11 = "*"
onnxruntime-gpu = "*"
pynvcodec = "^2.6.0"

After dependencies resolving it must be built into a packages (for example, using poetry build).

The *__init__.py* module must contains all presented imported objects and classes to make analytics works.
__init__.py
"""Suit analytics."""

from video_analytics_helpers.luna_wraps import EventMetadata, LunaApiExternalParams, LunaWraps as PostProcessing

from .analytics import AnalyticsCtx, FrameCtx, initialize
from .common import getDocumentation, mandatoryNodes, targetNodes
from .models import Callbacks, Params, Targets
The *analytics.py* module contains analytics initialization, frame and analytics contexts.
analytics.py
"""Suit analytics."""

from logging import Logger
from pathlib import Path

import attr
import numpy as np
import onnxruntime as ort
from luna_analytics_manager.base import FrameEnviron
from luna_analytics_manager.templates.sdk import SDKAnalyticsCtx, SDKFrameCtx
from lunavl.sdk.image_utils.image import VLImage
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper

from .classes import AnalyticsRes, FrameRes
from .models import Callbacks, Params, Targets

logger = Logger("luna.suit")


async def initialize(dataPath: str | Path | None = None, **kwargs) -> None:
    """
    Initialize analytics onnx wrapper

    Args:
        dataPath: Path to the data directory
        **kwargs: Additional keyword arguments

    Returns:
        None
    """
    if dataPath is None:
        configPrefix = Path(__file__).absolute().parent / "data"
    elif isinstance(dataPath, str):
        configPrefix = Path(dataPath)
    else:
        configPrefix = dataPath

    configPrefix / "imagenet-simple-labels.json"
    runtimeSettings = kwargs.get("runtime_settings", {})
    options = ort.SessionOptions()
    options.intra_op_num_threads = runtimeSettings.get("num_threads", 6)

    backend = (
        [("CUDAExecutionProvider", {"device_id": 0})]
        if runtimeSettings.get("device_class", "cpu") == "gpu"
        else ["CPUExecutionProvider"]
    )

    globalSession = ort.InferenceSession(
        (configPrefix / "resnet50-v2-7.onnx").as_posix(),
        sess_options=options,
        providers=backend,
    )
    globalSession.set_providers(backend)
    onnxWrapper = OnnxruntimeWrapper(options.intra_op_num_threads, globalSession)

    AnalyticsCtx.initialize(onnxWrapper)


@attr.dataclass(slots=True)
class FrameCtx(SDKFrameCtx):
    """Frame processing context."""

    processed: FrameRes | None = None


@attr.dataclass(slots=True)
class AnalyticsCtx(SDKAnalyticsCtx):
    """Analytics context."""

    name: str = "suit"
    params: Params = attr.field(factory=Params)
    targets: Targets = attr.field(factory=Targets)
    callbacks: Callbacks = attr.field(factory=Callbacks)
    aggregated: AnalyticsRes = attr.field(factory=AnalyticsRes)

    @classmethod
    def initialize(cls, onnxWrapper: OnnxruntimeWrapper) -> None:
        """Initialize analytics context"""
        cls.onnxWrapper = onnxWrapper

    async def prepareFrameCtx(self, frame: FrameEnviron[np.ndarray, VLImage]) -> FrameCtx:
        """Prepare frame processing context."""

        return FrameCtx(
            image=frame.frame,
            number=frame.raw.number,
            timestamp=frame.raw.timestamp,
            lastFrame=frame.raw.lastFrame,
        )
The *classes.py* module contains frame and analytics results containers description.
classes.py
"""Analytics structures."""

import attr

from .suit_nodes.events import SuitEvent


@attr.dataclass(slots=True)
class FrameRes:
    """Analytics result on a single frame."""

    events: list[SuitEvent]

    def asDict(self):
        """Build results in API format."""

        return {"events": [event.asDict() for event in self.events]}


@attr.dataclass(slots=True)
class AnalyticsRes:
    """Analytics aggregated result."""

    def asDict(self):
        """Build results in API format."""

        return {}
The *common.py* module contains analytics target, mandatory nodes and `getDocumentation` function which returns raw html analytics documentation for user.
common.py
"""Analytics setup."""

from pathlib import Path

from .models import Params, Targets
from .suit_nodes.suit_nodes import NodeOverview, NodeSuit

NODES = {NodeOverview, NodeSuit}
_DOCS_PATH = Path(__file__).parent / "spec.html"


def targetNodes(targets: Targets):
    """Pre-select nodes by user-defined targets."""

    yield from (node for node in NODES if node.name in targets)  # type: ignore


def mandatoryNodes(params: Params, targets: Targets):
    """Pre-select mandatory nodes according to configuration parameters."""
    yield from (NodeSuit,)


def getDocumentation() -> str:
    """Get analytics documentation"""
    with open(_DOCS_PATH) as f:
        return f.read()
The *models.py* module contains analytics parameters models.
models.py
"""Analytics models."""

from typing import Annotated, Literal

from annotated_types import Ge
from luna_analytics_manager.templates.mixins.frame_frequency_mixins import RateTime
from luna_analytics_manager.templates.models import WrapUnionError
from luna_analytics_manager.templates.sdk import SDKAnalyticsTargets
from pydantic import Field, Strict
from video_analytics_helpers.models import EventAnalyticCallback, HttpAnalyticCallback, WSAnalyticCallback
from video_analytics_helpers.params import BaseParams
from vlutils.structures.pydantic import BaseModel


class RateFrame(BaseModel):
    """Rate frame model. Sets the rate of frames to be processed."""

    period: Annotated[int, Ge(1), Strict()] = 1
    unit: Literal["frame"] = "second"


class OnetimeEventPolicyEnd(BaseModel):
    """Onetime event policy end model. Event will be triggered at the end of the track."""

    trigger: Literal["end"] = "end"


class OnetimeEventPolicyStart(BaseModel):
    """Onetime event policy start model. Event will be triggered at the start of the track."""

    trigger: Literal["start"] = "start"


class PeriodEventPolicy(BaseModel):
    """Period event policy model. Event will be triggered at the specified interval."""

    trigger: Literal["period"]
    interval: Annotated[float, Ge(0.0), Strict()]


class Params(BaseParams):
    """Params of analytic"""

    rate: Annotated[RateFrame | RateTime, WrapUnionError, Field(discriminator="unit", default_factory=RateFrame)]
    eventPolicy: Annotated[
        OnetimeEventPolicyStart | OnetimeEventPolicyEnd | PeriodEventPolicy,
        Field(discriminator="trigger", default_factory=OnetimeEventPolicyStart),
    ]


class Targets(SDKAnalyticsTargets[Literal["suit", "overview"]]):
    """Targets of analytic"""

    _default = {"suit", "overview"}


Callbacks = Annotated[
    list[Annotated[HttpAnalyticCallback | WSAnalyticCallback | EventAnalyticCallback, Field(discriminator="type")]],
    Field(max_length=10),
]
The *__init__.py* module in *suit_nodes* path is empty.
__init__.py

The *events.py* module in *suit_nodes* path contains events and track structures.
events.py
import attr
from video_analytics_helpers.containers import (
    AggregatedEvent as BaseAggregatedEvent,
    Event as BaseEvent,
    Overview,
    Track as BaseTrack,
)


@attr.dataclass(slots=True)
class Suit:
    """Event frame result."""

    name: str = ""

    def asDict(self):
        """Build results in API format."""
        return {"name": self.name}


@attr.dataclass(slots=True)
class AggregatedSuitEvent(BaseAggregatedEvent[Overview]):
    """Aggregated suit event."""

    eventType: str = "suit"
    suit: Suit | None = None

    def asDict(self):
        """Build results in API format."""

        result = super().asDict()
        if self.suit is not None:
            result.update(self.suit.asDict())
        return result


@attr.dataclass(slots=True)
class SuitEvent(BaseEvent[AggregatedSuitEvent]):
    """Suit event result."""

    eventType: str = "suit"
    suit: Suit | None = None

    def asDict(self):
        """Build event data in API format."""
        res = super().asDict()
        if self.suit is not None:
            res.update(self.suit.asDict())
        return res


class SuitTrack(BaseTrack[SuitEvent, AggregatedSuitEvent]):
    """Analytics track."""
The *suit_estimator.py* module in *suit_nodes* path contains suit estimator responsible for image preprocessing, processing and neural model results postprocessing.
suit_estimator.py
from pathlib import Path

import cv2
import numpy as np
import ujson
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper


class SuitClassifier(object):
    """
    Suit estimator

    onnxWrapper: Wrapper around ONNX Runtime.
    crop_size: Image crop size. Defaults to 224.
    crop_type: Type of cropping strategy.

    """

    def __init__(
        self,
        onnxWrapper: OnnxruntimeWrapper,
        crop_size: int = 224,
        crop_type: str = "full_frame",
    ):
        self.onnxWrapper = onnxWrapper
        self.onnx_input_keys = [t.name for t in self.onnxWrapper.onnxSession.get_inputs()]

        self.is_new_id = dict()

        self.crop_size = crop_size
        self.crop_type = crop_type

        self.suit_id = dict()
        self._labels = None

    async def crop_and_scale(self, img_rgb):
        """Crops and resizes the input RGB image.

        Args:
            img_rgb: Input image in RGB format.

        Raises:
            Exception: If `crop_type` is not supported.

        Returns:
            numpy.ndarray: Cropped and resized RGB image.
        """
        assert img_rgb.shape[2] == 3
        img_shape = img_rgb.shape[:2]
        if self.crop_type == "center_crop":
            min_side = min(img_shape[0], img_shape[1])
            y_0, x_0 = (img_shape[0] - min_side) // 2, (img_shape[1] - min_side) // 2
            y_1, x_1 = y_0 + min_side, x_0 + min_side
            crop = img_rgb[y_0:y_1, x_0:x_1]
        elif self.crop_type == "full_frame":
            crop = img_rgb.copy()
        else:
            raise Exception(f"Unknown type of crop: {self.crop_type}")

        crop_resized = await self.onnxWrapper.runAsync(cv2.resize, crop, (self.crop_size, self.crop_size))
        return crop_resized

    async def preproc(self, img_rgb):
        """Preprocesses the image for ONNX model input.
        Args:
            img_rgb: Input RGB image.

        Returns:
            numpy.ndarray: Preprocessed image in NCHW format.
        """
        crop_prepared = await self.crop_and_scale(img_rgb)
        crop_prepared = crop_prepared.astype(np.float32) / 255.0
        crop_prepared = (crop_prepared - 0.5) / 0.5
        batch = crop_prepared.reshape((1,) + crop_prepared.shape)
        batch = batch.transpose(0, 3, 1, 2)
        return np.ascontiguousarray(batch)

    async def forward(self, img_rgb):
        """Runs a forward pass through the ONNX model.

        Args:
            img_rgb: Input RGB image, shape (H, W, 3).

        Returns:
            numpy.ndarray: Raw prediction scores for each class.
        """
        batch_numpy = await self.preproc(img_rgb)
        input_tensors_list = {self.onnx_input_keys[0]: batch_numpy}

        net_outs = await self.onnxWrapper.forward(input_tensors_list)
        return net_outs[0][0]

    def get_labels(self):
        """Loads class labels from a JSON file.

        Returns:
            numpy.ndarray: Array of labels as strings.
        """
        if self._labels is None:
            with open(Path(__file__).parent.parent / "data" / "imagenet-simple-labels.json") as f:
                data = ujson.load(f)
            self._labels = np.asarray(data)
        return self._labels

    async def run(self, img_rgb, cam_id):
        """Classifies the input image for a specific camera ID.

        Tracks camera IDs to differentiate between sources.

        Args:
            img_rgb: Input RGB image.
            cam_id: Identifier for the camera.

        Returns:
            Estimated class label.
        """
        if not cam_id in self.suit_id:
            self.suit_id[cam_id] = 0
            self.is_new_id[cam_id] = True

        preds = await self.forward(img_rgb)
        idx = np.argmax(preds)
        return self.get_labels()[idx]
The *suit_nodes.py* module in *suit_nodes* path contains suit nodes structures.
suit_nodes.py
import uuid

from luna_analytics_manager import Node
from luna_analytics_manager.sync import Syncer2 as Syncer
from luna_analytics_manager.templates.image_helpers.roi import getROIRect
from luna_analytics_manager.templates.models import EventStatus, VideoSegment
from video_analytics_helpers.frame_base_analytics import BaseFrameAnalyticsNode
from video_analytics_helpers.node_collections import NodeOverview as NodeOverviewBase

from ..analytics import AnalyticsCtx, FrameCtx
from ..classes import FrameRes
from .events import AggregatedSuitEvent, Overview, Suit, SuitEvent, SuitTrack
from .suit_estimator import SuitClassifier


class NodeSuit(
    BaseFrameAnalyticsNode[
        FrameCtx,
        AnalyticsCtx,
        SuitEvent,
        AggregatedSuitEvent,
        FrameRes,
        Suit,
        SuitTrack,
    ]
):
    """Suit Analytics node."""

    name = "suit"
    requires: list[Node] = []
    frameResCls = FrameRes
    # class for tracks
    trackCls = SuitTrack

    def __init__(self):
        super().__init__()
        self.frameSyncer = Syncer()
        self.track = None
        self.trackLength = 0

        self.suitEstimator = SuitClassifier(onnxWrapper=AnalyticsCtx.onnxWrapper)

    def buildEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> tuple[SuitEvent, AggregatedSuitEvent]:
        """Generate current event and aggregated event."""

        event = SuitEvent(suit=suit, timeOffset=frameCtx.timestamp)
        aggregatedEvent = AggregatedSuitEvent(
            videoSegment=VideoSegment(frameCtx.timestamp, frameCtx.timestamp),
            suit=suit,
            overview=Overview(frameCtx.timestamp, frameCtx.image),
        )
        event.eventId = aggregatedEvent.eventId = str(uuid.uuid4())
        return event, aggregatedEvent

    def updateEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """Update current event with new frame."""

        event = self.track.currentEvent
        event.suit = suit
        event.timeOffset = frameCtx.timestamp
        event.eventStatus = EventStatus.inProcess

    def updateAggregatedEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """Update current aggregated event with new frame."""

        aggregatedEvent = self.track.currentAggregatedEvent
        if aggregatedEvent.videoSegment is None:
            aggregatedEvent.videoSegment = VideoSegment(frameCtx.timestamp, frameCtx.timestamp)
        else:
            aggregatedEvent.videoSegment.endTimeOffset = frameCtx.timestamp

        if aggregatedEvent.suit is None:
            aggregatedEvent.suit = suit

    async def estimate(self, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> Suit | None:
        """
        Estimate suit on frame.
        Args:
            frameCtx: current frame context
            ctx: analytic context

        Returns:
            Detected suit label
        """
        img_rgb = frameCtx.image.asNPArray()  # check if it is really RGB format

        if ctx.params.roi:
            roi = getROIRect(frameCtx.image.rect, ctx.params.roi)
            img_rgb = img_rgb[roi.top : roi.bottom, roi.left : roi.right]

        camera_id = "0"
        detectedObj = await self.suitEstimator.run(img_rgb, camera_id)
        return Suit(name=detectedObj)

    def isTrackNeedEnd(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
        """
        Determinate that event is needed to end or not after current frame processing.

        If function return `True`, track will be stopped. If function return `False` track will be started or continue.
        """
        return suit.name != "suit"


class NodeOverview(NodeOverviewBase[FrameCtx, AnalyticsCtx]):
    """Overview node."""

    requires = [NodeSuit]
The *spec.yml* file contains openapi documentation with analytics parameters description in yaml format.
spec.yml
openapi: 3.0.0
info:
  version: 'v.0.0.1'
  title: 'Suit Analytics'
  description: |
     `Suit Analytics` is intended for determination of:
        - suit_class parameter;
        

     The `suit` analytics decode stream frames taking into account `rate` parameter
     from <a href="#tag/suit-analytics/paths/~1suit_analytics/post">stream creation request analytic parameters</a>
     and performs the following actions:
        - Calculates suit class
        - For each callback check it's conditions and execute callbacks, which fit conditions.

      The schema of data sending by `callback` described as stream creation request callback.

components:
  schemas:
    error:
      type: object
      properties:
        error_code:
          type: integer
          description: Error code.
        desc:
          type: string
          description: Short error description.
        detail:
          type: string
          description: Error details.
        link:
          type: string
          description: Link to the documentation website with the error description.
      required: [error_code, detail, desc, link]
      example:
        error_code: 1
        detail: internal server error
        desc: internal server error
        link: "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-1"

    int01:
      type: integer
      enum: [0,1]

    roi_int_coordinates:
      type: integer
      minimum: 0
      default: 0
      maximum: 65536
      example: 3327

    roi_float_percent:
      type: number
      format: float
      minimum: 0
      default: 0.0
      maximum: 100.0
      example: 87.4

    roi_float_percent_size:
      type: number
      format: float
      minimum: 0.00001
      maximum: 100.0
      example: 66.3

    roi_abs:
      type: object
      properties:
        x:
          $ref: '#/components/schemas/roi_int_coordinates'
        y:
          $ref: '#/components/schemas/roi_int_coordinates'
        width:
          $ref: '#/components/schemas/roi_int_coordinates'
        height:
          $ref: '#/components/schemas/roi_int_coordinates'
        mode:
          type: string
          enum: [abs]
          example: "abs"
          description: Coordinates and size are set in pixels.
      required: [x, y, width, height, mode]

    roi_percent:
      type: object
      properties:
        x:
          $ref: '#/components/schemas/roi_float_percent'
        y:
          $ref: '#/components/schemas/roi_float_percent'
        width:
          $ref: '#/components/schemas/roi_float_percent_size'
        height:
          $ref: '#/components/schemas/roi_float_percent_size'
        mode:
          type: string
          enum: [ percent ]
          example: "percent"
          description: Coordinates and size are set in percentage.
      required: [x, y, width, height, mode]

    roi:
      oneOf:
        - $ref: '#/components/schemas/roi_abs'
        - $ref: '#/components/schemas/roi_percent'
      description: |
          Region of interest on a frame. Boundaries of the area are described in `x`, `y` coordinates
          of the top left point and `width`, `height` properties
          **Region must not be any bigger than the original frame**

          The region of interest will be sent to estimator. The smaller the `roi`, the smaller the area the estimator
          will process and, accordingly, work faster.

    number01:
      type: number
      minimum: 0
      maximum: 1

    rate:
      type: object
      description: |
        Analytic rate execution determines on which frames the analytics will be launched.

        Analytic rate execution can be configured in next ways:
        - execute analytic on each Nth frame (`unit` - `frame`)
        - execute analytic on each frame corresponding to the Nth second (`unit` - `second`)
      properties:
        period:
          type: float
          minimum: 0
          default: 1
          description: Period length.
        unit:
          type: string
          default: second
          enum:
            - frame
            - second
          description: Unit for a period calculation (every `n` seconds or every `n` frames).
      required: [ unit, period ]

    callback_base:
      type: object
      properties:
        enable:
          type: integer
          enum: [0, 1]
          default: 1
          description: Whether callback enabled or not.

    callback_ws:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [luna-ws-notification]
              description: Event will be sent with `suit` type.
      required: [type]

    callback_basic_authorization:
      type: object
      properties:
        type:
          type: string
          description: Authorization type.
          enum: [basic]
        login:
          type: string
          maxLength: 128
          description: Login.
        password:
          type: string
          maxLength: 128
          description: Password.
      required: [ type, login, password]
      description: Callback basic authorization parameters.

    callback_http:
      allOf:
        - $ref: "#/components/schemas/callback_base"
        - type: object
          properties:
            type:
              type: string
              enum: [http]
              description: Event will be sent to http url.
            url:
              type: string
              description: Callback url.
            authorization:
              $ref: '#/components/schemas/callback_basic_authorization'
            params:
              type: object
              properties:
                timeout:
                  type: integer
                  default: 60
                  description: Callback request timeout.
                content_type:
                  type: string
                  default: application/json
                  enum: [application/json]
                  description: Callback request content type.
                headers:
                  type: object
                  description: Callback request headers.
                  additionalProperties: true
              description: Callback request parameters
      required: [url, type]

    callback:
      oneOf:
        - $ref: '#/components/schemas/callback_http'
        - $ref: '#/components/schemas/callback_ws'
      discriminator:
        propertyName: type
        mapping:
          http: '#/components/schemas/callback_http'
          luna-ws-notification: '#/components/schemas/callback_ws'

    image_retain_policy:
      type: object
      description: |
        Image retain policy applicable when analytic `overview` target is specified,
        and configures parameters with which will image be saved.
      properties:
        mimetype:
          type: string
          enum:
            - PNG
            - JPEG
          default: JPEG
          description: Image format.
        quality:
          allOf:
            - $ref: '#/components/schemas/number01'
            - default: 1
          description: Image quality, on a scale from 0 (worst) to 1 (best). Has no effect on `PNG`.
        max_size:
          type: integer
          minimum: 0
          default: 640
          description: Image max size, in pxl. Neither the width nor the height will exceed this value.

    period_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [period]
        interval:
          type: float
          description: Event generation period interval.
          minimum: 0.0
          default: 1.0
      required: [trigger]

    start_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [start]
      required: [trigger]

    end_event_policy:
      type: object
      properties:
        trigger:
          type: string
          enum: [end]
      required: [trigger]

    parameters:
      type: object
      description: |
        Analytic parameters for stream processing.

        There are default analytic parameters which will be applied for stream processing if no one specified here.
      properties:
        targets:
          type: array
          default: [suit]
          items:
            type: string
            enum: [suit, overview]
          description: |
            Estimations to perform on the video.

            `suit` will calculate some `suit` value if specified.

            `overview` will add image to events.
        callbacks:
          type: array
          description: |
            Callbacks parameters.

            `http` type callback sends events to the specified url by POST request.

            `luna-ws-notification` type callback sends events to websocket using sender.
          maxItems: 10
          items:
            $ref: '#/components/schemas/callback'

        parameters:
          type: object
          description: Estimation parameters.
          properties:
            event_policy:
              description: |
                Event policy.
                
                When suit appears on frame, the new `track` will starts, when suit disappears, `track` will stops.
                
                - when track starts (`trigger` - `start`)
                - when track ends (`trigger` - `end`)
                - periodically while track exists (`trigger` - `period`)
              type: object
              properties:
              oneOf:
                - $ref: '#/components/schemas/period_event_policy'
                - $ref: '#/components/schemas/start_event_policy'
                - $ref: '#/components/schemas/end_event_policy'
              discriminator:
                propertyName: trigger
                mapping:
                  period: '#/components/schemas/period_event_policy'
                  start: '#/components/schemas/start_event_policy'
                  end: '#/components/schemas/end_event_policy'
              default:
                trigger: start
            image_retain_policy:
              $ref: '#/components/schemas/image_retain_policy'
            probe_count:
              description: |
                A number of consecutive suit estimations with not 'none' class. This parameter is intended to
                prevent false positives from analytics.
              type: integer
              minimum: 0
              default: 3
            roi:
              $ref: '#/components/schemas/roi'
            rate:
              allOf:
                - $ref: '#/components/schemas/rate'
                - default:
                    unit: second
                    period: 1

    stream:
      type: object
      description: |
        Full stream creation schema is available in `Luna-Video-Manager` documentation.

        The presented schema described only analytics parameters.
      properties:
        data:
          type: object
          description: stream data
          additionalProperties: true
        analytics:
          type: array
          description: analytics list
          items:
            type: object
            description: analytics
            properties:
              analytic_name:
                type: string
                maxLength: 36
                pattern: '^[a-zA-Z0-9_\-]{1,36}$'
                description: Analytic name.
                enum: [suit_analytics]
              parameters:
                type: object
                $ref: '#/components/schemas/parameters'
            required: [analytic_name]
      additionalProperties: true
      required: [data, analytics]

    string36_nullable:
      type: string
      maxLength: 36
      nullable: true

    longitude:
      type: number
      minimum: -180
      maximum: 180
      example: 36.616

    latitude:
      type: number
      minimum: -90
      maximum: 90
      example: 55.752

    geo_position:
      type: object
      nullable: true
      description: Geo position specified by geographic coordinates - longitude and latitude.
      properties:
        longitude:
          allOf:
            - $ref: '#/components/schemas/longitude'
            - description: Longitude in degrees.
        latitude:
          allOf:
            - $ref: '#/components/schemas/latitude'
            - description: Latitude in degrees.
      required: [longitude, latitude]
      example:
        longitude: 36.616
        latitude: 55.752

    uuid:
      type: string
      format: uuid
      pattern: '^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
      example: "557d54ec-29ad-4f3c-93b4-c9092ef12515"

    video_segment:
      type: object
      properties:
        start_time_offset:
          type: number
          example: 0.123
          description: Start video segment offset(seconds).
        end_time_offset:
          type: number
          example: 1.234
          description: Eng video segment offset(seconds).
      required: [ start_time_offset, end_time_offset ]

    callback_request:
      type: object
      properties:
        event_type:
          type: string
          enum: [suit]
          description: Event type.
        account_id:
          allOf:
            - $ref: '#/components/schemas/uuid'
          description: Account id of event.
        event_create_time:
          type: string
          description: Event creation time.
        event_end_time:
          type: string
          description: Event end time.
        event:
          type: object
          description: Event.
          properties:
            stream_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Stream ID.
            event_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Event ID.
            track_id:
              allOf:
                - $ref: '#/components/schemas/uuid'
              description: Track id.
              nullable: true
            video_segment:
              $ref: '#/components/schemas/video_segment'
            overview:
              type: object
              properties:
                time_offset:
                  type: number
                  description: time offset
                image:
                  type: str
                  description: link to image
              required: [time_offset]
            location:
              type: object
              properties:
                city:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Moscow
                  description: City that stream belongs.
                area:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Central
                  description: Area that stream belongs.
                district:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Basmanny
                  description: District that stream belongs.
                street:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: Podsosensky lane
                  description: Street that stream belongs.
                house_number:
                  allOf:
                    - $ref: '#/components/schemas/string36_nullable'
                  example: 23 bldg.3
                  description: Street that stream belongs.
                geo_position:
                  $ref: '#/components/schemas/geo_position'
              description: |
                Stream location parameters.

                Required callback `location` target to send this value.
          required: [stream_id, event_id, suit_class, track_id]
      required: [event_type, event, account_id, event_create_time, event_end_time]
paths:
  /suit_analytics:
    post:
      tags:
        - suit analytics
      summary: stream creation
      description: |
        Stream creation superficial request with detailed analytics parameters description.
        
        Full request description (exclude for this analytics description) available at `Luna-Video-Manager` documentation.
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/stream'
            example:
              data:
                reference: rtsp://stream_url
                type: stream
                downloadable: false
              analytics:
                - analytic_name: suit_analytics
                  parameters:
                    parameters:
                      probe_count: 2
                      image_retain_policy:
                        mimetype: PNG
                      roi:
                        mode: abs
                        x: 0
                        y: 0
                        width: 1000
                        height: 500
                    callbacks:
                      - url: "http://127.0.0.1:8001"
                        type: http
                      - url: "http://127.0.0.1:8002"
                        type: http
                      - type: luna-ws-notification
                    targets:
                      - suit
                      - overview
      callbacks:
        onData:
          '{$request.body.callbacks.*.url}':
            post:
              requestBody:
                description: subscription payload
                content:
                  application/json:
                    schema:
                      $ref: '#/components/schemas/callback_request'

The *spec.html* file contains openapi documentation with analytics parameters description in html format.


Agent lambda examples

  • Here is an example of agent lambda with suit video analytics:

    To include video analytics presented in previous chapter into lambda agent as package it needs the following lambda archive file structure:

    Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as package)
      ├──pyproject.toml
      ├──poetry.lock
      └──lambda_main.py
    

    The lambda_main.py module:

    lambda_main.py
    AVAILABLE_ANALYTICS = ["suit_analytics"]
    

    It this case, the dependencies of lambda agent must include only analytics as dependency, not analytics dependencies:

    *pyproject.toml*
    pyproject.toml
    [build-system]
    requires = ["poetry-core"]
    build-backend = "poetry.core.masonry.api"
    
    [tool.poetry]
    name = "lambda-agent"
    version = "0.0.1"
    description = "lambda-agent"
    authors = [ "VisionLabs", ]
    
    [[tool.poetry.source]]
    name = "vlabspypi"
    url = "http://pypi.visionlabs.ru/root/public/+simple"
    priority = "primary"
    
    [[tool.poetry.source]]
    name = "public-pypi"
    url = "https://pypi.org/simple"
    priority = "supplemental"
    
    [tool.poetry.dependencies]
    python = "^3.12"
    suit-analytics = "0.0.4"
    

    To include presented analytics into lambda agent as module it needs the following lambda archive file structure:

    Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as module)
      ├──pyproject.toml
      ├──poetry.lock
      ├──lambda_main.py
      └──suit_analytics
          ├──__init__.py
          ├──analytics.py
          ├──classes.py
          ├──common.py
          ├──models.py
          ├──spec.html
          ├──spec.yml
          └──data
             ├──resnet50-v2-7.onnx
             └──imagenet-simple-labels.json
          └──suit_nodes
             ├──__init__.py
             ├──events.py
             ├──suit_estimator.py
             └──suid_nodes.py
    

    In this case, the analytics dependencies must be included into lambda dependencies. The lambda dependencies described as pyproject.toml file:

    *pyproject.toml*
    pyproject.toml
    [build-system]
    requires = ["poetry-core"]
    build-backend = "poetry.core.masonry.api"
    
    [tool.poetry]
    name = "lambda agent"
    version = "0.0.1"
    description = "lambda agent"
    authors = [ "VisionLabs", ]
    
    [[tool.poetry.source]]
    name = "vlabspypi"
    url = "http://pypi.visionlabs.ru/root/public/+simple"
    priority = "primary"
    
    [[tool.poetry.source]]
    name = "public-pypi"
    url = "https://pypi.org/simple"
    priority = "supplemental"
    
    [tool.poetry.dependencies]
    python = "^3.12"
    luna-analytics-manager = {version="*", extras=["sdk"]}
    opencv-python-headless = "*"
    nvidia-curand-cu11 = "*"
    nvidia-cufft-cu11 = "*"
    onnxruntime-gpu = "*"
    pynvcodec = ">=2.6.0"