from typing import Optional
from crutches_on_wheels.cow.enums.tasks import SubTaskStatus, TaskType
from crutches_on_wheels.cow.monitoring.points import BaseMonitoringPoint
[docs]class SubtaskMonitoringBasePoint(BaseMonitoringPoint):
"""
Base class for monitoring an subtask execution
Attributes:
subtaskId (int): subtask id
taskId (int): task id
taskType (int): task type
"""
#: service name
service: str
__slots__ = ("subtaskId", "taskId", "taskType", "subTaskId")
def __init__(self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType):
super().__init__(eventTime)
self.taskId = taskId
self.subTaskId = subtaskId
self.taskType = taskType.value
[docs] @classmethod
def initialize(cls, serviceName: str):
"""
Initialize request point for custom service.
Args:
serviceName: service name
"""
cls.service = serviceName
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
baseTags = {
"service": self.service,
"task_id": self.taskId,
"subtask_id": self.subTaskId,
"task_type": self.taskType,
}
return baseTags
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
return {}
[docs]class SubtaskExecutionPoint(SubtaskMonitoringBasePoint):
"""
Point for monitoring subtask execution
Attributes:
additionalTags (dict): additional tags for point
additionalFields (dict): additional fields for point
"""
#: series
series = "subtask_execution"
__slots__ = ("additionalTags", "additionalFields")
def __init__(
self,
eventTime: float,
subtaskId: int,
taskId: int,
taskType: TaskType,
additionalTags: Optional[dict] = None,
additionalFields: Optional[dict] = None,
):
super().__init__(eventTime, subtaskId, taskId, taskType)
self.additionalTags = additionalTags if additionalTags is not None else {}
self.additionalFields = additionalFields if additionalFields is not None else {}
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
baseTags = super().tags
baseTags.update(self.additionalTags)
return baseTags
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
baseFields = super().fields
baseFields.update(self.additionalFields)
return baseFields
[docs]class SubtaskFinalPoint(SubtaskExecutionPoint):
"""
Point for monitoring subtask ending.
Attributes:
additionalTags (dict): additional tags for point
additionalFields (dict): additional fields for point
executionTime (float): subtask execution time
status (int): subtask status
"""
#: series
series = "subtask_final"
__slots__ = ("additionalTags", "additionalFields", "executionTime", "status")
def __init__(
self,
eventTime: float,
subTaskId: int,
taskId: int,
taskType: TaskType,
executionTime: float,
status: SubTaskStatus,
additionalTags: Optional[dict] = None,
additionalFields: Optional[dict] = None,
):
super().__init__(
eventTime, subTaskId, taskId, taskType, additionalFields=additionalFields, additionalTags=additionalTags
)
self.executionTime = executionTime
self.status = status.value
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
baseTags = super().tags
baseTags.update(**self.additionalTags, **{"status": self.status})
return baseTags
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
baseFields = super().fields
baseFields.update(**{"execution_time": self.executionTime}, **self.additionalFields)
return baseFields
[docs]class WorkerQueuePoint(SubtaskMonitoringBasePoint):
"""
Point for a worker queue monitoring
Attributes:
transportTime (float): time between receiving task of a worker and start execution
"""
#: series
series = "worker_queue"
__slots__ = ("transportTime",)
def __init__(self, eventTime: float, subtaskId: int, taskId: int, taskType: TaskType, transportTime: float):
super().__init__(eventTime, subtaskId, taskId, taskType)
self.transportTime = transportTime
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
return super().tags
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
baseFields = super().fields
baseFields.update({"transport_time": self.transportTime})
return baseFields
[docs]class SubtaskErrorPoint(SubtaskMonitoringBasePoint):
"""
Point for sub task error monitoring.
Attributes:
errorCode (int): error code
additionalTags (dict): additional tags
additionalFields (dict): additional fields
"""
#: series
series = "subtask_errors"
__slots__ = ("additionalTags", "additionalFields", "errorCode")
def __init__(
self,
eventTime: float,
subtaskId: int,
taskId: int,
taskType: TaskType,
errorCode: int,
additionalTags: Optional[dict] = None,
additionalFields: Optional[dict] = None,
):
super().__init__(eventTime, subtaskId, taskId, taskType)
self.additionalTags = additionalTags if additionalTags is not None else {}
self.additionalFields = additionalFields if additionalFields is not None else {}
self.errorCode = errorCode
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
baseTags = super().tags
baseTags.update(**self.additionalTags, error_code=self.errorCode)
return baseTags
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
baseFields = super().fields
baseFields.update(**self.additionalFields, error_code=self.errorCode)
return baseFields
[docs]class WorkerRequestPoint(BaseMonitoringPoint):
"""
Point for monitoring queue of sending subtasks to workers.
Attributes:
taskId (int): task id
subtaskId (int): subtask id
transportTime (float): initialize sending a task subtasks and send subtask to thr worker execution
"""
#: series
series: str = "worker_requests"
#: service name
service: str
__slots__ = ("transportTime", "taskId", "subtaskId")
def __init__(self, eventTime: float, subtaskId: int, taskId: int, transportTime: float):
super().__init__(eventTime)
self.taskId = taskId
self.subtaskId = subtaskId
self.transportTime = transportTime
[docs] @classmethod
def initialize(cls, serviceName: str):
"""
Initialize request point for custom service.
Args:
serviceName: service name
"""
cls.service = serviceName
@property
def tags(self) -> dict:
"""
Get point tags
Returns:
dict with service as keys + additionalTags
"""
return {"service": self.service, "task_id": self.taskId, "subtask_id": self.subtaskId}
@property
def fields(self) -> dict:
"""
Get point fields
Returns:
dict with additional fields
"""
baseFields = {"transport_time": self.transportTime}
return baseFields