Monitoring¶
Data for monitoring¶
Now we monitor several types of events:
request, all HTTP requests.
error, all failed HTTP requests.
subtask execution, some messages about a process of executing a task.
subtask errors, all errors, generated during subtask processing.
subtask final, messages about the end of subtask processing.
worker requests, all messages, generated during subtask sending to workers.
worker queue, some messages about a transportation task through a worker queue.
Every event is a point in the time series. The point is represented as a union of the following data:
series name (now requests and errors)
start request time
tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
fields, non-indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
‘Requests’ series. Triggered on every HTTP request. Each point contains data about the corresponding request (execution time and etc).
tags
tag name
description
service
“luna-tasks” or “luna-tasks-worker”
route
concatenation of a request method and a request resource (GET:/version)
status_code
http status code of response
fields
fields
description
request_id
request id
execution_time
request execution time
‘Errors’ series. Triggered on failed HTTP request. Each point contains error_code of luna error.
tags
tag name
description
service
“luna-tasks” or “luna-tasks-worker”
route
concatenation of a request method and a request resource (GET:/version)
status_code
http status code of response
error_code
luna error code
fields
fields
description
request_id
request id
‘Subtask errors’ series. Triggered on an error in a pipeline of a task processing.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
error_code
luna error code
fields
fields
description
error_code
luna error code
‘subtask execution’ series. Triggered on an error in a pipeline of a task processing.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
fields
fields
description
save_result_time
time of saving result in image-store (all task type)
linked_faces
approximate linked faces count (linker task)
events
removed event ids count (gc task)
events_face_descriptors
removed face descriptors count (gc task)
events_body_descriptors
removed body descriptors count (gc task)
faces
removed face ids count (gc task)
faces_face_descriptors
removed face descriptors count (gc task)
match_time
matching time (crossmatching, clustering, roc task)
clusterization_report_build_time
build report time (reporter task)
track_id_collecting
track id (clustering task)
export_build_time
export build time (exporter task)
‘subtask final’ series. Triggered on an end of subtask execution.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
status
task status (1, 2, 3, 4, 5)
fields
fields
description
execution_time
execution subtask time
‘worker queue’ series. Triggered on a getting task from the worker queue.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
fields
fields
description
transport_time
time between receiving task of a worker and start execution
‘worker requests’ series. Triggered after sending subtask to a worker.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
fields
fields
description
transport_time
initialize sending a task subtasks and send subtask to thr worker execution
It can be to add additional tags or fields.
Database¶
Monitoring is implemented as data sending to an influx database. You can set up your database credentials in configuration file in section “monitoring”.
Classes¶
- class luna_tasks.monitoring.points.SubtaskErrorPoint(eventTime, subtaskId, taskId, taskType, errorCode, additionalTags=None, additionalFields=None)[source]¶
Point for sub task error monitoring.
- errorCode¶
error code
- Type:
int
- additionalTags¶
additional tags
- Type:
dict
- additionalFields¶
additional fields
- Type:
dict
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_errors'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskExecutionPoint(eventTime, subtaskId, taskId, taskType, additionalTags=None, additionalFields=None)[source]¶
Point for monitoring subtask execution
- additionalTags¶
additional tags for point
- Type:
dict
- additionalFields¶
additional fields for point
- Type:
dict
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_execution'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskFinalPoint(eventTime, subTaskId, taskId, taskType, executionTime, status, additionalTags=None, additionalFields=None)[source]¶
Point for monitoring subtask ending.
- additionalTags¶
additional tags for point
- Type:
dict
- additionalFields¶
additional fields for point
- Type:
dict
- executionTime¶
subtask execution time
- Type:
float
- status¶
subtask status
- Type:
int
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_final'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskMonitoringBasePoint(eventTime, subtaskId, taskId, taskType)[source]¶
Base class for monitoring an subtask execution .. attribute:: subtaskId
subtask id
- type:
int
- taskId¶
task id
- Type:
int
- taskType¶
task type
- Type:
int
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- classmethod initialize(serviceName)[source]¶
Initialize request point for custom service. :param serviceName: service name
- service: str¶
service name
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.WorkerQueuePoint(eventTime, subtaskId, taskId, taskType, transportTime)[source]¶
Point for a worker queue monitoring .. attribute:: transportTime
time between receiving task of a worker and start execution
- type:
float
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'worker_queue'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.WorkerRequestPoint(eventTime, subtaskId, taskId, transportTime)[source]¶
Point for monitoring queue of sending subtasks to workers.
- taskId¶
task id
- Type:
int
- subtaskId¶
subtask id
- Type:
int
- transportTime¶
initialize sending a task subtasks and send subtask to thr worker execution
- Type:
float
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- classmethod initialize(serviceName)[source]¶
Initialize request point for custom service. :param serviceName: service name
- series: str = 'worker_requests'¶
series
- service: str¶
service name
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
Module contains points for monitoring’s.
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseMonitoringPoint(eventTime)[source]¶
Abstract class for points
- eventTime¶
event time as timestamp
- Type:
float
- abstract property fields: Dict[str, int | float | str]¶
Get fields from point. We supposed that fields are not indexing data
- Returns:
dict with fields.
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- abstract property tags: Dict[str, int | float | str]¶
Get tags from point. We supposed that tags are indexing data
- Returns:
dict with tags.
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]¶
Base class for point which is associated with requests.
- requestId¶
request id
- Type:
str
- route¶
concatenation of a request method and a request resource
- Type:
str
- service¶
service name
- Type:
str
- requestTime¶
a request processing start timestamp
- Type:
float
- statusCode¶
status code of a request response.
- Type:
int
- property fields: Dict[str, int | float | str]¶
Get fields
- Returns:
“request_id”
- Return type:
dict with following keys
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- property tags: Dict[str, int | float | str]¶
Get tags
- Returns:
“route”, “service”, “status_code”
- Return type:
dict with following keys
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]¶
Class fo storing an additional data for monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.InfluxFormatter[source]¶
Format any point filed into inline format
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.MonitroingPointInfluxFormatBuilder(name, bases, namespace, /, **kwargs)[source]¶
Complement point class with explicit fields formatting function for the sake of better performance
To perform type format building target class must have ‘fields’ property return value annotated with TypedDict.
Target class might be configured via ‘Config’ class. Available options:
extraFields: whether class should handle additional fields or not
>>> from typing import TypedDict >>> >>> class MonitoringFields(TypedDict): ... field1: str ... field2: int ... field3: float ... field4: bool >>> >>> class BasePoint(BaseMonitoringPoint, metaclass=MonitroingPointInfluxFormatBuilder): ... ... def __init__(self, fields: dict): ... self._fields = fields ... ... @property ... def tags(self): ... return {} ... ... @property ... def fields(self) -> MonitoringFields: ... return self._fields ... >>> class TestPointNoExtra(BasePoint): ... ... class Config: ... extraFields = False ... >>> class TestPointWithExtra(BasePoint): ... class Config: ... extraFields = True >>> >>> >>> point1 = TestPointNoExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False}) >>> point2 = TestPointWithExtra({"field1": "data", "field2": 1, "field3": 1.0, "field4": False, "extra": True}) >>> print(point1.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False >>> print(point2.convertFieldsToInfluxLineProtocol()) field1="data",field2=1i,field3=1.000000,field4=False,extra=True
- classmethod buildInfluxFormats(annotations, extraFields)[source]¶
Build map with influx formats for corresponding fields
- Parameters:
annotations (dict) – point fields type annotations
extraFields (bool) – whether point uses extra fields or not
- Returns:
dict of fields with their format
- Return type:
dict
- static convertFieldsToInfluxLineProtocolNoExtra(point)[source]¶
Convert point fields into influx line protocol format without extra fields
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]¶
Request monitoring point is suspended for monitoring requests errors (error codes)
- errorCode¶
error code
- Type:
int
- additionalTags¶
additional tags which was specified for the request
- Type:
dict
- additionalFields¶
additional fields which was specified for the request
- Type:
dict
- property fields: Dict[str, int | float | str]¶
Get fields.
- Returns:
dict with base fields and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- series: str = 'errors'¶
series “errors”
- property tags: Dict[str, int | float | str]¶
Get tags.
- Returns:
dict with base tags, “error_code” and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]¶
Request monitoring point is suspended for monitoring all requests and measure a request time and etc.
- executionTime¶
execution time
- Type:
float
- additionalTags¶
additional tags which was specified for the request
- Type:
dict
- additionalFields¶
additional fields which was specified for the request
- Type:
dict
- property fields: Dict[str, int | float | str]¶
Get fields.
- Returns:
dict with base fields, “execution_time” and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- series: str = 'requests'¶
series “request”
- property tags: Dict[str, int | float | str]¶
Get tags.
- Returns:
dict with base tags and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- luna_tasks.crutches_on_wheels.cow.monitoring.points.getRoute(resource, method)[source]¶
Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method
- Returns:
{resource}”
- Return type:
“{method}
- Return type:
str
- luna_tasks.crutches_on_wheels.cow.monitoring.points.monitorTime(monitoringData, fieldName)[source]¶
Context manager for timing execution time.
- Parameters:
monitoringData – container for saving result
fieldName – field name
Module implement base class for monitoring
- class luna_tasks.crutches_on_wheels.cow.monitoring.manager.LunaMonitoringManager(settings, pluginManager)[source]¶
Monitoring manager. Sends data to the monitoring storage and monitoring plugins. .. attribute:: settings
monitoring storage settings
- class luna_tasks.crutches_on_wheels.cow.monitoring.manager.MonitoringSettings(*args, **kwargs)[source]¶
Monitoring settings protocol
Module contains classes for sending a data to an influx monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.BaseMonitoringAdapter(settings, flushingPeriod)[source]¶
Base monitoring adapter.
- backgroundScheduler¶
runner for periodic flushing monitoring points
- Type:
AsyncIOScheduler
- _buffer¶
list of buffering points which is waiting sending to influx
- Type:
- flushingPeriod¶
period of flushing points (in seconds)
- Type:
float
- _influxSettings¶
current influx settings
- Type:
- _job¶
sending monitoring data job
- Type:
Job
- addPointsToBuffer(points)[source]¶
Add points to buffer.
- Parameters:
points – points
- Return type:
None
- static convertFieldsToInfluxLineProtocol(fields)[source]¶
Convert field value to influx line protocol format
- Parameters:
fields – dict with values to convert
- Retruns:
line protocol string
- Return type:
str
- generatePointStr(point)[source]¶
Generate string from point
- Parameters:
point – point
- Returns:
influx line protocol string
- Return type:
str