Matching plugin example

"""
Plugin for matching by indexed lists.
"""

import asyncio
import logging

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from luna_plugins.matcher_plugins.base_plugin import BaseMatcherPlugin
from luna_plugins.matcher_plugins.base_struct import (
    ErrorMatchResult,
    Face,
    FaceMatchResult,
    IMatcher,
    MatchRequest,
    MatchResult,
    MatchUnitType,
)
from luna_plugins.matcher_plugins.exceptions import Rematch
from tzlocal import get_localzone_name
from vlutils.descriptors.data import DescriptorType

from .matcher import MatcherClient

logger = logging.getLogger("luna-plugins.lim-plugin")


class IndexedListMatcher(IMatcher):
    """
    Indexed list matcher class.
    """

    # indexed matcher client
    client: MatcherClient
    # list of labels acceptable for indexed matching (synchronizing in background)
    matchingLabels: set[str]

    # available reference types
    availableReferenceTypes = frozenset([MatchUnitType.descriptor])
    # available candidates types
    availableCandidateTypes = frozenset([MatchUnitType.face])
    # available match descriptor types
    availableDescriptorTypes = frozenset([DescriptorType.face])
    # available matching sort order
    availableSortOrder = frozenset(["similarity"])

    def getAvailableTargets(self, origin: MatchUnitType) -> frozenset[str]:
        """Description see :func:`~IMatcher.getAvailableTargets`."""
        return frozenset(["face_id", "similarity"])

    def getMatchingCost(self, matchRequest: MatchRequest) -> float:
        """Description see :func:`~IMatcher.getMatchingCost`."""
        if set(matchRequest.candidate.filters) - {"origin", "account_id"} == {"list_id"}:
            if matchRequest.candidate.filters["list_id"] in self.matchingLabels:
                return 10
        return float("inf")

    @classmethod
    async def syncMatchingLabels(cls):
        """Sync matching labels."""
        try:
            cls.matchingLabels = await cls.client.readMatchingLabels()
        except Exception as exc:
            logger.error(f"Failed to sync matching labels: {exc}")

    @classmethod
    async def initialize(cls, matcherConfig: dict):
        """Initialize indexed matching client."""
        cls.client = await MatcherClient.create(matcherConfig["REDIS_URL"])
        cls.timeout = matcherConfig["REQUEST_TIMEOUT"]
        cls.scheduler = AsyncIOScheduler(timezone=get_localzone_name())
        cls.scheduler.add_job(cls.syncMatchingLabels, trigger="interval", seconds=1)
        cls.scheduler.start()

    @classmethod
    async def close(cls):
        """Close matching client."""
        cls.scheduler.shutdown()
        await cls.client.close()

    async def _match(self, matchingRequest: MatchRequest) -> MatchResult:
        """
        Match candidate & reference by indexed list ID.

        Args:
            matchingRequest: matching request

        Returns:
            match result
        """
        async with self.client.session() as conn:
            match = await conn.match(
                descriptor=matchingRequest.reference.descriptor,
                label=matchingRequest.candidate.filters["list_id"],
                limit=matchingRequest.candidate.limit,
                lunaAccountId=matchingRequest.candidate.filters.get("account_id"),
                lunaRequestId=matchingRequest.requestId,
                timeout=self.timeout,
            )

        if (error := match["error"]) is not None:
            results = ErrorMatchResult(errorCode=error["error_code"], desc=error["desc"], detail=error["detail"])
        else:
            if threshold := matchingRequest.candidate.threshold:
                filtered = filter(lambda x: x[1] >= threshold, match["result"])
            else:
                filtered = match["result"]
            results = [FaceMatchResult(Face(faceId=res[0]), similarity=res[1]) for res in filtered]

        return MatchResult(matchRequest=matchingRequest, results=results)

    async def match(self, matchRequest: MatchRequest) -> MatchResult:
        """Description see :func:`~IMatcher.match`."""

        try:
            return await self._match(matchRequest)
        except asyncio.exceptions.TimeoutError:
            raise Rematch(f"Index for matching label '{matchRequest.candidate.filters['list_id']}' not found")
        except Exception as exc:
            logger.exception(f"match are failed, label {matchRequest.candidate.filters['list_id']}")
            raise Rematch(exc) from exc


class IndexedListPlugin(BaseMatcherPlugin):
    """Indexed list plugin class."""

    class Meta:
        # pulling matcher config from configurator
        configSource = "configurator"

    def getMatcher(self) -> IndexedListMatcher:
        """
        Get indexed list matcher.

        Returns:
            class `~IndexedListMatcher`
        """
        return IndexedListMatcher()

    async def initialize(self) -> None:
        """Description see :func:`~BaseMatcherPlugin.initialize`."""
        await super().initialize()
        await IndexedListMatcher.initialize(self.matcherConfig)

    async def close(self) -> None:
        """Description see :func:`~BaseMatcherPlugin.close`."""
        await IndexedListMatcher.close()