Source code for luna_handlers.sdk.sdk_loop.monitoring

"""
Loop task monitoring module.
"""
from typing import Any, Optional

from .crutches_on_wheels.monitoring.points import DataForMonitoring


[docs]class MonitoringField: """ Monitoring field. Attributes: fieldName (str) in monitoring value: value """ __slots__ = ("fieldName", "value") def __init__(self, fieldName: str, value: Optional[Any] = None): self.fieldName: str = fieldName self.value: Any = value def __repr__(self) -> str: """ Representation. Returns: "<self.fieldName>: <self.value>" """ return f"{self.fieldName}: {self.value}"
[docs]class TaskMonitoringData: """ Container with list of all monitoring fields for sdk task. Attributes: executionTime (float): task execution Time resultsQueueTransportTime (float): task transport time in results queue faceDetectorQueueTransportTime (float): task transport time in detector queue faceExtractorQueueTransportTime (float): task transport time in extractor queue faceWarpEstimatorQueueTransportTime (float): task transport time in warp estimator queue faceDetectorStageTime (float): face detector stage time faceExtractorStageTime (float): extractor stage time faceWarpEstimatorStageTime (float): warp estimator stage time faceDetectTime (float): face detection time faceDescriptorExtractTime (float): face descriptor extact time basicAttributesExtractTime (float): basic attributes extract time gazeEstimationTime (float): gaze direction estimation time headPoseEstimationTime (float): head pose direction estimation time eyesEstimationTime (float): eyes attribute estimation time agsEstimationTime (float): ags estimation time warpQualityEstimationTime (float): warp quality direction estimation time emotionsEstimationTime (float): emotions estimation time mouthStateEstimationTime (float): mouth state estimation time maskEstimationTime (float): mask estimation time glassesEstimationTime (float): glasses estimation time livenessEstimationTime (float): liveness estimation time """ __slots__ = ( "executionTime", "resultsQueueTransportTime", "faceDetectorQueueTransportTime", "humanDetectorQueueTransportTime", "faceExtractorQueueTransportTime", "faceWarpEstimatorQueueTransportTime", "humanExtractorQueueTransportTime", "faceDetectorStageTime", "humanDetectorStageTime", "faceExtractorStageTime", "humanExtractorStageTime", "faceWarpEstimatorStageTime", "faceDetectTime", "humanDetectTime", "faceDescriptorExtractTime", "humanDescriptorExtractTime", "basicAttributesExtractTime", "gazeEstimationTime", "headPoseEstimationTime", "eyesEstimationTime", "agsEstimationTime", "warpQualityEstimationTime", "emotionsEstimationTime", "mouthStateEstimationTime", "maskEstimationTime", "glassesEstimationTime", "livenessEstimationTime", ) def __init__(self): self.executionTime = MonitoringField("task_execution_time") self.resultsQueueTransportTime = MonitoringField("results_queue_transport_time") self.faceDetectorQueueTransportTime = MonitoringField("face_detector_queue_transport_time") self.humanDetectorQueueTransportTime = MonitoringField("human_detector_queue_transport_time") self.faceExtractorQueueTransportTime = MonitoringField("face_extractor_queue_transport_time") self.humanExtractorQueueTransportTime = MonitoringField("human_extractor_queue_transport_time") self.faceWarpEstimatorQueueTransportTime = MonitoringField( "face_warp_attributes_estimator_queue_transport_time" ) self.faceDetectorStageTime = MonitoringField("face_detector_stage_time") self.humanDetectorStageTime = MonitoringField("human_detector_stage_time") self.faceExtractorStageTime = MonitoringField("face_extractor_stage_time") self.humanExtractorStageTime = MonitoringField("human_extractor_stage_time") self.faceWarpEstimatorStageTime = MonitoringField("face_warp_estimator_stage_time") self.faceDetectTime = MonitoringField("faces_detect_time") self.humanDetectTime = MonitoringField("humans_detect_time") self.faceDescriptorExtractTime = MonitoringField("face_descriptors_extract_time") self.humanDescriptorExtractTime = MonitoringField("human_descriptors_extract_time") self.basicAttributesExtractTime = MonitoringField("basic_attributes_extract_time") self.gazeEstimationTime = MonitoringField("gaze_direction_estimation_time") self.headPoseEstimationTime = MonitoringField("head_pose_estimation_time") self.eyesEstimationTime = MonitoringField("eyes_attributes_estimation_time") self.agsEstimationTime = MonitoringField("ags_estimation_time") self.warpQualityEstimationTime = MonitoringField("quality_estimation_time") self.emotionsEstimationTime = MonitoringField("emotions_estimation_time") self.mouthStateEstimationTime = MonitoringField("mouth_attributes_estimation_time") self.maskEstimationTime = MonitoringField("mask_estimation_time") self.glassesEstimationTime = MonitoringField("glasses_estimation_time") self.livenessEstimationTime = MonitoringField("liveness_estimation_time")
[docs] def toDataForMonitoring(self) -> DataForMonitoring: """ Convert to DataForMonitoring format. Returns: data without empty fields """ fields = {} for slot in self.__slots__: field: MonitoringField = getattr(self, slot) if field.value is not None: fields[field.fieldName] = field.value return DataForMonitoring(fields=fields)