Source code for luna_tasks.crutches_on_wheels.monitoring.points

"""
Module contains points for monitoring's.
"""
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import field
from time import perf_counter
from typing import Dict, Optional, Union

from vlutils.structures.dataclasses import dataclass


[docs]class BaseMonitoringPoint(ABC): """ Abstract class for points Attributes: eventTime (float): event time as timestamp """ # str: point series name ("errors", "request", etc.) series: str __slots__ = ("eventTime",) def __init__(self, eventTime: float): self.eventTime = eventTime @property @abstractmethod def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags from point. We supposed that tags are indexing data Returns: dict with tags. """ @property @abstractmethod def fields(self) -> Dict[str, Union[int, float, str]]: """ Get tags from point. We supposed that fields are not indexing data Returns: dict with fields. """
[docs]def getRoute(resource: str, method: str) -> str: """ Get a request route, concatenation of a request method and a request resource Args: resource: resource method: method Returns: "{method}:{resource}" """ return f"{method}:{resource}"
[docs]class BaseRequestMonitoringPoint(BaseMonitoringPoint): """ Base class for point which is associated with requests. Attributes: requestId (str): request id route (str): concatenation of a request method and a request resource service (str): service name requestTime (float): a request processing start timestamp statusCode (int): status code of a request response. """ __slots__ = ("requestId", "route", "service", "eventTime", "statusCode") def __init__(self, requestId: str, resource: str, method: str, requestTime: float, service: str, statusCode: int): super().__init__(requestTime) self.route = getRoute(resource, method) self.requestId = requestId self.service = service self.statusCode = statusCode @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags Returns: dict with following keys: "route", "service", "status_code" """ return {"route": self.route, "service": self.service, "status_code": self.statusCode} @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields Returns: dict with following keys: "request_id" """ return {"request_id": self.requestId}
[docs]class RequestMonitoringPoint(BaseRequestMonitoringPoint): """ Request monitoring point is suspended for monitoring all requests and measure a request time and etc. Attributes: executionTime (float): execution time additionalTags (dict): additional tags which was specified for the request additionalFields (dict): additional fields which was specified for the request """ #: series "request" series = "requests" __slots__ = ("executionTime", "additionalTags", "additionalFields") def __init__( self, requestId: str, resource: str, method: str, executionTime: float, requestTime: float, service: str, statusCode: int, additionalTags: Optional[Dict[str, Union[str, float, int]]] = None, additionalFields: Optional[Dict[str, Union[str, float, int]]] = None, ): super().__init__( requestId=requestId, requestTime=requestTime, service=service, resource=resource, method=method, statusCode=statusCode, ) self.executionTime = executionTime self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags. Returns: dict with base tags and additional tags """ baseTags = super().tags baseTags.update(self.additionalTags) return baseTags @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields. Returns: dict with base fields, "execution_time" and additional tags """ baseFields = super().fields baseFields["execution_time"] = self.executionTime baseFields.update(self.additionalFields) return baseFields
[docs]class RequestErrorMonitoringPoint(BaseRequestMonitoringPoint): """ Request monitoring point is suspended for monitoring requests errors (error codes) Attributes: errorCode (int): error code additionalTags (dict): additional tags which was specified for the request additionalFields (dict): additional fields which was specified for the request """ __slots__ = ("additionalTags", "additionalFields", "errorCode") #: series "errors" series = "errors" def __init__( self, requestId: str, resource: str, method: str, errorCode: int, service: str, requestTime: float, statusCode: int, additionalTags: Optional[Dict[str, Union[str, float, int]]] = None, additionalFields: Optional[Dict[str, Union[str, float, int]]] = None, ): super().__init__( requestId=requestId, requestTime=requestTime, service=service, resource=resource, method=method, statusCode=statusCode, ) self.errorCode = errorCode self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags. Returns: dict with base tags, "error_code" and additional tags """ baseTags = super().tags baseTags["error_code"] = self.errorCode baseTags.update(self.additionalTags) return baseTags @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields. Returns: dict with base fields and additional tags """ baseFields = super().fields baseFields.update(self.additionalFields) return baseFields
[docs]@dataclass(withSlots=True) class DataForMonitoring: """ Class fo storing an additional data for monitoring. """ # (dict): tags, indexes data tags: dict = field(default_factory=dict) # (dict): fields, non indexes data fields: dict = field(default_factory=dict) def __iadd__(self, other: "DataForMonitoring"): """ Sum monitoring data Args: other: other monitoring data Returns: union monitoring data """ self.fields.update(other.fields) # pylint: disable-msg=E1101 self.tags.update(other.tags) # pylint: disable-msg=E1101 return self
[docs]@contextmanager def monitorTime(monitoringData: DataForMonitoring, fieldName: str): """ Context manager for timing execution time. Args: monitoringData: container for saving result fieldName: field name """ start = perf_counter() yield monitoringData.fields[fieldName] = perf_counter() - start