Source code for luna_handlers.app.handlers.verifier_handler
""" Handler for a certain verifier. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseHandler
from classes.query_schemas.verifiers import VerifierQueries
from classes.schemas.verifier import VerifierModel
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class VerifierHandler(BaseHandler):
"""
Handler for work with verifier.
Resource: "/{api_version}/verifiers/{verifierId}"
"""
[docs] async def put(self, verifierId: str) -> HTTPResponse:
"""
Replace the verifier. See `spec_put_verifier`_.
.. _spec_put_verifier:
_static/api.html#operation/putVerifier
Args:
verifierId: verifier id
Raises:
VLException(Error.BadContentType, 400, isCriticalError=False) if content type is upsupported
VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found
Returns:
response with status code 200 and verifier current verion
"""
if self.request.content_type != "application/json":
raise VLException(Error.BadContentType, 400, isCriticalError=False)
newVerifier = self.loadDataFromJson(self.request.json, VerifierModel)
verifierVersion = await self.dbContext.putVerifier(
verifierId=verifierId,
policies=newVerifier.policies.asDict(),
description=newVerifier.description,
accountId=self.getAccountIdFromHeader(),
)
if not verifierVersion:
raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False)
return self.success(200, outputJson={"version": verifierVersion})
[docs] async def get(self, verifierId: str) -> HTTPResponse:
"""
Get verifier by id. See `spec_get_verifier`_.
.. _spec_get_verifier:
_static/api.html#operation/getVerifier
Args:
verifierId: verifier id
Returns:
response with status code 200 and verifier in body
"""
queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries)
verifier = await self.dbContext.getVerifier(verifierId, accountId=queries.accountId)
return self.success(200, outputJson=verifier)
[docs] async def head(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221
"""
Check verifier existence by id. See `spec_head_verifier`_.
.. _spec_head_verifier:
_static/api.html#operation/checkVerifier
Args:
verifierId: verifier id
Raises:
VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found
Returns:
response with status code 200
"""
queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries)
if not await self.dbContext.checkVerifier(verifierId, accountId=queries.accountId):
raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False)
return self.success(200)
[docs] async def delete(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221
"""
Remove verifier by id. See `spec_remove_verifier`_.
.. _spec_remove_verifier:
_static/api.html#operation/removeVerifier
Args:
verifierId: verifier id
Raises:
VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found
Returns:
response with status code 204
"""
queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries)
if not await self.dbContext.deleteVerifier(verifierId, accountId=queries.accountId):
raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False)
return self.success(204)
[docs]class HandlerVerifyRaw(BaseHandler):
"""
Handler for raw verifying reference descriptors vs candidates descriptors
Resource: "/{api_version}/verifiers/{verifierId}/raw"
"""
async def post(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221
queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries)
handler = await self.dbContext.getVerifier(verifierId=verifierId, accountId=queries.accountId)
verifyThreshold = handler["policies"]["verification_threshold"]
resp = await self.luna3Client.lunaPythonMatcher.makeRequest(
f"{self.luna3Client.lunaPythonMatcher.baseUri}/matcher/raw",
"POST",
body=self.request.body,
raiseError=True,
asyncRequest=True,
headers={"Content-Type": self.request.content_type},
)
matches = resp.json
for referenceMatches in matches["matches"]:
for match in referenceMatches["matches"]:
match["status"] = match["similarity"] > verifyThreshold
return self.success(200, outputJson=matches)