Source code for luna_handlers.sdk.sdk_loop.human_extractor

"""
Module contains a functional for batch estimation descriptors and basic attributes
"""
from itertools import chain
from typing import List, Tuple

from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarpedImage

from .base_extractor import BaseExtractorSubTask, BaseExtractorState, BaseExtractor
from .crutches_on_wheels.errors.errors import Error
from .sdk_task import SDKTask, tasksTimeMonitoring, HumanAttributes
from .settings import HumanExtractorSettings


[docs]class HumanExtractorState(BaseExtractorState): """ Extractor state, process local. Final class. State contains: - logger for worker - instance of basic attributes estimator - instance of FaceEngine - maps a descriptor version to instance of extractor """
[docs]class HumanExtractorSubTask(BaseExtractorSubTask): """ Human Extractor sub task. """
[docs]class HumanExtractor(BaseExtractor): """ Human extractor """ #: state class _state = HumanExtractorState
[docs]def getSubTasksForTasks(tasks: List[SDKTask]) -> Tuple[List[BaseExtractorSubTask], List[BaseExtractorSubTask]]: """ Separate tasks on sub tasks. Args: tasks: tasks Returns: tuple sub tasks without aggregation and with """ subTasksWithoutAggregation = [] subTasksWithAggregation = [] for task in tasks: aggregate = task.aggregateAttributes if not aggregate or len(task.humanWarps) == 1: for estimation in chain(*(image.estimations for image in task.images if not image.error)): if not estimation.body: continue warp = estimation.body.warp try: subtask = HumanExtractorSubTask( attributes=HumanAttributes(warps=[warp]), warps=[HumanWarpedImage(warp.body)], descriptorVersion=task.toEstimation.humanEstimationTargets.estimateHumanDescriptor, taskId=task.taskId, estimationId=estimation.id, ) subTasksWithoutAggregation.append(subtask) except ValueError as e: warp.error = Error.BadWarpImage.format(e) continue else: try: subtask = HumanExtractorSubTask( attributes=HumanAttributes(warps=task.humanWarps), warps=[HumanWarpedImage(warp.body) for warp in task.humanWarps], descriptorVersion=task.toEstimation.humanEstimationTargets.estimateHumanDescriptor, taskId=task.taskId, ) except ValueError as e: task.error = Error.BadWarpImage.format(e) else: subTasksWithAggregation.append(subtask) return subTasksWithoutAggregation, subTasksWithAggregation
[docs]def extract(tasks: List[SDKTask]): """ Extract human body attributes attributes. Args: tasks: tasks Returns: processed tasks """ extractor = HumanExtractor() extractor.logger.info(f"got {len(tasks)} tasks") subTasksWithoutAggregation, subTasksWithAggregation = getSubTasksForTasks(tasks) if subTasksWithoutAggregation: with tasksTimeMonitoring(fieldName="humanDescriptorExtractTime", tasks=tasks): extractor.batchExtractDescriptors(subTasksWithoutAggregation) if subTasksWithAggregation: with tasksTimeMonitoring(fieldName="humanDescriptorExtractTime", tasks=tasks): extractor.batchExtractDescriptorsWithAggregation(subTasksWithAggregation) for task in tasks: for subTask in subTasksWithoutAggregation + subTasksWithAggregation: if task.taskId == subTask.taskId: if task.aggregateAttributes: task.aggregatedEstimations.extraction.body = subTask.attributes else: task.updateEstimationsWithExtractionResult(subTask) extractor.logger.info(f"performed {len(tasks)} tasks") return tasks
[docs]def initWorker(settings: HumanExtractorSettings): """ Initialize extractor worker. Init logger, initialize FSDK, create basic attributes estimator and descriptor extractors. Args: settings: detector settings """ HumanExtractorState.initialize("luna-handlers-h-extractor", settings=settings) HumanExtractorState().logger.info("human extractor worker is initialized")