Source code for cow.monitoring.points
"""
Module contains points for monitoring.
"""
import typing
from abc import ABC, ABCMeta, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
from time import perf_counter
from types import NoneType, UnionType
from typing import Dict, Optional, Union
INFLUX_TYPES_MAP = {
    str: '%s="%s"',
    bool: "%s=%r",
    float: "%s=%f",
    int: "%s=%di",
}
[docs]
class InfluxFormatter(str):
    """Format any point filed into inline format"""
    def __mod__(self, other):
        monitoringField, value = other
        return INFLUX_TYPES_MAP[type(value)] % (monitoringField, value)
INFLUX_FORMATTER = InfluxFormatter()
[docs]
class MonitoringPointInfluxFormatBuilder(ABCMeta):
    """
    Complement point class with explicit fields formatting function for the sake of better performance
    To perform type format building target class must have 'fields' property return value annotated with TypedDict.
    Target class might be configured via 'Config' class.
    Available options:
        extraFields: whether class should handle additional fields or not
    >>> from typing import TypedDict
    >>>
    >>> class MonitoringFields(TypedDict):
    ...     field1: str
    ...     field2: int
    ...     field3: float
    ...     field4: bool
    >>>
    >>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder):
    ...
    ...     def __init__(self, fields: dict):
    ...         self._fields = fields
    ...
    ...     @property
    ...     def tags(self):
    ...         return {}
    ...
    ...     @property
    ...     def fields(self) -> MonitoringFields:
    ...         return self._fields
    ...
    >>> class TestPointNoExtra(BasePoint):
    ...
    ...     class Config:
    ...         extraFields = False
    ...
    >>> class TestPointWithExtra(BasePoint):
    ...     class Config:
    ...         extraFields = True
    >>>
    >>>
    >>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False})
    >>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True})
    >>> print(point1.convertFieldsToInfluxLineProtocol())
    field1="data",field2=1i,field3=1.000000,field4=False
    >>> print(point2.convertFieldsToInfluxLineProtocol())
    field1="data",field2=1i,field3=1.000000,field4=False,extra=True
    """
[docs]
    @classmethod
    def buildInfluxFormats(mcs, annotations: dict, extraFields: bool) -> dict:
        """
        Build map with influx formats for corresponding fields
        Args:
            annotations (dict): point fields type annotations
            extraFields (bool): whether point uses extra fields or not
        Returns:
            dict of fields with their format
        """
        influxFormats = {}
        for _field, _type in annotations.items():
            if isinstance(_type, typing._UnionGenericAlias | UnionType):
                types = [t for t in _type.__args__ if t != NoneType]
                if len(types) > 1:
                    raise TypeError("Too many allowed types")
                _type = types[0]
            influxFormats[_field] = mcs.getTypeFormat(_type, _field, extraFields)
        return influxFormats
[docs]
    @staticmethod
    def getTypeFormat(_type: type, _field: str, extraFields: bool) -> str:
        """
        Get field type format
        Args:
            _type (type): field type
            _field (str): field name
            extraFields (bool): whether point uses extra fields or not
        Returns:
            string format of the field
        """
        try:
            if extraFields:
                return INFLUX_TYPES_MAP[_type]
            fieldFormat = INFLUX_TYPES_MAP[_type].split("=")[-1]
            return f"{_field}={fieldFormat}"
        except KeyError as err:
            raise TypeError(
                f"Got unsupported type for monitoring point, must be one of {INFLUX_TYPES_MAP.keys()}"
            ) from err
[docs]
    @staticmethod
    def convertFieldsToInfluxLineProtocolWithExtra(point):
        """Convert point fields into influx line protocol format with extra fields"""
        influxFormats = point._influxFormats
        return ",".join(
            (influxFormats.get(_field, INFLUX_FORMATTER) % (_field, value) for _field, value in point.fields.items())
        )
[docs]
    @staticmethod
    def convertFieldsToInfluxLineProtocolNoExtra(point):
        """Convert point fields into influx line protocol format without extra fields"""
        influxFormats = point._influxFormats
        return ",".join((influxFormats[_field] % value for _field, value in point.fields.items()))
    def __new__(mcs, name, bases, namespace, /, **kwargs):
        point = super().__new__(mcs, name, bases, namespace, **kwargs)
        fieldsType = point.fields.fget.__annotations__["return"]
        if not isinstance(fieldsType, typing._TypedDictMeta):
            return point
        extraFields = False
        if "Config" in namespace:
            extraFields = namespace["Config"].extraFields
        annotations = fieldsType.__annotations__
        point._influxFormats = mcs.buildInfluxFormats(annotations, extraFields)
        if extraFields:
            point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolWithExtra
        else:
            point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolNoExtra
        return point
[docs]
class BaseMonitoringPoint(ABC):
    """
    Abstract class for points
    Attributes:
        eventTime (float): event time as timestamp
    """
    __slots__ = ("eventTime", "series")
    # str: point series name ("errors", "request", etc.)
    series: str
    def __init__(self, eventTime: float):
        self.eventTime = eventTime
    @property
    @abstractmethod
    def tags(self) -> Dict[str, Union[int, float, str]]:
        """
        Get tags from point. We supposed that tags are indexing data
        Returns:
            dict with tags.
        """
    @property
    @abstractmethod
    def fields(self) -> Dict[str, Union[int, float, str]]:
        """
        Get fields from point. We supposed that fields are not indexing data
        Returns:
            dict with fields.
        """
[docs]
def getRoute(resource: str, method: str) -> str:
    """
    Get a request route, concatenation of a request method and a request resource
    Args:
        resource: resource
        method: method
    Returns:
        "{method}:{resource}"
    """
    return f"{method}:{resource}"
[docs]
class BaseRequestMonitoringPoint(BaseMonitoringPoint):
    """
    Base class for point which is associated with requests.
    Attributes:
        requestId (str): request id
        route (str): concatenation of a request method and a request resource
        service (str): service name
        statusCode (int): status code of a request response.
    """
    __slots__ = ("requestId", "route", "service", "statusCode")
    def __init__(self, requestId: str, resource: str, method: str, requestTime: float, service: str, statusCode: int):
        super().__init__(requestTime)
        self.route = getRoute(resource, method)
        self.requestId = requestId
        self.service = service
        self.statusCode = statusCode
    @property
    def tags(self) -> Dict[str, Union[int, float, str]]:
        """
        Get tags
        Returns:
            dict with following keys: "route", "service", "status_code"
        """
        return {"route": self.route, "service": self.service, "status_code": self.statusCode}
    @property
    def fields(self) -> Dict[str, Union[int, float, str]]:
        """
        Get fields
        Returns:
            dict with following keys: "request_id"
        """
        return {"request_id": self.requestId}
[docs]
class RequestMonitoringPoint(BaseRequestMonitoringPoint):
    """
    Request monitoring point is suspended for monitoring all requests and measure a request time etc.
    Attributes:
        executionTime (float): execution time
        additionalTags (dict): additional tags which was specified for the request
        additionalFields (dict): additional fields which was specified for the request
    """
    #: series "request"
    series = "requests"
    __slots__ = ("executionTime", "additionalTags", "additionalFields")
    def __init__(
        self,
        requestId: str,
        resource: str,
        method: str,
        executionTime: float,
        requestTime: float,
        service: str,
        statusCode: int,
        additionalTags: Optional[Dict[str, Union[str, float, int]]] = None,
        additionalFields: Optional[Dict[str, Union[str, float, int]]] = None,
    ):
        super().__init__(
            requestId=requestId,
            requestTime=requestTime,
            service=service,
            resource=resource,
            method=method,
            statusCode=statusCode,
        )
        self.executionTime = executionTime
        self.additionalTags = additionalTags if additionalTags is not None else {}
        self.additionalFields = additionalFields if additionalFields is not None else {}
    @property
    def tags(self) -> Dict[str, Union[int, float, str]]:
        """
        Get tags.
        Returns:
            dict with base tags and additional tags
        """
        baseTags = super().tags
        baseTags.update(self.additionalTags)
        return baseTags
    @property
    def fields(self) -> Dict[str, Union[int, float, str]]:
        """
        Get fields.
        Returns:
            dict with base fields, "execution_time" and additional tags
        """
        baseFields = super().fields
        baseFields["execution_time"] = self.executionTime
        baseFields.update(self.additionalFields)
        return baseFields
[docs]
class RequestErrorMonitoringPoint(BaseRequestMonitoringPoint):
    """
    Request monitoring point is suspended for monitoring requests errors (error codes)
    Attributes:
        errorCode (int): error code
        additionalTags (dict): additional tags which was specified for the request
        additionalFields (dict): additional fields which was specified for the request
    """
    __slots__ = ("additionalTags", "additionalFields", "errorCode")
    #: series "errors"
    series = "errors"
    def __init__(
        self,
        requestId: str,
        resource: str,
        method: str,
        errorCode: int,
        service: str,
        requestTime: float,
        statusCode: int,
        additionalTags: Optional[Dict[str, Union[str, float, int]]] = None,
        additionalFields: Optional[Dict[str, Union[str, float, int]]] = None,
    ):
        super().__init__(
            requestId=requestId,
            requestTime=requestTime,
            service=service,
            resource=resource,
            method=method,
            statusCode=statusCode,
        )
        self.errorCode = errorCode
        self.additionalTags = additionalTags if additionalTags is not None else {}
        self.additionalFields = additionalFields if additionalFields is not None else {}
    @property
    def tags(self) -> Dict[str, Union[int, float, str]]:
        """
        Get tags.
        Returns:
            dict with base tags, "error_code" and  additional tags
        """
        baseTags = super().tags
        baseTags["error_code"] = self.errorCode
        baseTags.update(self.additionalTags)
        return baseTags
    @property
    def fields(self) -> Dict[str, Union[int, float, str]]:
        """
        Get fields.
        Returns:
            dict with base fields and additional tags
        """
        baseFields = super().fields
        baseFields.update(self.additionalFields)
        return baseFields
[docs]
@dataclass(slots=True)
class DataForMonitoring:
    """
    Class fo storing an additional data for monitoring.
    """
    # (dict): tags, indexes data
    tags: dict = field(default_factory=dict)  # pylint: disable-msg=invalid-field-call
    # (dict): fields, non indexes data
    fields: dict = field(default_factory=dict)  # pylint: disable-msg=invalid-field-call
    def __iadd__(self, other: "DataForMonitoring"):
        """
        Sum monitoring data
        Args:
            other: other monitoring data
        Returns:
            union monitoring data
        """
        self.fields.update(other.fields)  # pylint: disable-msg=E1101
        self.tags.update(other.tags)  # pylint: disable-msg=E1101
        return self
[docs]
@contextmanager
def monitorTime(monitoringData: DataForMonitoring, fieldName: str):
    """
    Context manager for timing  execution time.
    Args:
        monitoringData: container for saving result
        fieldName: field name
    """
    start = perf_counter()
    yield
    monitoringData.fields[fieldName] = perf_counter() - start