Source code for luna_python_matcher.app.handlers.matcher_raw_handler
""" Matcher Raw handler. """
from typing import List, Tuple
from sanic.response import HTTPResponse
from vlutils.descriptors.containers import sdkDescriptorDecode
from vlutils.descriptors.data import DescriptorsEnum
from vlutils.descriptors.match import match
from vlutils.descriptors.xpk_reader import readXPKFromBinary
from app.handlers.base_handler import BaseMatcherHandler
from app_common.handlers.schemas import RawMatch
from classes.enums import BinaryReferenceType
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class MatcherRawHandler(BaseMatcherHandler):
"""
Matcher raw handler allows to do similarity calculations for input descriptors .
Resource: "/{api_version}/matcher/raw"
"""
descriptorTypeByVersion = {descriptorType.value.version: descriptorType.value for descriptorType in DescriptorsEnum}
[docs] async def getMatchPayload(self) -> Tuple[List[Tuple], List[Tuple]]:
"""
Get candidate and reference match structures
Raises:
VLException(Error.BadInputData) if msgpack unpack failed
VLException(Error.BadSdkDescriptor) if provided sdk descriptor is corrupted
VLException(Error.BadInputXpk) if provided xpk file is corrupted
VLException(Error.UnknownDescriptorVersion) if provided descriptor has incorrect version
VLException(Error.InvalidDescriptorLength) if descriptor length is incorrect
VLException(Error.VersionNotMatchWithVersionForMatching) if descriptor version is'nt supported for matching
Returns:
candidate and reference match structures
"""
def getDescriptor(item: dict) -> Tuple[int, bytes]:
"""Selects appropriate decoding algorythm based on discriminator item["type"]
Args:
item: A dictionary with descriptor data
Returns:
Tuple of version and descriptor.
"""
if item["type"] == BinaryReferenceType.rawDescriptor.value:
version, binaryDescriptor = item["data"]["version"], self.convertFromBase64ToBytesIfNeed(
item["data"]["descriptor"]
)
elif item["type"] == BinaryReferenceType.sdkDescriptor.value:
sdkDescriptor = self.convertFromBase64ToBytesIfNeed(item["data"])
try:
version, binaryDescriptor = sdkDescriptorDecode(sdkDescriptor)
except (ValueError, SyntaxError):
raise VLException(Error.BadSdkDescriptor, 400, isCriticalError=False)
elif item["type"] == BinaryReferenceType.xpkFile.value:
try:
xpk = readXPKFromBinary(self.convertFromBase64ToBytesIfNeed(item["data"]))
except ValueError:
raise VLException(Error.BadInputXpk, 400, isCriticalError=False)
version, binaryDescriptor = xpk["Descriptor"]["version"], xpk["Descriptor"]["raw_descriptor"]
else:
raise RuntimeError(f"error binary reference type: {item['type']}")
try:
descriptorType = self.descriptorTypeByVersion[version]
except KeyError:
raise VLException(Error.UnknownDescriptorVersion.format(version), 400, isCriticalError=False)
if descriptorType.length != len(binaryDescriptor):
raise VLException(
Error.InvalidDescriptorLength.format(len(binaryDescriptor)), 400, isCriticalError=False
)
seenVersions.add(version)
return version, binaryDescriptor
inputJson: dict = self.request.json
self.validateJson(inputJson, RawMatch.schema, useJsonSchema=False)
seenVersions = set()
candidates = [(item["id"], getDescriptor(item)) for item in inputJson["candidates"]]
references = [(item["id"], getDescriptor(item)) for item in inputJson["references"]]
if len(seenVersions) > 1:
error = Error.DifferentVersionsNotAllowed.format(f"{*sorted(seenVersions),}")
raise VLException(error, 400, isCriticalError=False)
return candidates, references
[docs] async def post(self) -> HTTPResponse:
"""
Match each candidate descriptor with each reference descriptor. See `spec_matcher_raw`_.
.. _spec_matcher_raw:
_static/api.html#operation/matchingRaw
Returns:
response with matching results
"""
candidates, references = await self.getMatchPayload()
results: List[dict] = []
for referenceId, (descriptorVersion, descriptor) in references:
matches: List[Tuple[str, float]] = [
(candidateId, match(descriptor, candidateDescriptor, descriptorVersion))
for candidateId, (_, candidateDescriptor) in candidates
]
matches.sort(key=lambda item: item[1], reverse=True)
matchInfo = {
"reference_id": referenceId,
"matches": [
{"candidate_id": candidateId, "similarity": similarity} for candidateId, similarity in matches
],
}
results.append(matchInfo)
return self.success(201, outputJson={"matches": results})