Monitoring¶
Data for monitoring¶
Now we monitor several types of events:
request, all HTTP requests.
error, all failed HTTP requests.
subtask execution, some messages about a process of executing a task.
subtask errors, all errors, generated during subtask processing.
subtask final, messages about the end of subtask processing.
worker requests, all messages, generated during subtask sending to workers.
worker queue, some messages about a transportation task through a worker queue.
Comparison of data formats for Clickhouse and InfluxDB:
InfluxDB: Each event is presented as a “point” in a time series. The structure of a point includes:
series name
start event time
tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
fields, non indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
Clickhouse: In Clickhouse, the data structure resembles that of a traditional SQL table. Each event is represented as a record, where:
The `time` field contains the record’s creation timestamp;
The `data` field contains a JSON object with all the information that would otherwise be distributed across tags and fields in InfluxDB.
Important: In Clickhouse, there is no differentiation between “tags” and “fields”—all data is consolidated into a single JSON object within the data field.
The structure and the meaning of each monitoring series remain consistent. However, for Clickhouse, data from tags and fields are merged into a single JSON object under the data field. Below are examples for each series:
- ‘Requests’ series. - Triggered on every request. Each point contains a data about corresponding request (execution time and etc). - InfluxDB: - tags - tag name - description - service - “luna-tasks” or “luna-tasks-worker” - route - concatenation of a request method and a request resource (GET:/version) - status_code - http status code of response 
- fields - fields - description - request_id - request id - execution_time - request execution time 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks", "route": "GET:/version", "status_code": 200, "request_id": "1536751345,6a5c2191-3e9b-f5a4-fc45-3abf43625c5f", "execution_time": 123.45 } 
- ‘Errors’ series. - Triggered on failed request. Each point contains error_code of luna error. - InfluxDB: - tags - tag name - description - service - “luna-tasks” or “luna-tasks-worker” - route - concatenation of a request method and a request resource (GET:/version) - status_code - http status code of response - error_code - luna error code 
- fields - fields - description - request_id - request id 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks", "route": "GET:/version", "status_code": 400, "error_code": 13037, "request_id": "1536751345,6a5c2191-3e9b-f5a4-fc45-3abf43625c5f", } 
- ‘Subtask errors’ series. - Triggered on an error in a pipeline of a task processing. - InfluxDB: - tags - tag name - description - service - always “luna-tasks-worker” - task_id - task id - subtask_id - subtask id - task_type - task type (1, 2, 3, …) - error_code - luna error code 
- fields - fields - description - error_code - luna error code 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks-worker", "task_id": 111, "subtask_id": 2, "task_type": 3, "error_code": 13037, } 
- ‘subtask execution’ series. - Triggered on an error in a pipeline of a task processing. - InfluxDB: - tags - tag name - description - service - always “luna-tasks-worker” - task_id - task id - subtask_id - subtask id - task_type - task type (1, 2, 3, …) 
- fields - fields - description - save_result_time - time of saving result in image-store (all task type) - linked_faces - approximate linked faces count (linker task) - events - removed event ids count (gc task) - events_face_descriptors - removed face descriptors count (gc task) - events_body_descriptors - removed body descriptors count (gc task) - faces - removed face ids count (gc task) - faces_face_descriptors - removed face descriptors count (gc task) - match_time - matching time (crossmatching, clustering, roc task) - clusterization_report_build_time - build report time (reporter task) - track_id_collecting - track id (clustering task) - export_build_time - export build time (exporter task) 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks-worker", "task_id": 111, "subtask_id": 2, "task_type": 3, "save_result_time": 0.1223, "linked_faces": 3, "events": 3, "events_face_descriptors": 3, "events_body_descriptors": 3, "faces": 3, "faces_face_descriptors": 3, "match_time": 1.234, "clusterization_report_build_time": 0.3, "track_id_collecting": 23, "export_build_time": 1.2, } 
- ‘subtask final’ series. - Triggered on an end of subtask execution. - InfluxDB: - tags - tag name - description - service - always “luna-tasks-worker” - task_id - task id - subtask_id - subtask id - task_type - task type (1, 2, 3, …) - status - task status (1, 2, 3, 4, 5) 
- fields - fields - description - execution_time - execution subtask time 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks-worker", "task_id": 111, "subtask_id": 2, "task_type": 3, "status": 3, "execution_time": 0.12 } 
- ‘worker queue’ series. - Triggered on a getting task from the worker queue. - InfluxDB: - tags - tag name - description - service - always “luna-tasks-worker” - task_id - task id - subtask_id - subtask id - task_type - task type (1, 2, 3, …) 
- fields - fields - description - transport_time - time between receiving task of a worker and start execution 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks-worker", "task_id": 111, "subtask_id": 2, "task_type": 3, "transport_time": 0.12 } 
- ‘worker requests’ series. - Triggered after sending subtask to a worker. - InfluxDB: - tags - tag name - description - service - always “luna-tasks-worker” - task_id - task id - subtask_id - subtask id 
- fields - fields - description - transport_time - initialize sending a task subtasks and send subtask to thr worker execution 
 - ClickHouse JSON `data` field Example: - { "service": "luna-tasks-worker", "task_id": 111, "subtask_id": 2, "transport_time": 0.12 } 
It can be to add additional tags or fields.
Database¶
You can refer to documentation for influx database and clickhouse database to compare the databases and choose what benefit your needs more. Note that clickhouse might be the better choice for aggregation You can setup your database credentials in configuration file in section “monitoring”.
Classes¶
- class tasks_tools.monitoring.points.SubtaskErrorPoint(eventTime, subtaskId, taskId, taskType, errorCode, additionalTags=None, additionalFields=None)[source]¶
- Point for sub task error monitoring. - errorCode¶
- error code - Type:
- int 
 
 - additionalTags¶
- additional tags - Type:
- dict 
 
 - additionalFields¶
- additional fields - Type:
- dict 
 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - 
series: str= 'subtask_errors'¶
- series 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
- class tasks_tools.monitoring.points.SubtaskExecutionPoint(eventTime, subtaskId, taskId, taskType, additionalTags=None, additionalFields=None)[source]¶
- Point for monitoring subtask execution - additionalTags¶
- additional tags for point - Type:
- dict 
 
 - additionalFields¶
- additional fields for point - Type:
- dict 
 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - 
series: str= 'subtask_execution'¶
- series 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
- class tasks_tools.monitoring.points.SubtaskFinalPoint(eventTime, subTaskId, taskId, taskType, executionTime, status, additionalTags=None, additionalFields=None)[source]¶
- Point for monitoring subtask ending. - additionalTags¶
- additional tags for point - Type:
- dict 
 
 - additionalFields¶
- additional fields for point - Type:
- dict 
 
 - executionTime¶
- subtask execution time - Type:
- float 
 
 - status¶
- subtask status - Type:
- int 
 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - 
series: str= 'subtask_final'¶
- series 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
- class tasks_tools.monitoring.points.SubtaskMonitoringBasePoint(eventTime, subtaskId, taskId, taskType)[source]¶
- Base class for monitoring an subtask execution .. attribute:: subtaskId - subtask id - type:
- int 
 - taskId¶
- task id - Type:
- int 
 
 - taskType¶
- task type - Type:
- int 
 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - classmethod initialize(serviceName)[source]¶
- Initialize request point for custom service. :param serviceName: service name 
 - 
service: str¶
- service name 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
- class tasks_tools.monitoring.points.WorkerQueuePoint(eventTime, subtaskId, taskId, taskType, transportTime)[source]¶
- Point for a worker queue monitoring .. attribute:: transportTime - time between receiving task of a worker and start execution - type:
- float 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - 
series: str= 'worker_queue'¶
- series 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
- class tasks_tools.monitoring.points.WorkerRequestPoint(eventTime, subtaskId, taskId, transportTime)[source]¶
- Point for monitoring queue of sending subtasks to workers. - taskId¶
- task id - Type:
- int 
 
 - subtaskId¶
- subtask id - Type:
- int 
 
 - transportTime¶
- initialize sending a task subtasks and send subtask to thr worker execution - Type:
- float 
 
 - property fields: dict¶
- Get point fields - Returns:
- dict with additional fields 
 
 - classmethod initialize(serviceName)[source]¶
- Initialize request point for custom service. :param serviceName: service name 
 - 
series: str= 'worker_requests'¶
- series 
 - 
service: str¶
- service name 
 - property tags: dict¶
- Get point tags - Returns:
- dict with service as keys + additionalTags 
 
 
Module contains points for monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseMonitoringPoint(eventTime)[source]¶
- Abstract class for points - eventTime¶
- event time as timestamp - Type:
- float 
 
 - abstract property fields: Dict[str, int | float | str]¶
- Get fields from point. We supposed that fields are not indexing data - Returns:
- dict with fields. 
 
 - abstract property tags: Dict[str, int | float | str]¶
- Get tags from point. We supposed that tags are indexing data - Returns:
- dict with tags. 
 
 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]¶
- Base class for point which is associated with requests. - requestId¶
- request id - Type:
- str 
 
 - route¶
- concatenation of a request method and a request resource - Type:
- str 
 
 - service¶
- service name - Type:
- str 
 
 - statusCode¶
- status code of a request response. - Type:
- int 
 
 - property fields: Dict[str, int | float | str]¶
- Get fields - Returns:
- “request_id” 
- Return type:
- dict with following keys 
 
 - property tags: Dict[str, int | float | str]¶
- Get tags - Returns:
- “route”, “service”, “status_code” 
- Return type:
- dict with following keys 
 
 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]¶
- Class fo storing an additional data for monitoring. 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.InfluxFormatter[source]¶
- Format any point filed into inline format 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.MonitoringPointInfluxFormatBuilder(name, bases, namespace, /, **kwargs)[source]¶
- Complement point class with explicit fields formatting function for the sake of better performance - To perform type format building target class must have ‘fields’ property return value annotated with TypedDict. - Target class might be configured via ‘Config’ class. - Available options:
- extraFields: whether class should handle additional fields or not 
 - >>> from typing import TypedDict >>> >>> class MonitoringFields(TypedDict): ... field1: str ... field2: int ... field3: float ... field4: bool >>> >>> class BasePoint(BaseMonitoringPoint, metaclass=MonitoringPointInfluxFormatBuilder): ... ... def __init__(self, fields: dict): ... self._fields = fields ... ... @property ... def tags(self): ... return {} ... ... @property ... def fields(self) -> MonitoringFields: ... return self._fields ... >>> class TestPointNoExtra(BasePoint): ... ... class Config: ... extraFields = False ... >>> class TestPointWithExtra(BasePoint): ... class Config: ... extraFields = True >>> >>> >>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False}) >>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True}) >>> print(point1.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False >>> print(point2.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False,extra=True - classmethod buildInfluxFormats(annotations, extraFields)[source]¶
- Build map with influx formats for corresponding fields - Return type:
- dict
- Parameters:
- annotations (dict) – point fields type annotations 
- extraFields (bool) – whether point uses extra fields or not 
 
- Returns:
- dict of fields with their format 
 
 - static convertFieldsToInfluxLineProtocolNoExtra(point)[source]¶
- Convert point fields into influx line protocol format without extra fields 
 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]¶
- Request monitoring point is suspended for monitoring requests errors (error codes) - errorCode¶
- error code - Type:
- int 
 
 - additionalTags¶
- additional tags which was specified for the request - Type:
- dict 
 
 - additionalFields¶
- additional fields which was specified for the request - Type:
- dict 
 
 - property fields: Dict[str, int | float | str]¶
- Get fields. - Returns:
- dict with base fields and additional tags 
 
 - 
series: str= 'errors'¶
- series “errors” 
 - property tags: Dict[str, int | float | str]¶
- Get tags. - Returns:
- dict with base tags, “error_code” and additional tags 
 
 
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]¶
- Request monitoring point is suspended for monitoring all requests and measure a request time etc. - executionTime¶
- execution time - Type:
- float 
 
 - additionalTags¶
- additional tags which was specified for the request - Type:
- dict 
 
 - additionalFields¶
- additional fields which was specified for the request - Type:
- dict 
 
 - property fields: Dict[str, int | float | str]¶
- Get fields. - Returns:
- dict with base fields, “execution_time” and additional tags 
 
 - 
series: str= 'requests'¶
- series “request” 
 - property tags: Dict[str, int | float | str]¶
- Get tags. - Returns:
- dict with base tags and additional tags 
 
 
- luna_tasks.crutches_on_wheels.cow.monitoring.points.getRoute(resource, method)[source]¶
- Return type:
- str
 - Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method - Returns:
- {resource}” 
- Return type:
- “{method} 
 
- luna_tasks.crutches_on_wheels.cow.monitoring.points.monitorTime(monitoringData, fieldName)[source]¶
- Context manager for timing execution time. - Parameters:
- monitoringData – container for saving result 
- fieldName – field name 
 
 
Module implement base class for monitoring
- class luna_tasks.crutches_on_wheels.cow.monitoring.manager.LunaMonitoringManager(settings, name, pluginManager)[source]¶
- Monitoring manager. Sends data to the monitoring storage and monitoring plugins. .. attribute:: settings - monitoring storage settings 
- class luna_tasks.crutches_on_wheels.cow.monitoring.manager.MonitoringSettings(*args, **kwargs)[source]¶
- Monitoring settings protocol 
Module contains classes for sending a data to an influx monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxMonitoringAdapter(settings, flushingPeriod)[source]¶
- Influx 2.x adaptor. Suspended to send points to an influxdb - bucket¶
- influx bucket name - Type:
- str 
 
 - addPointsToBuffer(points)[source]¶
- Add points to buffer. - Return type:
- None
- Parameters:
- points – points 
 
 - static convertFieldsToInfluxLineProtocol(fields)[source]¶
- Convert field value to influx line protocol format - Return type:
- str
- Parameters:
- fields – dict with values to convert 
- Returns:
- line protocol string 
 
 
- class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.InfluxSettings(url, bucket, organization, token)[source]¶
- Container for influx 2.x settings 
Clickhouse monitoring adaptor
- class luna_tasks.crutches_on_wheels.cow.monitoring.clickhouse_adapter.ClickhouseMonitoringAdaptor(settings, flushingPeriod)[source]¶
- Clickhouse adaptor. Suspended to send points to a clickhouse - addPointsToBuffer(points)[source]¶
- Add points to the buffer. - Return type:
- None
- Parameters:
- points – points