Agent lambda development
Here is agent lambda development description.
More information about lambda types and differences of agent and others available at lambda types description.
More information about LUNA videoanalytics, in particular, luna-video-manager, video-agents and their interaction see its’ developers manuals.
Agent lambda requirements
Agent lambda has several requirements to addition with basic requirements:
Luna Video Manager available by credentials from Luna-Configurator
Luna Licenses available by credentials from Luna-Configurator
Luna Events available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Events usage
Luna Sender available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Sender usage
Luna Faces/Images Samples Store available by credentials from Luna-Configurator; it can be disabled using ADDITIONAL_SERVICES_USAGE setting and it this case lambda should provide for work without Luna-Image-Store usage
Agent lambda configuration
The agent lambda required several settings from luna-configurator, whose can be separated to several groups:
LUNA_LAMBDA_AGENT_UNIT_LOGGER - lambda logger settings
luna-services addresses and timeouts settings (for example, LUNA_EVENTS_ADDRESS and LUNA_EVENTS_TIMEOUTS will be used by lambda to make requests to luna-events service)
ADDITIONAL_SERVICES_USAGE setting will be used to determine which luna-services can be used by the lambda (the lambda will not check connection to disabled services and will raise an error if user try to make request to such service)
LUNA_LAMBDA_AGENT_UNIT_ANALYTICS_SETTINGS - lambda analytics-specific settings, for example, settings of which device must be used by one or another analytics - CPU or GPU
LUNA_LAMBDA_AGENT_UNIT_VIDEO_SETTINGS/LUNA_LAMBDA_AGENT_UNIT_RUNTIME_SETTINGS - lambda video processing settings
External agent
Agent can be started as external service by setting up agent creation parameters. In this regime agent will be treated as external to the main platform service and all communication with platform will be performed via luna-api service. Running agent in this regime requires luna account credentials or access token set via creation parameters. Note, that used token must be provided with specific permissions to work properly. Minimal set of permissions:
{
"event": ["creation"],
"image": ["creation"],
"video_stream": ["creation"],
"video_analytic": ["view", "creation", "modification"],
"video_agent": ["creation", "deletion"],
}
Things to note
Services luna-api, luna-licenses and luna-configurator services must be available for external lambda agent
If some of the analytics were previously created by internal agent or external agent with other account id, agent will not start, as analytics update will fail. To overcome this analytics must be removed manually
Only luna-api client will be available for use in custom routes
Read luna-video-agent development documentation for more information
Agent lambda development
Lambda agent works with analytics, each lambda agent must have at least one video analytics.
It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The lambda_main.py and other configuration files will not differ in this cases.
The description below describes the case when analytics represents itself separate package, for description in the case of analytics is a module in lambda see analytics development chapter.
The agent lambda development in the easiest way consists of it’s configuration by specifying several parameters in lambda_main.py file.
Warning
The names of analytics, features, etc. listed below are fictitious and have nothing in common with the names of real analytics, etc.
The only required variable which must be specified is a list of available analytics, which also must be specified as lambda requirements, for example:
#: list of analytics for agent lambda
AVAILABLE_ANALYTICS = ["people"]
people_analytics==1.2.0
There are some more parameters which can also be relevant:
The ANALYTICS_LICENSE_FEATURES is a map with analytics and their licensing feature names. If analytics not required licensing, it must not be included into this map. If no one analytics must depends on licensing, this map must be empty or may not be included in lambda_main.py.
The ESTIMATORS_TO_INITIALIZE and ESTIMATORS_TARGETS are lists of SDK estimators and their targets which must initialized before analytics usage. If no one analytics not required SDK estimators usage, this lists must be empty or may not be included in lambda_main.py.
Note
In this example lambda has people_analytics package of version 1.2.0 as requirement, which represents video analytics package with analytics named people. The usage of this analytics is regulated by license feature named people_feature. It is requires the estimator from luna-sdkloop package named people and the people target estimator to be initialized before lambda starts.
from sdk_loop.enums import Estimators, LoopEstimations
#: list of analytics for agent lambda
AVAILABLE_ANALYTICS = ["people"]
#: map with analytics and their license features
ANALYTICS_LICENSE_FEATURES = {"people": "people_feature"}
#: list of estimators for initialization
ESTIMATORS_TO_INITIALIZE = [Estimators.people]
#: list of estimators targets for initialization
ESTIMATORS_TARGETS = [LoopEstimations.people]
people_analytics==1.2.0
There are some more settings which can be used for agent lambda in lambda_main.py file:
# count of video-agent registration attempt in *luna-video-manager*
REGISTRATION_ATTEMPT_COUNT = 5
# delay between registration attempts
REGISTRATION_ATTEMPT_DELAY = 1
# max size of feedback which can be sent to *luna-video-manager* at one time
FEEDBACK_MAX_BATCH_COUNT = 1000
# the code of response for ws-connections when agent shutdown
WSCODE_GOING_AWAY = 1001
# the name of lambda video-agent which will be registered in *luna-video-manager*
AGENT_NAME = "luna-lambda-great-video-agent"
# the description of lambda video-agent which will be registered in *luna-video-manager*
AGENT_DESCRIPTION = "luna-lambda-greatest-video-agent"
The are many other possibilities to modify such lambda agent behavior, it’s interaction with analytics and so on, which can be realized in the same way as regular luna-video-agent, for more information, see luna-video-agent developers manual.
Agent video analytics development
It is possible to specify video analytics as separate package which will installed as lambda dependency or include analytics as module into lambda. The description below describes analytics module, which can then be included as a module or package.
Note
The complete for user video analytics development guide is in development and will be included in future releases.
In presented example(s) the poetry tool using for dependencies management, so the pyproject.toml file is filled by user and the poetry.lock file must be generated using poetry tool.
The example of video analytics which detect suits on video/streams and generate events if suits appears.
Suit analytics description
How it works: The analytics processes video frames through a ResNet-50 neural network to classify whether a person is wearing a suit. It supports both CPU and GPU inference, with configurable frame processing rates and region-of-interest (ROI) analysis.
When events are generated: Events are generated based on configurable policies: - Start trigger (default): Events when a suit is first detected - End trigger: Events when suit detection ends - Periodic trigger: Events at regular intervals during detection
By default event will be generated when suit is detected.
- {
“account_id”: “string”, “event_create_time”: “string”, “event_end_time”: “string”, “event_type”: “suit”, “event”: {
“stream_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “event_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “track_id”: “557d54ec-29ad-4f3c-93b4-c9092ef12515”, “source”: “string”, “name”: “suit”, “overview”: {
“image”: “string”, “time_offset”: 1.234
}, “location”: {
“city”: “Moscow”, “area”: “Central”, “district”: “Basmanny”, “street”: “Podsosensky lane”, “house_number”: “23 bldg.3”, “geo_position”: {
“longitude”: 36.616, “latitude”: 55.752
}
}, “video_segment”: {
“start_time_offset”: 0.123, “end_time_offset”: 1.234
}
}
}
Note
The example below uses image classification model (“resnet50-v2-7.onnx”) and json file (“imagenet-simple-labels.json”) from github. To make this example works it needs to get this files from github and place into data folder of lambda agent archive.
├──__init__.py
├──analytics.py
├──classes.py
├──common.py
├──models.py
├──spec.html
├──spec.yml
└──data
├──resnet50-v2-7.onnx
└──imagenet-simple-labels.json
└──suit_nodes
├──__init__.py
├──events.py
├──suit_estimator.py
└──suid_nodes.py
Note
The video analytics requires some dependencies which are available on visionlabs pypi for local run. All provided examples use visionlabs pypi specification at tool.poetry.source section of pyproject.toml file which make possible to install dependencies from the above source. It is also required HASP license provided by VisionLabs. For agent lambda creation, all required video analytics dependencies already included in base image (which is using for agent lambda building).
If it needs to build this analytics as separate package, it must include its’ dependencies for example using pyproject.toml/poetry.lock files:
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "suit-analytics"
version = "0.0.4"
description = "suit analytics"
authors = [ "VisionLabs", ]
[[tool.poetry.source]]
name = "vlabspypi"
url = "http://pypi.visionlabs.ru/root/public/+simple"
priority = "primary"
[[tool.poetry.source]]
name = "public-pypi"
url = "https://pypi.org/simple"
priority = "supplemental"
[tool.setuptools.package-data]
analytics-docs = ["spec.html"]
onnx = ["data/resnet50-v2-7.onnx"]
[tool.poetry.dependencies]
python = "^3.12"
luna-analytics-manager = {version="*", extras=["sdk"]}
opencv-python-headless = "*"
nvidia-curand-cu11 = "*"
nvidia-cufft-cu11 = "*"
onnxruntime-gpu = "*"
pynvcodec = "^2.6.0"
After dependencies resolving it must be built into a packages (for example, using poetry build).
The *__init__.py* module must contains all presented imported objects and classes to make analytics works.
"""Suit analytics."""
from video_analytics_helpers.luna_wraps import EventMetadata, LunaApiExternalParams, LunaWraps as PostProcessing
from .analytics import AnalyticsCtx, FrameCtx, initialize
from .common import getDocumentation, mandatoryNodes, targetNodes
from .models import Callbacks, Params, Targets
The *analytics.py* module contains analytics initialization, frame and analytics contexts.
"""Suit analytics."""
from logging import Logger
from pathlib import Path
import attr
import numpy as np
import onnxruntime as ort
from luna_analytics_manager.base import FrameEnviron
from luna_analytics_manager.templates.sdk import SDKAnalyticsCtx, SDKFrameCtx
from lunavl.sdk.image_utils.image import VLImage
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper
from .classes import AnalyticsRes, FrameRes
from .models import Callbacks, Params, Targets
logger = Logger("luna.suit")
async def initialize(dataPath: str | Path | None = None, **kwargs) -> None:
"""
Initialize analytics onnx wrapper
Args:
dataPath: Path to the data directory
**kwargs: Additional keyword arguments
Returns:
None
"""
if dataPath is None:
configPrefix = Path(__file__).absolute().parent / "data"
elif isinstance(dataPath, str):
configPrefix = Path(dataPath)
else:
configPrefix = dataPath
configPrefix / "imagenet-simple-labels.json"
runtimeSettings = kwargs.get("runtime_settings", {})
options = ort.SessionOptions()
options.intra_op_num_threads = runtimeSettings.get("num_threads", 6)
backend = (
[("CUDAExecutionProvider", {"device_id": 0})]
if runtimeSettings.get("device_class", "cpu") == "gpu"
else ["CPUExecutionProvider"]
)
globalSession = ort.InferenceSession(
(configPrefix / "resnet50-v2-7.onnx").as_posix(),
sess_options=options,
providers=backend,
)
globalSession.set_providers(backend)
onnxWrapper = OnnxruntimeWrapper(options.intra_op_num_threads, globalSession)
AnalyticsCtx.initialize(onnxWrapper)
@attr.dataclass(slots=True)
class FrameCtx(SDKFrameCtx):
"""Frame processing context."""
processed: FrameRes | None = None
@attr.dataclass(slots=True)
class AnalyticsCtx(SDKAnalyticsCtx):
"""Analytics context."""
name: str = "suit"
params: Params = attr.field(factory=Params)
targets: Targets = attr.field(factory=Targets)
callbacks: Callbacks = attr.field(factory=Callbacks)
aggregated: AnalyticsRes = attr.field(factory=AnalyticsRes)
@classmethod
def initialize(cls, onnxWrapper: OnnxruntimeWrapper) -> None:
"""Initialize analytics context"""
cls.onnxWrapper = onnxWrapper
async def prepareFrameCtx(self, frame: FrameEnviron[np.ndarray, VLImage]) -> FrameCtx:
"""Prepare frame processing context."""
return FrameCtx(
image=frame.frame,
number=frame.raw.number,
timestamp=frame.raw.timestamp,
lastFrame=frame.raw.lastFrame,
)
The *classes.py* module contains frame and analytics results containers description.
"""Analytics structures."""
import attr
from .suit_nodes.events import SuitEvent
@attr.dataclass(slots=True)
class FrameRes:
"""Analytics result on a single frame."""
events: list[SuitEvent]
def asDict(self):
"""Build results in API format."""
return {"events": [event.asDict() for event in self.events]}
@attr.dataclass(slots=True)
class AnalyticsRes:
"""Analytics aggregated result."""
def asDict(self):
"""Build results in API format."""
return {}
The *common.py* module contains analytics target, mandatory nodes and `getDocumentation` function which returns raw html analytics documentation for user.
"""Analytics setup."""
from pathlib import Path
from .models import Params, Targets
from .suit_nodes.suit_nodes import NodeOverview, NodeSuit
NODES = {NodeOverview, NodeSuit}
_DOCS_PATH = Path(__file__).parent / "spec.html"
def targetNodes(targets: Targets):
"""Pre-select nodes by user-defined targets."""
yield from (node for node in NODES if node.name in targets) # type: ignore
def mandatoryNodes(params: Params, targets: Targets):
"""Pre-select mandatory nodes according to configuration parameters."""
yield from (NodeSuit,)
def getDocumentation() -> str:
"""Get analytics documentation"""
with open(_DOCS_PATH) as f:
return f.read()
The *models.py* module contains analytics parameters models.
"""Analytics models."""
from typing import Annotated, Literal
from annotated_types import Ge
from luna_analytics_manager.templates.mixins.frame_frequency_mixins import RateTime
from luna_analytics_manager.templates.models import WrapUnionError
from luna_analytics_manager.templates.sdk import SDKAnalyticsTargets
from pydantic import Field, Strict
from video_analytics_helpers.models import EventAnalyticCallback, HttpAnalyticCallback, WSAnalyticCallback
from video_analytics_helpers.params import BaseParams
from vlutils.structures.pydantic import BaseModel
class RateFrame(BaseModel):
"""Rate frame model. Sets the rate of frames to be processed."""
period: Annotated[int, Ge(1), Strict()] = 1
unit: Literal["frame"] = "second"
class OnetimeEventPolicyEnd(BaseModel):
"""Onetime event policy end model. Event will be triggered at the end of the track."""
trigger: Literal["end"] = "end"
class OnetimeEventPolicyStart(BaseModel):
"""Onetime event policy start model. Event will be triggered at the start of the track."""
trigger: Literal["start"] = "start"
class PeriodEventPolicy(BaseModel):
"""Period event policy model. Event will be triggered at the specified interval."""
trigger: Literal["period"]
interval: Annotated[float, Ge(0.0), Strict()]
class Params(BaseParams):
"""Params of analytic"""
rate: Annotated[RateFrame | RateTime, WrapUnionError, Field(discriminator="unit", default_factory=RateFrame)]
eventPolicy: Annotated[
OnetimeEventPolicyStart | OnetimeEventPolicyEnd | PeriodEventPolicy,
Field(discriminator="trigger", default_factory=OnetimeEventPolicyStart),
]
class Targets(SDKAnalyticsTargets[Literal["suit", "overview"]]):
"""Targets of analytic"""
_default = {"suit", "overview"}
Callbacks = Annotated[
list[Annotated[HttpAnalyticCallback | WSAnalyticCallback | EventAnalyticCallback, Field(discriminator="type")]],
Field(max_length=10),
]
The *events.py* module in *suit_nodes* path contains events and track structures.
import attr
from video_analytics_helpers.containers import (
AggregatedEvent as BaseAggregatedEvent,
Event as BaseEvent,
Overview,
Track as BaseTrack,
)
@attr.dataclass(slots=True)
class Suit:
"""Event frame result."""
name: str = ""
def asDict(self):
"""Build results in API format."""
return {"name": self.name}
@attr.dataclass(slots=True)
class AggregatedSuitEvent(BaseAggregatedEvent[Overview]):
"""Aggregated suit event."""
eventType: str = "suit"
suit: Suit | None = None
def asDict(self):
"""Build results in API format."""
result = super().asDict()
if self.suit is not None:
result.update(self.suit.asDict())
return result
@attr.dataclass(slots=True)
class SuitEvent(BaseEvent[AggregatedSuitEvent]):
"""Suit event result."""
eventType: str = "suit"
suit: Suit | None = None
def asDict(self):
"""Build event data in API format."""
res = super().asDict()
if self.suit is not None:
res.update(self.suit.asDict())
return res
class SuitTrack(BaseTrack[SuitEvent, AggregatedSuitEvent]):
"""Analytics track."""
The *suit_estimator.py* module in *suit_nodes* path contains suit estimator responsible for image preprocessing, processing and neural model results postprocessing.
from pathlib import Path
import cv2
import numpy as np
import ujson
from video_analytics_helpers.onnxruntime_wrapper import OnnxruntimeWrapper
class SuitClassifier(object):
"""
Suit estimator
onnxWrapper: Wrapper around ONNX Runtime.
crop_size: Image crop size. Defaults to 224.
crop_type: Type of cropping strategy.
"""
def __init__(
self,
onnxWrapper: OnnxruntimeWrapper,
crop_size: int = 224,
crop_type: str = "full_frame",
):
self.onnxWrapper = onnxWrapper
self.onnx_input_keys = [t.name for t in self.onnxWrapper.onnxSession.get_inputs()]
self.is_new_id = dict()
self.crop_size = crop_size
self.crop_type = crop_type
self.suit_id = dict()
self._labels = None
async def crop_and_scale(self, img_rgb):
"""Crops and resizes the input RGB image.
Args:
img_rgb: Input image in RGB format.
Raises:
Exception: If `crop_type` is not supported.
Returns:
numpy.ndarray: Cropped and resized RGB image.
"""
assert img_rgb.shape[2] == 3
img_shape = img_rgb.shape[:2]
if self.crop_type == "center_crop":
min_side = min(img_shape[0], img_shape[1])
y_0, x_0 = (img_shape[0] - min_side) // 2, (img_shape[1] - min_side) // 2
y_1, x_1 = y_0 + min_side, x_0 + min_side
crop = img_rgb[y_0:y_1, x_0:x_1]
elif self.crop_type == "full_frame":
crop = img_rgb.copy()
else:
raise Exception(f"Unknown type of crop: {self.crop_type}")
crop_resized = await self.onnxWrapper.runAsync(cv2.resize, crop, (self.crop_size, self.crop_size))
return crop_resized
async def preproc(self, img_rgb):
"""Preprocesses the image for ONNX model input.
Args:
img_rgb: Input RGB image.
Returns:
numpy.ndarray: Preprocessed image in NCHW format.
"""
crop_prepared = await self.crop_and_scale(img_rgb)
crop_prepared = crop_prepared.astype(np.float32) / 255.0
crop_prepared = (crop_prepared - 0.5) / 0.5
batch = crop_prepared.reshape((1,) + crop_prepared.shape)
batch = batch.transpose(0, 3, 1, 2)
return np.ascontiguousarray(batch)
async def forward(self, img_rgb):
"""Runs a forward pass through the ONNX model.
Args:
img_rgb: Input RGB image, shape (H, W, 3).
Returns:
numpy.ndarray: Raw prediction scores for each class.
"""
batch_numpy = await self.preproc(img_rgb)
input_tensors_list = {self.onnx_input_keys[0]: batch_numpy}
net_outs = await self.onnxWrapper.forward(input_tensors_list)
return net_outs[0][0]
def get_labels(self):
"""Loads class labels from a JSON file.
Returns:
numpy.ndarray: Array of labels as strings.
"""
if self._labels is None:
with open(Path(__file__).parent.parent / "data" / "imagenet-simple-labels.json") as f:
data = ujson.load(f)
self._labels = np.asarray(data)
return self._labels
async def run(self, img_rgb, cam_id):
"""Classifies the input image for a specific camera ID.
Tracks camera IDs to differentiate between sources.
Args:
img_rgb: Input RGB image.
cam_id: Identifier for the camera.
Returns:
Estimated class label.
"""
if not cam_id in self.suit_id:
self.suit_id[cam_id] = 0
self.is_new_id[cam_id] = True
preds = await self.forward(img_rgb)
idx = np.argmax(preds)
return self.get_labels()[idx]
The *suit_nodes.py* module in *suit_nodes* path contains suit nodes structures.
import uuid
from luna_analytics_manager import Node
from luna_analytics_manager.sync import Syncer2 as Syncer
from luna_analytics_manager.templates.image_helpers.roi import getROIRect
from luna_analytics_manager.templates.models import EventStatus, VideoSegment
from video_analytics_helpers.frame_base_analytics import BaseFrameAnalyticsNode
from video_analytics_helpers.node_collections import NodeOverview as NodeOverviewBase
from ..analytics import AnalyticsCtx, FrameCtx
from ..classes import FrameRes
from .events import AggregatedSuitEvent, Overview, Suit, SuitEvent, SuitTrack
from .suit_estimator import SuitClassifier
class NodeSuit(
BaseFrameAnalyticsNode[
FrameCtx,
AnalyticsCtx,
SuitEvent,
AggregatedSuitEvent,
FrameRes,
Suit,
SuitTrack,
]
):
"""Suit Analytics node."""
name = "suit"
requires: list[Node] = []
frameResCls = FrameRes
# class for tracks
trackCls = SuitTrack
def __init__(self):
super().__init__()
self.frameSyncer = Syncer()
self.track = None
self.trackLength = 0
self.suitEstimator = SuitClassifier(onnxWrapper=AnalyticsCtx.onnxWrapper)
def buildEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> tuple[SuitEvent, AggregatedSuitEvent]:
"""Generate current event and aggregated event."""
event = SuitEvent(suit=suit, timeOffset=frameCtx.timestamp)
aggregatedEvent = AggregatedSuitEvent(
videoSegment=VideoSegment(frameCtx.timestamp, frameCtx.timestamp),
suit=suit,
overview=Overview(frameCtx.timestamp, frameCtx.image),
)
event.eventId = aggregatedEvent.eventId = str(uuid.uuid4())
return event, aggregatedEvent
def updateEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
"""Update current event with new frame."""
event = self.track.currentEvent
event.suit = suit
event.timeOffset = frameCtx.timestamp
event.eventStatus = EventStatus.inProcess
def updateAggregatedEvent(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
"""Update current aggregated event with new frame."""
aggregatedEvent = self.track.currentAggregatedEvent
if aggregatedEvent.videoSegment is None:
aggregatedEvent.videoSegment = VideoSegment(frameCtx.timestamp, frameCtx.timestamp)
else:
aggregatedEvent.videoSegment.endTimeOffset = frameCtx.timestamp
if aggregatedEvent.suit is None:
aggregatedEvent.suit = suit
async def estimate(self, frameCtx: FrameCtx, ctx: AnalyticsCtx) -> Suit | None:
"""
Estimate suit on frame.
Args:
frameCtx: current frame context
ctx: analytic context
Returns:
Detected suit label
"""
img_rgb = frameCtx.image.asNPArray() # check if it is really RGB format
if ctx.params.roi:
roi = getROIRect(frameCtx.image.rect, ctx.params.roi)
img_rgb = img_rgb[roi.top : roi.bottom, roi.left : roi.right]
camera_id = "0"
detectedObj = await self.suitEstimator.run(img_rgb, camera_id)
return Suit(name=detectedObj)
def isTrackNeedEnd(self, suit: Suit, frameCtx: FrameCtx, ctx: AnalyticsCtx):
"""
Determinate that event is needed to end or not after current frame processing.
If function return `True`, track will be stopped. If function return `False` track will be started or continue.
"""
return suit.name != "suit"
class NodeOverview(NodeOverviewBase[FrameCtx, AnalyticsCtx]):
"""Overview node."""
requires = [NodeSuit]
The *spec.yml* file contains openapi documentation with analytics parameters description in yaml format.
openapi: 3.0.0
info:
version: 'v.0.0.1'
title: 'Suit Analytics'
description: |
`Suit Analytics` is intended for determination of:
- suit_class parameter;
The `suit` analytics decode stream frames taking into account `rate` parameter
from <a href="#tag/suit-analytics/paths/~1suit_analytics/post">stream creation request analytic parameters</a>
and performs the following actions:
- Calculates suit class
- For each callback check it's conditions and execute callbacks, which fit conditions.
The schema of data sending by `callback` described as stream creation request callback.
components:
schemas:
error:
type: object
properties:
error_code:
type: integer
description: Error code.
desc:
type: string
description: Short error description.
detail:
type: string
description: Error details.
link:
type: string
description: Link to the documentation website with the error description.
required: [error_code, detail, desc, link]
example:
error_code: 1
detail: internal server error
desc: internal server error
link: "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-1"
int01:
type: integer
enum: [0,1]
roi_int_coordinates:
type: integer
minimum: 0
default: 0
maximum: 65536
example: 3327
roi_float_percent:
type: number
format: float
minimum: 0
default: 0.0
maximum: 100.0
example: 87.4
roi_float_percent_size:
type: number
format: float
minimum: 0.00001
maximum: 100.0
example: 66.3
roi_abs:
type: object
properties:
x:
$ref: '#/components/schemas/roi_int_coordinates'
y:
$ref: '#/components/schemas/roi_int_coordinates'
width:
$ref: '#/components/schemas/roi_int_coordinates'
height:
$ref: '#/components/schemas/roi_int_coordinates'
mode:
type: string
enum: [abs]
example: "abs"
description: Coordinates and size are set in pixels.
required: [x, y, width, height, mode]
roi_percent:
type: object
properties:
x:
$ref: '#/components/schemas/roi_float_percent'
y:
$ref: '#/components/schemas/roi_float_percent'
width:
$ref: '#/components/schemas/roi_float_percent_size'
height:
$ref: '#/components/schemas/roi_float_percent_size'
mode:
type: string
enum: [ percent ]
example: "percent"
description: Coordinates and size are set in percentage.
required: [x, y, width, height, mode]
roi:
oneOf:
- $ref: '#/components/schemas/roi_abs'
- $ref: '#/components/schemas/roi_percent'
description: |
Region of interest on a frame. Boundaries of the area are described in `x`, `y` coordinates
of the top left point and `width`, `height` properties
**Region must not be any bigger than the original frame**
The region of interest will be sent to estimator. The smaller the `roi`, the smaller the area the estimator
will process and, accordingly, work faster.
number01:
type: number
minimum: 0
maximum: 1
rate:
type: object
description: |
Analytic rate execution determines on which frames the analytics will be launched.
Analytic rate execution can be configured in next ways:
- execute analytic on each Nth frame (`unit` - `frame`)
- execute analytic on each frame corresponding to the Nth second (`unit` - `second`)
properties:
period:
type: float
minimum: 0
default: 1
description: Period length.
unit:
type: string
default: second
enum:
- frame
- second
description: Unit for a period calculation (every `n` seconds or every `n` frames).
required: [ unit, period ]
callback_base:
type: object
properties:
enable:
type: integer
enum: [0, 1]
default: 1
description: Whether callback enabled or not.
callback_ws:
allOf:
- $ref: "#/components/schemas/callback_base"
- type: object
properties:
type:
type: string
enum: [luna-ws-notification]
description: Event will be sent with `suit` type.
required: [type]
callback_basic_authorization:
type: object
properties:
type:
type: string
description: Authorization type.
enum: [basic]
login:
type: string
maxLength: 128
description: Login.
password:
type: string
maxLength: 128
description: Password.
required: [ type, login, password]
description: Callback basic authorization parameters.
callback_http:
allOf:
- $ref: "#/components/schemas/callback_base"
- type: object
properties:
type:
type: string
enum: [http]
description: Event will be sent to http url.
url:
type: string
description: Callback url.
authorization:
$ref: '#/components/schemas/callback_basic_authorization'
params:
type: object
properties:
timeout:
type: integer
default: 60
description: Callback request timeout.
content_type:
type: string
default: application/json
enum: [application/json]
description: Callback request content type.
headers:
type: object
description: Callback request headers.
additionalProperties: true
description: Callback request parameters
required: [url, type]
callback:
oneOf:
- $ref: '#/components/schemas/callback_http'
- $ref: '#/components/schemas/callback_ws'
discriminator:
propertyName: type
mapping:
http: '#/components/schemas/callback_http'
luna-ws-notification: '#/components/schemas/callback_ws'
image_retain_policy:
type: object
description: |
Image retain policy applicable when analytic `overview` target is specified,
and configures parameters with which will image be saved.
properties:
mimetype:
type: string
enum:
- PNG
- JPEG
default: JPEG
description: Image format.
quality:
allOf:
- $ref: '#/components/schemas/number01'
- default: 1
description: Image quality, on a scale from 0 (worst) to 1 (best). Has no effect on `PNG`.
max_size:
type: integer
minimum: 0
default: 640
description: Image max size, in pxl. Neither the width nor the height will exceed this value.
period_event_policy:
type: object
properties:
trigger:
type: string
enum: [period]
interval:
type: float
description: Event generation period interval.
minimum: 0.0
default: 1.0
required: [trigger]
start_event_policy:
type: object
properties:
trigger:
type: string
enum: [start]
required: [trigger]
end_event_policy:
type: object
properties:
trigger:
type: string
enum: [end]
required: [trigger]
parameters:
type: object
description: |
Analytic parameters for stream processing.
There are default analytic parameters which will be applied for stream processing if no one specified here.
properties:
targets:
type: array
default: [suit]
items:
type: string
enum: [suit, overview]
description: |
Estimations to perform on the video.
`suit` will calculate some `suit` value if specified.
`overview` will add image to events.
callbacks:
type: array
description: |
Callbacks parameters.
`http` type callback sends events to the specified url by POST request.
`luna-ws-notification` type callback sends events to websocket using sender.
maxItems: 10
items:
$ref: '#/components/schemas/callback'
parameters:
type: object
description: Estimation parameters.
properties:
event_policy:
description: |
Event policy.
When suit appears on frame, the new `track` will starts, when suit disappears, `track` will stops.
- when track starts (`trigger` - `start`)
- when track ends (`trigger` - `end`)
- periodically while track exists (`trigger` - `period`)
type: object
properties:
oneOf:
- $ref: '#/components/schemas/period_event_policy'
- $ref: '#/components/schemas/start_event_policy'
- $ref: '#/components/schemas/end_event_policy'
discriminator:
propertyName: trigger
mapping:
period: '#/components/schemas/period_event_policy'
start: '#/components/schemas/start_event_policy'
end: '#/components/schemas/end_event_policy'
default:
trigger: start
image_retain_policy:
$ref: '#/components/schemas/image_retain_policy'
probe_count:
description: |
A number of consecutive suit estimations with not 'none' class. This parameter is intended to
prevent false positives from analytics.
type: integer
minimum: 0
default: 3
roi:
$ref: '#/components/schemas/roi'
rate:
allOf:
- $ref: '#/components/schemas/rate'
- default:
unit: second
period: 1
stream:
type: object
description: |
Full stream creation schema is available in `Luna-Video-Manager` documentation.
The presented schema described only analytics parameters.
properties:
data:
type: object
description: stream data
additionalProperties: true
analytics:
type: array
description: analytics list
items:
type: object
description: analytics
properties:
analytic_name:
type: string
maxLength: 36
pattern: '^[a-zA-Z0-9_\-]{1,36}$'
description: Analytic name.
enum: [suit_analytics]
parameters:
type: object
$ref: '#/components/schemas/parameters'
required: [analytic_name]
additionalProperties: true
required: [data, analytics]
string36_nullable:
type: string
maxLength: 36
nullable: true
longitude:
type: number
minimum: -180
maximum: 180
example: 36.616
latitude:
type: number
minimum: -90
maximum: 90
example: 55.752
geo_position:
type: object
nullable: true
description: Geo position specified by geographic coordinates - longitude and latitude.
properties:
longitude:
allOf:
- $ref: '#/components/schemas/longitude'
- description: Longitude in degrees.
latitude:
allOf:
- $ref: '#/components/schemas/latitude'
- description: Latitude in degrees.
required: [longitude, latitude]
example:
longitude: 36.616
latitude: 55.752
uuid:
type: string
format: uuid
pattern: '^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
example: "557d54ec-29ad-4f3c-93b4-c9092ef12515"
video_segment:
type: object
properties:
start_time_offset:
type: number
example: 0.123
description: Start video segment offset(seconds).
end_time_offset:
type: number
example: 1.234
description: Eng video segment offset(seconds).
required: [ start_time_offset, end_time_offset ]
callback_request:
type: object
properties:
event_type:
type: string
enum: [suit]
description: Event type.
account_id:
allOf:
- $ref: '#/components/schemas/uuid'
description: Account id of event.
event_create_time:
type: string
description: Event creation time.
event_end_time:
type: string
description: Event end time.
event:
type: object
description: Event.
properties:
stream_id:
allOf:
- $ref: '#/components/schemas/uuid'
description: Stream ID.
event_id:
allOf:
- $ref: '#/components/schemas/uuid'
description: Event ID.
track_id:
allOf:
- $ref: '#/components/schemas/uuid'
description: Track id.
nullable: true
video_segment:
$ref: '#/components/schemas/video_segment'
overview:
type: object
properties:
time_offset:
type: number
description: time offset
image:
type: str
description: link to image
required: [time_offset]
location:
type: object
properties:
city:
allOf:
- $ref: '#/components/schemas/string36_nullable'
example: Moscow
description: City that stream belongs.
area:
allOf:
- $ref: '#/components/schemas/string36_nullable'
example: Central
description: Area that stream belongs.
district:
allOf:
- $ref: '#/components/schemas/string36_nullable'
example: Basmanny
description: District that stream belongs.
street:
allOf:
- $ref: '#/components/schemas/string36_nullable'
example: Podsosensky lane
description: Street that stream belongs.
house_number:
allOf:
- $ref: '#/components/schemas/string36_nullable'
example: 23 bldg.3
description: Street that stream belongs.
geo_position:
$ref: '#/components/schemas/geo_position'
description: |
Stream location parameters.
Required callback `location` target to send this value.
required: [stream_id, event_id, suit_class, track_id]
required: [event_type, event, account_id, event_create_time, event_end_time]
paths:
/suit_analytics:
post:
tags:
- suit analytics
summary: stream creation
description: |
Stream creation superficial request with detailed analytics parameters description.
Full request description (exclude for this analytics description) available at `Luna-Video-Manager` documentation.
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/stream'
example:
data:
reference: rtsp://stream_url
type: stream
downloadable: false
analytics:
- analytic_name: suit_analytics
parameters:
parameters:
probe_count: 2
image_retain_policy:
mimetype: PNG
roi:
mode: abs
x: 0
y: 0
width: 1000
height: 500
callbacks:
- url: "http://127.0.0.1:8001"
type: http
- url: "http://127.0.0.1:8002"
type: http
- type: luna-ws-notification
targets:
- suit
- overview
callbacks:
onData:
'{$request.body.callbacks.*.url}':
post:
requestBody:
description: subscription payload
content:
application/json:
schema:
$ref: '#/components/schemas/callback_request'
Agent lambda examples
Here is an example of agent lambda with suit video analytics:
To include video analytics presented in previous chapter into lambda agent as package it needs the following lambda archive file structure:
Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as package)├──pyproject.toml ├──poetry.lock └──lambda_main.py
The lambda_main.py module:
lambda_main.pyAVAILABLE_ANALYTICS = ["suit_analytics"]
It this case, the dependencies of lambda agent must include only analytics as dependency, not analytics dependencies:
*pyproject.toml*
pyproject.toml[build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry] name = "lambda-agent" version = "0.0.1" description = "lambda-agent" authors = [ "VisionLabs", ] [[tool.poetry.source]] name = "vlabspypi" url = "http://pypi.visionlabs.ru/root/public/+simple" priority = "primary" [[tool.poetry.source]] name = "public-pypi" url = "https://pypi.org/simple" priority = "supplemental" [tool.poetry.dependencies] python = "^3.12" suit-analytics = "0.0.4"
To include presented analytics into lambda agent as module it needs the following lambda archive file structure:
Lambda agent with suit analytics example file structure (the case when analytics includes in lambda as module)├──pyproject.toml ├──poetry.lock ├──lambda_main.py └──suit_analytics ├──__init__.py ├──analytics.py ├──classes.py ├──common.py ├──models.py ├──spec.html ├──spec.yml └──data ├──resnet50-v2-7.onnx └──imagenet-simple-labels.json └──suit_nodes ├──__init__.py ├──events.py ├──suit_estimator.py └──suid_nodes.py
In this case, the analytics dependencies must be included into lambda dependencies. The lambda dependencies described as pyproject.toml file:
*pyproject.toml*
pyproject.toml[build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry] name = "lambda agent" version = "0.0.1" description = "lambda agent" authors = [ "VisionLabs", ] [[tool.poetry.source]] name = "vlabspypi" url = "http://pypi.visionlabs.ru/root/public/+simple" priority = "primary" [[tool.poetry.source]] name = "public-pypi" url = "https://pypi.org/simple" priority = "supplemental" [tool.poetry.dependencies] python = "^3.12" luna-analytics-manager = {version="*", extras=["sdk"]} opencv-python-headless = "*" nvidia-curand-cu11 = "*" nvidia-cufft-cu11 = "*" onnxruntime-gpu = "*" pynvcodec = ">=2.6.0"