Source code for luna_python_matcher.app.handlers.matcher_raw_handler

""" Matcher Raw handler. """

from typing import List, Tuple

from sanic.response import HTTPResponse
from vlutils.descriptors.containers import sdkDescriptorDecode
from vlutils.descriptors.data import DescriptorsEnum
from vlutils.descriptors.match import match
from vlutils.descriptors.xpk_reader import readXPKFromBinary

from app.handlers.base_handler import BaseMatcherHandler
from app_common.handlers.schemas import RawMatch
from classes.enums import BinaryReferenceType
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException


[docs]class MatcherRawHandler(BaseMatcherHandler): """ Matcher raw handler allows to do similarity calculations for input descriptors . Resource: "/{api_version}/matcher/raw" """ descriptorTypeByVersion = {descriptorType.value.version: descriptorType.value for descriptorType in DescriptorsEnum}
[docs] async def getMatchPayload(self) -> Tuple[List[Tuple], List[Tuple]]: """ Get candidate and reference match structures Raises: VLException(Error.BadInputData) if msgpack unpack failed VLException(Error.BadSdkDescriptor) if provided sdk descriptor is corrupted VLException(Error.BadInputXpk) if provided xpk file is corrupted VLException(Error.UnknownDescriptorVersion) if provided descriptor has incorrect version VLException(Error.InvalidDescriptorLength) if descriptor length is incorrect VLException(Error.VersionNotMatchWithVersionForMatching) if descriptor version is'nt supported for matching Returns: candidate and reference match structures """ def getDescriptor(item: dict) -> Tuple[int, bytes]: """Selects appropriate decoding algorythm based on discriminator item["type"] Args: item: A dictionary with descriptor data Returns: Tuple of version and descriptor. """ if item["type"] == BinaryReferenceType.rawDescriptor.value: version, binaryDescriptor = item["data"]["version"], self.convertFromBase64ToBytesIfNeed( item["data"]["descriptor"] ) elif item["type"] == BinaryReferenceType.sdkDescriptor.value: sdkDescriptor = self.convertFromBase64ToBytesIfNeed(item["data"]) try: version, binaryDescriptor = sdkDescriptorDecode(sdkDescriptor) except (ValueError, SyntaxError): raise VLException(Error.BadSdkDescriptor, 400, isCriticalError=False) elif item["type"] == BinaryReferenceType.xpkFile.value: try: xpk = readXPKFromBinary(self.convertFromBase64ToBytesIfNeed(item["data"])) except ValueError: raise VLException(Error.BadInputXpk, 400, isCriticalError=False) version, binaryDescriptor = xpk["Descriptor"]["version"], xpk["Descriptor"]["raw_descriptor"] else: raise RuntimeError(f"error binary reference type: {item['type']}") try: descriptorType = self.descriptorTypeByVersion[version] except KeyError: raise VLException(Error.UnknownDescriptorVersion.format(version), 400, isCriticalError=False) if descriptorType.length != len(binaryDescriptor): raise VLException( Error.InvalidDescriptorLength.format(len(binaryDescriptor)), 400, isCriticalError=False ) seenVersions.add(version) return version, binaryDescriptor inputJson: dict = self.request.json self.validateJson(inputJson, RawMatch.schema, useJsonSchema=False) seenVersions = set() candidates = [(item["id"], getDescriptor(item)) for item in inputJson["candidates"]] references = [(item["id"], getDescriptor(item)) for item in inputJson["references"]] if len(seenVersions) > 1: error = Error.DifferentVersionsNotAllowed.format(f"{*sorted(seenVersions),}") raise VLException(error, 400, isCriticalError=False) return candidates, references
[docs] async def post(self) -> HTTPResponse: """ Match each candidate descriptor with each reference descriptor. See `spec_matcher_raw`_. .. _spec_matcher_raw: _static/api.html#operation/matchingRaw Returns: response with matching results """ candidates, references = await self.getMatchPayload() results: List[dict] = [] for referenceId, (descriptorVersion, descriptor) in references: matches: List[Tuple[str, float]] = [ (candidateId, match(descriptor, candidateDescriptor, descriptorVersion)) for candidateId, (_, candidateDescriptor) in candidates ] matches.sort(key=lambda item: item[1], reverse=True) matchInfo = { "reference_id": referenceId, "matches": [ {"candidate_id": candidateId, "similarity": similarity} for candidateId, similarity in matches ], } results.append(matchInfo) return self.success(201, outputJson={"matches": results})