Monitoring

Data for monitoring

Now we monitor several types of events:

  • request, all HTTP requests.

  • error, all failed HTTP requests.

  • subtask execution, some messages about a process of executing a task.

  • subtask errors, all errors, generated during subtask processing.

  • subtask final, messages about the end of subtask processing.

  • worker requests, all messages, generated during subtask sending to workers.

  • worker queue, some messages about a transportation task through a worker queue.

Comparison of data formats for Clickhouse and InfluxDB:

InfluxDB: Each event is presented as a “point” in a time series. The structure of a point includes:

  • series name

  • start event time

  • tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

  • fields, non indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

Clickhouse: In Clickhouse, the data structure resembles that of a traditional SQL table. Each event is represented as a record, where:

  • The `time` field contains the record’s creation timestamp;

  • The `data` field contains a JSON object with all the information that would otherwise be distributed across tags and fields in InfluxDB.

Important: In Clickhouse, there is no differentiation between “tags” and “fields”—all data is consolidated into a single JSON object within the data field.

The structure and the meaning of each monitoring series remain consistent. However, for Clickhouse, data from tags and fields are merged into a single JSON object under the data field. Below are examples for each series:

  • ‘Requests’ series.

    Triggered on every request. Each point contains a data about corresponding request (execution time and etc).

    InfluxDB:

    • tags

      tag name

      description

      service

      “luna-tasks” or “luna-tasks-worker”

      route

      concatenation of a request method and a request resource (GET:/version)

      status_code

      http status code of response

    • fields

      fields

      description

      request_id

      request id

      execution_time

      request execution time

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks",
        "route": "GET:/version",
        "status_code": 200,
        "request_id": "1536751345,6a5c2191-3e9b-f5a4-fc45-3abf43625c5f",
        "execution_time": 123.45
    }
    
  • ‘Errors’ series.

    Triggered on failed request. Each point contains error_code of luna error.

    InfluxDB:

    • tags

      tag name

      description

      service

      “luna-tasks” or “luna-tasks-worker”

      route

      concatenation of a request method and a request resource (GET:/version)

      status_code

      http status code of response

      error_code

      luna error code

    • fields

      fields

      description

      request_id

      request id

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks",
        "route": "GET:/version",
        "status_code": 400,
        "error_code": 13037,
        "request_id": "1536751345,6a5c2191-3e9b-f5a4-fc45-3abf43625c5f",
    }
    
  • ‘Subtask errors’ series.

    Triggered on an error in a pipeline of a task processing.

    InfluxDB:

    • tags

      tag name

      description

      service

      always “luna-tasks-worker”

      task_id

      task id

      subtask_id

      subtask id

      task_type

      task type (1, 2, 3, …)

      error_code

      luna error code

    • fields

      fields

      description

      error_code

      luna error code

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks-worker",
        "task_id": 111,
        "subtask_id": 2,
        "task_type": 3,
        "error_code": 13037,
    }
    
  • ‘subtask execution’ series.

    Triggered on an error in a pipeline of a task processing.

    InfluxDB:

    • tags

      tag name

      description

      service

      always “luna-tasks-worker”

      task_id

      task id

      subtask_id

      subtask id

      task_type

      task type (1, 2, 3, …)

    • fields

      fields

      description

      save_result_time

      time of saving result in image-store (all task type)

      linked_faces

      approximate linked faces count (linker task)

      events

      removed event ids count (gc task)

      events_face_descriptors

      removed face descriptors count (gc task)

      events_body_descriptors

      removed body descriptors count (gc task)

      faces

      removed face ids count (gc task)

      faces_face_descriptors

      removed face descriptors count (gc task)

      match_time

      matching time (crossmatching, clustering, roc task)

      clusterization_report_build_time

      build report time (reporter task)

      track_id_collecting

      track id (clustering task)

      export_build_time

      export build time (exporter task)

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks-worker",
        "task_id": 111,
        "subtask_id": 2,
        "task_type": 3,
        "save_result_time": 0.1223,
        "linked_faces": 3,
        "events": 3,
        "events_face_descriptors": 3,
        "events_body_descriptors": 3,
        "faces": 3,
        "faces_face_descriptors": 3,
        "match_time": 1.234,
        "clusterization_report_build_time": 0.3,
        "track_id_collecting": 23,
        "export_build_time": 1.2,
    }
    
  • ‘subtask final’ series.

    Triggered on an end of subtask execution.

    InfluxDB:

    • tags

      tag name

      description

      service

      always “luna-tasks-worker”

      task_id

      task id

      subtask_id

      subtask id

      task_type

      task type (1, 2, 3, …)

      status

      task status (1, 2, 3, 4, 5)

    • fields

      fields

      description

      execution_time

      execution subtask time

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks-worker",
        "task_id": 111,
        "subtask_id": 2,
        "task_type": 3,
        "status": 3,
        "execution_time": 0.12
    }
    
  • ‘worker queue’ series.

    Triggered on a getting task from the worker queue.

    InfluxDB:

    • tags

      tag name

      description

      service

      always “luna-tasks-worker”

      task_id

      task id

      subtask_id

      subtask id

      task_type

      task type (1, 2, 3, …)

    • fields

      fields

      description

      transport_time

      time between receiving task of a worker and start execution

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks-worker",
        "task_id": 111,
        "subtask_id": 2,
        "task_type": 3,
        "transport_time": 0.12
    }
    
  • ‘worker requests’ series.

    Triggered after sending subtask to a worker.

    InfluxDB:

    • tags

      tag name

      description

      service

      always “luna-tasks-worker”

      task_id

      task id

      subtask_id

      subtask id

    • fields

      fields

      description

      transport_time

      initialize sending a task subtasks and send subtask to thr worker execution

    ClickHouse JSON `data` field Example:

    {
        "service": "luna-tasks-worker",
        "task_id": 111,
        "subtask_id": 2,
        "transport_time": 0.12
    }
    

It can be to add additional tags or fields.

Database

You can refer to documentation for influx database and clickhouse database to compare the databases and choose what benefit your needs more. Note that clickhouse might be the better choice for aggregation You can setup your database credentials in configuration file in section “monitoring”.

Classes

Module contains points for monitoring.

class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseMonitoringPoint(eventTime)[source]

Abstract class for points

eventTime

event time as timestamp

Type:

float

abstract property fields: Dict[str, int | float | str]

Get fields from point. We supposed that fields are not indexing data

Returns:

dict with fields.

abstract property tags: Dict[str, int | float | str]

Get tags from point. We supposed that tags are indexing data

Returns:

dict with tags.

class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]

Base class for point which is associated with requests.

requestId

request id

Type:

str

route

concatenation of a request method and a request resource

Type:

str

service

service name

Type:

str

statusCode

status code of a request response.

Type:

int

property fields: Dict[str, int | float | str]

Get fields

Returns:

“request_id”

Return type:

dict with following keys

property tags: Dict[str, int | float | str]

Get tags

Returns:

“route”, “service”, “status_code”

Return type:

dict with following keys

class luna_tasks.crutches_on_wheels.cow.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]

Class fo storing an additional data for monitoring.

class luna_tasks.crutches_on_wheels.cow.monitoring.points.InfluxFormatter[source]

Format any point filed into inline format

class luna_tasks.crutches_on_wheels.cow.monitoring.points.MonitoringPointInfluxFormatBuilder(name, bases, namespace, /, **kwargs)[source]

Complement point class with explicit fields formatting function for the sake of better performance

To perform type format building target class must have ‘fields’ property return value annotated with TypedDict.

Target class might be configured via ‘Config’ class. Available options:

extraFields: whether class should handle additional fields or not

>>> from typing import TypedDict
>>>
>>> class MonitoringFields(TypedDict):
...     field1: str
...     field2: int
...     field3: float
...     field4: bool
>>>
>>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder):
...
...     def __init__(self, fields: dict):
...         self._fields = fields
...
...     @property
...     def tags(self):
...         return {}
...
...     @property
...     def fields(self) -> MonitoringFields:
...         return self._fields
...
>>> class TestPointNoExtra(BasePoint):
...
...     class Config:
...         extraFields = False
...
>>> class TestPointWithExtra(BasePoint):
...     class Config:
...         extraFields = True
>>>
>>>
>>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False})
>>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True})
>>> print(point1.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False
>>> print(point2.convertFieldsToInfluxLineProtocol())
field1="data",field2=1i,field3=1.000000,field4=False,extra=True
classmethod buildInfluxFormats(annotations, extraFields)[source]

Build map with influx formats for corresponding fields

Return type:

dict

Parameters:
  • annotations (dict) – point fields type annotations

  • extraFields (bool) – whether point uses extra fields or not

Returns:

dict of fields with their format

static convertFieldsToInfluxLineProtocolNoExtra(point)[source]

Convert point fields into influx line protocol format without extra fields

static convertFieldsToInfluxLineProtocolWithExtra(point)[source]

Convert point fields into influx line protocol format with extra fields

static getTypeFormat(_type, _field, extraFields)[source]

Get field type format

Return type:

str

Parameters:
  • _type (type) – field type

  • _field (str) – field name

  • extraFields (bool) – whether point uses extra fields or not

Returns:

string format of the field

class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring requests errors (error codes)

errorCode

error code

Type:

int

additionalTags

additional tags which was specified for the request

Type:

dict

additionalFields

additional fields which was specified for the request

Type:

dict

property fields: Dict[str, int | float | str]

Get fields.

Returns:

dict with base fields and additional tags

series: str = 'errors'

series “errors”

property tags: Dict[str, int | float | str]

Get tags.

Returns:

dict with base tags, “error_code” and additional tags

class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring all requests and measure a request time etc.

executionTime

execution time

Type:

float

additionalTags

additional tags which was specified for the request

Type:

dict

additionalFields

additional fields which was specified for the request

Type:

dict

property fields: Dict[str, int | float | str]

Get fields.

Returns:

dict with base fields, “execution_time” and additional tags

series: str = 'requests'

series “request”

property tags: Dict[str, int | float | str]

Get tags.

Returns:

dict with base tags and additional tags

luna_tasks.crutches_on_wheels.cow.monitoring.points.getRoute(resource, method)[source]
Return type:

str

Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method

Returns:

{resource}”

Return type:

“{method}

luna_tasks.crutches_on_wheels.cow.monitoring.points.monitorTime(monitoringData, fieldName)[source]

Context manager for timing execution time.

Parameters:
  • monitoringData – container for saving result

  • fieldName – field name

Module implement base class for monitoring

class luna_tasks.crutches_on_wheels.cow.monitoring.manager.LunaMonitoringManager(settings, name, pluginManager)[source]

Monitoring manager. Sends data to the monitoring storage and monitoring plugins. .. attribute:: settings

monitoring storage settings

async close()[source]

Stop monitoring.

Return type:

None

flushPoints(points)[source]

Flush point to monitoring.

Return type:

None

Parameters:

points – point

async initialize()[source]

Initialize monitoring

Return type:

None

async probe()[source]

Check that we can connect to the monitoring service with current configuration. Can be used without initialization

class luna_tasks.crutches_on_wheels.cow.monitoring.manager.MonitoringSettings(*args, **kwargs)[source]

Monitoring settings protocol

class ClickhouseCredentials[source]

Monitoring credentials

class InfluxCredentials[source]

Monitoring credentials

Module contains classes for sending a data to an influx monitoring.

class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxMonitoringAdapter(settings, flushingPeriod)[source]

Influx 2.x adaptor. Suspended to send points to an influxdb

bucket

influx bucket name

Type:

str

addPointsToBuffer(points)[source]

Add points to buffer.

Return type:

None

Parameters:

points – points

clearBuffer()[source]

Clear monitoring buffer

Return type:

list[str]

static convertFieldsToInfluxLineProtocol(fields)[source]

Convert field value to influx line protocol format

Return type:

str

Parameters:

fields – dict with values to convert

Returns:

line protocol string

generatePointStr(point)[source]

Generate string from point

Return type:

str

Parameters:

point – point

Returns:

influx line protocol string

async stopMonitoring()[source]

Stop monitoring (cancel all request and stop getting new).

Return type:

None

class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxSettings(url, bucket, organization, token)[source]

Container for influx 2.x settings

Clickhouse monitoring adaptor

class luna_tasks.crutches_on_wheels.cow.monitoring.clickhouse_adapter.ClickhouseMonitoringAdaptor(settings, flushingPeriod)[source]

Clickhouse adaptor. Suspended to send points to a clickhouse

addPointsToBuffer(points)[source]

Add points to buffer.

Return type:

None

Parameters:

points – points

clearBuffer()[source]

Clear monitoring buffer

Return type:

dict[str, list[dict]]

static generateRecord(point)[source]

Generate monitoring record from point

Return type:

dict

Parameters:

point – point

Returns:

monitoring record

async initializeMonitoring()[source]

Initialize monitoring.

Return type:

None

async stopMonitoring()[source]

Stop monitoring (cancel all request and stop getting new).

Return type:

None

class luna_tasks.crutches_on_wheels.cow.monitoring.clickhouse_adapter.ClickhouseSettings(url, user, password, database)[source]

Clickhouse monitoring settings