Agent interaction
Foreword
The video-agent represents handler that can decode and execute analytics on stream provided by Luna-Video-Manager.
The interaction between the Luna-Video-Manager and the video agent is facilitated via HTTP.
Full functionality requires the video-agent to meet a number of requirements, which are detailed in this chapter by
provided primitive example including a description of its implementation and limitations compared to a fully-fledged solution.
This primitive example is available as zip-archive, for link, preparation and execution description,
see this paragraph.
Analytics registration
A video agent is intended to process streams via analytics.
To read more about what analytics is it and why is it needed, see analytics chapter.
The Luna-Video-Manager should be aware which agent with which analytics can works, but Luna-Video-Manager doesn’t
know about any analytics in general, so each video-agent must register its analytics before start work as video-agent.
To register analytics it is need to make analytics registration request
with actual parameters.
Note that only analytics_name parameter is required, but all other parameters are not useless:
for example, validation_schema is used by Luna-Video-Manager to validate analytics parameters from user request before it
gets to the video-agent, documentation can be viewed to user could correctly specify analytics parameters, etc.
It is possible that analytics registered by external request, but to make sure that agent analytics and analytics
registered in Luna-Video-Manager are equal and not just that they have the same names, it is highly recommended to
make video-agent register analytics itself.
It is possible when video-agent restarts or more than one video-agent register the same analytics to receive
response with 409 status code and already exists text from Luna-Video-Manager.
It is recommended to replace such
analytics during the video-agent startup using analytics replace request
to make sure the latest version of analytics with actual parameters is registered in Luna-Video-Manager.
The Luna-Video-Manager allows to:
what makes possible to make custom algorythm of analytics registration in different cases.
The provided primitive agent example contains analytics registration example here.
Agent registration
A video-agent is intended for work with Luna-Video-Manager.
To make Luna-Video-Manager knows how many and what video-agents exists, which analytics can they process and so on, each
video-agent must be registered in Luna-Video-Manager.
Note
The agent registration is the only one required step which must be done before stream processing.
To register agent it is need to make agent registration request
with actual parameters.
Note that not all parameters in this request are required, but optional parameters are not useless:
for example, description is used by user to get some additional information about video-agent and agent_groups
parameter is allow to make Luna-Video-Manager specify only stream linked to some groups to agent (more description
here), etc.
If agent parameters such as list of available analytic names or maximum number of streams changed during agent
existence, the agent must stop all streams processing, remove its registration (using
delete agent request) and make registration request one more time.
Agent port and api version will be used by Luna-Video-Manager for making websocket subscription to receive data
from agent.
The parameters from Luna-Video-Manager reply are also valuable and must be taken into account by video-agent
An id of agent generated by Luna-Video-Manager which must be used by video-agent in future
requests to Luna-Video-Manager
A video agent must repeat the get streams for processing request
at least this period, otherwise the video-agent will be excluded from list of active video-agents and
Luna-Video-Manager will not distribute streams to this agent
(see agents downgrade for details)
The period which agent must works autonomously without connection to Luna-Video-Manager. It does not mean that
video-agent might work without agent registration.
The provided primitive agent example contains agent registration example here.
Get streams for processing
The video-agent must decode and execute analytics on stream provided by Luna-Video-Manager.
To get streams which must be processed by video-agent it must periodically makes
get streams for processing request (see
agent registration and agents downgrade for
related information).
Is it required to correctly specify the agent_id received during agent registration in
request.
The reply can be empty what will mentions that there is no new streams for processing are appointed by Luna-Video-Manager
to the video-agent.
When reply is not empty, it will contains list of streams with much data, including stream reference, type, parameters
for video-decoder and many other parameters, each of them must be proceed in the right place.
The once agent receives stream for processing it must start stream processing and start
feedback sending without any delay. Otherwise, it is
not possible to Luna-Video-Manager for properly distribute streams to agents. It is not expected that stream
with the same id and version appears in one request.
See streams distribution chapter,
streams downgrade chapter and
agents downgrade chapter for related information.
Parameters stream_id, version and analytic_index (from analytics) must be used in
feedback request to Luna-Video-Manager.
Parameter feedback_frequency specified the maximum delay of feedback for stream must be sent. See
feedback sending paragraph for related information.
Parameter version must be used by agent to replace stream with the same id which is in process, cause
version increase means that stream has been replaced by user and can contains all new parameters instead of
stream_id.
Parameters from data section must be used by video-decoder (and optionally, if video downloads).
Parameters from analytics section contains list of analytics and its’ parameters which must process the stream.
Parameters from notification_policy are intended to specify when and what callbacks must be triggered by video-agent
when stream change it’s processing status.
Other parameters such as source, location and other must be used in analytics callbacks and websocket notifications.
The provided primitive agent example contains streams getting example here.
The provided primitive agent example contains streams processing example here.
Feedback sending
The video-agent must periodically (for details, see parameters description in
streams getting paragraph) send feedback for all streams which are processed by video-agent to make
Luna-Video-Manager knows actual statuses of stream processing. If Luna-Video-Manager not receive feedback in time
streams processing will not happens as expected, see streams downgrade chapter for
details.
To send feedback, the video-agent must use send feedback request
including:
stream_id, status - stream id and actual stream processing status
version - actual stream version
time - feedback generation time in format RFC 3339
error - text with error or message denoting that no errors occurred during stream processing
analytics indexed - the same indexes as idx from analytics in get streams reply
analytics_intervals - actual analytics working intervals versions
The reply on this requests from Luna-Video-Manager are also contains useful information which must by taken into account
by video-agent (for each stream):
stream_id and version - for comparison with current stream id and it’s version: if they are not the same, it means
that stream version was updated and currently processing stream must be stopped
action - action which video-agent must apply to stream - continue it’s processing or stop
optional new_analytics_intervals which contains new analytics working intervals and its’ versions: if this
field appears in feedback request reply, the new analytics working intervals must be immediately applied
For more information about analytics working intervals see this paragraph.
Agent receive new stream for processing, validate parameters and concludes that stream processing is not
possible (for example, specified stream reference url is not available). In this case Luna-Video-Manager
assumes that agent send feedback with stream failed status and agent will not process such stream.
Agent receive stream which processing must be stopped (for example due to user stop stream processing by
making stream patch request). In this case Luna-Video-Manager
assumes that agent send feedback with stream stopped status and stop stream processing.
During stream processing fatal (fatal means that stream processing is not longer possible) exception occurred
(for example, stream suddenly became unavailable). In this case Luna-Video-Manager assumes that agent send
feedback with failed stream status and stop stream processing.
During stream processing or callback sending an non-fatal unexpected exception occurred (for example,
callback url is unavailable). In this case Luna-Video-Manager assumes that agent send feedback with stream
in_progress status and error contains text with exception description and stream processing will continue.
Analytics processing is not currently running as scheduled.
Stream ends (more applicable for video file, but rtsp-stream also can ends). In this case Luna-Video-Manager
assumes that agent send feedback with done status and stop stream processing.
All other significant information also can be send to Luna-Video-Manager for better understanding of stream
processing progress.
The provided primitive agent example contains streams processing example here.
The provided primitive agent example contains feedback processing example here.
Analytics working intervals
There is a possibility that user specify some intervals when each analytics must work and when must not works.
Within framework of Luna-Video-Manager and video-agent interaction such intervals named as analytics working intervals
and analytics which must now work at the current moment is paused. If all stream analytics must not work at the moment
so stream status is paused.
The video-agent receives analytics working intervals during getting streams for processing and
it is possible that these intervals can be updated, removed or firstly appears in reply during
feedback processing. Independently of when and how analytics working intervals received, the
video-agent must execute analytics according to actual intervals.
It is possible to continue video decoding in this case, but it is not recommended. It is recommended to
stop video decoding if all analytics are paused and continue only when at least one analytics must works.
When stream processing is paused (no one analytics must works according to working intervals) or at least one of
analytics is paused, video-agent must send feedback with corresponding status to make Luna-Video-Manager knows
that analytics processing works as expected and will continue in expected moment.
See feedback processing for related information.
The provided primitive agent example contains streams processing example here.
The provided primitive agent example contains working intervals processing example here.
Websocket subscription
During stream processing it is possible the need to get information about each analytics processing results arise.
For this case video-agent must provide an opportunity to get all this information using websocket subscription:
it must /<api_version>/ws url available for ws subscription (where api_version is positive integer number
which means api version of video-agent)
it must support required query parameters: stream_id and analytic_index where stream_id is id of stream of
interest and the analytic_index is the same index as the one from analytics in
get streams reply
it must support optional query parameter account_id which must works as filter and not allow to receive data
for stream with another account_id
The provided primitive agent example contains websocket handler example here.
The provided primitive agent example contains websocket data provider example here.
The provided primitive agent example contains script which make ws subscription example here.
Graceful shutdown
It is not required, but highly recommended to graceful shutdown of video-agent when it stops to make data in
Luna-Video-Manager consistent and works faster.
It is recommended to make 2 actions before video-agent stops:
The provided primitive agent example contains agent graceful shutdown example here.
Primitive agent example explanation
In this paragraph presented primitive agent example structure with explanation (for each module) of differences
between the example and fully-fledged video-agent solution.
This examples demonstrates how video-agent works:
register analytics and agent itself
get streams for processing
process streams using decoder and fake analytics
process analytics working intervals
imitates feedback and callbacks sending
send websocket events
This primitive example is available as zip-archive, for link, preparation and execution description,
see this paragraph.
Warning
It is critically not recommended to use this or similar solutions in production or development,
this solution is highly suboptimal.
This example is only intended for demonstration and research of Luna-Video-Manager and video-agent
interaction rules.
Agent example file structure
├──agent.py
├──analytics.py
├──cli_args.py
├──decoder.py
├──feedback.py
├──intervals_resolver.py
├──run.py
├──stream_processing.py
├──structs_n_consts.py
├──ws_handler.py
├──ws_provider.py
└──ws_subscribe.py
The fully-fledged video-agent may be similar to this part of example except for one case when stream must be replaced
with stream which has the same id but higher version
(triggered by stream replace request).
"""
Module contains agent-example high-level, initialize and shutdown methods
The tasks of this module:
- provide function that will `register analytics <#anchor-analytics-registration>`_
- provide function that will `register agent <#anchor-agent-registration>`_
- provide function that will periodically `get streams for processing <#anchor-get-streams>`_ taking
into account parameters received in reply of agent registration
- provide function that will add new stream for processing
- provide function that `gracefully shutdown video-agent <#anchor-shutdown>`_
"""
import asyncio
import time
import aiohttp
from stream_processing import processStream
from structs_n_consts import ACCOUNT_ID, VIDEO_MANAGER_API_URL
from ws_provider import WSProvider
class Agent:
"""Agent-example main class"""
# list of analytics which agent-example supports
myAnalytics: list[str] = ["fake_analytics"]
def __init__(self, port: int):
"""Initialize agent variables for future usage"""
self.port = port
self.session: aiohttp.ClientSession | None = None
self.agentId: str | None = None
self.refreshPeriod: int | None = None
self.alivePeriod: int | None = None
self.tasks: dict[str, asyncio.Task] = {}
self.wsProvider: WSProvider = WSProvider()
async def makeRequest(self, uri: str, method: str = "POST", raiseForStatus: bool = True, json: dict | None = None):
"""Make HTTP-request with minimalistic wrap using 1 aiohttp session which is already initialized"""
return await getattr(self.session, method.lower())(
url=f"{VIDEO_MANAGER_API_URL}{uri}",
json=json,
headers={"Luna-Account-Id": ACCOUNT_ID},
raise_for_status=raiseForStatus,
)
async def registerMyAnalytics(self):
"""Register analytics which agent-example supports"""
for analyticsName in self.myAnalytics:
reply = await self.makeRequest(
uri="/analytics", json={"analytic_name": analyticsName}, raiseForStatus=False
)
if reply.status > 300 and reply.status != 409:
replyJson = await reply.json()
raise ValueError("Failed to register analytic" + str(replyJson))
print("Success: agent analytics registration")
async def registerAgent(self):
"""Register agent itself"""
reply = await self.makeRequest(
uri="/agents",
json={
"name": "my-agent",
"max_stream_count": 2,
"port": self.port,
"api_version": 2,
"analytic_names": self.myAnalytics,
},
raiseForStatus=False,
)
replyJson = await reply.json()
if reply.status >= 300:
raise ValueError("Failed to register agent: " + str(replyJson))
self.agentId = replyJson["agent_id"]
self.refreshPeriod = replyJson["refresh_period"]
self.alivePeriod = replyJson["alive_period"]
print("Success: agent registration")
async def run(self):
"""
Main agent execution point:
- initialize aiohttp session for future usage
- register analytics, agent
- execute main stream processing cycle
"""
self.session = aiohttp.ClientSession()
await self.registerMyAnalytics()
await self.registerAgent()
replyFailTime: float | None = None
while True:
if replyFailTime is not None and time.time() - replyFailTime >= self.alivePeriod:
print(f"Fail to connect manager for {self.alivePeriod} seconds, shut down")
break
await asyncio.sleep(self.refreshPeriod)
reply = await self.makeRequest(uri=f"/agents/{self.agentId}/streams", method="POST", raiseForStatus=False)
replyJson = await reply.json()
if replyFailTime is None and reply.status >= 300:
replyFailTime = time.time()
else:
replyFailTime = None
for streamAsDict in replyJson["streams"]:
self.tasks[streamAsDict["stream_id"]] = asyncio.create_task(
processStream(
streamAsDict=streamAsDict,
wsProvider=self.wsProvider,
agentId=self.agentId,
session=self.session,
)
)
async def runAsTask(self, app):
"""Execute agent main cycle as asyncio-task"""
asyncio.create_task(self.run())
async def close(self, app):
"""
Shutdown agent:
- Stop all active stream processing tasks
- Close all ws connections
- Remove agent from video-manager making http-request
- Close aiohttp session
"""
for activeTask in self.tasks.values():
if not activeTask.cancelled() and not activeTask.done():
activeTask.cancel()
await self.wsProvider.close()
if self.agentId is not None:
await self.makeRequest(uri=f"/agents/{self.agentId}", method="delete")
if self.session is not None:
await self.session.close()
print("Agent has been stopped")
The fully-fledged video-agent cannot be similar to this part of example.
"""
Module contains agent-example fake analytics.
The tasks of this module:
- provide fake analytics which will take any parameters and not assume cpu/gpu resource and returns some fake results
"""
from uuid import uuid4
class FakeAnalytics:
"""
Fake analytics, which is intended only as example without real processing of anything
"""
def __init__(self, analytics: list[dict]):
"""Initialize interval variables for fake analytics"""
self.analytics = analytics
self.counter = 0
self.enabled = True
async def events(self, frame):
"""
Fake events generator, works every second
- If not enabled, it will sleep until is will be enabled
- If enabled returns some fake `events` according to the primitive algorithm
"""
while True:
if self.enabled:
self.counter += 1
if self.counter == 20:
print("Last fake event")
yield {"event": {"event_id": str(uuid4())}, "value": 100500}
elif self.counter == 1:
print("First fake event")
yield {"event": {"event_id": str(uuid4())}, "value": 0}
elif self.counter % 5 == 0:
print(f"Fake event number {self.counter // 5 + 1}")
yield {"event": {"event_id": str(uuid4())}, "value": self.counter // 5}
elif self.counter > 20:
break
The fully-fledged video-agent scarcely cannot be similar to this part of example cause fully-fledged video-agent is
a complex solution that required a lot of settings.
"""
Module contains agent-example cli arguments parser.
The tasks of this module:
- provide command line arguments parser with only one available parameter
"""
from argparse import ArgumentParser
def getCliArgs():
"""Get parsed cli args: `port`"""
parser = ArgumentParser()
parser.add_argument("--port", type=int, default=5555, help="agent port", dest="port")
return parser.parse_args()
The fully-fledged video-agent cannot be similar to this part of example cause it does not process any decoding
parameters specified in data section (see get streams for processing
for related information).
"""
Module contains agent-example decoder.
The tasks of this module:
- provide video-decoder which will take any parameters and returns frames
"""
import cv2
class Decoder:
"""Decoder, which is intended only as example"""
def __init__(self, url: str, **kwargs):
"""Fake initialization"""
self.url = url
self.cap = cv2.VideoCapture(url)
self.kwargs = kwargs
async def nextFrame(self):
"""Async generator which yields frames from source"""
try:
while True:
_, frame = self.cap.read()
if frame is None:
break
yield frame
finally:
self.cap.release()
The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic
functionality which allows this example to works as real video-agent:
error field does not describe real error
last_processed_frame_time is fake
analytic_indexes hardcoded and not assumes any other values
"""
Module contains agent-example feedback processing functions.
The tasks of this module:
- provide function which `periodically send feedback <#anchor-feedback>`_ to Luna-Video-Manager
"""
import asyncio
import datetime
import time
import aiohttp
from structs_n_consts import VIDEO_MANAGER_API_URL, StreamStatus
class Feedback:
"""
Primitive feedback sender which implements only basic login
"""
def __init__(self, streamAsDict: dict, session: aiohttp.ClientSession, agentId: str):
"""Initialize internal variables for future usage"""
self.streamAsDict = streamAsDict
self.session = session
self.status = StreamStatus.in_progress
self.agentId = agentId
async def _send(self):
"""
Send feedback to video-manager, log, return reply from video-manager
Warnings:
- is valid for video-manager
- is sending useless information as `error`
- fake `time` value (last processed frame time)
- `[0]` analytics index, which is not applicable for splittable streams or streams with > 1 analytics
"""
reply = await self.session.post(
url=f"{VIDEO_MANAGER_API_URL}/agents/{self.agentId}/streams/feedback",
json={
"streams": [
{
"stream_id": self.streamAsDict["stream_id"],
"status": self.status.value,
"error": "error" if self.status == StreamStatus.failed else "no error",
"version": self.streamAsDict["version"],
"time": str(datetime.datetime.now(datetime.UTC).replace(tzinfo=None)),
"last_processed_frame_time": float(time.time()),
"analytic_indexes": [0],
}
]
},
)
print("feedback has been sent")
return reply
async def run(self):
"""
Endless feedback which works every `feedback_frequency`(received from video-manager)
sending cycle which sends feedback until (one of):
- video-manager reply with `stop` signal for current stream (feedback with confirmation also must be sent)
- stream is done or failed what is due to agent stream processing algorythm
"""
while True:
reply = await self._send()
replyJson = await reply.json()
if replyJson["streams"] and replyJson["streams"][0]["action"] == "stop":
self.status = StreamStatus.stop
await self._send()
break
if self.status in (StreamStatus.done, StreamStatus.failed):
break
await asyncio.sleep(self.streamAsDict["feedback_frequency"])
The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic
functionality which allows this example to works as real video-agent:
it has no possibilities to process working intervals for all stream analytics - it is suboptimal
it has no possibilities to wait and execute next calculation only in time when next analytics state must switch
Note
The algorythm of calculation is not the most suboptimal algorythm in this example and can be used in
fully-fledged solution with/without minimalistic improvements.
"""
Module contains agent-example working intervals resolver.
The tasks of this module:
- function which calculates should analytics works at the moment or not
"""
import datetime
def isAnalyticsMustWorks(intervals: list[int]):
"""
Check whether analytics must work at the moment based on current time and analytics working
intervals received from video-manager
"""
if not intervals:
return True
curTime = int(datetime.datetime.now(datetime.UTC).replace(tzinfo=None).timestamp())
firstStart = intervals[0]
if firstStart > curTime:
return False
lastEnd = intervals[-1]
if lastEnd <= curTime:
return False
mustWorks = False
for idx, point in enumerate(intervals):
if curTime < point:
mustWorks = idx % 2 == 1
break
if curTime == point:
mustWorks = idx % 2 == 0
break
return mustWorks
The fully-fledged video-agent might be similar to this part of example except for it must provide a lot of other
features.
"""
Module contains agent-example execution functions.
The tasks of this module:
- create aiohttp web-application with 1 route, startup and shutdown actions and execute it
"""
from agent import Agent
from aiohttp import web
from cli_args import getCliArgs
from ws_handler import ws_handler
def main():
"""
- Parse cli-args
- Create aiohttp web application server with 1 route
- Enable startup and shutdown actions (see `agent` module for details)
- Run web application
"""
args = getCliArgs()
webApp = web.Application()
webApp.add_routes([web.get("/2/ws", ws_handler)])
webApp.agent = Agent(port=args.port)
webApp.on_startup.append(webApp.agent.runAsTask)
webApp.on_shutdown.append(webApp.agent.close)
web.run_app(webApp, port=args.port)
if __name__ == "__main__":
main()
The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic
functionality which allows this example to works as real video-agent:
it does not provide and processing of notification_policy, but it might be similar to logs
in some cases (for example - stream processing start, finish)
it does not provide possibility of analytics callbacks execution
there is no processing of stream replacement (triggered by
stream replace request)
there is not processing of case when all analytics are paused so decoder works always
"""
Module contains agent-example stream processing functions.
The tasks of this module:
- covers the entire stream process from start to finish
- log (print in the example) stream processing start, error, finish, callback sending, etc
- launch processing in a coordinated manner taking into account analytics working intervals
- send decoded frames to analytics and process results
"""
import asyncio
import aiohttp
from analytics import FakeAnalytics
from decoder import Decoder
from feedback import Feedback
from intervals_solver import isAnalyticsMustWorks
from structs_n_consts import StreamStatus
from ws_provider import WSProvider
async def processStream(streamAsDict: dict, session: aiohttp.ClientSession, agentId: str, wsProvider: WSProvider):
"""
Process stream:
- Create fake decoder and analytics; feedback sender and add stream to ws provider
- while decoder return frames, check whether analytics must be executed due to its working intervals
- if analytics must not work send corresponding feedback and log
- if analytics must work fake `execute callbacks`, log and send events to all ws subscribers
- if any exception occurred, fail stream and remove it from ws provider
- if stream processing is done (no more frames from decoder), finish stream processing with corresponding feedback
"""
print(f"Start processing stream: {streamAsDict['stream_id']}")
decoder = Decoder(url=streamAsDict["data"]["reference"], **streamAsDict["data"])
analyticsManager = FakeAnalytics(streamAsDict["analytics"])
feedbackSender = Feedback(streamAsDict=streamAsDict, session=session, agentId=agentId)
feedbackTask = asyncio.create_task(feedbackSender.run())
wsProvider.addStream(stream=streamAsDict)
try:
async for frame in decoder.nextFrame():
if feedbackTask.cancelled() or feedbackTask.done():
print("Stream processing has been stopped due to manager reply")
return
if not isAnalyticsMustWorks(intervals=streamAsDict["analytics"][0]["intervals"]["intervals"]):
feedbackSender.status = StreamStatus.paused
analyticsManager.enabled = False
print("Analytics processing is paused due to its scheduling")
continue
else:
analyticsManager.enabled = True
feedbackSender.status = StreamStatus.in_progress
await asyncio.sleep(1)
async for event in analyticsManager.events(frame):
wsProvider.wsBroadcast(stream_id=streamAsDict["stream_id"], analyticsResults=event)
print(f"Callbacks for event have been sent! Event: {event}")
except Exception as exc:
print(f"Exception occurred during stream processing: {str(exc)}")
feedbackSender.status = StreamStatus.failed
wsProvider.removeStream(streamId=streamAsDict["stream_id"])
else:
print(f"Finish processing stream: {streamAsDict['stream_id']}")
feedbackSender.status = StreamStatus.done
wsProvider.finishStream(streamId=streamAsDict["stream_id"])
"""
Module contains agent-example auxiliary structs and constants.
This is auxilary module which only contains some useful structs and constants.
"""
from enum import IntEnum
# account id for agent registration (required for video-manager)
ACCOUNT_ID = "3ee93f1d-198c-4096-9674-bd42a30c969f"
# video-manager base url
VIDEO_MANAGER_API_URL = "http://127.0.0.1:5230/2"
class StreamStatus(IntEnum):
"""
Enum for stream status.
"""
pending = 0
in_progress = 1
done = 2
restart = 3
failed = 4
stop = 5
# special status for splittable stream with different sub statuses
unknown = 6
handler_lost = 7
paused = 8
The fully-fledged video-agent cannot be similar to this part of example due to several reasons, but provides basic
functionality which allows this example to works as real video-agent:
"""
Module contains agent-example websocket handler.
The tasks of this module:
- to provide websocket subscription possibility
- filter requests by comparison of specified query parameters and available streams
- log errors
"""
import asyncio
from aiohttp import web
from websockets import ConnectionClosed
async def ws_handler(request):
"""
Handler intended for processing websocket requests:
- if required stream not exists or doesn't match account id & analytics index - raise an exception
- otherwise, add subscriber to ws provider and send an event every second to subscriber
- finish when subscriber close connection or stream removed from ws provider (due to it is done or failed)
"""
ws = web.WebSocketResponse()
wsProvider = request.app.agent.wsProvider
streamId = request.query.get("stream_id")
accountId = request.query.get("account_id")
analyticIndex = int(request.query.get("analytic_index"))
if not wsProvider.contains(streamId=streamId, accountId=accountId, analyticIndex=analyticIndex):
raise ValueError(f"Stream with id {streamId} not found")
await ws.prepare(request=request)
wsProvider.addSubscriber(streamId=streamId, wsConnection=ws, analyticIndex=analyticIndex)
try:
while True:
await asyncio.sleep(1)
if not wsProvider.contains(streamId=streamId, analyticIndex=analyticIndex):
break
except ConnectionClosed as e:
print(f"WS connection closed with code {e.code}: {e.reason}")
except asyncio.CancelledError:
print("Client disconnected session")
except Exception:
print("keepalive ping failed", exc_info=True)
finally:
wsProvider.removeSubscriber(streamId=streamId, wsConnection=ws, analyticIndex=analyticIndex)
return ws
The fully-fledged video-agent might be similar to this part of example.
"""
Module contains agent-example websocket provider.
The tasks of this module:
- to provide possibility of comparison streams, its analytics and websocket subscribers
- to provide possibilities of stream addition and removing
- to provide possibilities of websocket subscription addition and removing
- to execute sending required messages for all required websocket subscriptions
"""
import asyncio
class WSStreamMeta:
"""WS stream meta info"""
__slots__ = ("accountId", "analyticIndexes")
def __init__(self, analyticIndexes: list[int], accountId: str | None = None):
self.accountId = accountId
self.analyticIndexes = analyticIndexes
class WSProvider:
"""
WS data provider which is intended to provide events from one or several streams to one or several subscribers
Also it need check stream account id and analytics index affiliation before add subscribed (see `contains` method)
"""
# stream id to connections map
_websockets: dict[str, dict[int, set["WSConnection"]]]
# stream id to account id map
_streamsMeta: dict[str, WSStreamMeta]
def __init__(self):
self._websockets = {}
self._streamsMeta = {}
def contains(self, streamId: str, analyticIndex: int, accountId: str | None = None):
"""Check whether ws provider has knowledge about stream with specified id/account id"""
if streamId not in self._streamsMeta:
return False
match = True
if accountId is not None:
match &= self._streamsMeta[streamId].accountId == accountId
match &= analyticIndex in self._streamsMeta[streamId].analyticIndexes
return match
def wsBroadcast(
self,
stream_id: str,
streamError: str | None = None,
analyticsError: dict | None = None,
analyticsResults: dict | list[dict] | None = None,
) -> None:
"""Broadcast message to all subscribers"""
item2Send: dict
if streamError is not None:
item2Send = {"stream_status": "error", "error": streamError}
elif analyticsError is not None:
item2Send = {
"stream_status": "in_progress",
"error": None,
"analytics_results": analyticsError,
}
elif analyticsResults is not None:
item2Send = {
"stream_status": "in_progress",
"error": None,
"analytics_results": analyticsResults,
}
else:
raise NotImplementedError
for analyticIdx, connections in self._websockets[stream_id].items():
for connection in connections:
asyncio.create_task(connection.send_json(item2Send))
def addStream(self, stream: dict):
"""Add stream"""
analyticIndexes = [row["idx"] for row in stream["analytics"]]
streamId = stream["stream_id"]
if streamId not in self._websockets:
self._websockets[streamId] = {}
for analyticIdx in analyticIndexes:
if analyticIdx not in self._websockets[streamId]:
self._websockets[streamId][analyticIdx] = set()
if streamId not in self._streamsMeta:
self._streamsMeta[streamId] = WSStreamMeta(accountId=stream["account_id"], analyticIndexes=analyticIndexes)
def removeStream(self, streamId: str):
"""Remove stream"""
if streamId in self._websockets:
for analyticIdx, connections in self._websockets[streamId].items():
for connection in connections:
asyncio.create_task(connection.send_json({"stream_status": "error", "error": "processing stopped"}))
self._websockets.pop(streamId, None)
self._streamsMeta.pop(streamId, None)
def finishStream(self, streamId: str):
"""Remove stream from pool"""
if streamId in self._websockets:
for analyticIdx, connections in self._websockets[streamId].items():
for connection in connections:
asyncio.create_task(connection.send_json({"stream_status": "finished", "error": None}))
self._websockets.pop(streamId, None)
self._streamsMeta.pop(streamId, None)
def addSubscriber(self, streamId: str, analyticIndex: int, wsConnection):
"""Add subscriber for stream"""
if streamId not in self._websockets:
self._websockets[streamId] = {}
if analyticIndex not in self._websockets[streamId]:
self._websockets[streamId][analyticIndex] = set()
self._websockets[streamId][analyticIndex].add(wsConnection)
def removeSubscriber(self, streamId: str, analyticIndex: int, wsConnection):
"""Remove subscriber for stream"""
if streamId in self._websockets:
if analyticIndex in self._websockets[streamId]:
self._websockets[streamId][analyticIndex].remove(wsConnection)
async def close(self):
coros = []
for setOfAnalytics in self._websockets.values():
for wsSet in setOfAnalytics.values():
for ws in wsSet:
coros.append(ws.close(code=1001, reason="Server shutdown"))
self._websockets = {}
await asyncio.gather(*coros)
This module is not part of video-agent example, but just shows how can websocket subscription can be implemented.
"""
Module contains simple websocket subscription script example.
Warnings:
Ensure stream is currently being processing otherwise it will raise an exception
"""
import asyncio
import websockets
from websockets import ConnectionClosedOK
WS_RESP = None
STREAM_ID = "<REPLACE_WITH_STREAM_ID_OF_INTEREST>"
async def main():
global WS_RESP
uri = f"ws://127.0.0.1:5555/2/ws?stream_id={STREAM_ID}&analytic_index=0"
headers = {}
try:
async with websockets.connect(uri, additional_headers=headers) as websocket:
while True:
WS_RESP = await websocket.recv()
print(WS_RESP)
except ConnectionClosedOK:
print("Events receiving complete")
if __name__ == "__main__":
asyncio.run(main())
Primitive agent example execution
Warning
It is critically not recommended to use this or similar solutions in production or development,
this solution is highly suboptimal.
This example is only intended for demonstration and research of Luna-Video-Manager and video-agent
interaction rules.
The primitive agent example
Source code is available for download here
To prepare this example, it needs Luna-Video-Manager is available and some requirements to be installed,
for example, this way:
unzip agent-example.zip
cd agent_example
python3.13 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
To run this example
python run.py --port=5555
Where 5555 is default port and it is allow to not specify it or use another port.