"""
Module contains task loop
"""
import asyncio
import os
from asyncio import CancelledError, Future
from collections import namedtuple
from time import time
from typing import Optional, Dict
from logbook import Logger
from vlutils.jobs.async_runner import AsyncRunner
from vlutils.structures.switch import switch
from .crutches_on_wheels.errors.errors import Error
from .crutches_on_wheels.errors.exception import VLException
from .enums import Stages
from .estimation_targets import (
SDKEstimationTargets,
SDKFaceEstimationTargets,
SDKHumanEstimationTargets,
SDKLivenessEstimationPolicy,
)
from .face_extractor import extract as extractFace
from .face_extractor import initWorker as initFaceExtractorWorker
from .face_detector import detect as detectFaces
from .face_detector import initWorker as initFaceDetectorWorker
from .human_detector import detect as detectHumans
from .human_detector import initWorker as initHumanDetectorWorker
from .human_extractor import extract as extractHuman
from .human_extractor import initWorker as initHumanExtractorWorker
from .face_warp_estimator import estimate
from .face_warp_estimator import initWorker as initWarpEstimatorWorker
from .queues.queue import NamedQueue
from .sdk_stages import AsyncStageHandler
from .sdk_task import SDKTask, SDKDetectableImage, FaceWarp, HumanWarp
from .settings import SDKLoopSettings
from .utils.logger import getLogger
TaskData = namedtuple("TaskData", ("image", "target"))
[docs]class SDKTaskLoop:
"""
Loop for processing sdk task (extract attributes, detect faces, estiimate attributes)
Attributes:
_faceExtractorQueue (NamedQueue): queue for extract tasks face attributes
_faceDetectorQueue (NamedQueue): queue for detect face tasks
_faceEstimatorQueue (NamedQueue): queue for estimate warp attributes tasks
_humanDetectorQueue (NamedQueue): queue for detect face tasks
_humanExtractorQueue (NamedQueue): queue for extract tasks human attributes
_resultsQueue (NamedQueue): queue for tasks results
_running (bool): loop status
_results (Dict[str, Future]): map a task id to a future which waiting the task result
logger (Logger): loop logger
loopScheduler (AsyncRunner): Scheduler which contains all running coroutines (loop, queue workers)
"""
def __init__(self, settings: SDKLoopSettings):
self._resultsQueue: NamedQueue = NamedQueue(name="results")
self._running: bool = False
self._results: Dict[str, Future] = {}
self.logger: Logger = getLogger(template="sdk loop")
self.loopScheduler: AsyncRunner = None
self.settings = settings
self.routingMap: dict = {}
if Stages.humanDetector in settings.availableStages:
self._humanDetectorQueue: NamedQueue = NamedQueue(name="humanDetector")
self.humanDetectorHandler: AsyncStageHandler = AsyncStageHandler(
stage=Stages.humanDetector,
executor=detectHumans,
inputQueue=self._humanDetectorQueue,
optimalStageBatchSize=settings.humanDetectorStageSettings.optimalBatchSize,
workerCount=settings.humanDetectorStageSettings.listenerCount,
poolSize=settings.humanDetectorStageSettings.workerCount,
workerSettings=self.settings.humanDetectorSettings,
workerInitializer=initHumanDetectorWorker,
)
self.routingMap[Stages.humanDetector] = self._humanDetectorQueue
else:
self._humanDetectorQueue = None
self.humanDetectorHandler = None
if Stages.humanExtractor in settings.availableStages:
self._humanExtractorQueue: NamedQueue = NamedQueue(name="humanExtractor")
self.humanExtractorHandler: AsyncStageHandler = AsyncStageHandler(
stage=Stages.humanExtractor,
executor=extractHuman,
inputQueue=self._humanExtractorQueue,
optimalStageBatchSize=settings.humanExtractorStageSettings.optimalBatchSize,
workerCount=settings.humanExtractorStageSettings.listenerCount,
poolSize=settings.humanExtractorStageSettings.workerCount,
workerSettings=self.settings.humanExtractorSettings,
workerInitializer=initHumanExtractorWorker,
)
self.routingMap[Stages.humanExtractor] = self._humanExtractorQueue
else:
self._humanExtractorQueue = None
self.humanExtractorHandler = None
if Stages.faceDetector in settings.availableStages:
self._faceDetectorQueue: NamedQueue = NamedQueue(name="faceDetector")
self.faceDetectorHandler: AsyncStageHandler = AsyncStageHandler(
stage=Stages.faceDetector,
executor=detectFaces,
inputQueue=self._faceDetectorQueue,
optimalStageBatchSize=settings.faceDetectorStageSettings.optimalBatchSize,
workerCount=settings.faceDetectorStageSettings.listenerCount,
poolSize=settings.faceDetectorStageSettings.workerCount,
workerSettings=self.settings.faceDetectorSettings,
workerInitializer=initFaceDetectorWorker,
)
self.routingMap[Stages.faceDetector] = self._faceDetectorQueue
else:
self._faceDetectorQueue = None
self.faceDetectorHandler = None
if Stages.faceExtractor in settings.availableStages:
self._faceExtractorQueue: NamedQueue = NamedQueue(name="faceExtractor")
self.faceExtractorHandler: AsyncStageHandler = AsyncStageHandler(
stage=Stages.faceExtractor,
executor=extractFace,
inputQueue=self._faceExtractorQueue,
optimalStageBatchSize=settings.faceExtractorStageSettings.optimalBatchSize,
workerCount=settings.faceExtractorStageSettings.listenerCount,
poolSize=settings.faceExtractorStageSettings.workerCount,
workerSettings=self.settings.faceExtractorSettings,
workerInitializer=initFaceExtractorWorker,
)
self.routingMap[Stages.faceExtractor] = self._faceExtractorQueue
else:
self._faceExtractorQueue = None
self.faceExtractorHandler = None
if Stages.faceEstimator in settings.availableStages:
self._faceEstimatorQueue: NamedQueue = NamedQueue(name="faceWarpEstimator")
self.faceEstimatorHandler: AsyncStageHandler = AsyncStageHandler(
stage=Stages.faceEstimator,
executor=estimate,
inputQueue=self._faceEstimatorQueue,
optimalStageBatchSize=settings.faceWarpEstimationStageSettings.optimalBatchSize,
workerCount=settings.faceWarpEstimationStageSettings.listenerCount,
poolSize=settings.faceWarpEstimationStageSettings.workerCount,
workerSettings=self.settings.faceWarpEstimatorSettings,
workerInitializer=initWarpEstimatorWorker,
)
self.routingMap[Stages.faceEstimator] = self._faceEstimatorQueue
else:
self._faceEstimatorQueue = None
self.faceEstimatorHandler = None
[docs] async def initialize(self):
"""
Initialize loop.
Start coroutine with infinite loop collect results loop, start task workers
"""
self.logger.info("start sdk loop")
self.loopScheduler = AsyncRunner(1, closeTimeout=1)
for handler in (
self.faceDetectorHandler,
self.faceExtractorHandler,
self.faceEstimatorHandler,
self.humanDetectorHandler,
self.humanExtractorHandler,
):
if handler is not None:
handler.run(resultQueue=self._resultsQueue, routingQueueMap=self.routingMap)
self._running = True
self.loopScheduler.runNoWait((self.collectResultLoop(),))
self.logger.info("sdk loop started")
[docs] def checkStages(self, task: SDKTask) -> None:
"""
Check detector/extractor/estimator using in task's pipeline and its availability
Args:
task: task
Raises:
ForbiddenProcess if one of pipeline stages needs service which turned off
"""
for stage in task.pipeline:
if stage not in self.settings.availableStages:
raise VLException(Error.ForbiddenProcess.format(stage.value), 403, False)
[docs] async def validate(self):
"""
Validate SDK loop.
Start SDK tasks to validate SDK services (detector, extractor, estimator).
Raises:
RuntimeError indicating the stage that failed to validate if one of the tasks returns an error
"""
self.logger.info("validate sdk loop")
imagePath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data", "warp.jpg")
with open(imagePath, "rb") as file:
image = file.read()
faceDetectorTargets = SDKFaceEstimationTargets(
estimateGaze=1,
estimateHeadPose=1,
estimateEyesAttributes=1,
estimateLiveness=SDKLivenessEstimationPolicy(estimate=1, qualityThreshold=0),
)
faceDetectionTargets = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceDetectorTargets)
faceExtractorTargets = SDKFaceEstimationTargets(estimateBasicAttributes=1, estimateFaceDescriptor=1)
faceExtractionTargets = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceExtractorTargets)
faceWarpEstimatorTargets = SDKFaceEstimationTargets(
estimateQuality=1, estimateMouthAttributes=1, estimateEmotions=1, estimateMask=1, estimateGlasses=1,
)
faceWarpEstimationTargets = SDKEstimationTargets(
estimateHuman=0, faceEstimationTargets=faceWarpEstimatorTargets
)
humanDetectionTargets = SDKEstimationTargets(
estimateFace=0, humanEstimationTargets=SDKHumanEstimationTargets(estimateHumanDescriptor=0)
)
humanExtractionTargets = SDKEstimationTargets(
estimateFace=0, humanEstimationTargets=SDKHumanEstimationTargets(estimateHumanDescriptor=1)
)
stagesMap = {
Stages.faceDetector: TaskData(SDKDetectableImage(filename="raw", body=image), faceDetectionTargets),
Stages.faceExtractor: TaskData(FaceWarp(filename="face warp", body=image), faceExtractionTargets),
Stages.faceEstimator: TaskData(FaceWarp(filename="face warp", body=image), faceWarpEstimationTargets),
Stages.humanDetector: TaskData(SDKDetectableImage(filename="raw", body=image), humanDetectionTargets),
Stages.humanExtractor: TaskData(HumanWarp(filename="human warp", body=image), humanExtractionTargets),
}
tasks = []
for stage in self.settings.availableStages:
taskData = stagesMap[stage]
tasks.append(
self.processTask(task=SDKTask(toEstimation=taskData.target, data=[taskData.image]), logger=self.logger)
)
for result in await asyncio.gather(*tasks):
if result.error:
raise RuntimeError(f"Failed to validate the sdk loop. Error: {result.error.asDict()}")
self.logger.info("sdk loop validated")
[docs] async def collectResultLoop(self):
"""
Infinity loop for collect results from the results queue
"""
while self._running:
task: SDKTask = await self._resultsQueue.get()
now = time()
task.monitoringData.executionTime.value = now - task.startExecutionTime
task.monitoringData.resultsQueueTransportTime.value = now - task.lastCheckpointTime
task.lastCheckpointTime = now
if task.taskId in self._results:
future: Future = self._results.pop(task.taskId)
if future.done():
# case of cancellation
pass
else:
future.set_result(task)
self._resultsQueue.task_done()
[docs] async def processTask(self, task: SDKTask, logger: Optional[Logger] = None) -> SDKTask:
"""
Process a sdk task.
Args:
task: task
logger: logger
Returns:
task with results
Raises:
RuntimeError: if ".pipeline" return unknown stage.
"""
self.checkStages(task)
if not logger:
logger = self.logger
future = asyncio.get_running_loop().create_future()
self._results[task.taskId] = future
task.startExecutionTime = time()
task.lastCheckpointTime = time()
for case in switch(task.pipeline[0]):
if case(Stages.humanDetector):
self._humanDetectorQueue.put_nowait(task)
logger.info(f"put sdk task {task.taskId} to human detector queue")
break
if case(Stages.humanExtractor):
self._humanExtractorQueue.put_nowait(task)
logger.info(f"put sdk task {task.taskId} to human extractor queue")
break
if case(Stages.faceDetector):
self._faceDetectorQueue.put_nowait(task)
logger.info(f"put sdk task {task.taskId} to face detector queue")
break
if case(Stages.faceEstimator):
self._faceEstimatorQueue.put_nowait(task)
logger.info(f"put sdk task {task.taskId} to face estimator queue")
break
if case(Stages.faceExtractor):
self._faceExtractorQueue.put_nowait(task)
logger.info(f"put sdk task {task.taskId} to face extractor queue")
break
if case():
raise RuntimeError("Unknown stage")
else:
raise RuntimeError("Unknown stage")
try:
result = await future
logger.info(f"sdk task {task.taskId} done")
return result
except CancelledError:
logger.info(f"sdk task {task.taskId} was cancelled")
if task.taskId in self._results:
self._results.pop(task.taskId).cancel()
raise
[docs] async def close(self):
"""
Close loop
"""
self.logger.info("stop loop")
self._running = False
for handler in (
self.faceDetectorHandler,
self.faceEstimatorHandler,
self.faceExtractorHandler,
self.humanDetectorHandler,
self.humanExtractorHandler,
):
if handler is not None:
await handler.stop()
self.logger.info(f"stopped sdk {handler.stage.name} handler")
await self.loopScheduler.close()
self.logger.info("stopped sdk loop")