Source code for luna_backport4.crutches_on_wheels.cow.monitoring.points

"""
Module contains points for monitoring.
"""

import typing
from abc import ABC, ABCMeta, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
from time import perf_counter
from types import NoneType, UnionType
from typing import Dict, Optional, Union

INFLUX_TYPES_MAP = {
    str: '%s="%s"',
    bool: "%s=%r",
    float: "%s=%f",
    int: "%s=%di",
}


[docs] class InfluxFormatter(str): """Format any point filed into inline format""" def __mod__(self, other): monitoringField, value = other return INFLUX_TYPES_MAP[type(value)] % (monitoringField, value)
INFLUX_FORMATTER = InfluxFormatter()
[docs] class MonitoringPointInfluxFormatBuilder(ABCMeta): """ Complement point class with explicit fields formatting function for the sake of better performance To perform type format building target class must have 'fields' property return value annotated with TypedDict. Target class might be configured via 'Config' class. Available options: extraFields: whether class should handle additional fields or not >>> from typing import TypedDict >>> >>> class MonitoringFields(TypedDict): ... field1: str ... field2: int ... field3: float ... field4: bool >>> >>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder): ... ... def __init__(self, fields: dict): ... self._fields = fields ... ... @property ... def tags(self): ... return {} ... ... @property ... def fields(self) -> MonitoringFields: ... return self._fields ... >>> class TestPointNoExtra(BasePoint): ... ... class Config: ... extraFields = False ... >>> class TestPointWithExtra(BasePoint): ... class Config: ... extraFields = True >>> >>> >>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False}) >>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True}) >>> print(point1.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False >>> print(point2.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False,extra=True """
[docs] @classmethod def buildInfluxFormats(mcs, annotations: dict, extraFields: bool) -> dict: """ Build map with influx formats for corresponding fields Args: annotations (dict): point fields type annotations extraFields (bool): whether point uses extra fields or not Returns: dict of fields with their format """ influxFormats = {} for _field, _type in annotations.items(): if isinstance(_type, typing._UnionGenericAlias | UnionType): types = [t for t in _type.__args__ if t != NoneType] if len(types) > 1: raise TypeError("Too many allowed types") _type = types[0] influxFormats[_field] = mcs.getTypeFormat(_type, _field, extraFields) return influxFormats
[docs] @staticmethod def getTypeFormat(_type: type, _field: str, extraFields: bool) -> str: """ Get field type format Args: _type (type): field type _field (str): field name extraFields (bool): whether point uses extra fields or not Returns: string format of the field """ try: if extraFields: return INFLUX_TYPES_MAP[_type] fieldFormat = INFLUX_TYPES_MAP[_type].split("=")[-1] return f"{_field}={fieldFormat}" except KeyError as err: raise TypeError( f"Got unsupported type for monitoring point, must be one of {INFLUX_TYPES_MAP.keys()}" ) from err
[docs] @staticmethod def convertFieldsToInfluxLineProtocolWithExtra(point): """Convert point fields into influx line protocol format with extra fields""" influxFormats = point._influxFormats return ",".join( (influxFormats.get(_field, INFLUX_FORMATTER) % (_field, value) for _field, value in point.fields.items()) )
[docs] @staticmethod def convertFieldsToInfluxLineProtocolNoExtra(point): """Convert point fields into influx line protocol format without extra fields""" influxFormats = point._influxFormats return ",".join((influxFormats[_field] % value for _field, value in point.fields.items()))
def __new__(mcs, name, bases, namespace, /, **kwargs): point = super().__new__(mcs, name, bases, namespace, **kwargs) fieldsType = point.fields.fget.__annotations__["return"] if not isinstance(fieldsType, typing._TypedDictMeta): return point extraFields = False if "Config" in namespace: extraFields = namespace["Config"].extraFields annotations = fieldsType.__annotations__ point._influxFormats = mcs.buildInfluxFormats(annotations, extraFields) if extraFields: point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolWithExtra else: point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolNoExtra return point
[docs] class BaseMonitoringPoint(ABC): """ Abstract class for points Attributes: eventTime (float): event time as timestamp """ __slots__ = ("eventTime", "series") # str: point series name ("errors", "request", etc.) series: str def __init__(self, eventTime: float): self.eventTime = eventTime @property @abstractmethod def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags from point. We supposed that tags are indexing data Returns: dict with tags. """ @property @abstractmethod def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields from point. We supposed that fields are not indexing data Returns: dict with fields. """
[docs] def getRoute(resource: str, method: str) -> str: """ Get a request route, concatenation of a request method and a request resource Args: resource: resource method: method Returns: "{method}:{resource}" """ return f"{method}:{resource}"
[docs] class BaseRequestMonitoringPoint(BaseMonitoringPoint): """ Base class for point which is associated with requests. Attributes: requestId (str): request id route (str): concatenation of a request method and a request resource service (str): service name statusCode (int): status code of a request response. """ __slots__ = ("requestId", "route", "service", "statusCode") def __init__(self, requestId: str, resource: str, method: str, requestTime: float, service: str, statusCode: int): super().__init__(requestTime) self.route = getRoute(resource, method) self.requestId = requestId self.service = service self.statusCode = statusCode @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags Returns: dict with following keys: "route", "service", "status_code" """ return {"route": self.route, "service": self.service, "status_code": self.statusCode} @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields Returns: dict with following keys: "request_id" """ return {"request_id": self.requestId}
[docs] class RequestMonitoringPoint(BaseRequestMonitoringPoint): """ Request monitoring point is suspended for monitoring all requests and measure a request time etc. Attributes: executionTime (float): execution time additionalTags (dict): additional tags which was specified for the request additionalFields (dict): additional fields which was specified for the request """ #: series "request" series = "requests" __slots__ = ("executionTime", "additionalTags", "additionalFields") def __init__( self, requestId: str, resource: str, method: str, executionTime: float, requestTime: float, service: str, statusCode: int, additionalTags: Optional[Dict[str, Union[str, float, int]]] = None, additionalFields: Optional[Dict[str, Union[str, float, int]]] = None, ): super().__init__( requestId=requestId, requestTime=requestTime, service=service, resource=resource, method=method, statusCode=statusCode, ) self.executionTime = executionTime self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags. Returns: dict with base tags and additional tags """ baseTags = super().tags baseTags.update(self.additionalTags) return baseTags @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields. Returns: dict with base fields, "execution_time" and additional tags """ baseFields = super().fields baseFields["execution_time"] = self.executionTime baseFields.update(self.additionalFields) return baseFields
[docs] class RequestErrorMonitoringPoint(BaseRequestMonitoringPoint): """ Request monitoring point is suspended for monitoring requests errors (error codes) Attributes: errorCode (int): error code additionalTags (dict): additional tags which was specified for the request additionalFields (dict): additional fields which was specified for the request """ __slots__ = ("additionalTags", "additionalFields", "errorCode") #: series "errors" series = "errors" def __init__( self, requestId: str, resource: str, method: str, errorCode: int, service: str, requestTime: float, statusCode: int, additionalTags: Optional[Dict[str, Union[str, float, int]]] = None, additionalFields: Optional[Dict[str, Union[str, float, int]]] = None, ): super().__init__( requestId=requestId, requestTime=requestTime, service=service, resource=resource, method=method, statusCode=statusCode, ) self.errorCode = errorCode self.additionalTags = additionalTags if additionalTags is not None else {} self.additionalFields = additionalFields if additionalFields is not None else {} @property def tags(self) -> Dict[str, Union[int, float, str]]: """ Get tags. Returns: dict with base tags, "error_code" and additional tags """ baseTags = super().tags baseTags["error_code"] = self.errorCode baseTags.update(self.additionalTags) return baseTags @property def fields(self) -> Dict[str, Union[int, float, str]]: """ Get fields. Returns: dict with base fields and additional tags """ baseFields = super().fields baseFields.update(self.additionalFields) return baseFields
[docs] @dataclass(slots=True) class DataForMonitoring: """ Class fo storing an additional data for monitoring. """ # (dict): tags, indexes data tags: dict = field(default_factory=dict) # pylint: disable-msg=invalid-field-call # (dict): fields, non indexes data fields: dict = field(default_factory=dict) # pylint: disable-msg=invalid-field-call def __iadd__(self, other: "DataForMonitoring"): """ Sum monitoring data Args: other: other monitoring data Returns: union monitoring data """ self.fields.update(other.fields) # pylint: disable-msg=E1101 self.tags.update(other.tags) # pylint: disable-msg=E1101 return self
[docs] @contextmanager def monitorTime(monitoringData: DataForMonitoring, fieldName: str): """ Context manager for timing execution time. Args: monitoringData: container for saving result fieldName: field name """ start = perf_counter() yield monitoringData.fields[fieldName] = perf_counter() - start