Source code for luna_handlers.sdk.sdk_loop.base_extractor

"""
Base module for extractors
"""
import os
from typing import List, Union, Dict, Optional, Type, TypeVar, Generic

from lunavl.sdk.errors.errors import LunaVLError, ErrorInfo
from lunavl.sdk.errors.exceptions import LunaSDKException
from lunavl.sdk.estimators.body_estimators.human_descriptor import HumanDescriptorEstimator
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarpedImage
from lunavl.sdk.estimators.face_estimators.face_descriptor import FaceDescriptorEstimator
from lunavl.sdk.estimators.face_estimators.facewarper import FaceWarpedImage
from lunavl.sdk.faceengine.setting_provider import FaceEngineSettingsProvider, RuntimeSettingsProvider
from vlutils.descriptors.data import DescriptorsEnum, DescriptorType

from .crutches_on_wheels.utils.log import Logger
from .sdk_task import FaceAttributes, HumanAttributes
from .settings import FaceExtractorSettings, HumanExtractorSettings, RuntimeSettings
from .utils.worker_state import State
from .utils.recipes import grouper

#: generic type for extractor state
ExtractorState = TypeVar("ExtractorState", bound="BaseExtractorState")


[docs]class BaseExtractorState(State): """ Extractor state, process local. State contains: - logger for worker - instance of basic attributes estimator - instance of FaceEngine - maps a descriptor version to instance of extractor """ #: global face engine, process local _faceEngine: "VLFaceEngine" = None #: extractor settings, process local _settings: Union[FaceExtractorSettings, HumanExtractorSettings] #: a global map descriptor version to extractor, process local _extractors: Union[Dict[int, FaceDescriptorEstimator], Dict[int, HumanDescriptorEstimator]] = {} @property def faceEngine(self) -> "VLFaceEngine": """ Get instance of FaceEngine Returns: _faceEngine """ return self._faceEngine
[docs] @classmethod def initialize(cls, workerName: str, settings: Union[HumanExtractorSettings, FaceExtractorSettings]) -> bool: """ Initialize state. Singleton. Initialize FaceEngine, extractor and basic attributes. Args: workerName: worker name settings: settings for worker Returns: True if it is first call of initialize (for process) otherwise False """ if not super().initialize(workerName, settings): return False faceEngineSettingsProvider, runtimeSettingsProvider = FaceEngineSettingsProvider(), RuntimeSettingsProvider() runtimeSettings: RuntimeSettings = cls._settings.runtimeSettings runtimeSettingsProvider.runtimeSettings.deviceClass = runtimeSettings.deviceClass runtimeSettingsProvider.runtimeSettings.numThreads = runtimeSettings.numThreads runtimeSettingsProvider.runtimeSettings.numComputeStreams = runtimeSettings.numComputeStreams faceEngineSettingsProvider.descriptorFactorySettings.model = cls._settings.descriptorVersion if cls._settings.runtimeSettings.deviceClass.value == "cpu": os.environ["CUDA_VISIBLE_DEVICES"] = "-1" from lunavl.sdk.faceengine.engine import VLFaceEngine # pylint: disable-msg=C0415 cls._faceEngine = VLFaceEngine(faceEngineConf=faceEngineSettingsProvider, runtimeConf=runtimeSettingsProvider) return True
[docs] def getExtractor( self, descriptorVersion: Optional[int] = None ) -> Union[FaceDescriptorEstimator, HumanDescriptorEstimator]: """ Get extractor for corresponding descriptor version. If there is no extractor of corresponding version, it will be created. If descriptorVersion is equal to None or 1, an extractor with default version will be returned. An extractor with the corresponding version will be returned otherwise. Args: descriptorVersion: descriptor version. Returns: extractor """ if descriptorVersion is None or descriptorVersion == 1: descriptorVersion = self.settings.descriptorVersion if descriptorVersion not in self._extractors: for descriptor in DescriptorsEnum: if descriptor.value.version == descriptorVersion: if descriptor.value.type == DescriptorType.face: self._extractors[descriptorVersion] = self.faceEngine.createFaceDescriptorEstimator( descriptorVersion ) else: self._extractors[descriptorVersion] = self.faceEngine.createHumanDescriptorEstimator( descriptorVersion ) break else: raise RuntimeError(f"Unknown descriptor version {descriptorVersion}") return self._extractors[descriptorVersion]
[docs]class BaseExtractorSubTask: """ Extractor sub task. Attributes: warps (List[WarpedImage]): list of warp for extract aggregate attributes. attributes (Attributes): result attributes taskId (int): original task id estimationId (str): unique estimation id estimationDescriptorVersion (int): 0 means do not estimate descriptor, -1 - default, other - estimate descriptor with specified version error (LunaVLError): error occurred during task execution """ __slots__ = ( "warps", "attributes", "taskId", "estimationId", "estimateBasicAttributes", "estimationDescriptorVersion", "error", ) def __init__( self, attributes: Union[FaceAttributes, HumanAttributes], warps: Union[List[FaceWarpedImage], List[HumanWarpedImage]], taskId: int, estimationId: Optional[str] = None, descriptorVersion: int = 0, ): self.warps: Union[List[FaceWarpedImage], List[HumanWarpedImage]] = warps self.attributes: Union[FaceAttributes, HumanAttributes] = attributes self.taskId: int = taskId self.estimationId: str = estimationId self.estimationDescriptorVersion = descriptorVersion self.error: Union[ErrorInfo, None] = None
[docs] def needEstimateDescriptor(self) -> bool: """ Need or not estimate descriptor Returns: true if estimationDescriptorVersion != 0 """ return self.estimationDescriptorVersion != 0
[docs]class BaseExtractor(Generic[ExtractorState]): """ Base extractor class """ # state class _state: Type[ExtractorState] @property def logger(self) -> Logger: """ Get Logger. Returns: logger from state """ return self.state.logger @property def state(self) -> ExtractorState: """ Get extractor state Returns: state """ return self._state() @staticmethod def _getDescriptorVersionSubTaskMap(subTasks: List[BaseExtractorSubTask]) -> dict: """ Get sub tasks as dict with keys - descriptor versions Args: subTasks: sub tasks Returns: dict descriptor versions with sub tasks """ actualSubTasks = [subTask for subTask in subTasks if subTask.needEstimateDescriptor()] subTaskDescriptorVersionMap = dict() for subTask in actualSubTasks: if subTask.estimationDescriptorVersion not in subTaskDescriptorVersionMap: subTaskDescriptorVersionMap[subTask.estimationDescriptorVersion] = [subTask] else: subTaskDescriptorVersionMap[subTask.estimationDescriptorVersion].append(subTask) return subTaskDescriptorVersionMap
[docs] def batchExtractDescriptors(self, subTasks: List[BaseExtractorSubTask]) -> None: """ Batch extract descriptors for sub tasks with one warp. Batches are chunked to reduce FSDK memory consumption. Args: subTasks: sub tasks """ subTaskDescriptorVersionMap = self._getDescriptorVersionSubTaskMap(subTasks) for descriptorVersion, subTaskList in subTaskDescriptorVersionMap.items(): vlExtractor = self.state.getExtractor(descriptorVersion=descriptorVersion) for subTaskChunk in grouper(subTaskList, self.state.settings.optimalBatchSize): try: batch, _ = vlExtractor.estimateDescriptorsBatch([attribute.warps[0] for attribute in subTaskChunk]) except LunaSDKException as exc: if exc.error.errorCode not in ( LunaVLError.BatchedInternalError.errorCode, LunaVLError.ValidationFailed.errorCode, ): raise warpsForSecondTry = [] for subTask, error in zip(subTaskChunk, exc.context): if LunaVLError.Ok.errorCode == error.errorCode: warpsForSecondTry.append(subTask.warps[0]) else: subTask.error = error batch = [] if warpsForSecondTry: batch = vlExtractor.estimateDescriptorsBatch(warpsForSecondTry) for subTask, descriptor in zip(subTaskChunk, batch): subTask.attributes.descriptor = descriptor.asDict()
[docs] def batchExtractDescriptorsWithAggregation(self, subTasks: List[BaseExtractorSubTask]): """ Batch extract descriptors for sub tasks with several warps Args: subTasks: sub tasks """ subTaskDescriptorVersionMap = self._getDescriptorVersionSubTaskMap(subTasks) for descriptorVersion, subTasksByDescriptorVersion in subTaskDescriptorVersionMap.items(): vlExtractor = self.state.getExtractor(descriptorVersion) for subTask in subTasksByDescriptorVersion: try: _, aggregateDescriptor = vlExtractor.estimateDescriptorsBatch(subTask.warps, aggregate=True) subTask.attributes.descriptor = aggregateDescriptor.asDict() except LunaSDKException as e: subTask.error = e.error