Source code for luna_tasks.monitoring.points

from typing import Optional

from crutches_on_wheels.cow.enums.tasks import SubTaskStatus, TaskType
from crutches_on_wheels.cow.monitoring.points import BaseMonitoringPoint


[docs] class SubtaskMonitoringBasePoint(BaseMonitoringPoint): """ Base class for monitoring an subtask execution Attributes: subtaskId (int): subtask id taskId (int): task id taskType (int): task type """ #: service name service: str __slots__ = ("subtaskId", "taskId", "taskType", "subTaskId") def __init__(self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType): super().__init__(eventTime) self.taskId = taskId self.subTaskId = subtaskId self.taskType = taskType.value
[docs] @classmethod def initialize(cls, serviceName: str): """ Initialize request point for custom service. Args: serviceName: service name """ cls.service = serviceName
@property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ baseTags = { "service": self.service, "task_id": self.taskId, "subtask_id": self.subTaskId, "task_type": self.taskType, } return baseTags @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ return {}
[docs] class SubtaskExecutionPoint(SubtaskMonitoringBasePoint): """ Point for monitoring subtask execution Attributes: additionalTags (dict): additional tags for point additionalFields (dict): additional fields for point """ #: series series = "subtask_execution" __slots__ = ("additionalTags", "additionalFields") def __init__( self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType, additionalTags: Optional[dict] = None, additionalFields: Optional[dict] = None, ): super().__init__(eventTime, subtaskId, taskId, taskType) self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} @property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ baseTags = super().tags baseTags.update(self.additionalTags) return baseTags @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ baseFields = super().fields baseFields.update(self.additionalFields) return baseFields
[docs] class SubtaskFinalPoint(SubtaskExecutionPoint): """ Point for monitoring subtask ending. Attributes: additionalTags (dict): additional tags for point additionalFields (dict): additional fields for point executionTime (float): subtask execution time status (int): subtask status """ #: series series = "subtask_final" __slots__ = ("additionalTags", "additionalFields", "executionTime", "status") def __init__( self, eventTime: float, subTaskId: int, taskId: int, taskType: TaskType, executionTime: float, status: SubTaskStatus, additionalTags: Optional[dict] = None, additionalFields: Optional[dict] = None, ): super().__init__( eventTime, subTaskId, taskId, taskType, additionalFields=additionalFields, additionalTags=additionalTags ) self.executionTime = executionTime self.status = status.value @property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ baseTags = super().tags baseTags.update(**self.additionalTags, **{"status": self.status}) return baseTags @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ baseFields = super().fields baseFields.update(**{"execution_time": self.executionTime}, **self.additionalFields) return baseFields
[docs] class WorkerQueuePoint(SubtaskMonitoringBasePoint): """ Point for a worker queue monitoring Attributes: transportTime (float): time between receiving task of a worker and start execution """ #: series series = "worker_queue" __slots__ = ("transportTime",) def __init__(self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType, transportTime: float): super().__init__(eventTime, subtaskId, taskId, taskType) self.transportTime = transportTime @property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ return super().tags @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ baseFields = super().fields baseFields.update({"transport_time": self.transportTime}) return baseFields
[docs] class SubtaskErrorPoint(SubtaskMonitoringBasePoint): """ Point for sub task error monitoring. Attributes: errorCode (int): error code additionalTags (dict): additional tags additionalFields (dict): additional fields """ #: series series = "subtask_errors" __slots__ = ("additionalTags", "additionalFields", "errorCode") def __init__( self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType, errorCode: int, additionalTags: Optional[dict] = None, additionalFields: Optional[dict] = None, ): super().__init__(eventTime, subtaskId, taskId, taskType) self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} self.errorCode = errorCode @property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ baseTags = super().tags baseTags.update(**self.additionalTags, error_code=self.errorCode) return baseTags @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ baseFields = super().fields baseFields.update(**self.additionalFields, error_code=self.errorCode) return baseFields
[docs] class WorkerRequestPoint(BaseMonitoringPoint): """ Point for monitoring queue of sending subtasks to workers. Attributes: taskId (int): task id subtaskId (int): subtask id transportTime (float): initialize sending a task subtasks and send subtask to thr worker execution """ #: series series: str = "worker_requests" #: service name service: str __slots__ = ("transportTime", "taskId", "subtaskId") def __init__(self, eventTime: float, subtaskId: int, taskId: int, transportTime: float): super().__init__(eventTime) self.taskId = taskId self.subtaskId = subtaskId self.transportTime = transportTime
[docs] @classmethod def initialize(cls, serviceName: str): """ Initialize request point for custom service. Args: serviceName: service name """ cls.service = serviceName
@property def tags(self) -> dict: """ Get point tags Returns: dict with service as keys + additionalTags """ return {"service": self.service, "task_id": self.taskId, "subtask_id": self.subtaskId} @property def fields(self) -> dict: """ Get point fields Returns: dict with additional fields """ baseFields = {"transport_time": self.transportTime} return baseFields