Source code for luna_handlers.sdk.sdk_loop.human_extractor
"""
Module contains a functional for batch estimation descriptors and basic attributes
"""
from itertools import chain
from typing import List, Tuple
from lunavl.sdk.estimators.body_estimators.humanwarper import HumanWarpedImage
from .base_extractor import BaseExtractorSubTask, BaseExtractorState, BaseExtractor
from .crutches_on_wheels.errors.errors import Error
from .sdk_task import SDKTask, tasksTimeMonitoring, HumanAttributes
from .settings import HumanExtractorSettings
[docs]class HumanExtractorState(BaseExtractorState):
"""
Extractor state, process local. Final class.
State contains:
- logger for worker
- instance of basic attributes estimator
- instance of FaceEngine
- maps a descriptor version to instance of extractor
"""
[docs]class HumanExtractor(BaseExtractor):
"""
Human extractor
"""
#: state class
_state = HumanExtractorState
[docs]def getSubTasksForTasks(tasks: List[SDKTask]) -> Tuple[List[BaseExtractorSubTask], List[BaseExtractorSubTask]]:
"""
Separate tasks on sub tasks.
Args:
tasks: tasks
Returns:
tuple sub tasks without aggregation and with
"""
subTasksWithoutAggregation = []
subTasksWithAggregation = []
for task in tasks:
aggregate = task.aggregateAttributes
if not aggregate or len(task.humanWarps) == 1:
for estimation in chain(*(image.estimations for image in task.images if not image.error)):
if not estimation.body:
continue
warp = estimation.body.warp
try:
subtask = HumanExtractorSubTask(
attributes=HumanAttributes(warps=[warp]),
warps=[HumanWarpedImage(warp.body)],
descriptorVersion=task.toEstimation.humanEstimationTargets.estimateHumanDescriptor,
taskId=task.taskId,
estimationId=estimation.id,
)
subTasksWithoutAggregation.append(subtask)
except ValueError as e:
warp.error = Error.BadWarpImage.format(e)
continue
else:
try:
subtask = HumanExtractorSubTask(
attributes=HumanAttributes(warps=task.humanWarps),
warps=[HumanWarpedImage(warp.body) for warp in task.humanWarps],
descriptorVersion=task.toEstimation.humanEstimationTargets.estimateHumanDescriptor,
taskId=task.taskId,
)
except ValueError as e:
task.error = Error.BadWarpImage.format(e)
else:
subTasksWithAggregation.append(subtask)
return subTasksWithoutAggregation, subTasksWithAggregation
[docs]def extract(tasks: List[SDKTask]):
"""
Extract human body attributes attributes.
Args:
tasks: tasks
Returns:
processed tasks
"""
extractor = HumanExtractor()
extractor.logger.info(f"got {len(tasks)} tasks")
subTasksWithoutAggregation, subTasksWithAggregation = getSubTasksForTasks(tasks)
if subTasksWithoutAggregation:
with tasksTimeMonitoring(fieldName="humanDescriptorExtractTime", tasks=tasks):
extractor.batchExtractDescriptors(subTasksWithoutAggregation)
if subTasksWithAggregation:
with tasksTimeMonitoring(fieldName="humanDescriptorExtractTime", tasks=tasks):
extractor.batchExtractDescriptorsWithAggregation(subTasksWithAggregation)
for task in tasks:
for subTask in subTasksWithoutAggregation + subTasksWithAggregation:
if task.taskId == subTask.taskId:
if task.aggregateAttributes:
task.aggregatedEstimations.extraction.body = subTask.attributes
else:
task.updateEstimationsWithExtractionResult(subTask)
extractor.logger.info(f"performed {len(tasks)} tasks")
return tasks
[docs]def initWorker(settings: HumanExtractorSettings):
"""
Initialize extractor worker. Init logger, initialize FSDK, create basic attributes estimator and descriptor
extractors.
Args:
settings: detector settings
"""
HumanExtractorState.initialize("luna-handlers-h-extractor", settings=settings)
HumanExtractorState().logger.info("human extractor worker is initialized")