# -*- coding: utf-8 -*-
""" Base handler
Module realize base class for all handlers.
"""
import base64
import io
from abc import abstractmethod
from typing import List, Optional, Union, Tuple, Any, Dict, Type
import aiohttp
import binascii
from PIL import Image
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarpedImage
from lunavl.sdk.estimators.face_estimators.facewarper import FaceWarpedImage
from pydantic import BaseModel
from yarl import URL
from luna3.client import Client
from luna3.common.http_objs import BoundingBox
from app.app import BaseHandlersRequestHandler, HandlersRequest
from app.global_vars.context_vars import requestIdCtx
from app.global_vars.enums import ImageType
from app.handlers.available_content_types import (
isAllowableContentType,
isAllowableRawContentType,
MAP_BASE64_TYPE_TO_DATA_TYPE,
)
from classes.functions import loadDataFromJson
from classes.monitoring import HandlersMonitoringData
from classes.multipart_processing import ImageWithBB, ImageWithFaceBB
from classes.raw_descriptor_data import RawDescriptorData
from classes.schemas.base_schema import BaseSchema
from classes.schemas.policies import Policies
from classes.schemas.verifier import VerifierPoliciesModel as VerifierPolicies
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.utils.functions import currentDateTime, convertDateTimeToCurrentFormatStr, downloadImage
from db.context import DBContext
from img_utils.utils import convertToBytesIfNeed
from sdk.sdk_loop.sdk_task import SDKDetectableImage, FaceWarp, HumanWarp, SDKTask
from sdk.sdk_loop.task_loop import SDKTaskLoop
MAP_WARPED_IMAGE = {1: FaceWarpedImage, 2: HumanWarpedImage}
[docs]class BaseHandler(BaseHandlersRequestHandler):
"""
Base handler for other handlers.
Attributes:
luna3Client (luna3.client.Client): luna3 client
dbContext (DBContext): db context
redisContext (RedisContext): redis context
"""
def __init__(self, request: HandlersRequest):
super().__init__(request)
requestIdCtx.set(self.requestId)
self.luna3Client: Client = request.luna3Client
self.dbContext = request.dbContext
self.redisContext = request.redisContext
self.accountId: Optional[str] = None
[docs] def checkLivenessEstimationLicensing(self, estimate: int):
"""
Check liveness estimation licensing
Args:
estimate: liveness estimation status
Raises:
VLException(Error.LicenseProblem) if liveness estimation disabled
"""
if not estimate:
return
if not self.app.ctx.licenseChecker.licenseState:
raise VLException(Error.LicenseProblem.format("Cannot get license information."), 403, False)
if not self.app.ctx.licenseChecker.licenseState.expirationTime.isAvailable:
raise VLException(Error.LicenseProblem.format("License expired"), 403, False)
if self.app.ctx.licenseChecker.licenseState.liveness.value is None:
raise VLException(Error.LicenseProblem.format("Liveness feature disabled"), 403, False)
if self.app.ctx.licenseChecker.licenseState.liveness.value != 2:
raise VLException(Error.LicenseProblem.format("Liveness v.2 feature disabled"), 403, False)
if self.app.ctx.licenseChecker.licenseState.livenessBalance is None:
# licensing by expiration, not by executions
return
if not self.app.ctx.licenseChecker.licenseState.livenessBalance.isAvailable:
raise VLException(Error.LicenseProblem.format("Liveness balance is exceeded"), 403, False)
if not self.app.ctx.licenseRecorder.isLivenessSynchronized():
raise VLException(Error.LicenseProblem.format("Feature execution synchronization failed"), 403, False)
[docs] def checkPolicyLicensing(self, policies: Union[Policies, VerifierPolicies]):
"""
Check handler policies licensing
Args:
policies: handler policies
"""
self.checkLivenessEstimationLicensing(policies.detectPolicy.estimateLiveness.estimate)
async def _downloadImage(self, url: Union[str, URL]) -> tuple[bytes, str]:
"""
Download image by external url
Args:
url: url
Returns:
image and content type
Raises:
VLException(Error.BadContentTypeDownloadedImage.format(url), 400, isCriticalError=False):
if a downloaded image content type is not allowable
"""
clientTimeout = aiohttp.ClientTimeout(
total=self.config.loadExternalImageTimeout.totalTimeout,
connect=self.config.loadExternalImageTimeout.connectTimeout,
sock_connect=self.config.loadExternalImageTimeout.sockConnectTimeout,
sock_read=self.config.loadExternalImageTimeout.sockReadTimeout,
)
imageBody, contentType = await downloadImage(
url=url, logger=self.logger, timeout=clientTimeout, accountId=self.accountId
)
if not isAllowableRawContentType(contentType):
raise VLException(Error.BadContentTypeDownloadedImage.format(url), 400, isCriticalError=False)
return imageBody, contentType
[docs] async def downloadFaceSample(self, sampleId: str) -> FaceWarp:
"""
Download face sample by id
Args:
sampleId: sample id
Returns:
sdk face warp
"""
bucket = self.config.faceSamplesStorage.bucket
response = await self.luna3Client.lunaFaceSamplesStore.getImage(
bucket, sampleId, self.accountId, raiseError=True
)
image = Image.open(io.BytesIO(response.body))
return FaceWarp(body=image, sampleId=sampleId)
[docs] async def downloadBodySample(self, sampleId: str) -> HumanWarp:
"""
Download body sample by id
Args:
sampleId: sample id
Returns:
sdk body warp
"""
bucket = self.config.bodySamplesStorage.bucket
response = await self.luna3Client.lunaBodySamplesStore.getImage(
bucket, sampleId, self.accountId, raiseError=True
)
image = Image.open(io.BytesIO(response.body))
return HumanWarp(body=image, sampleId=sampleId)
@staticmethod
def _getRawDataContainer(
body: bytes,
contentType: str,
imageType: Union[ImageType, None],
fileName: Optional[str] = None,
faceBoundingBoxList: Optional[List[dict]] = None,
bodyBoundingBoxList: Optional[List[dict]] = None,
sampleId: Optional[str] = None,
url: Optional[str] = None,
detectTime: Optional[str] = None,
imageOrigin: Optional[str] = None,
) -> Union[SDKDetectableImage, FaceWarp, HumanWarp, RawDescriptorData]:
"""
Get raw data container: detectable image object or raw descriptor data
Args:
body: binary data
contentType: expected data content type from request
fileName: filename
imageType: image type
sampleId: sample id for warped image
faceBoundingBoxList: list with detection rectangles
url: image source
detectTime: detection time in ISO format
Returns:
prepared raw data container
Raises:
VLException(Error.OnlyOneDetectionRectAvailable, 403, False) if there are more than 1 bounding box
VLException(Error.BadContentType, 400, False) if image content type is not allowable
VLException(Error.BoundingBoxNotAvailableForWarp, 400, False) if try to use bounding box for warp
"""
if not isAllowableContentType(contentType, allowRawDescriptors=True):
raise VLException(Error.BadContentType, 400, isCriticalError=False)
data, contentType = convertToBytesIfNeed(body, contentType)
if contentType in ("application/x-sdk-descriptor", "application/x-vl-xpk"):
return RawDescriptorData(data, mimetype=contentType, filename=fileName)
if faceBoundingBoxList and len(faceBoundingBoxList) > 1:
raise VLException(Error.OnlyOneDetectionRectAvailable, 403, isCriticalError=False)
if bodyBoundingBoxList and len(bodyBoundingBoxList) > 1:
raise VLException(Error.OnlyOneDetectionRectAvailable, 403, isCriticalError=False)
if (faceBoundingBoxList or bodyBoundingBoxList) and imageType in (ImageType.faceWarp, ImageType.humanWarp):
raise VLException(Error.BoundingBoxNotAvailableForWarp, 400, False)
if imageType == ImageType.rawImage or imageType is None:
fBoundingBoxes = [BoundingBox(**faceBoundingBoxList[0])] if faceBoundingBoxList is not None else None
hBoundingBoxes = [BoundingBox(**bodyBoundingBoxList[0])] if bodyBoundingBoxList is not None else None
image = SDKDetectableImage(
filename=fileName or "raw image",
body=data,
faceBoundingBoxes=fBoundingBoxes,
humanBoundingBoxes=hBoundingBoxes,
url=url,
detectTime=detectTime,
imageOrigin=imageOrigin,
)
return image
warpKwargs = dict(
filename=fileName or "raw warped image",
body=data,
sampleId=sampleId,
detectTime=detectTime,
imageOrigin=imageOrigin,
)
if imageType == ImageType.faceWarp:
warp = FaceWarp(**warpKwargs)
elif imageType == ImageType.humanWarp:
warp = HumanWarp(**warpKwargs)
else:
raise RuntimeError(f"Unknown imageType: {imageType}")
# check image is warp
try:
MAP_WARPED_IMAGE[imageType.value](data)
except ValueError as err:
warp.error = Error.BadWarpImage.format(err)
except LunaSDKException as e:
warp.error = e.error
return warp
async def _getImagesFromJson(
self, inputJson: dict, imageType: ImageType, defaultDetectTime: str, allowRawDescriptors: bool
) -> Union[List[SDKDetectableImage], List[FaceWarp], List[HumanWarp]]:
"""
Get images from request json
Args:
inputJson: json from request
imageType: image type
defaultDetectTime: image detection time in ISO format
allowRawDescriptors: whether raw descriptor mimetypes allowed or not
Returns:
list of prepared SDKDetectableImage or FaceWarp or HumanWarp
Raises:
VLException(Error.BadInputJson, 400, False) if failed decode descriptor
"""
try:
contentType = inputJson["mimetype"]
if not isAllowableContentType(contentType, allowRawDescriptors=allowRawDescriptors):
raise VLException(Error.BadContentType, 400, isCriticalError=False)
image = base64.b64decode(inputJson["image"])
contentType = MAP_BASE64_TYPE_TO_DATA_TYPE.get(contentType, contentType)
return [
self._getRawDataContainer(
body=image,
contentType=contentType,
imageType=imageType,
faceBoundingBoxList=inputJson.get("face_bounding_boxes"),
bodyBoundingBoxList=inputJson.get("body_bounding_boxes"),
detectTime=self.convertDetectionTimeToCurrentFormat(
inputJson.get("detect_time"), defaultDetectTime
),
imageOrigin=inputJson.get("image_origin"),
)
]
except binascii.Error:
raise VLException(Error.BadInputJson.format("image", "Failed to decode descriptor"), 400, False)
async def _getImagesFromUrls(
self, inputJson: dict, imageType: ImageType, defaultDetectTime: str
) -> Union[List[SDKDetectableImage], List[FaceWarp], List[HumanWarp]]:
"""
Get images from request's urls (list of urls in json with optional detection rectangles)
Args:
inputJson: json from request
imageType: image type
defaultDetectTime: image detection time in ISO format
Returns:
list of prepared SDKDetectableImage or FaceWarp or HumanWarp
"""
resultImages = []
for row in inputJson["urls"]:
url = row["url"]
try:
image, contentType = await self._downloadImage(url)
except VLException as e:
image = SDKDetectableImage(
filename=url,
body=b"",
faceBoundingBoxes=row.get("face_bounding_boxes"),
humanBoundingBoxes=row.get("body_bounding_boxes"),
url=url,
detectTime=self.convertDetectionTimeToCurrentFormat(row.get("detect_time"), defaultDetectTime),
imageOrigin=row.get("image_origin"),
)
image.error = e.error
resultImages.append(image)
else:
resultImages.append(
self._getRawDataContainer(
body=image,
contentType=contentType,
imageType=imageType,
faceBoundingBoxList=row.get("face_bounding_boxes"),
bodyBoundingBoxList=row.get("body_bounding_boxes"),
fileName=url,
url=url,
detectTime=self.convertDetectionTimeToCurrentFormat(row.get("detect_time"), defaultDetectTime),
imageOrigin=row.get("image_origin"),
)
)
return resultImages
async def _getImagesFromSamples(
self, inputJson: dict, imageType: ImageType, defaultDetectTime: str
) -> Union[List[SDKDetectableImage], List[FaceWarp], List[HumanWarp]]:
"""
Get images from request's samples
(list of sample ids to get from luna-image-store and optional detection rectangles)
Args:
inputJson: json from request. None for unknown
imageType: imageType
defaultDetectTime: image detection time in ISO format
Returns:
list of prepared SDKDetectableImage or FaceWarp or HumanWarp
"""
resultImages = []
if imageType == imageType.faceWarp:
storeApiClient = self.luna3Client.lunaFaceSamplesStore
bucketName = self.config.faceSamplesStorage.bucket
elif imageType == imageType.humanWarp:
storeApiClient = self.luna3Client.lunaBodySamplesStore
bucketName = self.config.bodySamplesStorage.bucket
else:
raise VLException(
error=Error.BadWarpImage.format(
"Not supported image type for samples. Valid image type one of: face or body warp"
),
statusCode=400,
isCriticalError=False,
)
responses, samples = [], []
if isinstance(inputJson["samples"][0], dict):
for sample in inputJson["samples"]:
responses.append(
await storeApiClient.getImage(
imageId=sample["sample_id"], bucketName=bucketName, accountId=self.accountId, raiseError=True
)
)
detectTime = self.convertDetectionTimeToCurrentFormat(sample.get("detect_time"), defaultDetectTime)
samples.append(
{
"sample_id": sample["sample_id"],
"detect_time": detectTime,
"image_origin": sample.get("image_origin"),
}
)
else:
for sampleId in inputJson["samples"]:
responses.append(
await storeApiClient.getImage(
imageId=sampleId, bucketName=bucketName, accountId=self.accountId, raiseError=True
)
)
samples.append({"sample_id": sampleId, "detect_time": defaultDetectTime, "image_origin": None})
for sample, response in zip(samples, responses):
resultImages.append(
self._getRawDataContainer(
body=response.body,
contentType=response.headers["Content-Type"],
imageType=imageType,
sampleId=sample["sample_id"],
fileName=sample["sample_id"],
detectTime=sample["detect_time"],
imageOrigin=sample["image_origin"],
)
)
return resultImages
[docs] @staticmethod
def loadDataFromJson(data: dict, model: Type[BaseModel]) -> Any:
"""
Load data from json with pydantic
Args:
data: input data
model: pydantic model
Returns:
initialized object
"""
return loadDataFromJson(data, model)
@property
def sdkLoop(self) -> SDKTaskLoop:
"""
Get current sdk loop.
Returns:
sdk loop from warp
"""
return self.request.app.ctx.sdkLoop
[docs] def handleMonitoringData(self, monitoringData: HandlersMonitoringData):
"""
Handle monitoring data.
Args:
monitoringData: monitoring data
"""
if not self.config.monitoring.sendData:
return
self.request.dataForMonitoring += monitoringData.request
if monitoringData.sdkUsages:
self.app.ctx.monitoring.flushPoints([monitoringData.sdkUsages])
[docs]class BaseHandlerWithMultipart(BaseHandler):
"""
Base handler class for resource with multipart requests availability
"""
[docs] @abstractmethod
async def getDataFromMultipart(
self, imageType: ImageType = ImageType.rawImage
) -> Union[
Union[
Tuple[Union[List[SDKDetectableImage], List[FaceWarp]], Union[dict, None]],
Union[List[SDKDetectableImage], List[FaceWarp]],
]
]:
"""
Get data from multipart request
Args:
imageType: image type
Returns:
list of Images or list warps and optionally dict with policies (for multipart request with policies)
"""
def _getDataFromMultipart(
self,
multipartData: Dict[str, Union[ImageWithBB, ImageWithFaceBB]],
imageType: ImageType,
allowRawDescriptors: bool = False,
) -> List[Union[SDKDetectableImage, FaceWarp, HumanWarp, RawDescriptorData]]:
"""
Get data from multipart request (list of images and optional detection rectangles, or raw descriptors)
Args:
multipartData: validated images from multipart
imageType: image type
allowRawDescriptors: whether raw descriptor mimetypes allowed or not
Returns:
list of prepared SDKDetectableImage or FaceWarp
Raises:
VLException(Error.BadContentTypeInMultipartImage, 400, isCriticalError=False): if content type of a part of
multipart request is wrong
"""
resultImages = []
defaultDetectTime = currentDateTime(self.config.storageTime)
for image in multipartData.values():
if not isAllowableContentType(image.contentType, allowRawDescriptors=allowRawDescriptors):
raise VLException(Error.BadContentTypeInMultipartImage, 400, isCriticalError=False)
faceBoundingBoxList = None
bodyBoundingBoxList = None
if image.faceBoundingBoxes:
faceBoundingBoxList = image.faceBoundingBoxes
if isinstance(image, ImageWithBB) and image.bodyBoundingBoxes:
bodyBoundingBoxList = image.bodyBoundingBoxes
resultImages.append(
self._getRawDataContainer(
body=image.body,
contentType=image.contentType,
imageType=imageType,
fileName=image.filename,
faceBoundingBoxList=faceBoundingBoxList,
bodyBoundingBoxList=bodyBoundingBoxList,
detectTime=self.convertDetectionTimeToCurrentFormat(image.detectTime, defaultDetectTime),
imageOrigin=image.imageOrigin,
)
)
return resultImages
[docs] async def getDataFromRequest(
self,
request: HandlersRequest,
validationModel: Type[BaseSchema],
imageType: Union[ImageType, None],
allowRawDescriptors: bool = False,
) -> List[Union[SDKDetectableImage, FaceWarp, HumanWarp, RawDescriptorData]]:
"""
Get images from request body to detect faces.
Args:
request: request
imageType: imageType
validationModel: validation model
allowRawDescriptors: whether raw descriptor mimetypes allowed or not
Returns:
list of Images or list warps
Raises:
VLException(Error.BadContentType, 400, isCriticalError=False): if content type of request is wrong
VLException(Error.BadMultipartInput, 400, isCriticalError=False): if failed to read multipart
"""
contentType = request.content_type
defaultDetectTime = currentDateTime(self.config.storageTime)
if isAllowableContentType(contentType, allowRawDescriptors=allowRawDescriptors):
body = request.body
estimationData = [
self._getRawDataContainer(
body=body, contentType=contentType, imageType=imageType, detectTime=defaultDetectTime
)
]
elif contentType == "application/json":
inputJson = request.json
self.loadDataFromJson(inputJson, validationModel)
if "image" in inputJson:
estimationData = await self._getImagesFromJson(
inputJson=inputJson,
imageType=imageType,
defaultDetectTime=defaultDetectTime,
allowRawDescriptors=allowRawDescriptors,
)
elif "urls" in inputJson:
estimationData = await self._getImagesFromUrls(
inputJson=inputJson, imageType=imageType, defaultDetectTime=defaultDetectTime
)
elif "samples" in inputJson:
estimationData = await self._getImagesFromSamples(
inputJson=inputJson, imageType=imageType, defaultDetectTime=defaultDetectTime
)
else:
raise RuntimeError(f"bad input json {inputJson}")
else:
raise VLException(Error.BadContentType, 400, isCriticalError=False)
return estimationData