Source code for luna_handlers.sdk.sdk_loop.base_extractor
"""
Base module for extractors
"""
import os
from typing import List, Union, Dict, Optional, Type, TypeVar, Generic
from lunavl.sdk.errors.errors import LunaVLError, ErrorInfo
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.human_descriptor import HumanDescriptorEstimator
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarpedImage
from lunavl.sdk.estimators.face_estimators.face_descriptor import FaceDescriptorEstimator
from lunavl.sdk.estimators.face_estimators.facewarper import FaceWarpedImage
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from vlutils.descriptors.data import DescriptorsEnum, DescriptorType
from .crutches_on_wheels.utils.log import Logger
from .sdk_task import FaceAttributes, HumanAttributes
from .settings import FaceExtractorSettings, HumanExtractorSettings, RuntimeSettings
from .utils.worker_state import State
from .utils.recipes import grouper
#: generic type for extractor state
ExtractorState = TypeVar("ExtractorState", bound="BaseExtractorState")
[docs]class BaseExtractorState(State):
"""
Extractor state, process local.
State contains:
- logger for worker
- instance of basic attributes estimator
- instance of FaceEngine
- maps a descriptor version to instance of extractor
"""
#: global face engine, process local
_faceEngine: "VLFaceEngine" = None
#: extractor settings, process local
_settings: Union[FaceExtractorSettings, HumanExtractorSettings]
#: a global map descriptor version to extractor, process local
_extractors: Union[Dict[int, FaceDescriptorEstimator], Dict[int, HumanDescriptorEstimator]] = {}
@property
def faceEngine(self) -> "VLFaceEngine":
"""
Get instance of FaceEngine
Returns:
_faceEngine
"""
return self._faceEngine
[docs] @classmethod
def initialize(cls, workerName: str, settings: Union[HumanExtractorSettings, FaceExtractorSettings]) -> bool:
"""
Initialize state. Singleton.
Initialize FaceEngine, extractor and basic attributes.
Args:
workerName: worker name
settings: settings for worker
Returns:
True if it is first call of initialize (for process) otherwise False
"""
if not super().initialize(workerName, settings):
return False
faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider()
runtimeSettings: RuntimeSettings = cls._settings.runtimeSettings
runtimeSettingsProvider.runtimeSettings.deviceClass = runtimeSettings.deviceClass
runtimeSettingsProvider.runtimeSettings.numThreads = runtimeSettings.numThreads
runtimeSettingsProvider.runtimeSettings.numComputeStreams = runtimeSettings.numComputeStreams
faceEngineSettingsProvider.descriptorFactorySettings.model = cls._settings.descriptorVersion
if cls._settings.runtimeSettings.deviceClass.value == "cpu":
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from lunavl.sdk.faceengine.engine import VLFaceEngine # pylint: disable-msg=C0415
cls._faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider)
return True
[docs] def getExtractor(
self, descriptorVersion: Optional[int] = None
) -> Union[FaceDescriptorEstimator, HumanDescriptorEstimator]:
"""
Get extractor for corresponding descriptor version.
If there is no extractor of corresponding version, it will be created.
If descriptorVersion is equal to None or 1, an extractor with default version will be returned.
An extractor with the corresponding version will be returned otherwise.
Args:
descriptorVersion: descriptor version.
Returns:
extractor
"""
if descriptorVersion is None or descriptorVersion == 1:
descriptorVersion = self.settings.descriptorVersion
if descriptorVersion not in self._extractors:
for descriptor in DescriptorsEnum:
if descriptor.value.version == descriptorVersion:
if descriptor.value.type == DescriptorType.face:
self._extractors[descriptorVersion] = self.faceEngine.createFaceDescriptorEstimator(
descriptorVersion
)
else:
self._extractors[descriptorVersion] = self.faceEngine.createHumanDescriptorEstimator(
descriptorVersion
)
break
else:
raise RuntimeError(f"Unknown descriptor version {descriptorVersion}")
return self._extractors[descriptorVersion]
[docs]class BaseExtractorSubTask:
"""
Extractor sub task.
Attributes:
warps (List[WarpedImage]): list of warp for extract aggregate attributes.
attributes (Attributes): result attributes
taskId (int): original task id
estimationId (str): unique estimation id
estimationDescriptorVersion (int): 0 means do not estimate descriptor, -1 - default,
other - estimate descriptor with specified version
error (LunaVLError): error occurred during task execution
"""
__slots__ = (
"warps",
"attributes",
"taskId",
"estimationId",
"estimateBasicAttributes",
"estimationDescriptorVersion",
"error",
)
def __init__(
self,
attributes: Union[FaceAttributes, HumanAttributes],
warps: Union[List[FaceWarpedImage], List[HumanWarpedImage]],
taskId: int,
estimationId: Optional[str] = None,
descriptorVersion: int = 0,
):
self.warps: Union[List[FaceWarpedImage], List[HumanWarpedImage]] = warps
self.attributes: Union[FaceAttributes, HumanAttributes] = attributes
self.taskId: int = taskId
self.estimationId: str = estimationId
self.estimationDescriptorVersion = descriptorVersion
self.error: Union[ErrorInfo, None] = None
[docs] def needEstimateDescriptor(self) -> bool:
"""
Need or not estimate descriptor
Returns:
true if estimationDescriptorVersion != 0
"""
return self.estimationDescriptorVersion != 0
[docs]class BaseExtractor(Generic[ExtractorState]):
"""
Base extractor class
"""
# state class
_state: Type[ExtractorState]
@property
def logger(self) -> Logger:
"""
Get Logger.
Returns:
logger from state
"""
return self.state.logger
@property
def state(self) -> ExtractorState:
"""
Get extractor state
Returns:
state
"""
return self._state()
@staticmethod
def _getDescriptorVersionSubTaskMap(subTasks: List[BaseExtractorSubTask]) -> dict:
"""
Get sub tasks as dict with keys - descriptor versions
Args:
subTasks: sub tasks
Returns:
dict descriptor versions with sub tasks
"""
actualSubTasks = [subTask for subTask in subTasks if subTask.needEstimateDescriptor()]
subTaskDescriptorVersionMap = dict()
for subTask in actualSubTasks:
if subTask.estimationDescriptorVersion not in subTaskDescriptorVersionMap:
subTaskDescriptorVersionMap[subTask.estimationDescriptorVersion] = [subTask]
else:
subTaskDescriptorVersionMap[subTask.estimationDescriptorVersion].append(subTask)
return subTaskDescriptorVersionMap
[docs] def batchExtractDescriptors(self, subTasks: List[BaseExtractorSubTask]) -> None:
"""
Batch extract descriptors for sub tasks with one warp.
Batches are chunked to reduce FSDK memory consumption.
Args:
subTasks: sub tasks
"""
subTaskDescriptorVersionMap = self._getDescriptorVersionSubTaskMap(subTasks)
for descriptorVersion, subTaskList in subTaskDescriptorVersionMap.items():
vlExtractor = self.state.getExtractor(descriptorVersion=descriptorVersion)
for subTaskChunk in grouper(subTaskList, self.state.settings.optimalBatchSize):
try:
batch, _ = vlExtractor.estimateDescriptorsBatch([attribute.warps[0] for attribute in subTaskChunk])
except LunaSDKException as exc:
if exc.error.errorCode not in (
LunaVLError.BatchedInternalError.errorCode,
LunaVLError.ValidationFailed.errorCode,
):
raise
warpsForSecondTry = []
for subTask, error in zip(subTaskChunk, exc.context):
if LunaVLError.Ok.errorCode == error.errorCode:
warpsForSecondTry.append(subTask.warps[0])
else:
subTask.error = error
batch = []
if warpsForSecondTry:
batch = vlExtractor.estimateDescriptorsBatch(warpsForSecondTry)
for subTask, descriptor in zip(subTaskChunk, batch):
subTask.attributes.descriptor = descriptor.asDict()
[docs] def batchExtractDescriptorsWithAggregation(self, subTasks: List[BaseExtractorSubTask]):
"""
Batch extract descriptors for sub tasks with several warps
Args:
subTasks: sub tasks
"""
subTaskDescriptorVersionMap = self._getDescriptorVersionSubTaskMap(subTasks)
for descriptorVersion, subTasksByDescriptorVersion in subTaskDescriptorVersionMap.items():
vlExtractor = self.state.getExtractor(descriptorVersion)
for subTask in subTasksByDescriptorVersion:
try:
_, aggregateDescriptor = vlExtractor.estimateDescriptorsBatch(subTask.warps, aggregate=True)
subTask.attributes.descriptor = aggregateDescriptor.asDict()
except LunaSDKException as e:
subTask.error = e.error