"""
Plugin for matching by indexed lists.
"""
import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from luna_plugins.matcher_plugins.base_plugin import BaseMatcherPlugin
from luna_plugins.matcher_plugins.base_struct import (
ErrorMatchResult,
Face,
FaceMatchResult,
IMatcher,
MatchRequest,
MatchResult,
MatchUnitType,
)
from luna_plugins.matcher_plugins.exceptions import Rematch
from tzlocal import get_localzone_name
from vlutils.descriptors.data import DescriptorType
from .matcher import MatcherClient
logger = logging.getLogger("luna-plugins.lim-plugin")
class IndexedListMatcher(IMatcher):
"""
Indexed list matcher class.
"""
# indexed matcher client
client: MatcherClient
# list of labels acceptable for indexed matching (synchronizing in background)
matchingLabels: set[str]
# available reference types
availableReferenceTypes = frozenset([MatchUnitType.descriptor])
# available candidates types
availableCandidateTypes = frozenset([MatchUnitType.face])
# available match descriptor types
availableDescriptorTypes = frozenset([DescriptorType.face])
# available matching sort order
availableSortOrder = frozenset(["similarity"])
def getAvailableTargets(self, origin: MatchUnitType) -> frozenset[str]:
"""Description see :func:`~IMatcher.getAvailableTargets`."""
return frozenset(["face_id", "similarity"])
def getMatchingCost(self, matchRequest: MatchRequest) -> float:
"""Description see :func:`~IMatcher.getMatchingCost`."""
if set(matchRequest.candidate.filters) - {"origin", "account_id"} == {"list_id"}:
if matchRequest.candidate.filters["list_id"] in self.matchingLabels:
return 10
return float("inf")
@classmethod
async def syncMatchingLabels(cls):
"""Sync matching labels."""
try:
cls.matchingLabels = await cls.client.readMatchingLabels()
except Exception as exc:
logger.error(f"Failed to sync matching labels: {exc}")
@classmethod
async def initialize(cls, matcherConfig: dict):
"""Initialize indexed matching client."""
cls.client = await MatcherClient.create(matcherConfig["REDIS_URL"])
cls.timeout = matcherConfig["REQUEST_TIMEOUT"]
cls.scheduler = AsyncIOScheduler(timezone=get_localzone_name())
cls.scheduler.add_job(cls.syncMatchingLabels, trigger="interval", seconds=1)
cls.scheduler.start()
@classmethod
async def close(cls):
"""Close matching client."""
cls.scheduler.shutdown()
await cls.client.close()
async def _match(self, matchingRequest: MatchRequest) -> MatchResult:
"""
Match candidate & reference by indexed list ID.
Args:
matchingRequest: matching request
Returns:
match result
"""
async with self.client.session() as conn:
match = await conn.match(
descriptor=matchingRequest.reference.descriptor,
label=matchingRequest.candidate.filters["list_id"],
limit=matchingRequest.candidate.limit,
lunaAccountId=matchingRequest.candidate.filters.get("account_id"),
lunaRequestId=matchingRequest.requestId,
timeout=self.timeout,
)
if (error := match["error"]) is not None:
results = ErrorMatchResult(errorCode=error["error_code"], desc=error["desc"], detail=error["detail"])
else:
if threshold := matchingRequest.candidate.threshold:
filtered = filter(lambda x: x[1] >= threshold, match["result"])
else:
filtered = match["result"]
results = [FaceMatchResult(Face(faceId=res[0]), similarity=res[1]) for res in filtered]
return MatchResult(matchRequest=matchingRequest, results=results)
async def match(self, matchRequest: MatchRequest) -> MatchResult:
"""Description see :func:`~IMatcher.match`."""
try:
return await self._match(matchRequest)
except asyncio.exceptions.TimeoutError:
raise Rematch(f"Index for matching label '{matchRequest.candidate.filters['list_id']}' not found")
except Exception as exc:
logger.exception(f"match are failed, label {matchRequest.candidate.filters['list_id']}")
raise Rematch(exc) from exc
class IndexedListPlugin(BaseMatcherPlugin):
"""Indexed list plugin class."""
class Meta:
# pulling matcher config from configurator
configSource = "configurator"
def getMatcher(self) -> IndexedListMatcher:
"""
Get indexed list matcher.
Returns:
class `~IndexedListMatcher`
"""
return IndexedListMatcher()
async def initialize(self) -> None:
"""Description see :func:`~BaseMatcherPlugin.initialize`."""
await super().initialize()
await IndexedListMatcher.initialize(self.matcherConfig)
async def close(self) -> None:
"""Description see :func:`~BaseMatcherPlugin.close`."""
await IndexedListMatcher.close()