Source code for luna_backport4.crutches_on_wheels.cow.monitoring.points
"""
Module contains points for monitoring.
"""
import typing
from abc import ABC, ABCMeta, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
from time import perf_counter
from types import NoneType, UnionType
from typing import Dict, Optional, Union
INFLUX_TYPES_MAP = {
str: '%s="%s"',
bool: "%s=%r",
float: "%s=%f",
int: "%s=%di",
}
[docs]
class InfluxFormatter(str):
"""Format any point filed into inline format"""
def __mod__(self, other):
monitoringField, value = other
return INFLUX_TYPES_MAP[type(value)] % (monitoringField, value)
INFLUX_FORMATTER = InfluxFormatter()
[docs]
class MonitoringPointInfluxFormatBuilder(ABCMeta):
"""
Complement point class with explicit fields formatting function for the sake of better performance
To perform type format building target class must have 'fields' property return value annotated with TypedDict.
Target class might be configured via 'Config' class.
Available options:
extraFields: whether class should handle additional fields or not
>>> from typing import TypedDict
>>>
>>> class MonitoringFields(TypedDict):
... field1: str
... field2: int
... field3: float
... field4: bool
>>>
>>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder):
...
... def __init__(self, fields: dict):
... self._fields = fields
...
... @property
... def tags(self):
... return {}
...
... @property
... def fields(self) -> MonitoringFields:
... return self._fields
...
>>> class TestPointNoExtra(BasePoint):
...
... class Config:
... extraFields = False
...
>>> class TestPointWithExtra(BasePoint):
... class Config:
... extraFields = True
>>>
>>>
>>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False})
>>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True})
>>> print(point1.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False
>>> print(point2.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False,extra=True
"""
[docs]
@classmethod
def buildInfluxFormats(mcs, annotations: dict, extraFields: bool) -> dict:
"""
Build map with influx formats for corresponding fields
Args:
annotations (dict): point fields type annotations
extraFields (bool): whether point uses extra fields or not
Returns:
dict of fields with their format
"""
influxFormats = {}
for _field, _type in annotations.items():
if isinstance(_type, typing._UnionGenericAlias | UnionType):
types = [t for t in _type.__args__ if t != NoneType]
if len(types) > 1:
raise TypeError("Too many allowed types")
_type = types[0]
influxFormats[_field] = mcs.getTypeFormat(_type, _field, extraFields)
return influxFormats
[docs]
@staticmethod
def getTypeFormat(_type: type, _field: str, extraFields: bool) -> str:
"""
Get field type format
Args:
_type (type): field type
_field (str): field name
extraFields (bool): whether point uses extra fields or not
Returns:
string format of the field
"""
try:
if extraFields:
return INFLUX_TYPES_MAP[_type]
fieldFormat = INFLUX_TYPES_MAP[_type].split("=")[-1]
return f"{_field}={fieldFormat}"
except KeyError as err:
raise TypeError(
f"Got unsupported type for monitoring point, must be one of {INFLUX_TYPES_MAP.keys()}"
) from err
[docs]
@staticmethod
def convertFieldsToInfluxLineProtocolWithExtra(point):
"""Convert point fields into influx line protocol format with extra fields"""
influxFormats = point._influxFormats
return ",".join(
(influxFormats.get(_field, INFLUX_FORMATTER) % (_field, value) for _field, value in point.fields.items())
)
[docs]
@staticmethod
def convertFieldsToInfluxLineProtocolNoExtra(point):
"""Convert point fields into influx line protocol format without extra fields"""
influxFormats = point._influxFormats
return ",".join((influxFormats[_field] % value for _field, value in point.fields.items()))
def __new__(mcs, name, bases, namespace, /, **kwargs):
point = super().__new__(mcs, name, bases, namespace, **kwargs)
fieldsType = point.fields.fget.__annotations__["return"]
if not isinstance(fieldsType, typing._TypedDictMeta):
return point
extraFields = False
if "Config" in namespace:
extraFields = namespace["Config"].extraFields
annotations = fieldsType.__annotations__
point._influxFormats = mcs.buildInfluxFormats(annotations, extraFields)
if extraFields:
point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolWithExtra
else:
point.convertFieldsToInfluxLineProtocol = mcs.convertFieldsToInfluxLineProtocolNoExtra
return point
[docs]
class BaseMonitoringPoint(ABC):
"""
Abstract class for points
Attributes:
eventTime (float): event time as timestamp
"""
__slots__ = ("eventTime", "series")
# str: point series name ("errors", "request", etc.)
series: str
def __init__(self, eventTime: float):
self.eventTime = eventTime
@property
@abstractmethod
def tags(self) -> Dict[str, Union[int, float, str]]:
"""
Get tags from point. We supposed that tags are indexing data
Returns:
dict with tags.
"""
@property
@abstractmethod
def fields(self) -> Dict[str, Union[int, float, str]]:
"""
Get fields from point. We supposed that fields are not indexing data
Returns:
dict with fields.
"""
[docs]
def getRoute(resource: str, method: str) -> str:
"""
Get a request route, concatenation of a request method and a request resource
Args:
resource: resource
method: method
Returns:
"{method}:{resource}"
"""
return f"{method}:{resource}"
[docs]
class BaseRequestMonitoringPoint(BaseMonitoringPoint):
"""
Base class for point which is associated with requests.
Attributes:
requestId (str): request id
route (str): concatenation of a request method and a request resource
service (str): service name
statusCode (int): status code of a request response.
"""
__slots__ = ("requestId", "route", "service", "statusCode")
def __init__(self, requestId: str, resource: str, method: str, requestTime: float, service: str, statusCode: int):
super().__init__(requestTime)
self.route = getRoute(resource, method)
self.requestId = requestId
self.service = service
self.statusCode = statusCode
@property
def tags(self) -> Dict[str, Union[int, float, str]]:
"""
Get tags
Returns:
dict with following keys: "route", "service", "status_code"
"""
return {"route": self.route, "service": self.service, "status_code": self.statusCode}
@property
def fields(self) -> Dict[str, Union[int, float, str]]:
"""
Get fields
Returns:
dict with following keys: "request_id"
"""
return {"request_id": self.requestId}
[docs]
class RequestMonitoringPoint(BaseRequestMonitoringPoint):
"""
Request monitoring point is suspended for monitoring all requests and measure a request time etc.
Attributes:
executionTime (float): execution time
additionalTags (dict): additional tags which was specified for the request
additionalFields (dict): additional fields which was specified for the request
"""
#: series "request"
series = "requests"
__slots__ = ("executionTime", "additionalTags", "additionalFields")
def __init__(
self,
requestId: str,
resource: str,
method: str,
executionTime: float,
requestTime: float,
service: str,
statusCode: int,
additionalTags: Optional[Dict[str, Union[str, float, int]]] = None,
additionalFields: Optional[Dict[str, Union[str, float, int]]] = None,
):
super().__init__(
requestId=requestId,
requestTime=requestTime,
service=service,
resource=resource,
method=method,
statusCode=statusCode,
)
self.executionTime = executionTime
self.additionalTags = additionalTags if additionalTags is not None else {}
self.additionalFields = additionalFields if additionalFields is not None else {}
@property
def tags(self) -> Dict[str, Union[int, float, str]]:
"""
Get tags.
Returns:
dict with base tags and additional tags
"""
baseTags = super().tags
baseTags.update(self.additionalTags)
return baseTags
@property
def fields(self) -> Dict[str, Union[int, float, str]]:
"""
Get fields.
Returns:
dict with base fields, "execution_time" and additional tags
"""
baseFields = super().fields
baseFields["execution_time"] = self.executionTime
baseFields.update(self.additionalFields)
return baseFields
[docs]
class RequestErrorMonitoringPoint(BaseRequestMonitoringPoint):
"""
Request monitoring point is suspended for monitoring requests errors (error codes)
Attributes:
errorCode (int): error code
additionalTags (dict): additional tags which was specified for the request
additionalFields (dict): additional fields which was specified for the request
"""
__slots__ = ("additionalTags", "additionalFields", "errorCode")
#: series "errors"
series = "errors"
def __init__(
self,
requestId: str,
resource: str,
method: str,
errorCode: int,
service: str,
requestTime: float,
statusCode: int,
additionalTags: Optional[Dict[str, Union[str, float, int]]] = None,
additionalFields: Optional[Dict[str, Union[str, float, int]]] = None,
):
super().__init__(
requestId=requestId,
requestTime=requestTime,
service=service,
resource=resource,
method=method,
statusCode=statusCode,
)
self.errorCode = errorCode
self.additionalTags = additionalTags if additionalTags is not None else {}
self.additionalFields = additionalFields if additionalFields is not None else {}
@property
def tags(self) -> Dict[str, Union[int, float, str]]:
"""
Get tags.
Returns:
dict with base tags, "error_code" and additional tags
"""
baseTags = super().tags
baseTags["error_code"] = self.errorCode
baseTags.update(self.additionalTags)
return baseTags
@property
def fields(self) -> Dict[str, Union[int, float, str]]:
"""
Get fields.
Returns:
dict with base fields and additional tags
"""
baseFields = super().fields
baseFields.update(self.additionalFields)
return baseFields
[docs]
@dataclass(slots=True)
class DataForMonitoring:
"""
Class fo storing an additional data for monitoring.
"""
# (dict): tags, indexes data
tags: dict = field(default_factory=dict) # pylint: disable-msg=invalid-field-call
# (dict): fields, non indexes data
fields: dict = field(default_factory=dict) # pylint: disable-msg=invalid-field-call
def __iadd__(self, other: "DataForMonitoring"):
"""
Sum monitoring data
Args:
other: other monitoring data
Returns:
union monitoring data
"""
self.fields.update(other.fields) # pylint: disable-msg=E1101
self.tags.update(other.tags) # pylint: disable-msg=E1101
return self
[docs]
@contextmanager
def monitorTime(monitoringData: DataForMonitoring, fieldName: str):
"""
Context manager for timing execution time.
Args:
monitoringData: container for saving result
fieldName: field name
"""
start = perf_counter()
yield
monitoringData.fields[fieldName] = perf_counter() - start