"""
Module contains sdk task
"""
import dataclasses
import io
from contextlib import contextmanager
from enum import Enum
from itertools import chain
from time import perf_counter as perfTime
from time import time
from typing import List, Union, Optional, Iterable, Dict, Any
from uuid import uuid4
from PIL import Image
from PIL.Image import Image as PilImage
from lunavl.sdk.errors.errors import ErrorInfo as SDKErrorInfo
from lunavl.sdk.estimators.face_estimators.head_pose import HeadPose
from lunavl.sdk.estimators.face_estimators.livenessv1 import LivenessPrediction, LivenessV1
from lunavl.sdk.estimators.face_estimators.mask import Mask, MaskState
from numpy import ndarray
from vlutils.structures.dataclasses import dataclass
from .crutches_on_wheels.enums.attributes import Liveness
from .crutches_on_wheels.errors.errors import ErrorInfo, Error
from .enums import Stages, MultifacePolicy
from .estimation_targets import SDKEstimationTargets
from .liveness_predictor import LivenessPredicted
from .monitoring import TaskMonitoringData, MonitoringField
[docs]class TaskDataSource(Enum):
"""
Enum for a task data source
"""
images = 1 # images
faceWarps = 2 #: face warped images
humanWarps = 3 #: human body warped images
[docs]class SDKTaskFilters:
"""
Task filter's.
Attributes:
yawThreshold (Optional[int]): filter by yaw head angle for detection ([0...180])
pitchThreshold (Optional[int]): filter by pitch head angle for detection ([0...180])
rollThreshold (Optional[int]): filter by roll head angle for detection ([0...180])
maskStates (Optional[List[MaskState]]): filter by medical mask states for detection
livenessStates (Optional[List[LivenessPrediction]]): filter by liveness states for detection
garbageScoreThreshold (Optional[float]): filter by garbage score for descriptor ([0...1])
"""
__slots__ = (
"yawThreshold",
"rollThreshold",
"pitchThreshold",
"maskStates",
"livenessStates",
"garbageScoreThreshold",
)
def __init__(
self,
yawThreshold: Optional[int] = None,
rollThreshold: Optional[int] = None,
pitchThreshold: Optional[int] = None,
livenessStates: Optional[List[LivenessPrediction]] = None,
maskStates: Optional[List[MaskState]] = None,
garbageScoreThreshold: Optional[float] = None,
):
self.yawThreshold: Optional[int] = yawThreshold
self.rollThreshold: Optional[int] = rollThreshold
self.pitchThreshold: Optional[int] = pitchThreshold
self.maskStates: Optional[List[MaskState]] = maskStates
self.livenessStates: Optional[List[LivenessPrediction]] = livenessStates
self.garbageScoreThreshold: Optional[float] = garbageScoreThreshold
def __repr__(self):
return "SDKTaskFilters({})".format({key: getattr(self, key) for key in self.__slots__})
[docs] def needFilterByAngles(self) -> bool:
"""
Need or not filter gy head angles.
Returns:
True if any of angle threshold is not none.
"""
return any((self.yawThreshold is not None, self.pitchThreshold is not None, self.rollThreshold is not None))
[docs] def checkFilterByAngles(self, headPose: HeadPose) -> dict:
"""
Check filter by angles.
Args:
headPose: estimate head pose
Returns:
dict in api format::
{
"is_filtered": <bool>,
"filter_reasons": [<reasons>]
}
"""
res = {"is_filtered": False, "filter_reasons": []}
if not self.needFilterByAngles():
return res
if self.yawThreshold is not None and self.yawThreshold < abs(headPose.yaw):
res["is_filtered"] = True
res["filter_reasons"].append(
{"filter_name": "yaw_threshold", "object_value": headPose.yaw, "threshold_value": self.yawThreshold}
)
if self.pitchThreshold is not None and self.pitchThreshold < abs(headPose.pitch):
res["is_filtered"] = True
res["filter_reasons"].append(
{
"filter_name": "pitch_threshold",
"object_value": headPose.pitch,
"threshold_value": self.pitchThreshold,
}
)
if self.rollThreshold is not None and self.rollThreshold < abs(headPose.roll):
res["is_filtered"] = True
res["filter_reasons"].append(
{"filter_name": "roll_threshold", "object_value": headPose.roll, "threshold_value": self.rollThreshold}
)
return res
[docs] def checkFilterByMaskState(self, mask: Mask) -> dict:
"""
Check filter by medical mask state.
Args:
mask: medical mask estimation
Returns:
dict in api format::
{
"is_filtered": <bool>,
"filter_reasons": [<reasons>]
}
"""
res = {"is_filtered": False, "filter_reasons": []}
maskState = mask.predominateMask
if self.maskStates and maskState not in self.maskStates:
res["is_filtered"] = True
res["filter_reasons"].append(
{
"filter_name": "mask_states",
"object_value": maskState.value,
"threshold_value": [state.value for state in self.maskStates],
}
)
return res
[docs] def checkFilterByLiveness(self, liveness: Union[LivenessV1, LivenessPredicted]) -> dict:
"""
Check filter by liveness.
Args:
liveness: liveness estimation
Returns:
dict in api format::
{
"is_filtered": <bool>,
"filter_reasons": [<reasons>]
}
"""
res = {"is_filtered": False, "filter_reasons": []}
if self.livenessStates and liveness.prediction not in self.livenessStates:
res["is_filtered"] = True
res["filter_reasons"].append(
{
"filter_name": "liveness_states",
"object_value": Liveness[liveness.prediction.value].value,
"threshold_value": [Liveness[state.value].value for state in self.livenessStates],
}
)
return res
[docs] def checkFilterByGS(self, gs: float) -> dict: # pylint: disable-msg=C0103
"""
Check garbage score filter.
Args:
gs: estimated garbage score
Returns:
dict in api format::
{
"is_filtered": <bool>,
"filter_reasons": [<reasons>]
}
"""
isFiltered = gs < self.garbageScoreThreshold
res = {"is_filtered": isFiltered}
if isFiltered:
res["filter_reasons"] = [
{"filter_name": "score_threshold", "object_value": gs, "threshold_value": self.garbageScoreThreshold}
]
return res
[docs]class WarpAttributes:
"""
Warp attributes.
Attributes:
emotions (dict): dict with emotions from lunavl
mouthState (dict): dict with mouth state from lunavl
warpQuality (dict): dict with quality state from lunavl
mask (dict): dict with mask state from lunavl
glasses (dict): dict with glasses state from lunavl
"""
__slots__ = ("emotions", "mouthState", "warpQuality", "mask", "glasses")
def __init__(self):
self.emotions: Optional[dict] = None
self.mouthState: Optional[dict] = None
self.warpQuality: Optional[dict] = None
self.mask: Optional[dict] = None
self.glasses: Optional[dict] = None
[docs] def asDict(self) -> Dict[str, dict]:
"""
Convert to dict without not extracted attributes.
Returns:
dict in api format::
{
"emotions": <self.emotions>,
"mouth_attributes": <self.mouthState>,
"quality": <self.warpQuality>
}
"""
res = {}
if self.emotions is not None:
res["emotions"] = self.emotions
if self.mouthState is not None:
res["mouth_attributes"] = self.mouthState
# fsdk v.5.0.0.0 does not calculate mouth attributes score
res["mouth_attributes"]["score"] = 1.0
if self.warpQuality is not None:
res["quality"] = self.warpQuality
if self.mask is not None:
res["mask"] = self.mask
if self.glasses is not None:
res["glasses"] = self.glasses
return res
[docs]class SDKWarp:
"""
SDK Warp.
Attributes:
body (Union[bytes, bytearray, PilImage]): binary body
outerSampleId (Optional[str]): id of sample if warps is obtained from the sample storage
filename (Optional[str]): warp source filename
detectTime (Optional[str]): detection time in ISO format
id (str): warp id
isFiltered (bool): warp is filtered or not in a task processing (by garbage score)
imageId (Optional[str]): image id which is a source for warp
error (Optional[Union[ErrorInfo, SDKErrorInfo]]): error which is occurred during task processing
isSaved (bool): whether sample is saved in luna-image-store
imageOrigin (Optional[str]): detection image origin
"""
__slots__ = (
"body",
"outerSampleId",
"filename",
"id",
"isFiltered",
"attributes",
"imageId",
"error",
"isSaved",
"detectTime",
"imageOrigin",
)
def __init__(
self,
body: Union[bytes, bytearray, ndarray],
sampleId: Optional[str] = None,
filename: Optional[str] = None,
imageId: Optional[str] = None,
detectTime: Optional[str] = None,
imageOrigin: Optional[str] = None,
):
self.body: Union[bytes, bytearray, ndarray] = body
self.outerSampleId: Optional[str] = sampleId
self.filename: Optional[str] = filename
self.detectTime: Optional[str] = detectTime
self.imageOrigin: Optional[str] = imageOrigin
self.id: str = str(uuid4()) # pylint: disable-msg=C0103
self.isFiltered: bool = False
self.isSaved = self.outerSampleId is not None
self.imageId: Optional[str] = imageId
self.error: Optional[Union[ErrorInfo, SDKErrorInfo]] = None
@property
def sampleId(self) -> str:
"""
Get sample id of warp.
Returns:
outerSampleId if it is not None otherwise self.id
"""
return self.outerSampleId if self.outerSampleId else self.id
[docs] def asBytes(self) -> bytes:
"""
Get warped image as bytes.
Raises:
RuntimeError: if body has incorrect type
Returns:
image as bytes
"""
if isinstance(self.body, ndarray):
with io.BytesIO() as imageData:
pillowImage = Image.fromarray(self.body)
pillowImage.save(imageData, format="JPEG")
return imageData.getvalue()
if isinstance(self.body, (bytes, bytearray)):
return bytes(self.body)
raise RuntimeError(f"Bad warped image type: {type(self.body)}")
[docs] def asPillow(self) -> PilImage:
"""
Get image data as pillow object.
Raises:
RuntimeError: if body has incorrect type
Returns:
PIL image
"""
if isinstance(self.body, ndarray):
return Image.fromarray(self.body)
if isinstance(self.body, (bytes, bytearray)):
return Image.open(io.BytesIO(bytes(self.body)))
raise RuntimeError(f"Bad warped image type: {type(self.body)}")
[docs]class FaceWarp(SDKWarp):
"""
Face warp
"""
[docs]class HumanWarp(SDKWarp):
"""
Human warp
"""
[docs]class FaceAttributes:
"""
Face attributes: descriptor, gender, ethnicity, age.
Attributes:
basicAttributes (Optional[dict]): dict with basic attributes from lunavl
descriptor (Optional[dict]): dict with descriptor from lunavl
warps (List[str]): list of warps
filtered (bool): attributes is filtered or not in a task processing (by garbage score)
attributeId (Optional[str]): attribute id
"""
__slots__ = ("basicAttributes", "descriptor", "warps", "filtered", "attributeId")
def __init__(
self,
warps: List[SDKWarp],
basicAttributes: Optional[dict] = None,
descriptor: Optional[dict] = None,
attributeId: Optional[str] = None,
):
self.basicAttributes: Optional[dict] = basicAttributes
self.descriptor: Optional[dict] = descriptor
self.warps: List[SDKWarp] = warps
self.filtered: bool = False
self.attributeId: str = attributeId
[docs]class HumanAttributes:
"""
Face attributes: descriptor, gender, ethnicity, age.
Attributes:
descriptor (Optional[dict]): dict with descriptor from lunavl
warps (List[HumanWarp]): warp list
filtered (bool): attributes is filtered or not in a task processing (by garbage score)
attributeId (Optional[str]): attribute id
"""
__slots__ = ("descriptor", "warps", "filtered", "attributeId")
def __init__(
self, warps: List[HumanWarp], descriptor: Optional[dict] = None, attributeId: Optional[str] = None,
):
self.descriptor: dict = descriptor
self.warps: List[HumanWarp] = warps
self.filtered: bool = False
self.attributeId: str = attributeId
[docs]class BoundingBox:
"""
Face bounding box.
Attributes:
width (float): width
height (float): height
x (float): x coordinate of top left angle
y (float): y coordinate of top left angle
"""
__slots__ = ("x", "y", "width", "height")
def __init__(self, width: int, height: int, x: int, y: int):
self.x: float = float(x) # pylint: disable-msg=C0103
self.y: float = float(y) # pylint: disable-msg=C0103
self.width: float = float(width)
self.height: float = float(height)
[docs]class SDKDetectableImage:
"""
Image for detect faces.
Attributes:
id (str): image id
body (Union[bytes, bytearray, PilImage]): body
faceBoundingBoxes (Optional[List[BoundingBox]]): list face detections
humanBoundingBoxes (Optional[List[BoundingBox]]): list human body detections
filename (Optional[str]): image filename (meta data)
error (Optional[SDKErrorInfo]): error which is occurred during task processing
url (Optional[str]): image source
detectTime (Optional[str]): detection time in ISO format
imageOrigin (Optional[str]): detection image origin
"""
__slots__ = (
"id",
"body",
"faceBoundingBoxes",
"humanBoundingBoxes",
"filename",
"error",
"detectTime",
"url",
"imageOrigin",
)
def __init__(
self,
body: Union[bytes, bytearray, PilImage],
filename: Optional[str] = None,
faceBoundingBoxes: Optional[List[BoundingBox]] = None,
humanBoundingBoxes: Optional[List[BoundingBox]] = None,
url: Optional[str] = None,
detectTime: Optional[str] = None,
imageOrigin: Optional[str] = None,
):
self.id: str = str(uuid4()) # pylint: disable-msg=C0103
self.body: bytes = self.asBytes(body)
self.faceBoundingBoxes: Optional[List[BoundingBox]] = faceBoundingBoxes
self.humanBoundingBoxes: Optional[List[BoundingBox]] = humanBoundingBoxes
self.filename: Optional[str] = filename
self.detectTime: Optional[str] = detectTime
self.error: Optional[SDKErrorInfo] = None
self.url: Optional[str] = url
self.imageOrigin: Optional[str] = imageOrigin
[docs] @staticmethod
def asBytes(
imageData: Union[bytes, bytearray, PilImage], imageFormat: Optional[str] = None, exif: Optional[bytes] = None
) -> bytes:
"""
Get image data as bytes. Convert Pil image to bytes if needed.
Args:
imageData: pillow object
imageFormat: image format (determined from the filename extension)
Returns:
image data as bytes
"""
if isinstance(imageData, PilImage):
with io.BytesIO() as outputData:
if exif is not None:
imageData.save(outputData, format=imageFormat or imageData.format, exif=exif)
else:
imageData.save(outputData, format=imageFormat or imageData.format)
return outputData.getvalue()
return bytes(imageData)
[docs] def asPillow(self) -> Optional[PilImage]:
"""
Get image data as pillow object.
Returns:
PIL image
"""
try:
return Image.open(io.BytesIO(self.body))
except OSError:
self.error = Error.InvalidType.format("Unsupported type")
return None
[docs] def exifTranspose(self) -> None:
"""
Based on genuine function from pillow
https://pillow.readthedocs.io/en/latest/_modules/PIL/ImageOps.html#exif_transpose
The only difference is that manipulation is being done inplace, allowing us to
leverage internal caching of exif data.
NOTE: Transposed image might have incorrect values in particular tags. For example,
orientation, length, width.
"""
pilImage = self.asPillow()
if pilImage:
exif = pilImage.getexif()
orientation = exif.get(0x0112)
method = {
2: Image.FLIP_LEFT_RIGHT,
3: Image.ROTATE_180,
4: Image.FLIP_TOP_BOTTOM,
5: Image.TRANSPOSE,
6: Image.ROTATE_270,
7: Image.TRANSVERSE,
8: Image.ROTATE_90,
}.get(orientation)
if method is not None:
transposedImage = pilImage.transpose(method)
self.body = self.asBytes(transposedImage, imageFormat=pilImage.format, exif=exif)
[docs]@dataclass(withSlots=True)
class BaseEstimation:
"""
Base container for estimation result
"""
# (SDKWarp): estimation warp
warp: SDKWarp
# (Optional[dict]): detection bounding rectangle
rect: Optional[dict] = None
# (Optional[dict]): estimated attributes
attributes: Optional[dict] = None
# (Optional[Union[HumanAttributes, FaceAttributes]]): extracted attributes
extractedAttributes: Optional[Union[HumanAttributes, FaceAttributes]] = None
# (Optional[dict]): image visual properties estimation
quality: Optional[dict] = None
# (Optional[dict]): estimation filter
filter: Optional[dict] = None
[docs]@dataclass(withSlots=True)
class HumanEstimation(BaseEstimation):
"""
Container for human body estimation result
"""
# (Optional[dict]): array of 17 body detection landmarks
landmarks17: Optional[dict] = None
# (Optional[float]): estimation score
score: Optional[float] = None
[docs]@dataclass(withSlots=True)
class FaceEstimation(BaseEstimation):
"""
Container for face estimation result
"""
# (Optional[dict]): array of 5 face detection landmarks
landmarks5: Optional[dict] = None
# (Optional[dict]): array of 68 face detection landmarks
landmarks68: Optional[dict] = None
[docs]@dataclass(withSlots=True)
class SDKEstimation:
"""
Container for sdk estimation result.
Attributes:
id: unique estimation ID
"""
# (str): unique estimation ID
id: str = None # pylint: disable-msg=C0103
# (Optional[HumanEstimation]): human body estimation
body: Optional[HumanEstimation] = None
# (Optional[FaceEstimation]): face estimation
face: Optional[FaceEstimation] = None
def __post_init__(self):
""" Init container with unique id. """
self.id = str(uuid4())
@property
def errors(self) -> List[ErrorInfo]:
"""
Estimation errors
Returns:
list of all estimation errors
"""
return [estimation.warp.error for estimation in (self.face, self.body) if estimation and estimation.warp.error]
[docs]class ImageEstimation:
"""
Container for image estimation result.
Attributes:
image: source image
filename: source image filename
estimations: image estimations
"""
def __init__(self, image: Union[SDKDetectableImage, HumanWarp, FaceWarp]):
"""
Init container with source image.
Args:
image: source image
"""
if isinstance(image, SDKDetectableImage):
self.estimations: List[SDKEstimation] = []
elif isinstance(image, FaceWarp):
self.estimations = [SDKEstimation(face=FaceEstimation(warp=image))]
elif isinstance(image, HumanWarp):
self.estimations = [SDKEstimation(body=HumanEstimation(warp=image))]
else:
raise RuntimeError("Unsupported source image type.")
self.image = image
self.filename = self.image.filename
@property
def errors(self) -> List[ErrorInfo]:
"""
Estimation errors.
Returns:
list of all estimation errors
"""
errors = []
if self.image.error:
errors.append(self.image.error)
errors.extend(chain(*(estimation.errors for estimation in self.estimations)))
return errors
@property
def error(self) -> Optional[ErrorInfo]:
"""
Estimation error.
Returns:
first estimation error if any errors occur, otherwise None
"""
return self.errors[0] if self.errors else None
[docs]@dataclass(withSlots=True)
class AggregatedDetectionFaceAttributes:
"""
Container for aggregated face detection result
"""
# (List[SDKWarp]): list of warps on which aggregation is performed
warps: List[SDKWarp]
# (dict): dict with liveness prediction and estimation data
liveness: dict
# (bool): attributes is filtered or not in a task processing (by garbage score)
filtered: bool
[docs]@dataclass(withSlots=True)
class AggregatedDetectionHumanAttributes:
"""
Container for aggregated human detection result
"""
[docs]@dataclass(withSlots=True)
class AggregatedFaceWarpAttributes:
"""
Container for the aggregated result of face warp attributes
"""
# (Optional[Dict[str, Any]]): dict with predominant emotion and estimation data
emotions: Optional[Dict[str, Any]] = None
# (Optional[Dict[str, Any]]): dict with predominant mask and estimation data
mask: Optional[Dict[str, Any]] = None
[docs]@dataclass(withSlots=True)
class AggregatedHumanWarpAttributes:
"""
Container for the aggregated result of human warp attributes
"""
[docs]@dataclass(withSlots=True)
class AggregatedDetectionEstimations:
"""
Container for aggregated detection result
"""
# (Optional[FaceAttributes]): face aggregated attributes
face: Optional[AggregatedDetectionFaceAttributes] = None
# (Optional[HumanAttributes]): body aggregated attributes
body: Optional[AggregatedDetectionHumanAttributes] = None
[docs]@dataclass(withSlots=True)
class AggregatedWarpEstimations:
"""
Container for aggregated estimation result
"""
# (Optional[AggregatedFaceWarpAttributes]): face aggregated attributes
face: Optional[AggregatedFaceWarpAttributes] = None
# (Optional[AggregatedHumanWarpAttributes]): body aggregated attributes
body: Optional[AggregatedHumanWarpAttributes] = None
[docs]@dataclass(withSlots=True)
class AggregatedEstimations:
"""
Container for aggregated estimation result
"""
# (AggregatedDetectionEstimations): aggregated estimations of detector stage
detection: AggregatedDetectionEstimations = dataclasses.field(default_factory=AggregatedDetectionEstimations)
# (AggregatedExtractionEstimations): aggregated estimations of extractor stage
extraction: AggregatedExtractionEstimations = dataclasses.field(default_factory=AggregatedExtractionEstimations)
# (AggregatedWarpEstimations): aggregated estimations of warp estimator stage
estimation: AggregatedWarpEstimations = dataclasses.field(default_factory=AggregatedWarpEstimations)
[docs]@dataclass(withSlots=True)
class FilteredEstimation:
"""
Container for filtered image estimation result
"""
# (str): image filename
filename: str
# (Union[HumanEstimation, FaceEstimation]): image estimations
estimation: Union[HumanEstimation, FaceEstimation]
@property
def filterReasons(self):
"""
Estimation filter reason.
"""
return self.estimation.filter["filter_reasons"]
[docs]class SDKTask:
"""
Task for execution in the sdk loop.
Attributes:
createTime (float): task create time, timestamp
taskId (int): unique task id
source (TaskDataSource): source of data for task
toEstimation (SDKEstimationTargets): set of estimations for task
aggregateAttributes (bool): aggregate face attributes to one from input data or not
_pipeline (List[Stages]): list of stages for task
filters (Optional[SDKTaskFilters]): filters for processing estimation values
error (Optional[SDKErrorInfo]): error which is occurred during task processing
multifacePolicy (MultifacePolicy): multiple face detection policy
monitoringData (TaskMonitoringData): task processing monitoring data
startExecutionTime: start execution time
lastCheckpointTime: last checkpoint time
images (List[ImageEstimation]): task image processing results
aggregatedEstimations (AggregatedEstimations): task aggregated estimations
filteredEstimations(List[FilteredEstimation]): task filtered estimations
Raises:
RuntimeError: if input data is not correct
"""
#: generator for task id
_taskCounter = 0
__slots__ = (
"createTime",
"taskId",
"source",
"toEstimation",
"aggregateAttributes",
"_pipeline",
"isFiltered",
"filters",
"multifacePolicy",
"error",
"monitoringData",
"startExecutionTime",
"lastCheckpointTime",
"images",
"aggregatedEstimations",
"filteredEstimations",
)
def __init__(
self,
toEstimation: SDKEstimationTargets,
data: Union[List[SDKDetectableImage], List[HumanWarp], List[FaceWarp]],
filters: Optional[SDKTaskFilters] = None,
aggregateAttributes: bool = False,
multifacePolicy: MultifacePolicy = MultifacePolicy.allowed,
):
self.createTime = time()
SDKTask._taskCounter += 1
self.taskId = SDKTask._taskCounter
self.images: List[ImageEstimation] = []
self.aggregatedEstimations: AggregatedEstimations = AggregatedEstimations()
self.filteredEstimations: List[FilteredEstimation] = []
if not data:
raise RuntimeError("Empty source for sdk task")
if isinstance(data[0], SDKDetectableImage):
self.source: TaskDataSource = TaskDataSource.images
elif isinstance(data[0], FaceWarp):
self.source = TaskDataSource.faceWarps
elif isinstance(data[0], HumanWarp):
self.source = TaskDataSource.humanWarps
else:
raise RuntimeError("sdk task does not assume simultaneously images and warps source of task")
self.images = [ImageEstimation(image) for image in data]
self.filters: SDKTaskFilters = filters or SDKTaskFilters()
if self.filters.needFilterByAngles():
toEstimation.faceEstimationTargets.estimateHeadPose = 1
if self.filters.livenessStates:
toEstimation.faceEstimationTargets.estimateLiveness.estimate = 1
if self.filters.maskStates:
toEstimation.faceEstimationTargets.estimateMask = 1
self.toEstimation: SDKEstimationTargets = toEstimation
self.aggregateAttributes: bool = aggregateAttributes
self._pipeline: List[Stages] = self.getPipeline()
self.multifacePolicy: MultifacePolicy = multifacePolicy
self.error: Union[ErrorInfo, SDKErrorInfo, None] = None
self.monitoringData: TaskMonitoringData = TaskMonitoringData()
self.startExecutionTime = None
self.lastCheckpointTime = None
@property
def pipeline(self) -> List[Stages]:
"""
Getter for pipe line.
Returns:
order list of stages
"""
return self._pipeline
[docs] def checkMultiFacesRule(self):
"""
Check task multiple face detection.
Returns:
True if multiple faces or humans not detected else False
"""
for image in self.images:
faceDetections = [estimation.face for estimation in image.estimations if estimation.face]
if len(faceDetections) > 1:
bBoxes = [detection.rect for detection in faceDetections]
self.error = Error.MultipleFaces.format(image.filename, bBoxes)
return False
return True
@property
def faceWarps(self) -> List[FaceWarp]:
""" Task face warps. """
faceWarps = []
for image in self.images:
for estimation in image.estimations:
if estimation.face:
faceWarps.append(estimation.face.warp)
return faceWarps
@property
def humanWarps(self) -> List[HumanWarp]:
""" Task human body warps. """
humanWarps = []
for image in self.images:
for estimation in image.estimations:
if estimation.body:
humanWarps.append(estimation.body.warp)
return humanWarps
[docs] def getNextStage(self, currentStage: Stages) -> Union[Stages, None]:
"""
Get next stage of the task.
Args:
currentStage: current stage
Returns:
next stage or None (task need put to results queue)
"""
if all((image.errors for image in self.images)):
return None
currentStageIndex = self.pipeline.index(currentStage)
nextStageIndex = currentStageIndex + 1
if len(self.pipeline) == nextStageIndex:
return None
nextStage = self.pipeline[nextStageIndex]
if nextStage == Stages.faceExtractor:
if self.aggregateAttributes:
if not self.checkMultiFacesRule():
return None
if all((sample.isFiltered for sample in self.faceWarps)):
nextStageIndex += 1
if len(self.pipeline) == nextStageIndex:
return None
nextStage = self.pipeline[nextStageIndex]
if nextStage == Stages.humanExtractor:
if all((sample.isFiltered for sample in self.humanWarps)):
return None
return nextStage
[docs] def getObjectCountForStage(self, stage: Stages) -> int:
"""
Get count objects (images or warps) for task processing for stage.
Args:
stage: stage
Returns:
count images if stage is detector otherwise warps count
"""
return len(self.images) if stage == Stages.faceDetector else len(self.faceWarps)
[docs] def getPipeline(self) -> List[Stages]:
"""
Get task pipeline based on estimations.
Returns:
order list of stages list
"""
pipeline = []
if self.toEstimation.estimateFace and (self.source != TaskDataSource.humanWarps):
faceEstimationTargets = self.toEstimation.faceEstimationTargets
if self.source == TaskDataSource.images:
pipeline.append(Stages.faceDetector)
if any(
(
faceEstimationTargets.estimateQuality,
faceEstimationTargets.estimateEmotions,
faceEstimationTargets.estimateMouthAttributes,
faceEstimationTargets.estimateMask,
faceEstimationTargets.estimateGlasses,
)
):
pipeline.append(Stages.faceEstimator)
if any((faceEstimationTargets.estimateBasicAttributes, faceEstimationTargets.estimateFaceDescriptor)):
pipeline.append(Stages.faceExtractor)
if self.toEstimation.estimateHuman and (self.source != TaskDataSource.faceWarps):
humanEstimationTargets = self.toEstimation.humanEstimationTargets
if self.source == TaskDataSource.images:
pipeline.append(Stages.humanDetector)
if humanEstimationTargets.estimateHumanDescriptor:
pipeline.append(Stages.humanExtractor)
return pipeline
[docs]@contextmanager
def tasksTimeMonitoring(fieldName, tasks: Iterable[SDKTask]):
"""
Context manager for updating execution time of tasks.
Args:
fieldName: monitoring field name
tasks: tasks
"""
start = perfTime()
yield
executionTime = perfTime() - start
for task in tasks:
field: MonitoringField = getattr(task.monitoringData, fieldName)
field.value = executionTime