""" Matcher Proxy handler. """
import asyncio
import uuid
from typing import Any, Dict, List, Optional, Coroutine, Awaitable, Type, Tuple
from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest, RequestPayload
from sanic.response import HTTPResponse
from vlutils.descriptors.data import DescriptorType
from app_common.handlers.schemas import FaceMatch
from app_proxy.handlers.base_handler import ProxyBaseHandler
from classes.candidate_batch import CandidateBatch
from classes.enums import CandidateMatchTarget
from classes.match_processing_proxy import (
CoreIndexedMatchProcessor,
areFiltersSuitableForMatcher
)
from classes.reference import Reference
from crutches_on_wheels.web.query_getters import uuidGetter
[docs]class FacesMatcherProxyHandler(ProxyBaseHandler):
"""
Matcher proxy handler.
Resource: "/{api_version}/matcher/faces"
"""
[docs] def areCandidatesSuitableForIndexedMatcher(self, candidateBatch: CandidateBatch) -> bool:
"""
Check suitability to match using core indexed matcher
Candidate batch is suitable for core indexed matcher with the following conditions:
- indexed core cluster available (lpm use luna-indexed-core matcher)
- *INDEXED_MATCHER* is turned on (see config section named *ADDITIONAL_SERVICES_USAGE*)
- candidates batch origin is *faces*
- candidates filters are *list_id* and *account_id*
- *list_id* from candidates filters is indexed list (see *Luna-Index-Manager* documentation)
Returns:
True if filters are suitable to match using core indexed matcher else False
"""
if not self.app.ctx.matcherClusters.isIndexedCoreMatcherAvailable():
return False
if not self.config.additionalServicesUsage.indexedMatcher:
return False
if not areFiltersSuitableForMatcher(candidateBatch) or not self.app.ctx.indexedLists.isListIndexed(
candidateBatch.filters.listId
):
return False
return True
def _matchByPythonMatcher(self, json: Dict[str, Any], accountId: Optional[str] = None
) -> Coroutine[None, None, LunaResponse]:
"""
Proxy realisation for `PythonMatcherApi.match`.
See:
>>> from luna3.python_matcher.python_matcher import PythonMatcherApi
>>> PythonMatcherApi.match
Returns:
Python Matcher response
"""
headers = self.luna3Client.lunaPythonMatcher.getRequestIdAsDict()
payload = RequestPayload.buildMsgpack(json, headers)
return makeRequest(
"{}/matcher/faces".format(self.luna3Client.lunaPythonMatcher.baseUri),
"POST",
session=self.luna3Client.lunaPythonMatcher.session,
queryParams={"account_id": accountId},
body=payload,
raiseError=True,
asyncRequest=self.luna3Client.lunaPythonMatcher.getAsyncMode(),
)
async def _getCoreMatchResults(
self,
matchProcessor: Type[CoreIndexedMatchProcessor],
candidateBatches: List[CandidateBatch],
references: List[Reference],
accountId: Optional[str] = None
):
"""
Get match results from core matcher
Args:
matchProcessor: core or indexed core matching processor
candidateBatches: candidate batch list
references: reference list
accountId: account id
Returns:
Match results from core/indexed core matcher
"""
if candidateBatches:
matching = matchProcessor(facesDBContext=self.facesDBContext, candidates=candidateBatches,
references=references, accountId=accountId,
luna3Client=self.luna3Client)
return await matching.match()
return []
[docs] async def getCoreIndexedMatchResults(
self,
candidateBatches: List[CandidateBatch],
references: List[Reference],
accountId: Optional[str] = None
):
"""
Get match results from indexed core matcher
Args:
candidateBatches: candidate batch list
references: reference list
accountId: account id
Returns:
Match results from indexed core matcher
"""
return await self._getCoreMatchResults(
matchProcessor=CoreIndexedMatchProcessor,
candidateBatches=candidateBatches,
references=references,
accountId=accountId
)
[docs] async def getPythonMatcherMatchResults(self, referenceList: List[dict], candidateFilters: List[dict],
references: List[Reference], accountId: Optional[str] = None):
"""
Get matcher results from python matcher
Args:
candidateFilters: candidate filters list
referenceList: references from json
references: reference list
accountId: account id
Returns:
Match results from python matcher
"""
if candidateFilters:
preparedReferences: List[dict] = referenceList
for referenceIdx, referenceDict in enumerate(referenceList):
isRawReference = referenceList[referenceIdx]["type"] in ("raw_descriptor", "sdk_descriptor", "xpk_file")
if isRawReference:
referenceId = referenceDict["id"]
referenceList[referenceIdx] = [ref for ref in references if ref.referenceId == referenceId][
0].asRawDescriptor()
newJson = dict(references=preparedReferences, candidates=candidateFilters)
return (await self._matchByPythonMatcher(accountId=accountId, json=newJson)).json
return []
[docs] @staticmethod
def getMergedResults(coreIndexedMatchResults: List[dict],
pythonMatcherMatchResults: List[dict],
referenceCount: int, candidateMatchTargets: List[CandidateMatchTarget]):
"""
Get merged match result
Args:
coreIndexedMatchResults: match results from core indexed matcher
pythonMatcherMatchResults: matcher results from python matcher
referenceCount: reference count
candidateMatchTargets: candidates match targets
Returns:
Merged match results
"""
referenceSource = coreIndexedMatchResults or pythonMatcherMatchResults
mergedResults = [dict(reference=referenceSource[index]['reference'], matches=[]) for index in
range(referenceCount)]
for mainIndex in range(referenceCount):
coreResIndex, coreIndexedResIndex, pythonMatcherResIndex = 0, 0, 0
for resultSource in candidateMatchTargets:
if resultSource == CandidateMatchTarget.coreIndexed:
candidateBatchResult = coreIndexedMatchResults[mainIndex]["matches"][coreIndexedResIndex]
coreIndexedResIndex += 1
else:
candidateBatchResult = pythonMatcherMatchResults[mainIndex]["matches"][pythonMatcherResIndex]
pythonMatcherResIndex += 1
mergedResults[mainIndex]['matches'].append(candidateBatchResult)
return mergedResults
[docs] @staticmethod
def prepareRawDescriptorReferences(references: List[Reference]) -> Tuple[List[Reference], dict]:
"""
Prepare raw references. For each reference:
- check reference id is uuid, if not:
- save input reference (real id) id to map
- set reference id as random uuid (fake id)
Args:
references: prepared references from request
Returns:
map, where keys are replaced random uuids and values are input reference ids
"""
referenceIdMap = {}
for reference in references:
try:
uuid.UUID(reference.referenceId)
except ValueError:
newUuid = str(uuid.uuid4())
referenceIdMap[newUuid], reference.referenceId = reference.referenceId, newUuid
return references, referenceIdMap
[docs] @staticmethod
def prepareRawDescriptorInReply(matchResults: List[dict], referenceIdMap: Dict[str, str]) -> List[dict]:
"""
Replace raw descriptors' fake ids to real
Args:
matchResults: match results with fake uuids for raw descriptors
referenceIdMap: map with reference fake ids(uuids) and real(str) ids
Returns:
match results with real reference ids
"""
for matchRes in matchResults:
if matchRes["reference"]["id"] in referenceIdMap:
matchRes["reference"]["id"] = referenceIdMap[matchRes["reference"]["id"]]
return matchResults
[docs] async def post(self) -> HTTPResponse:
"""
Match events, faces and attributes by faces. See `spec_matcher`_.
.. _spec_matcher:
_static/api.html#operation/matching
Returns:
response with matching results
"""
inputJson: dict = self.request.json
self.validateMatchJson(inputJson, fastjsonSchema=FaceMatch.schema)
accountId = self.getQueryParam("account_id", uuidGetter, default=None)
preparedCandidates, preparedReferences = await self.getMatchStructuresFromRequest(
inputJson=inputJson,
accountId=accountId,
descriptorVersion=self.config.defaultFaceDescriptorVersion,
descriptorType=DescriptorType.face
)
preparedReferences, fakeToRealReferenceIdMap = self.prepareRawDescriptorReferences(preparedReferences)
indexedCoreMatcherCandidates: List[CandidateBatch] = []
pythonMatcherCandidatesFilters: List[dict] = []
candidateMatchTargets: List[CandidateMatchTarget] = []
for candidateBatchIndex, candidateBatchDict in enumerate(inputJson['candidates']):
if self.areCandidatesSuitableForIndexedMatcher(preparedCandidates[candidateBatchIndex]):
indexedCoreMatcherCandidates.append(preparedCandidates[candidateBatchIndex])
candidateMatchTargets.append(CandidateMatchTarget.coreIndexed)
else:
pythonMatcherCandidatesFilters.append(candidateBatchDict)
candidateMatchTargets.append(CandidateMatchTarget.pythonMatcher)
coreIndexedMatchResults, pythonMatherMatchResults = await asyncio.gather(
self.getCoreIndexedMatchResults(
candidateBatches=indexedCoreMatcherCandidates, references=preparedReferences, accountId=accountId),
self.getPythonMatcherMatchResults(
referenceList=inputJson["references"], references=preparedReferences, accountId=accountId,
candidateFilters=pythonMatcherCandidatesFilters)
)
unpreparedResult = self.getMergedResults(
coreIndexedMatchResults=coreIndexedMatchResults,
pythonMatcherMatchResults=pythonMatherMatchResults,
referenceCount=len(preparedReferences),
candidateMatchTargets=candidateMatchTargets
)
resultJson = self.prepareRawDescriptorInReply(unpreparedResult, fakeToRealReferenceIdMap)
return self.success(200, outputJson=resultJson, contentType="application/json")
[docs]class UnwantedFacesMatcherProxyHandler(FacesMatcherProxyHandler):
"""Unwanted matcher proxy handler."""
[docs] def post(self) -> Awaitable[HTTPResponse]:
"""
Print warning also.
"""
self.logger.warning("Resource `/1/matcher` is deprecated. Use `/1/matcher/faces` instead.")
return super().post()