Monitoring

Data for monitoring

Now we monitor several types of events:

  • request, all http and ws requests.

  • error, all failed http and ws requests.

  • ws requests, all messages send by websockets (ws) to clients.

  • subscription error, all errors, generated during redis messages processing or during the subscription

    establishment to redis.

  • redis messages, all messages from redis

Every event is a point in the time series. The point is represented as union of the following data:

  • series name (now requests and errors)

  • start request time

  • tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

  • fields, non indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

‘Requests’ series. Triggered on every http or websockets request. Each point contains data about the corresponding request (execution time and etc).

  • tags

    tag name

    description

    service

    always “luna-sender”

    route

    concatenation of a request method and a request resource (GET:/version)

    status_code

    http status code of response

  • fields

    fields

    description

    request_id

    request id

    execution_time

    request execution time

‘Errors’ series. Triggered on failed http or websockets request. Each point contains error_code of luna error.

  • tags

    tag name

    description

    service

    always “luna-sender”

    route

    concatenation of a request method and a request resource (GET:/version)

    status_code

    http status code of response

    error_code

    luna error code

  • fields

    fields

    description

    request_id

    request id

‘Subscription errors’ series. Triggered on an error in a pipeline of processing messages from redis or redis subscription. Each point contains error_code of luna error.

  • tags

    tag name

    description

    service

    always “luna-sender”

    error_code

    luna error code

  • fields, now nothing

‘ws requests’ series. Triggered on every request by ws to a client. Each point contains a time of processing

  • tags

    tag name

    description

    service

    always “luna-sender”

    account_id

    account id or none

  • fields

    fields

    description

    transport_time

    time between getting message from redis and finish sending request to a client

    request_id

    request id

Can be to add additional tags or fields in ‘subscription_error’.

Database

Monitoring is implemented as data sending to an influx database. You can setup your database credentials in configuration file in section “monitoring”.

Classes

Points. For monitoring events in the redis context

class luna_sender.app.monitoring.points.BaseSenderMonitoringPoint(eventTime)[source]

Base point for Sender messages monitoring

classmethod initialize(serviceName)[source]

Initialize :param serviceName: service name

service: str

service name

class luna_sender.app.monitoring.points.ErrorPoint(eventTime, errorCode, additionalTags=None, additionalFields=None)[source]

Point for monitoring errors of processing messages from redis or connecting to redis

additionalTags

additional tags for point

Type:

dict

errorCode

error code

Type:

str

additionalFields

additional fields for point

Type:

dict

property fields: dict

Get point fields

Returns:

dict with additional fields

Return type:

dict

series: str = 'subscription_error'

series

property tags: dict

Get point tags

Returns:

dict with service as keys + additionalTags

Return type:

dict

class luna_sender.app.monitoring.points.SubscribeRedisPoint(requestId, chanel, eventTime)[source]

Point for monitoring receiving message from redis

requestId

request id from luna

Type:

str

chanel

redis chanel (‘{chane name}:{account id}’)

Type:

str

property fields: dict

Get point fields :returns: dict with request id id as keys

Return type:

dict

series: str = 'redis_message'

series

property tags: dict

Get point tags

Returns:

dict with chanel and service as keys

Return type:

dict

class luna_sender.app.monitoring.points.WSRequestPoint(eventTime, requestId, transportTime, accountId)[source]

Point for monitoring sending message to ws

accountId

account id

Type:

str

requestId

request id from luna

Type:

str

transportTime

time between getting message from redis and sending it to a ws (seconds)

Type:

float

property fields: dict

Get point fields :returns: dict with request id id and transport time as keys

Return type:

dict

series: str = 'ws_requests'

series

property tags: dict

Get point tags

Returns:

dict with account id and service as keys

Return type:

dict

Module contains points for monitoring.

class luna_sender.crutches_on_wheels.cow.monitoring.points.BaseMonitoringPoint(eventTime)[source]

Abstract class for points

eventTime

event time as timestamp

Type:

float

abstract property fields: Dict[str, int | float | str]

Get fields from point. We supposed that fields are not indexing data

Returns:

dict with fields.

Return type:

Dict[str, Union[int, float, str]]

abstract property tags: Dict[str, int | float | str]

Get tags from point. We supposed that tags are indexing data

Returns:

dict with tags.

Return type:

Dict[str, Union[int, float, str]]

class luna_sender.crutches_on_wheels.cow.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]

Base class for point which is associated with requests.

requestId

request id

Type:

str

route

concatenation of a request method and a request resource

Type:

str

service

service name

Type:

str

statusCode

status code of a request response.

Type:

int

property fields: Dict[str, int | float | str]

Get fields

Returns:

“request_id”

Return type:

dict with following keys

Return type:

Dict[str, Union[int, float, str]]

property tags: Dict[str, int | float | str]

Get tags

Returns:

“route”, “service”, “status_code”

Return type:

dict with following keys

Return type:

Dict[str, Union[int, float, str]]

class luna_sender.crutches_on_wheels.cow.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]

Class fo storing an additional data for monitoring.

class luna_sender.crutches_on_wheels.cow.monitoring.points.InfluxFormatter[source]

Format any point filed into inline format

class luna_sender.crutches_on_wheels.cow.monitoring.points.MonitoringPointInfluxFormatBuilder(name, bases, namespace, /, **kwargs)[source]

Complement point class with explicit fields formatting function for the sake of better performance

To perform type format building target class must have ‘fields’ property return value annotated with TypedDict.

Target class might be configured via ‘Config’ class. Available options:

extraFields: whether class should handle additional fields or not

>>> from typing import TypedDict
>>>
>>> class MonitoringFields(TypedDict):
...     field1: str
...     field2: int
...     field3: float
...     field4: bool
>>>
>>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder):
...
...     def __init__(self, fields: dict):
...         self._fields = fields
...
...     @property
...     def tags(self):
...         return {}
...
...     @property
...     def fields(self) -> MonitoringFields:
...         return self._fields
...
>>> class TestPointNoExtra(BasePoint):
...
...     class Config:
...         extraFields = False
...
>>> class TestPointWithExtra(BasePoint):
...     class Config:
...         extraFields = True
>>>
>>>
>>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False})
>>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True})
>>> print(point1.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False
>>> print(point2.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False,extra=True
classmethod buildInfluxFormats(annotations, extraFields)[source]

Build map with influx formats for corresponding fields

Parameters:
  • annotations (dict) – point fields type annotations

  • extraFields (bool) – whether point uses extra fields or not

Returns:

dict of fields with their format

Return type:

dict

static convertFieldsToInfluxLineProtocolNoExtra(point)[source]

Convert point fields into influx line protocol format without extra fields

static convertFieldsToInfluxLineProtocolWithExtra(point)[source]

Convert point fields into influx line protocol format with extra fields

static getTypeFormat(_type, _field, extraFields)[source]

Get field type format

Parameters:
  • _type (type) – field type

  • _field (str) – field name

  • extraFields (bool) – whether point uses extra fields or not

Returns:

string format of the field

Return type:

str

class luna_sender.crutches_on_wheels.cow.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring requests errors (error codes)

errorCode

error code

Type:

int

additionalTags

additional tags which was specified for the request

Type:

dict

additionalFields

additional fields which was specified for the request

Type:

dict

property fields: Dict[str, int | float | str]

Get fields.

Returns:

dict with base fields and additional tags

Return type:

Dict[str, Union[int, float, str]]

series: str = 'errors'

series “errors”

property tags: Dict[str, int | float | str]

Get tags.

Returns:

dict with base tags, “error_code” and additional tags

Return type:

Dict[str, Union[int, float, str]]

class luna_sender.crutches_on_wheels.cow.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring all requests and measure a request time etc.

executionTime

execution time

Type:

float

additionalTags

additional tags which was specified for the request

Type:

dict

additionalFields

additional fields which was specified for the request

Type:

dict

property fields: Dict[str, int | float | str]

Get fields.

Returns:

dict with base fields, “execution_time” and additional tags

Return type:

Dict[str, Union[int, float, str]]

series: str = 'requests'

series “request”

property tags: Dict[str, int | float | str]

Get tags.

Returns:

dict with base tags and additional tags

Return type:

Dict[str, Union[int, float, str]]

luna_sender.crutches_on_wheels.cow.monitoring.points.getRoute(resource, method)[source]

Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method

Returns:

{resource}”

Return type:

“{method}

Return type:

str

luna_sender.crutches_on_wheels.cow.monitoring.points.monitorTime(monitoringData, fieldName)[source]

Context manager for timing execution time.

Parameters:
  • monitoringData – container for saving result

  • fieldName – field name

Module implement base class for monitoring

class luna_sender.crutches_on_wheels.cow.monitoring.manager.LunaMonitoringManager(settings, pluginManager)[source]

Monitoring manager. Sends data to the monitoring storage and monitoring plugins. .. attribute:: settings

monitoring storage settings

async close()[source]

Stop monitoring.

Return type:

None

flushPoints(points)[source]

Flush point to monitoring.

Parameters:

points – point

Return type:

None

async initialize()[source]

Initialize monitoring

Return type:

None

class luna_sender.crutches_on_wheels.cow.monitoring.manager.MonitoringSettings(*args, **kwargs)[source]

Monitoring settings protocol

class InfluxCredentials[source]

Monitoring credentials

Module contains classes for sending a data to an influx monitoring.

class luna_sender.crutches_on_wheels.cow.monitoring.influx_adapter.BaseMonitoringAdapter(settings, flushingPeriod)[source]

Base monitoring adapter.

backgroundScheduler

runner for periodic flushing monitoring points

Type:

AsyncIOScheduler

_buffer

list of buffering points which is waiting sending to influx

Type:

List[BaseRequestMonitoringPoint]

flushingPeriod

period of flushing points (in seconds)

Type:

float

logger

logger

Type:

Logger

_influxSettings

current influx settings

Type:

InfluxSettings

_job

sending monitoring data job

Type:

Job

addPointsToBuffer(points)[source]

Add points to buffer.

Parameters:

points – points

Return type:

None

static convertFieldsToInfluxLineProtocol(fields)[source]

Convert field value to influx line protocol format

Parameters:

fields – dict with values to convert

Returns:

line protocol string

Return type:

str

generatePointStr(point)[source]

Generate string from point

Parameters:

point – point

Returns:

influx line protocol string

Return type:

str

initializeScheduler()[source]

Start the loop for sending data from the buffer to monitoring.

Return type:

None

stopScheduler()[source]

Stop monitoring.

Return type:

None

updateFlushingPeriod(newPeriod)[source]

Update flushing period :param newPeriod: new period

class luna_sender.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxMonitoringAdapter(settings, flushingPeriod)[source]

Influx 2.x adaptor. Suspended to send points to an influxdb

bucket

influx bucket name

Type:

str

initializeMonitoring()[source]

Initialize monitoring.

Return type:

None

async stopMonitoring()[source]

Stop monitoring (cancel all request and stop getting new).

Return type:

None

class luna_sender.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxSettings(url, bucket, organization, token)[source]

Container for influx 2.x settings