Source code for luna_handlers.app.handlers.verifier_handler

""" Handler for a certain verifier. """
from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseHandler
from classes.query_schemas.verifiers import VerifierQueries
from classes.schemas.verifier import VerifierModel
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException


[docs]class VerifierHandler(BaseHandler): """ Handler for work with verifier. Resource: "/{api_version}/verifiers/{verifierId}" """
[docs] async def put(self, verifierId: str) -> HTTPResponse: """ Replace the verifier. See `spec_put_verifier`_. .. _spec_put_verifier: _static/api.html#operation/putVerifier Args: verifierId: verifier id Raises: VLException(Error.BadContentType, 400, isCriticalError=False) if content type is upsupported VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found Returns: response with status code 200 and verifier current verion """ if self.request.content_type != "application/json": raise VLException(Error.BadContentType, 400, isCriticalError=False) newVerifier = self.loadDataFromJson(self.request.json, VerifierModel) verifierVersion = await self.dbContext.putVerifier( verifierId=verifierId, policies=newVerifier.policies.asDict(), description=newVerifier.description, accountId=self.getAccountIdFromHeader(), ) if not verifierVersion: raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False) return self.success(200, outputJson={"version": verifierVersion})
[docs] async def get(self, verifierId: str) -> HTTPResponse: """ Get verifier by id. See `spec_get_verifier`_. .. _spec_get_verifier: _static/api.html#operation/getVerifier Args: verifierId: verifier id Returns: response with status code 200 and verifier in body """ queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries) verifier = await self.dbContext.getVerifier(verifierId, accountId=queries.accountId) return self.success(200, outputJson=verifier)
[docs] async def head(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221 """ Check verifier existence by id. See `spec_head_verifier`_. .. _spec_head_verifier: _static/api.html#operation/checkVerifier Args: verifierId: verifier id Raises: VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found Returns: response with status code 200 """ queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries) if not await self.dbContext.checkVerifier(verifierId, accountId=queries.accountId): raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False) return self.success(200)
[docs] async def delete(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221 """ Remove verifier by id. See `spec_remove_verifier`_. .. _spec_remove_verifier: _static/api.html#operation/removeVerifier Args: verifierId: verifier id Raises: VLException(Error.VerifierNotFound, 404, isCriticalError=False) if verifier not found Returns: response with status code 204 """ queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries) if not await self.dbContext.deleteVerifier(verifierId, accountId=queries.accountId): raise VLException(Error.VerifierNotFound.format(verifierId), 404, isCriticalError=False) return self.success(204)
[docs]class HandlerVerifyRaw(BaseHandler): """ Handler for raw verifying reference descriptors vs candidates descriptors Resource: "/{api_version}/verifiers/{verifierId}/raw" """ async def post(self, verifierId: str) -> HTTPResponse: # pylint: disable-msg=W0221 queries: VerifierQueries = self.loadDataFromQuery(VerifierQueries) handler = await self.dbContext.getVerifier(verifierId=verifierId, accountId=queries.accountId) verifyThreshold = handler["policies"]["verification_threshold"] resp = await self.luna3Client.lunaPythonMatcher.makeRequest( f"{self.luna3Client.lunaPythonMatcher.baseUri}/matcher/raw", "POST", body=self.request.body, raiseError=True, asyncRequest=True, headers={"Content-Type": self.request.content_type}, ) matches = resp.json for referenceMatches in matches["matches"]: for match in referenceMatches["matches"]: match["status"] = match["similarity"] > verifyThreshold return self.success(200, outputJson=matches)