Source code for luna_python_matcher.app.handlers.matcher_body_handler
""" Matcher handler. """
from typing import Type
from sanic.response import HTTPResponse
from vlutils.descriptors.data import DescriptorType
from app.handlers.base_handler import BaseMatcherHandler
from app_common.handlers.schemas import BodyMatch
from classes import enums
from classes.candidate_batch import CandidateBatch
from classes.enums import MatchSource
from classes.filters import FaceFilters, EventFilters
from classes.match_processing import MatchLPMProcessor
from classes.python_matcher_match_tools import EventsDBFaceTool, PythonMatcherMatchTool
from crutches_on_wheels.web.query_getters import uuidGetter
CANDIDATE_FILTERS_MAP = {MatchSource.faces.value: FaceFilters, MatchSource.events.value: EventFilters}
[docs]class BodyMatcherHandler(BaseMatcherHandler):
"""
Matcher handler allows to submit tasks to a service that searches for bodies similar to a given reference by
matching them.
Resource: "/{api_version}/matcher/bodies"
"""
[docs] @staticmethod
def getPythonMatcherMatchTool(candidateBatch: CandidateBatch) -> Type[PythonMatcherMatchTool]:
"""
Get match tool.
Args:
candidateBatch: prepared candidate batch from request, not suitable for matcher match tool
Returns:
matchTool - faces db / events db match tool
"""
if candidateBatch.origin == enums.MatchSource.events:
return EventsDBFaceTool
else:
raise NotImplementedError(f"Cannot work with '{candidateBatch.origin}' as candidate.")
[docs] async def post(self) -> HTTPResponse:
"""
Match events or raw descriptors by bodies. See `spec_matcher`_.
.. _spec_matcher:
_static/api.html#operation/matching
Returns:
response with matching results
"""
inputJson: dict = self.request.json
self.validateMatchJson(inputJson, fastjsonSchema=BodyMatch.schema)
accountId = self.getQueryParam("account_id", uuidGetter)
candidates, references = await self.getMatchStructuresFromRequest(
inputJson=inputJson,
accountId=accountId,
descriptorVersion=self.config.defaultHumanDescriptorVersion,
descriptorType=DescriptorType.body
)
matching = MatchLPMProcessor(eventsDBContext=self.eventsDBContext, facesDBContext=self.facesDBContext,
attributesDBContext=self.attributesDBContext,
candidates=candidates, references=references, accountId=accountId,
storageTime=self.config.storageTime, requestId=self.request.requestId)
matchResults = await matching.match()
return self.success(200, outputJson=matchResults)