Source code for luna_handlers.sdk.sdk_loop.task_loop

"""
Module contains task loop
"""
import asyncio
import os
from asyncio import CancelledError, Future
from collections import namedtuple
from time import time
from typing import Optional, Dict

from logbook import Logger
from vlutils.jobs.async_runner import AsyncRunner
from vlutils.structures.switch import switch

from .crutches_on_wheels.errors.errors import Error
from .crutches_on_wheels.errors.exception import VLException
from .enums import Stages
from .estimation_targets import (
    SDKEstimationTargets,
    SDKFaceEstimationTargets,
    SDKHumanEstimationTargets,
    SDKLivenessEstimationPolicy,
)
from .face_extractor import extract as extractFace
from .face_extractor import initWorker as initFaceExtractorWorker
from .face_detector import detect as detectFaces
from .face_detector import initWorker as initFaceDetectorWorker
from .human_detector import detect as detectHumans
from .human_detector import initWorker as initHumanDetectorWorker
from .human_extractor import extract as extractHuman
from .human_extractor import initWorker as initHumanExtractorWorker

from .face_warp_estimator import estimate
from .face_warp_estimator import initWorker as initWarpEstimatorWorker
from .queues.queue import NamedQueue
from .sdk_stages import AsyncStageHandler
from .sdk_task import SDKTask, SDKDetectableImage, FaceWarp, HumanWarp
from .settings import SDKLoopSettings
from .utils.logger import getLogger

TaskData = namedtuple("TaskData", ("image", "target"))


[docs]class SDKTaskLoop: """ Loop for processing sdk task (extract attributes, detect faces, estiimate attributes) Attributes: _faceExtractorQueue (NamedQueue): queue for extract tasks face attributes _faceDetectorQueue (NamedQueue): queue for detect face tasks _faceEstimatorQueue (NamedQueue): queue for estimate warp attributes tasks _humanDetectorQueue (NamedQueue): queue for detect face tasks _humanExtractorQueue (NamedQueue): queue for extract tasks human attributes _resultsQueue (NamedQueue): queue for tasks results _running (bool): loop status _results (Dict[str, Future]): map a task id to a future which waiting the task result logger (Logger): loop logger loopScheduler (AsyncRunner): Scheduler which contains all running coroutines (loop, queue workers) """ def __init__(self, settings: SDKLoopSettings): self._resultsQueue: NamedQueue = NamedQueue(name="results") self._running: bool = False self._results: Dict[str, Future] = {} self.logger: Logger = getLogger(template="sdk loop") self.loopScheduler: AsyncRunner = None self.settings = settings self.routingMap: dict = {} if Stages.humanDetector in settings.availableStages: self._humanDetectorQueue: NamedQueue = NamedQueue(name="humanDetector") self.humanDetectorHandler: AsyncStageHandler = AsyncStageHandler( stage=Stages.humanDetector, executor=detectHumans, inputQueue=self._humanDetectorQueue, optimalStageBatchSize=settings.humanDetectorStageSettings.optimalBatchSize, workerCount=settings.humanDetectorStageSettings.listenerCount, poolSize=settings.humanDetectorStageSettings.workerCount, workerSettings=self.settings.humanDetectorSettings, workerInitializer=initHumanDetectorWorker, ) self.routingMap[Stages.humanDetector] = self._humanDetectorQueue else: self._humanDetectorQueue = None self.humanDetectorHandler = None if Stages.humanExtractor in settings.availableStages: self._humanExtractorQueue: NamedQueue = NamedQueue(name="humanExtractor") self.humanExtractorHandler: AsyncStageHandler = AsyncStageHandler( stage=Stages.humanExtractor, executor=extractHuman, inputQueue=self._humanExtractorQueue, optimalStageBatchSize=settings.humanExtractorStageSettings.optimalBatchSize, workerCount=settings.humanExtractorStageSettings.listenerCount, poolSize=settings.humanExtractorStageSettings.workerCount, workerSettings=self.settings.humanExtractorSettings, workerInitializer=initHumanExtractorWorker, ) self.routingMap[Stages.humanExtractor] = self._humanExtractorQueue else: self._humanExtractorQueue = None self.humanExtractorHandler = None if Stages.faceDetector in settings.availableStages: self._faceDetectorQueue: NamedQueue = NamedQueue(name="faceDetector") self.faceDetectorHandler: AsyncStageHandler = AsyncStageHandler( stage=Stages.faceDetector, executor=detectFaces, inputQueue=self._faceDetectorQueue, optimalStageBatchSize=settings.faceDetectorStageSettings.optimalBatchSize, workerCount=settings.faceDetectorStageSettings.listenerCount, poolSize=settings.faceDetectorStageSettings.workerCount, workerSettings=self.settings.faceDetectorSettings, workerInitializer=initFaceDetectorWorker, ) self.routingMap[Stages.faceDetector] = self._faceDetectorQueue else: self._faceDetectorQueue = None self.faceDetectorHandler = None if Stages.faceExtractor in settings.availableStages: self._faceExtractorQueue: NamedQueue = NamedQueue(name="faceExtractor") self.faceExtractorHandler: AsyncStageHandler = AsyncStageHandler( stage=Stages.faceExtractor, executor=extractFace, inputQueue=self._faceExtractorQueue, optimalStageBatchSize=settings.faceExtractorStageSettings.optimalBatchSize, workerCount=settings.faceExtractorStageSettings.listenerCount, poolSize=settings.faceExtractorStageSettings.workerCount, workerSettings=self.settings.faceExtractorSettings, workerInitializer=initFaceExtractorWorker, ) self.routingMap[Stages.faceExtractor] = self._faceExtractorQueue else: self._faceExtractorQueue = None self.faceExtractorHandler = None if Stages.faceEstimator in settings.availableStages: self._faceEstimatorQueue: NamedQueue = NamedQueue(name="faceWarpEstimator") self.faceEstimatorHandler: AsyncStageHandler = AsyncStageHandler( stage=Stages.faceEstimator, executor=estimate, inputQueue=self._faceEstimatorQueue, optimalStageBatchSize=settings.faceWarpEstimationStageSettings.optimalBatchSize, workerCount=settings.faceWarpEstimationStageSettings.listenerCount, poolSize=settings.faceWarpEstimationStageSettings.workerCount, workerSettings=self.settings.faceWarpEstimatorSettings, workerInitializer=initWarpEstimatorWorker, ) self.routingMap[Stages.faceEstimator] = self._faceEstimatorQueue else: self._faceEstimatorQueue = None self.faceEstimatorHandler = None
[docs] async def initialize(self): """ Initialize loop. Start coroutine with infinite loop collect results loop, start task workers """ self.logger.info("start sdk loop") self.loopScheduler = AsyncRunner(1, closeTimeout=1) for handler in ( self.faceDetectorHandler, self.faceExtractorHandler, self.faceEstimatorHandler, self.humanDetectorHandler, self.humanExtractorHandler, ): if handler is not None: handler.run(resultQueue=self._resultsQueue, routingQueueMap=self.routingMap) self._running = True self.loopScheduler.runNoWait((self.collectResultLoop(),)) self.logger.info("sdk loop started")
[docs] def checkStages(self, task: SDKTask) -> None: """ Check detector/extractor/estimator using in task's pipeline and its availability Args: task: task Raises: ForbiddenProcess if one of pipeline stages needs service which turned off """ for stage in task.pipeline: if stage not in self.settings.availableStages: raise VLException(Error.ForbiddenProcess.format(stage.value), 403, False)
[docs] async def validate(self): """ Validate SDK loop. Start SDK tasks to validate SDK services (detector, extractor, estimator). Raises: RuntimeError indicating the stage that failed to validate if one of the tasks returns an error """ self.logger.info("validate sdk loop") imagePath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data", "warp.jpg") with open(imagePath, "rb") as file: image = file.read() faceDetectorTargets = SDKFaceEstimationTargets( estimateGaze=1, estimateHeadPose=1, estimateEyesAttributes=1, estimateLiveness=SDKLivenessEstimationPolicy(estimate=1, qualityThreshold=0), ) faceDetectionTargets = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceDetectorTargets) faceExtractorTargets = SDKFaceEstimationTargets(estimateBasicAttributes=1, estimateFaceDescriptor=1) faceExtractionTargets = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceExtractorTargets) faceWarpEstimatorTargets = SDKFaceEstimationTargets( estimateQuality=1, estimateMouthAttributes=1, estimateEmotions=1, estimateMask=1, estimateGlasses=1, ) faceWarpEstimationTargets = SDKEstimationTargets( estimateHuman=0, faceEstimationTargets=faceWarpEstimatorTargets ) humanDetectionTargets = SDKEstimationTargets( estimateFace=0, humanEstimationTargets=SDKHumanEstimationTargets(estimateHumanDescriptor=0) ) humanExtractionTargets = SDKEstimationTargets( estimateFace=0, humanEstimationTargets=SDKHumanEstimationTargets(estimateHumanDescriptor=1) ) stagesMap = { Stages.faceDetector: TaskData(SDKDetectableImage(filename="raw", body=image), faceDetectionTargets), Stages.faceExtractor: TaskData(FaceWarp(filename="face warp", body=image), faceExtractionTargets), Stages.faceEstimator: TaskData(FaceWarp(filename="face warp", body=image), faceWarpEstimationTargets), Stages.humanDetector: TaskData(SDKDetectableImage(filename="raw", body=image), humanDetectionTargets), Stages.humanExtractor: TaskData(HumanWarp(filename="human warp", body=image), humanExtractionTargets), } tasks = [] for stage in self.settings.availableStages: taskData = stagesMap[stage] tasks.append( self.processTask(task=SDKTask(toEstimation=taskData.target, data=[taskData.image]), logger=self.logger) ) for result in await asyncio.gather(*tasks): if result.error: raise RuntimeError(f"Failed to validate the sdk loop. Error: {result.error.asDict()}") self.logger.info("sdk loop validated")
[docs] async def collectResultLoop(self): """ Infinity loop for collect results from the results queue """ while self._running: task: SDKTask = await self._resultsQueue.get() now = time() task.monitoringData.executionTime.value = now - task.startExecutionTime task.monitoringData.resultsQueueTransportTime.value = now - task.lastCheckpointTime task.lastCheckpointTime = now if task.taskId in self._results: future: Future = self._results.pop(task.taskId) if future.done(): # case of cancellation pass else: future.set_result(task) self._resultsQueue.task_done()
[docs] async def processTask(self, task: SDKTask, logger: Optional[Logger] = None) -> SDKTask: """ Process a sdk task. Args: task: task logger: logger Returns: task with results Raises: RuntimeError: if ".pipeline" return unknown stage. """ self.checkStages(task) if not logger: logger = self.logger future = asyncio.get_running_loop().create_future() self._results[task.taskId] = future task.startExecutionTime = time() task.lastCheckpointTime = time() for case in switch(task.pipeline[0]): if case(Stages.humanDetector): self._humanDetectorQueue.put_nowait(task) logger.info(f"put sdk task {task.taskId} to human detector queue") break if case(Stages.humanExtractor): self._humanExtractorQueue.put_nowait(task) logger.info(f"put sdk task {task.taskId} to human extractor queue") break if case(Stages.faceDetector): self._faceDetectorQueue.put_nowait(task) logger.info(f"put sdk task {task.taskId} to face detector queue") break if case(Stages.faceEstimator): self._faceEstimatorQueue.put_nowait(task) logger.info(f"put sdk task {task.taskId} to face estimator queue") break if case(Stages.faceExtractor): self._faceExtractorQueue.put_nowait(task) logger.info(f"put sdk task {task.taskId} to face extractor queue") break if case(): raise RuntimeError("Unknown stage") else: raise RuntimeError("Unknown stage") try: result = await future logger.info(f"sdk task {task.taskId} done") return result except CancelledError: logger.info(f"sdk task {task.taskId} was cancelled") if task.taskId in self._results: self._results.pop(task.taskId).cancel() raise
[docs] async def close(self): """ Close loop """ self.logger.info("stop loop") self._running = False for handler in ( self.faceDetectorHandler, self.faceEstimatorHandler, self.faceExtractorHandler, self.humanDetectorHandler, self.humanExtractorHandler, ): if handler is not None: await handler.stop() self.logger.info(f"stopped sdk {handler.stage.name} handler") await self.loopScheduler.close() self.logger.info("stopped sdk loop")