Agent interaction

The agent represents handler that can decode and execute analytics on stream provided by Luna-Video-Manager.

Agent interaction assumed:

  • Optionally analytics registration. Analytics can be provided by agents (Luna-Video-Agent or other agents services) or administrators using creates analytics request. Independently of analytic registration source, before agent starts, the analytics must be registered in Luna-Video-Manager, see analytics chapter for more information

  • Agent registration (by making agent registration request): agent must provide its name, listening port, api version, list of available analytic names and maximum number of streams available to it for simultaneous processing, optionally some description can be provided too

    As the registration confirmation, the agent will receive agent_id, which must be used in further requests, it also receive the refresh_period for making get agent streams request (see next step for usage description). It also receive the alive_period, which is necessary for usage in case when connection to Luna-Video-Manager will be refused (see further description).

    If agent parameters such as list of available analytic names or maximum number of streams changed during agent existence, the agent must stop all streams processing, remove its registration and make registration request one more time.

    Agent port and api version will be used by Luna-Video-Manager for making websocket subscription to receive data from agent.

  • The agent must make periodically requests to get streams with period received in previous step, otherwise agent will be excluded from list of active agents and Luna-Video-Manager will not distribute streams to this agent (see agents downgrade for details). Response of this requests will contains streams which processing must be started including stream id, version and other meta information, which must be stored for internal usage and feedback sending.

  • The agent must provide an opportunity to receive analytics results in real time by supporting websocket subscription on the following route: <agent_host>:<agent_port>/<agent_api_version>/ws?stream_id=<stream_id>&account_id=<account_id>, where:

    • <agent_api_version> is an natural integer

    • <stream_id> and <account_id> is stream id and account id respectively

    The information received from analytics for each analyzed frame must be sent to all actual subscribers as soon as it possible.

  • If agent failed to make requests described above (for example, if connection refused) for more than alive_period, all streams processing must be stopped. After that, agent can stop itself or continue attempt to establish connection to Luna-Video-Manager. If connection will be established, agent will be returned to list of agents, which available for streams processing.

  • The once agent receives stream for processing it must start stream processing without any delay. Otherwise, it is not possible to Luna-Video-Manager for properly distribute streams to agents. It is not expected that stream with the same id and version appears in one request. If a situation arises in which stream which processing must be stopped hits the agent, it must be ignored.

  • Agent must send feedback with streams processing information periodically for each stream or all stream in one request (period received in response of get streams request as feedback_frequency parameter), otherwise stream will be marked as failed, see streams downgrade for details.

    Feedback must contains:

    • stream id, version and stream processing status

    • feedback generation time in format RFC 3339

    • feedback error text

    Feedback must be send in several cases:

    • Agent receive new stream for processing, validate parameters and concludes that stream processing is not possible (for example, specified stream reference url is not available). In this case Luna-Video-Manager assumes that agent send feedback with stream failed status and agent will not process such stream.

    • Agent stream processing happens normally. In this case Luna-Video-Manager assumes that agent send feedback with stream in_progress status. It is allowed to send no error or other positive message as feedback error.

    • Agent receive stream which processing must be stopped (for example due to user stop stream processing by making stream patch request). In this case Luna-Video-Manager assumes that agent send feedback with stream stopped status and stop stream processing.

    • During stream processing fatal (fatal means that stream processing is not longer possible) exception occurred (for example, stream suddenly became unavailable). In this case Luna-Video-Manager assumes that agent send feedback with failed stream status and stop stream processing.

    • During stream processing or callback sending an non-fatal unexpected exception occurred (for example, callback url is unavailable). In this case Luna-Video-Manager assumes that agent send feedback with stream in_progress status and error contains text with exception description and stream processing will proceed.

    • Stream ends (more applicable for video file, but rtsp-stream also can ends). In this case Luna-Video-Manager assumes that agent send feedback with done status and stop stream processing.

    • All other significant information also can be send to Luna-Video-Manager for better understanding of stream processing progress.

    Agent will receive response from feedback request which will contain actual information of what it must to with actual streams - continue or stop processing. If agent received information contains that stream with specified id/version must be stopped - agent must stop its processing.

  • Agent must remove it’s registration from Luna-Video-Manager by making agent deletion request when agent usage is no longer expected. Before agent registration will be removed, it must stops all streams processing and send relevant feedback, otherwise streams processing will be downgraded (see the description above).

The service administrator also can get information and, in some extreme cases, manage agents using provided requests.

Agent description

The example of agent logic in form of python-like pseudocode, where

is presented below:

SUBSCRIBERS: dict[str, set] = dict()
STREAMS_IN_PROGRESS = set()

class WSHandler:

    async def get(request, wsConnection):
        # websocket handler
        streamId = request.queries.get("stream_id")
        accountId = request.queries.get("account_id")
        global SUBSCRIBERS
        SUBSCRIBERS[streamId].add(wsConnection)
        while wsConnection.alive and streamId in streamId in STREAMS_IN_PROGRESS:
            ...
        SUBSCRIBERS.remove(wsConnection)

class WebServer:
    ...
    routes = [
    ...,
    "/<api_version>/ws", WSHandler
    ]

class Feedback:
    ...

class Agent:

    myAnalytics: list
    tasks: dict
    feedbackList: list

    def makeRequest(**kwargs):
        """Make HTTP-request"""
        ...

    def registerMyAnalytics(self):
        """Register available analytics"""
        for myAnalytic in self.myAnalytics:
            reply = analyticRegistrationRequest(myAnalytic)
            if reply.status_code not in (200, 409):
                raise ValueError("Failed to register analytic")

    def registerMyself(self):
        """Register agent itself"""
        reply = agentRegistrationRequest()
        if not reply.isGood:
            raise ValueError("Failed to register agent")

    def executeAnalytic(self, analytic, frame):
        """Execute analytic on frame"""

    async def processStream(self, stream):
        """Process stream: decode each frame and execute analytic, then - send feedback and execute callbacks"""
        global SUBSCRIBERS
        global STREAMS_IN_PROGRESS
        for frame in decoder(stream):
            result = []
            for myAnalytic in self.myAnalytics:
                data = self.executeAnalytic(myAnalytic, frame)
                # send any data to ws subscribers
                for ws in SUBSCRIBERS[streamId]:
                    ws.send(data)
                if not data.isGood:
                    self.makeRequest("bad feedback", stream)
                if data.isVeryBad:
                    self.makeRequest("fail", stream)
                    return
                result.append(data)
            self.feedbackList.append(Feedback(stream))
            self.makeRequest(stream["callbacks"], result)
        STREAMS_IN_PROGRESS.remove(stream["stream_id"])

    async def sendFeedback():
        toSend = [feedback.prepare() for feedback in self.feedbackList]
        replyJson = await feedbackRequest(body=toSend)
        for row in replyJson.items():
            if row["action"] == "stop":
                del self.tasks[row["stream_id"]]

    async def run(self):
        """Main cycle"""
        global STREAMS_IN_PROGRESS
        self.registerMyAnalytics()
        self.registerMe()
        while True:
            streamsToStart = getStreamsRequest()
            for stream in streamsToStart:
                STREAMS_IN_PROGRESS.add(stream["stream_id"])
                self.tasks[stream["stream_id"]] = asyncio.create_task(self.processStream(stream))
            await self.sendFeedback
            await asyncio.sleep(...)


if __name__ == "__main__":
    asyncio.run(Agent().run())