Agent interaction

Foreword

The video-agent represents handler that can decode and execute analytics on stream provided by Luna-Video-Manager.

The interaction between the Luna-Video-Manager and the video agent is facilitated via HTTP.

Full functionality requires the video-agent to meet a number of requirements, which are detailed in this chapter by provided primitive example including a description of its implementation and limitations compared to a fully-fledged solution.

This primitive example is available as zip-archive, for link, preparation and execution description, see this paragraph.

Analytics registration

A video agent is intended to process streams via analytics. To read more about what analytics is it and why is it needed, see analytics chapter.

The Luna-Video-Manager should be aware which agent with which analytics can works, but Luna-Video-Manager doesn’t know about any analytics in general, so each video-agent must register its analytics before start work as video-agent.

To register analytics it is need to make analytics registration request with actual parameters.

Note that only analytics_name parameter is required, but all other parameters are not useless: for example, validation_schema is used by Luna-Video-Manager to validate analytics parameters from user request before it gets to the video-agent, documentation can be viewed to user could correctly specify analytics parameters, etc.

What if analytics already registered using external request?

It is possible that analytics registered by external request, but to make sure that agent analytics and analytics registered in Luna-Video-Manager are equal and not just that they have the same names, it is highly recommended to make video-agent register analytics itself.

What if analytics already exists

It is possible when video-agent restarts or more than one video-agent register the same analytics to receive response with 409 status code and already exists text from Luna-Video-Manager.

It is recommended to replace such analytics during the video-agent startup using analytics replace request to make sure the latest version of analytics with actual parameters is registered in Luna-Video-Manager.

Additional notes

The Luna-Video-Manager allows to:

what makes possible to make custom algorythm of analytics registration in different cases.

The provided primitive agent example contains analytics registration example here.

Agent registration

A video-agent is intended for work with Luna-Video-Manager.

To make Luna-Video-Manager knows how many and what video-agents exists, which analytics can they process and so on, each video-agent must be registered in Luna-Video-Manager.

Note

The agent registration is the only one required step which must be done before stream processing.

To register agent it is need to make agent registration request with actual parameters.

Note that not all parameters in this request are required, but optional parameters are not useless: for example, description is used by user to get some additional information about video-agent and agent_groups parameter is allow to make Luna-Video-Manager specify only stream linked to some groups to agent (more description here), etc.

If agent parameters such as list of available analytic names or maximum number of streams changed during agent existence, the agent must stop all streams processing, remove its registration (using delete agent request) and make registration request one more time.

Agent port and api version will be used by Luna-Video-Manager for making websocket subscription to receive data from agent.

The parameters from Luna-Video-Manager reply are also valuable and must be taken into account by video-agent

agent_id

An id of agent generated by Luna-Video-Manager which must be used by video-agent in future requests to Luna-Video-Manager

refresh_period

A video agent must repeat the get streams for processing request at least this period, otherwise the video-agent will be excluded from list of active video-agents and Luna-Video-Manager will not distribute streams to this agent (see agents downgrade for details)

alive_period

The period which agent must works autonomously without connection to Luna-Video-Manager. It does not mean that video-agent might work without agent registration.

The provided primitive agent example contains agent registration example here.

Get streams for processing

The video-agent must decode and execute analytics on stream provided by Luna-Video-Manager.

To get streams which must be processed by video-agent it must periodically makes get streams for processing request (see agent registration and agents downgrade for related information).

Is it required to correctly specify the agent_id received during agent registration in request.

The reply can be empty what will mentions that there is no new streams for processing are appointed by Luna-Video-Manager to the video-agent.

When reply is not empty, it will contains list of streams with much data, including stream reference, type, parameters for video-decoder and many other parameters, each of them must be proceed in the right place.

The once agent receives stream for processing it must start stream processing and start feedback sending without any delay. Otherwise, it is not possible to Luna-Video-Manager for properly distribute streams to agents. It is not expected that stream with the same id and version appears in one request.

See streams distribution chapter, streams downgrade chapter and agents downgrade chapter for related information.

Streams parameters and its’ appointments

Parameters stream_id, version and analytic_index (from analytics) must be used in feedback request to Luna-Video-Manager.

Parameter feedback_frequency specified the maximum delay of feedback for stream must be sent. See feedback sending paragraph for related information.

Parameter version must be used by agent to replace stream with the same id which is in process, cause version increase means that stream has been replaced by user and can contains all new parameters instead of stream_id.

Parameters from data section must be used by video-decoder (and optionally, if video downloads).

Parameters from analytics section contains list of analytics and its’ parameters which must process the stream.

Parameters from notification_policy are intended to specify when and what callbacks must be triggered by video-agent when stream change it’s processing status.

Other parameters such as source, location and other must be used in analytics callbacks and websocket notifications.

The provided primitive agent example contains streams getting example here.

The provided primitive agent example contains streams processing example here.

Feedback sending

The video-agent must periodically (for details, see parameters description in streams getting paragraph) send feedback for all streams which are processed by video-agent to make Luna-Video-Manager knows actual statuses of stream processing. If Luna-Video-Manager not receive feedback in time streams processing will not happens as expected, see streams downgrade chapter for details.

To send feedback, the video-agent must use send feedback request including:

  • stream_id, status - stream id and actual stream processing status

  • version - actual stream version

  • time - feedback generation time in format RFC 3339

  • error - text with error or message denoting that no errors occurred during stream processing

  • analytics indexed - the same indexes as idx from analytics in get streams reply

  • analytics_intervals - actual analytics working intervals versions

The reply on this requests from Luna-Video-Manager are also contains useful information which must by taken into account by video-agent (for each stream):

  • stream_id and version - for comparison with current stream id and it’s version: if they are not the same, it means that stream version was updated and currently processing stream must be stopped

  • action - action which video-agent must apply to stream - continue it’s processing or stop

  • optional new_analytics_intervals which contains new analytics working intervals and its’ versions: if this field appears in feedback request reply, the new analytics working intervals must be immediately applied

    For more information about analytics working intervals see this paragraph.

Other situations when feedback must be sent independently of feedback sending period
  • Agent receive new stream for processing, validate parameters and concludes that stream processing is not possible (for example, specified stream reference url is not available). In this case Luna-Video-Manager assumes that agent send feedback with stream failed status and agent will not process such stream.

  • Agent receive stream which processing must be stopped (for example due to user stop stream processing by making stream patch request). In this case Luna-Video-Manager assumes that agent send feedback with stream stopped status and stop stream processing.

  • During stream processing fatal (fatal means that stream processing is not longer possible) exception occurred (for example, stream suddenly became unavailable). In this case Luna-Video-Manager assumes that agent send feedback with failed stream status and stop stream processing.

  • During stream processing or callback sending an non-fatal unexpected exception occurred (for example, callback url is unavailable). In this case Luna-Video-Manager assumes that agent send feedback with stream in_progress status and error contains text with exception description and stream processing will continue.

  • Analytics processing is not currently running as scheduled.

  • Stream ends (more applicable for video file, but rtsp-stream also can ends). In this case Luna-Video-Manager assumes that agent send feedback with done status and stop stream processing.

  • All other significant information also can be send to Luna-Video-Manager for better understanding of stream processing progress.

The provided primitive agent example contains streams processing example here.

The provided primitive agent example contains feedback processing example here.

Analytics working intervals

There is a possibility that user specify some intervals when each analytics must work and when must not works.

Within framework of Luna-Video-Manager and video-agent interaction such intervals named as analytics working intervals and analytics which must now work at the current moment is paused. If all stream analytics must not work at the moment so stream status is paused.

The video-agent receives analytics working intervals during getting streams for processing and it is possible that these intervals can be updated, removed or firstly appears in reply during feedback processing. Independently of when and how analytics working intervals received, the video-agent must execute analytics according to actual intervals.

Case when no analytics must work according to working intervals

It is possible to continue video decoding in this case, but it is not recommended. It is recommended to stop video decoding if all analytics are paused and continue only when at least one analytics must works.

Feedback on paused analytics

When stream processing is paused (no one analytics must works according to working intervals) or at least one of analytics is paused, video-agent must send feedback with corresponding status to make Luna-Video-Manager knows that analytics processing works as expected and will continue in expected moment. See feedback processing for related information.

The provided primitive agent example contains streams processing example here.

The provided primitive agent example contains working intervals processing example here.

Websocket subscription

During stream processing it is possible the need to get information about each analytics processing results arise.

For this case video-agent must provide an opportunity to get all this information using websocket subscription:

  • it must /<api_version>/ws url available for ws subscription (where api_version is positive integer number which means api version of video-agent)

  • it must support required query parameters: stream_id and analytic_index where stream_id is id of stream of interest and the analytic_index is the same index as the one from analytics in get streams reply

  • it must support optional query parameter account_id which must works as filter and not allow to receive data for stream with another account_id

The provided primitive agent example contains websocket handler example here.

The provided primitive agent example contains websocket data provider example here.

The provided primitive agent example contains script which make ws subscription example here.

Graceful shutdown

It is not required, but highly recommended to graceful shutdown of video-agent when it stops to make data in Luna-Video-Manager consistent and works faster.

It is recommended to make 2 actions before video-agent stops:

The provided primitive agent example contains agent graceful shutdown example here.

Primitive agent example explanation

In this paragraph presented primitive agent example structure with explanation (for each module) of differences between the example and fully-fledged video-agent solution.

This examples demonstrates how video-agent works:

  • register analytics and agent itself

  • get streams for processing

  • process streams using decoder and fake analytics

  • process analytics working intervals

  • imitates feedback and callbacks sending

  • send websocket events

This primitive example is available as zip-archive, for link, preparation and execution description, see this paragraph.

Warning

It is critically not recommended to use this or similar solutions in production or development, this solution is highly suboptimal.

This example is only intended for demonstration and research of Luna-Video-Manager and video-agent interaction rules.

Agent example file structure
 ├──agent.py
 ├──analytics.py
 ├──cli_args.py
 ├──decoder.py
 ├──feedback.py
 ├──intervals_resolver.py
 ├──run.py
 ├──stream_processing.py
 ├──structs_n_consts.py
 ├──ws_handler.py
 ├──ws_provider.py
 └──ws_subscribe.py
(agent.py) Module which contains agent-example high-level, initialize and shutdown methods.

The fully-fledged video-agent may be similar to this part of example except for one case when stream must be replaced with stream which has the same id but higher version (triggered by stream replace request).

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example high-level, initialize and shutdown methods


The tasks of this module:

    - provide function that will `register analytics <#anchor-analytics-registration>`_

    - provide function that will `register agent <#anchor-agent-registration>`_

    - provide function that will periodically `get streams for processing <#anchor-get-streams>`_ taking
      into account parameters received in reply of agent registration

    - provide function that will add new stream for processing

    - provide function that `gracefully shutdown video-agent <#anchor-shutdown>`_

"""

import asyncio
import time

import aiohttp
from stream_processing import processStream
from structs_n_consts import ACCOUNT_ID, VIDEO_MANAGER_API_URL
from ws_provider import WSProvider


class Agent:
    """Agent-example main class"""

    # list of analytics which agent-example supports
    myAnalytics: list[str] = ["fake_analytics"]

    def __init__(self, port: int):
        """Initialize agent variables for future usage"""
        self.port = port
        self.session: aiohttp.ClientSession | None = None
        self.agentId: str | None = None
        self.refreshPeriod: int | None = None
        self.alivePeriod: int | None = None
        self.tasks: dict[str, asyncio.Task] = {}
        self.wsProvider: WSProvider = WSProvider()

    async def makeRequest(self, uri: str, method: str = "POST", raiseForStatus: bool = True, json: dict | None = None):
        """Make HTTP-request with minimalistic wrap using 1 aiohttp session which is already initialized"""
        return await getattr(self.session, method.lower())(
            url=f"{VIDEO_MANAGER_API_URL}{uri}",
            json=json,
            headers={"Luna-Account-Id": ACCOUNT_ID},
            raise_for_status=raiseForStatus,
        )

    async def registerMyAnalytics(self):
        """Register analytics which agent-example supports"""
        for analyticsName in self.myAnalytics:
            reply = await self.makeRequest(
                uri="/analytics", json={"analytic_name": analyticsName}, raiseForStatus=False
            )
            if reply.status > 300 and reply.status != 409:
                replyJson = await reply.json()
                raise ValueError("Failed to register analytic" + str(replyJson))
        print("Success: agent analytics registration")

    async def registerAgent(self):
        """Register agent itself"""
        reply = await self.makeRequest(
            uri="/agents",
            json={
                "name": "my-agent",
                "max_stream_count": 2,
                "port": self.port,
                "api_version": 2,
                "analytic_names": self.myAnalytics,
            },
            raiseForStatus=False,
        )
        replyJson = await reply.json()
        if reply.status >= 300:
            raise ValueError("Failed to register agent: " + str(replyJson))
        self.agentId = replyJson["agent_id"]
        self.refreshPeriod = replyJson["refresh_period"]
        self.alivePeriod = replyJson["alive_period"]
        print("Success: agent registration")

    async def run(self):
        """
        Main agent execution point:
        - initialize aiohttp session for future usage
        - register analytics, agent
        - execute main stream processing cycle
        """
        self.session = aiohttp.ClientSession()
        await self.registerMyAnalytics()
        await self.registerAgent()
        replyFailTime: float | None = None

        while True:
            if replyFailTime is not None and time.time() - replyFailTime >= self.alivePeriod:
                print(f"Fail to connect manager for {self.alivePeriod} seconds, shut down")
                break
            await asyncio.sleep(self.refreshPeriod)
            reply = await self.makeRequest(uri=f"/agents/{self.agentId}/streams", method="POST", raiseForStatus=False)
            replyJson = await reply.json()
            if replyFailTime is None and reply.status >= 300:
                replyFailTime = time.time()
            else:
                replyFailTime = None

            for streamAsDict in replyJson["streams"]:
                self.tasks[streamAsDict["stream_id"]] = asyncio.create_task(
                    processStream(
                        streamAsDict=streamAsDict,
                        wsProvider=self.wsProvider,
                        agentId=self.agentId,
                        session=self.session,
                    )
                )

    async def runAsTask(self, app):
        """Execute agent main cycle as asyncio-task"""
        asyncio.create_task(self.run())

    async def close(self, app):
        """
        Shutdown agent:
        - Stop all active stream processing tasks
        - Close all ws connections
        - Remove agent from video-manager making http-request
        - Close aiohttp session
        """
        for activeTask in self.tasks.values():
            if not activeTask.cancelled() and not activeTask.done():
                activeTask.cancel()

        await self.wsProvider.close()
        if self.agentId is not None:
            await self.makeRequest(uri=f"/agents/{self.agentId}", method="delete")
        if self.session is not None:
            await self.session.close()
        print("Agent has been stopped")
(analytics.py) Module which contains agent-example fake analytics.

The fully-fledged video-agent cannot be similar to this part of example.

Code source.
"""
Module contains agent-example fake analytics.

The tasks of this module:

    - provide fake analytics which will take any parameters and not assume cpu/gpu resource and returns some fake results

"""

from uuid import uuid4


class FakeAnalytics:
    """
    Fake analytics, which is intended only as example without real processing of anything
    """

    def __init__(self, analytics: list[dict]):
        """Initialize interval variables for fake analytics"""
        self.analytics = analytics
        self.counter = 0
        self.enabled = True

    async def events(self, frame):
        """
        Fake events generator, works every second
        - If not enabled, it will sleep until is will be enabled
        - If enabled returns some fake `events` according to the primitive algorithm
        """
        while True:
            if self.enabled:
                self.counter += 1
                if self.counter == 20:
                    print("Last fake event")
                    yield {"event": {"event_id": str(uuid4())}, "value": 100500}
                elif self.counter == 1:
                    print("First fake event")
                    yield {"event": {"event_id": str(uuid4())}, "value": 0}
                elif self.counter % 5 == 0:
                    print(f"Fake event number {self.counter // 5 + 1}")
                    yield {"event": {"event_id": str(uuid4())}, "value": self.counter // 5}
                elif self.counter > 20:
                    break
(cli_args.py) Module which contains agent-example cli arguments parser.

The fully-fledged video-agent scarcely cannot be similar to this part of example cause fully-fledged video-agent is a complex solution that required a lot of settings.

Code source.
"""
Module contains agent-example cli arguments parser.

The tasks of this module:

    - provide command line arguments parser with only one available parameter

"""

from argparse import ArgumentParser


def getCliArgs():
    """Get parsed cli args: `port`"""
    parser = ArgumentParser()
    parser.add_argument("--port", type=int, default=5555, help="agent port", dest="port")
    return parser.parse_args()
(decoder.py) Module which contains agent-example decoder.

The fully-fledged video-agent cannot be similar to this part of example cause it does not process any decoding parameters specified in data section (see get streams for processing for related information).

Code source.
"""
Module contains agent-example decoder.

The tasks of this module:

    - provide video-decoder which will take any parameters and returns frames

"""

import cv2


class Decoder:
    """Decoder, which is intended only as example"""

    def __init__(self, url: str, **kwargs):
        """Fake initialization"""
        self.url = url
        self.cap = cv2.VideoCapture(url)
        self.kwargs = kwargs

    async def nextFrame(self):
        """Async generator which yields frames from source"""
        try:
            while True:
                _, frame = self.cap.read()
                if frame is None:
                    break
                yield frame
        finally:
            self.cap.release()
(feedback.py) Module which contains agent-example feedback processing functions.

The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic functionality which allows this example to works as real video-agent:

  • error field does not describe real error

  • last_processed_frame_time is fake

  • analytic_indexes hardcoded and not assumes any other values

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example feedback processing functions.

The tasks of this module:

    - provide function which `periodically send feedback <#anchor-feedback>`_ to Luna-Video-Manager

"""

import asyncio
import datetime
import time

import aiohttp
from structs_n_consts import VIDEO_MANAGER_API_URL, StreamStatus


class Feedback:
    """
    Primitive feedback sender which implements only basic login
    """

    def __init__(self, streamAsDict: dict, session: aiohttp.ClientSession, agentId: str):
        """Initialize internal variables for future usage"""
        self.streamAsDict = streamAsDict
        self.session = session
        self.status = StreamStatus.in_progress
        self.agentId = agentId

    async def _send(self):
        """
        Send feedback to video-manager, log, return reply from video-manager
        Warnings:
            - is valid for video-manager
            - is sending useless information as `error`
            - fake `time` value (last processed frame time)
            - `[0]` analytics index, which is not applicable for splittable streams or streams with > 1 analytics
        """
        reply = await self.session.post(
            url=f"{VIDEO_MANAGER_API_URL}/agents/{self.agentId}/streams/feedback",
            json={
                "streams": [
                    {
                        "stream_id": self.streamAsDict["stream_id"],
                        "status": self.status.value,
                        "error": "error" if self.status == StreamStatus.failed else "no error",
                        "version": self.streamAsDict["version"],
                        "time": str(datetime.datetime.now(datetime.UTC).replace(tzinfo=None)),
                        "last_processed_frame_time": float(time.time()),
                        "analytic_indexes": [0],
                    }
                ]
            },
        )
        print("feedback has been sent")
        return reply

    async def run(self):
        """
        Endless feedback which works every `feedback_frequency`(received from video-manager)
        sending cycle which sends feedback until (one of):
        - video-manager reply with `stop` signal for current stream (feedback with confirmation also must be sent)
        - stream is done or failed what is due to agent stream processing algorythm
        """
        while True:
            reply = await self._send()
            replyJson = await reply.json()
            if replyJson["streams"] and replyJson["streams"][0]["action"] == "stop":
                self.status = StreamStatus.stop
                await self._send()
                break
            if self.status in (StreamStatus.done, StreamStatus.failed):
                break
            await asyncio.sleep(self.streamAsDict["feedback_frequency"])
(intervals_solver.py) Module which contains agent-example working intervals resolver.

The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic functionality which allows this example to works as real video-agent:

  • it has no possibilities to process working intervals for all stream analytics - it is suboptimal

  • it has no possibilities to wait and execute next calculation only in time when next analytics state must switch

Note

The algorythm of calculation is not the most suboptimal algorythm in this example and can be used in fully-fledged solution with/without minimalistic improvements.

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example working intervals resolver.

The tasks of this module:

    - function which calculates should analytics works at the moment or not

"""

import datetime


def isAnalyticsMustWorks(intervals: list[int]):
    """
    Check whether analytics must work at the moment based on current time and analytics working
    intervals received from video-manager
    """
    if not intervals:
        return True

    curTime = int(datetime.datetime.now(datetime.UTC).replace(tzinfo=None).timestamp())

    firstStart = intervals[0]
    if firstStart > curTime:
        return False

    lastEnd = intervals[-1]
    if lastEnd <= curTime:
        return False

    mustWorks = False
    for idx, point in enumerate(intervals):
        if curTime < point:
            mustWorks = idx % 2 == 1
            break
        if curTime == point:
            mustWorks = idx % 2 == 0
            break

    return mustWorks
(run.py) Module which contains agent-example execution functions.

The fully-fledged video-agent might be similar to this part of example except for it must provide a lot of other features.

Code source.
"""
Module contains agent-example execution functions.

The tasks of this module:

    - create aiohttp web-application with 1 route, startup and shutdown actions and execute it

"""

from agent import Agent
from aiohttp import web
from cli_args import getCliArgs
from ws_handler import ws_handler


def main():
    """
    - Parse cli-args
    - Create aiohttp web application server with 1 route
    - Enable startup and shutdown actions (see `agent` module for details)
    - Run web application
    """
    args = getCliArgs()
    webApp = web.Application()
    webApp.add_routes([web.get("/2/ws", ws_handler)])
    webApp.agent = Agent(port=args.port)
    webApp.on_startup.append(webApp.agent.runAsTask)
    webApp.on_shutdown.append(webApp.agent.close)

    web.run_app(webApp, port=args.port)


if __name__ == "__main__":
    main()
(stream_processing.py) Module which contains agent-example stream processing functions.

The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic functionality which allows this example to works as real video-agent:

  • it does not provide and processing of notification_policy, but it might be similar to logs in some cases (for example - stream processing start, finish)

  • it does not provide possibility of analytics callbacks execution

  • there is no processing of stream replacement (triggered by stream replace request)

  • there is not processing of case when all analytics are paused so decoder works always

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example stream processing functions.

The tasks of this module:

    - covers the entire stream process from start to finish

    - log (print in the example) stream processing start, error, finish, callback sending, etc

    - launch processing in a coordinated manner taking into account analytics working intervals

    - send decoded frames to analytics and process results

"""

import asyncio

import aiohttp
from analytics import FakeAnalytics
from decoder import Decoder
from feedback import Feedback
from intervals_solver import isAnalyticsMustWorks
from structs_n_consts import StreamStatus
from ws_provider import WSProvider


async def processStream(streamAsDict: dict, session: aiohttp.ClientSession, agentId: str, wsProvider: WSProvider):
    """
    Process stream:
    - Create fake decoder and analytics; feedback sender and add stream to ws provider
    - while decoder return frames, check whether analytics must be executed due to its working intervals
    - if analytics must not work send corresponding feedback and log
    - if analytics must work fake `execute callbacks`, log and send events to all ws subscribers
    - if any exception occurred, fail stream and remove it from ws provider
    - if stream processing is done (no more frames from decoder), finish stream processing with corresponding feedback
    """
    print(f"Start processing stream: {streamAsDict['stream_id']}")
    decoder = Decoder(url=streamAsDict["data"]["reference"], **streamAsDict["data"])
    analyticsManager = FakeAnalytics(streamAsDict["analytics"])
    feedbackSender = Feedback(streamAsDict=streamAsDict, session=session, agentId=agentId)
    feedbackTask = asyncio.create_task(feedbackSender.run())
    wsProvider.addStream(stream=streamAsDict)

    try:
        async for frame in decoder.nextFrame():
            if feedbackTask.cancelled() or feedbackTask.done():
                print("Stream processing has been stopped due to manager reply")
                return
            if not isAnalyticsMustWorks(intervals=streamAsDict["analytics"][0]["intervals"]["intervals"]):
                feedbackSender.status = StreamStatus.paused
                analyticsManager.enabled = False
                print("Analytics processing is paused due to its scheduling")
                continue
            else:
                analyticsManager.enabled = True
                feedbackSender.status = StreamStatus.in_progress
                await asyncio.sleep(1)
            async for event in analyticsManager.events(frame):
                wsProvider.wsBroadcast(stream_id=streamAsDict["stream_id"], analyticsResults=event)
                print(f"Callbacks for event have been sent! Event: {event}")

    except Exception as exc:
        print(f"Exception occurred during stream processing: {str(exc)}")
        feedbackSender.status = StreamStatus.failed
        wsProvider.removeStream(streamId=streamAsDict["stream_id"])

    else:
        print(f"Finish processing stream: {streamAsDict['stream_id']}")
        feedbackSender.status = StreamStatus.done
        wsProvider.finishStream(streamId=streamAsDict["stream_id"])
(structs_n_consts.py) Module which contains agent-example auxiliary structs and constants.
Code source.
"""
Module contains agent-example auxiliary structs and constants.

This is auxilary module which only contains some useful structs and constants.

"""

from enum import IntEnum

# account id for agent registration (required for video-manager)
ACCOUNT_ID = "3ee93f1d-198c-4096-9674-bd42a30c969f"
# video-manager base url
VIDEO_MANAGER_API_URL = "http://127.0.0.1:5230/2"


class StreamStatus(IntEnum):
    """
    Enum for stream status.
    """

    pending = 0
    in_progress = 1
    done = 2
    restart = 3
    failed = 4
    stop = 5
    # special status for splittable stream with different sub statuses
    unknown = 6
    handler_lost = 7
    paused = 8
(ws_handler.py) Module contains agent-example websocket handler.

The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic functionality which allows this example to works as real video-agent:

  • it does not validate query parameters

  • there is no error processing which can reply to with a user specific error

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example websocket handler.

The tasks of this module:

    - to provide websocket subscription possibility

    - filter requests by comparison of specified query parameters and available streams

    - log errors

"""

import asyncio

from aiohttp import web
from websockets import ConnectionClosed


async def ws_handler(request):
    """
    Handler intended for processing websocket requests:
    - if required stream not exists or doesn't match account id & analytics index - raise an exception
    - otherwise, add subscriber to ws provider and send an event every second to subscriber
    - finish when subscriber close connection or stream removed from ws provider (due to it is done or failed)
    """
    ws = web.WebSocketResponse()

    wsProvider = request.app.agent.wsProvider
    streamId = request.query.get("stream_id")
    accountId = request.query.get("account_id")
    analyticIndex = int(request.query.get("analytic_index"))

    if not wsProvider.contains(streamId=streamId, accountId=accountId, analyticIndex=analyticIndex):
        raise ValueError(f"Stream with id {streamId} not found")
    await ws.prepare(request=request)
    wsProvider.addSubscriber(streamId=streamId, wsConnection=ws, analyticIndex=analyticIndex)
    try:
        while True:
            await asyncio.sleep(1)
            if not wsProvider.contains(streamId=streamId, analyticIndex=analyticIndex):
                break
    except ConnectionClosed as e:
        print(f"WS connection closed with code {e.code}: {e.reason}")
    except asyncio.CancelledError:
        print("Client disconnected session")
    except Exception:
        print("keepalive ping failed", exc_info=True)
    finally:
        wsProvider.removeSubscriber(streamId=streamId, wsConnection=ws, analyticIndex=analyticIndex)

    return ws
(ws_provider.py) Module which contains agent-example websocket provider.

The fully-fledged video-agent might be similar to this part of example.

This module is related to the following processes described above:
Code source.
"""
Module contains agent-example websocket provider.

The tasks of this module:

    - to provide possibility of comparison streams, its analytics and websocket subscribers

    - to provide possibilities of stream addition and removing

    - to provide possibilities of websocket subscription addition and removing

    - to execute sending required messages for all required websocket subscriptions

"""

import asyncio


class WSStreamMeta:
    """WS stream meta info"""

    __slots__ = ("accountId", "analyticIndexes")

    def __init__(self, analyticIndexes: list[int], accountId: str | None = None):
        self.accountId = accountId
        self.analyticIndexes = analyticIndexes


class WSProvider:
    """
    WS data provider which is intended to provide events from one or several streams to one or several subscribers
    Also it need check stream account id and analytics index affiliation before add subscribed (see `contains` method)
    """

    # stream id to connections map
    _websockets: dict[str, dict[int, set["WSConnection"]]]
    # stream id to account id map
    _streamsMeta: dict[str, WSStreamMeta]

    def __init__(self):
        self._websockets = {}
        self._streamsMeta = {}

    def contains(self, streamId: str, analyticIndex: int, accountId: str | None = None):
        """Check whether ws provider has knowledge about stream with specified id/account id"""
        if streamId not in self._streamsMeta:
            return False
        match = True
        if accountId is not None:
            match &= self._streamsMeta[streamId].accountId == accountId
        match &= analyticIndex in self._streamsMeta[streamId].analyticIndexes
        return match

    def wsBroadcast(
        self,
        stream_id: str,
        streamError: str | None = None,
        analyticsError: dict | None = None,
        analyticsResults: dict | list[dict] | None = None,
    ) -> None:
        """Broadcast message to all subscribers"""
        item2Send: dict
        if streamError is not None:
            item2Send = {"stream_status": "error", "error": streamError}
        elif analyticsError is not None:
            item2Send = {
                "stream_status": "in_progress",
                "error": None,
                "analytics_results": analyticsError,
            }
        elif analyticsResults is not None:
            item2Send = {
                "stream_status": "in_progress",
                "error": None,
                "analytics_results": analyticsResults,
            }
        else:
            raise NotImplementedError

        for analyticIdx, connections in self._websockets[stream_id].items():
            for connection in connections:
                asyncio.create_task(connection.send_json(item2Send))

    def addStream(self, stream: dict):
        """Add stream"""
        analyticIndexes = [row["idx"] for row in stream["analytics"]]
        streamId = stream["stream_id"]
        if streamId not in self._websockets:
            self._websockets[streamId] = {}
        for analyticIdx in analyticIndexes:
            if analyticIdx not in self._websockets[streamId]:
                self._websockets[streamId][analyticIdx] = set()
        if streamId not in self._streamsMeta:
            self._streamsMeta[streamId] = WSStreamMeta(accountId=stream["account_id"], analyticIndexes=analyticIndexes)

    def removeStream(self, streamId: str):
        """Remove stream"""
        if streamId in self._websockets:
            for analyticIdx, connections in self._websockets[streamId].items():
                for connection in connections:
                    asyncio.create_task(connection.send_json({"stream_status": "error", "error": "processing stopped"}))
        self._websockets.pop(streamId, None)
        self._streamsMeta.pop(streamId, None)

    def finishStream(self, streamId: str):
        """Remove stream from pool"""
        if streamId in self._websockets:
            for analyticIdx, connections in self._websockets[streamId].items():
                for connection in connections:
                    asyncio.create_task(connection.send_json({"stream_status": "finished", "error": None}))
        self._websockets.pop(streamId, None)
        self._streamsMeta.pop(streamId, None)

    def addSubscriber(self, streamId: str, analyticIndex: int, wsConnection):
        """Add subscriber for stream"""
        if streamId not in self._websockets:
            self._websockets[streamId] = {}
        if analyticIndex not in self._websockets[streamId]:
            self._websockets[streamId][analyticIndex] = set()
        self._websockets[streamId][analyticIndex].add(wsConnection)

    def removeSubscriber(self, streamId: str, analyticIndex: int, wsConnection):
        """Remove subscriber for stream"""
        if streamId in self._websockets:
            if analyticIndex in self._websockets[streamId]:
                self._websockets[streamId][analyticIndex].remove(wsConnection)

    async def close(self):
        coros = []
        for setOfAnalytics in self._websockets.values():
            for wsSet in setOfAnalytics.values():
                for ws in wsSet:
                    coros.append(ws.close(code=1001, reason="Server shutdown"))
        self._websockets = {}
        await asyncio.gather(*coros)
(ws_subscribe.py) Module which contains simple websocket subscription script example.

This module is not part of video-agent example, but just shows how can websocket subscription can be implemented.

Code source.
"""
Module contains simple websocket subscription script example.
Warnings:
    Ensure stream is currently being processing otherwise it will raise an exception
"""

import asyncio

import websockets
from websockets import ConnectionClosedOK

WS_RESP = None
STREAM_ID = "<REPLACE_WITH_STREAM_ID_OF_INTEREST>"


async def main():
    global WS_RESP
    uri = f"ws://127.0.0.1:5555/2/ws?stream_id={STREAM_ID}&analytic_index=0"
    headers = {}
    try:
        async with websockets.connect(uri, additional_headers=headers) as websocket:
            while True:
                WS_RESP = await websocket.recv()
                print(WS_RESP)
    except ConnectionClosedOK:
        print("Events receiving complete")


if __name__ == "__main__":
    asyncio.run(main())

Primitive agent example execution

Warning

It is critically not recommended to use this or similar solutions in production or development, this solution is highly suboptimal.

This example is only intended for demonstration and research of Luna-Video-Manager and video-agent interaction rules.

The primitive agent example

Source code is available for download here

Preparation and execution.

To prepare this example, it needs Luna-Video-Manager is available and some requirements to be installed, for example, this way:

unzip agent-example.zip
cd agent_example
python3.13 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

To run this example

python run.py --port=5555

Where 5555 is default port and it is allow to not specify it or use another port.