Monitoring¶
Data for monitoring¶
Now we monitor several types of events:
request, all HTTP requests.
error, all failed HTTP requests.
subtask execution, some messages about a process of executing a task.
subtask errors, all errors, generated during subtask processing.
subtask final, messages about the end of subtask processing.
worker requests, all messages, generated during subtask sending to workers.
worker queue, some messages about a transportation task through a worker queue.
Every event is a point in the time series. The point is represented as a union of the following data:
series name (now requests and errors)
start request time
tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
fields, non-indexed data in storage, dictionary: keys - string tag names, values - string, integer, float
‘Requests’ series. Triggered on every HTTP request. Each point contains data about the corresponding request (execution time and etc).
tags
tag name
description
service
“luna-tasks” or “luna-tasks-worker”
route
concatenation of a request method and a request resource (GET:/version)
status_code
http status code of response
fields
fields
description
request_id
request id
execution_time
request execution time
‘Errors’ series. Triggered on failed HTTP request. Each point contains error_code of luna error.
tags
tag name
description
service
“luna-tasks” or “luna-tasks-worker”
route
concatenation of a request method and a request resource (GET:/version)
status_code
http status code of response
error_code
luna error code
fields
fields
description
request_id
request id
‘Subtask errors’ series. Triggered on an error in a pipeline of a task processing.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
error_code
luna error code
fields
fields
description
error_code
luna error code
‘subtask execution’ series. Triggered on an error in a pipeline of a task processing.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
fields
fields
description
save_result_time
time of saving result in image-store (all task type)
linked_faces
approximate linked faces count (linker task)
extracted_descriptors
extracted descriptors count (additional extract task)
events
removed event ids count (gc task)
match_time
matching time (crossmatching task)
clusterization_report_build_time
build report time (reporter task)
‘subtask final’ series. Triggered on an end of subtask execution.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
fields
fields
description
execution_time
execution subtask time
‘worker queue’ series. Triggered on a getting task from the worker queue.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
task_type
task type (1, 2, 3, …)
fields
fields
description
transport_time
time between receiving task of a worker and start execution
‘worker requests’ series. Triggered after sending subtask to a worker.
tags
tag name
description
service
always “luna-tasks-worker”
task_id
task id
subtask_id
subtask id
fields
fields
description
transport_time
initialize sending a task subtasks and send subtask to thr worker execution
It can be to add additional tags or fields.
Database¶
Monitoring is implemented as data sending to an influx database. You can set up your database credentials in configuration file in section “monitoring”.
Classes¶
- class luna_tasks.monitoring.points.SubtaskErrorPoint(eventTime, subtaskId, taskId, taskType, errorCode, additionalTags=None, additionalFields=None)[source]¶
Point for sub task error monitoring.
- errorCode¶
error code
- Type:
int
- additionalTags¶
additional tags
- Type:
dict
- additionalFields¶
additional fields
- Type:
dict
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_errors'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskExecutionPoint(eventTime, subtaskId, taskId, taskType, additionalTags=None, additionalFields=None)[source]¶
Point for monitoring subtask execution
- additionalTags¶
additional tags for point
- Type:
dict
- additionalFields¶
additional fields for point
- Type:
dict
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_execution'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskFinalPoint(eventTime, subTaskId, taskId, taskType, executionTime, status, additionalTags=None, additionalFields=None)[source]¶
Point for monitoring subtask ending.
- additionalTags¶
additional tags for point
- Type:
dict
- additionalFields¶
additional fields for point
- Type:
dict
- executionTime¶
subtask execution time
- Type:
float
- status¶
subtask status
- Type:
int
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'subtask_final'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.SubtaskMonitoringBasePoint(eventTime, subtaskId, taskId, taskType)[source]¶
Base class for monitoring an subtask execution .. attribute:: subtaskId
subtask id
- type:
int
- taskId¶
task id
- Type:
int
- taskType¶
task type
- Type:
int
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- classmethod initialize(serviceName)[source]¶
Initialize request point for custom service. :param serviceName: service name
- service: str¶
service name
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.WorkerQueuePoint(eventTime, subtaskId, taskId, taskType, transportTime)[source]¶
Point for a worker queue monitoring .. attribute:: transportTime
time between receiving task of a worker and start execution
- type:
float
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- series: str = 'worker_queue'¶
series
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
- class luna_tasks.monitoring.points.WorkerRequestPoint(eventTime, subtaskId, taskId, transportTime)[source]¶
Point for monitoring queue of sending subtasks to workers.
- taskId¶
task id
- Type:
int
- subtaskId¶
subtask id
- Type:
int
- transportTime¶
initialize sending a task subtasks and send subtask to thr worker execution
- Type:
float
- property fields: dict¶
Get point fields
- Returns:
dict with additional fields
- Return type:
dict
- classmethod initialize(serviceName)[source]¶
Initialize request point for custom service. :param serviceName: service name
- series: str = 'worker_requests'¶
series
- service: str¶
service name
- property tags: dict¶
Get point tags
- Returns:
dict with service as keys + additionalTags
- Return type:
dict
Module contains points for monitoring’s.
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseMonitoringPoint(eventTime)[source]¶
Abstract class for points
- eventTime¶
event time as timestamp
- Type:
float
- abstract property fields: Dict[str, int | float | str]¶
Get tags from point. We supposed that fields are not indexing data
- Returns:
dict with fields.
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- abstract property tags: Dict[str, int | float | str]¶
Get tags from point. We supposed that tags are indexing data
- Returns:
dict with tags.
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]¶
Base class for point which is associated with requests.
- requestId¶
request id
- Type:
str
- route¶
concatenation of a request method and a request resource
- Type:
str
- service¶
service name
- Type:
str
- requestTime¶
a request processing start timestamp
- Type:
float
- statusCode¶
status code of a request response.
- Type:
int
- property fields: Dict[str, int | float | str]¶
Get fields
- Returns:
“request_id”
- Return type:
dict with following keys
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- property tags: Dict[str, int | float | str]¶
Get tags
- Returns:
“route”, “service”, “status_code”
- Return type:
dict with following keys
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]¶
Class fo storing an additional data for monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]¶
Request monitoring point is suspended for monitoring requests errors (error codes)
- errorCode¶
error code
- Type:
int
- additionalTags¶
additional tags which was specified for the request
- Type:
dict
- additionalFields¶
additional fields which was specified for the request
- Type:
dict
- property fields: Dict[str, int | float | str]¶
Get fields.
- Returns:
dict with base fields and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- series: str = 'errors'¶
series “errors”
- property tags: Dict[str, int | float | str]¶
Get tags.
- Returns:
dict with base tags, “error_code” and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- class luna_tasks.crutches_on_wheels.cow.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]¶
Request monitoring point is suspended for monitoring all requests and measure a request time and etc.
- executionTime¶
execution time
- Type:
float
- additionalTags¶
additional tags which was specified for the request
- Type:
dict
- additionalFields¶
additional fields which was specified for the request
- Type:
dict
- property fields: Dict[str, int | float | str]¶
Get fields.
- Returns:
dict with base fields, “execution_time” and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- series: str = 'requests'¶
series “request”
- property tags: Dict[str, int | float | str]¶
Get tags.
- Returns:
dict with base tags and additional tags
- Return type:
Dict
[str
,Union
[int
,float
,str
]]
- luna_tasks.crutches_on_wheels.cow.monitoring.points.getRoute(resource, method)[source]¶
Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method
- Returns:
{resource}”
- Return type:
“{method}
- Return type:
str
- luna_tasks.crutches_on_wheels.cow.monitoring.points.monitorTime(monitoringData, fieldName)[source]¶
Context manager for timing execution time.
- Parameters:
monitoringData – container for saving result
fieldName – field name
Module implement base class for monitoring
- class luna_tasks.crutches_on_wheels.cow.monitoring.base_monitoring.BaseLunaMonitoring[source]¶
Base class for monitoring
- class luna_tasks.crutches_on_wheels.cow.monitoring.base_monitoring.LunaRequestInfluxMonitoring(credentials, host='localhost', port=8086, ssl=False, flushingPeriod=1)[source]¶
Class for sending data which is associated with request to influx .. attribute:: settings
influxdb settings
- type:
InfluxSettings
- flushingPeriod¶
period of flushing points (in seconds)
- Type:
int
Module contains classes for sending a data to an influx monitoring.
- class luna_tasks.crutches_on_wheels.cow.monitoring.influx_adapter.BaseMonitoringAdapter(settings, flushingPeriod)[source]¶
Base monitoring adapter.
- backgroundScheduler¶
runner for periodic flushing monitoring points
- Type:
AsyncIOScheduler
- _buffer¶
list of buffering points which is waiting sending to influx
- Type:
- flushingPeriod¶
period of flushing points (in seconds)
- Type:
float
- _influxSettings¶
current influx settings
- Type:
- _job¶
sending monitoring data job
- Type:
Job
- addPointsToBuffer(points)[source]¶
Add points to buffer.
- Parameters:
points – points
- Return type:
None
- static convertFieldsToInfluxLineProtocol(fields)[source]¶
Convert field value to influx line protocol format
- Parameters:
fields – dict with values to convert
- Retruns:
line protocol string
- Return type:
str
- generatePointStr(point)[source]¶
Generate string from point
- Parameters:
point – point
- Returns:
influx line protocol string
- Return type:
str