Monitoring

Data for monitoring

Now we monitor several types of events:

  • request, all HTTP requests.

  • error, all failed HTTP requests.

  • subtask execution, some messages about a process of executing a task.

  • subtask errors, all errors, generated during subtask processing.

  • subtask final, messages about the end of subtask processing.

  • worker requests, all messages, generated during subtask sending to workers.

  • worker queue, some messages about a transportation task through a worker queue.

Every event is a point in the time series. The point is represented as a union of the following data:

  • series name (now requests and errors)

  • start request time

  • tags, indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

  • fields, non-indexed data in storage, dictionary: keys - string tag names, values - string, integer, float

‘Requests’ series. Triggered on every HTTP request. Each point contains data about the corresponding request (execution time and etc).

  • tags

    tag name

    description

    service

    “luna-tasks” or “luna-tasks-worker”

    route

    concatenation of a request method and a request resource (GET:/version)

    status_code

    http status code of response

  • fields

    fields

    description

    request_id

    request id

    execution_time

    request execution time

‘Errors’ series. Triggered on failed HTTP request. Each point contains error_code of luna error.

  • tags

    tag name

    description

    service

    “luna-tasks” or “luna-tasks-worker”

    route

    concatenation of a request method and a request resource (GET:/version)

    status_code

    http status code of response

    error_code

    luna error code

  • fields

    fields

    description

    request_id

    request id

‘Subtask errors’ series. Triggered on an error in a pipeline of a task processing.

  • tags

    tag name

    description

    service

    always “luna-tasks-worker”

    task_id

    task id

    subtask_id

    subtask id

    task_type

    task type (1, 2, 3, …)

    error_code

    luna error code

  • fields

    fields

    description

    error_code

    luna error code

‘subtask execution’ series. Triggered on an error in a pipeline of a task processing.

  • tags

    tag name

    description

    service

    always “luna-tasks-worker”

    task_id

    task id

    subtask_id

    subtask id

    task_type

    task type (1, 2, 3, …)

  • fields

    fields

    description

    save_result_time

    time of saving result in image-store (all task type)

    linked_faces

    approximate linked faces count (linker task)

    extracted_descriptors

    extracted descriptors count (additional extract task)

    events

    removed event ids count (gc task)

    match_time

    matching time (crossmatching task)

    clusterization_report_build_time

    build report time (reporter task)

‘subtask final’ series. Triggered on an end of subtask execution.

  • tags

    tag name

    description

    service

    always “luna-tasks-worker”

    task_id

    task id

    subtask_id

    subtask id

    task_type

    task type (1, 2, 3, …)

  • fields

    fields

    description

    execution_time

    execution subtask time

‘worker queue’ series. Triggered on a getting task from the worker queue.

  • tags

    tag name

    description

    service

    always “luna-tasks-worker”

    task_id

    task id

    subtask_id

    subtask id

    task_type

    task type (1, 2, 3, …)

  • fields

    fields

    description

    transport_time

    time between receiving task of a worker and start execution

‘worker requests’ series. Triggered after sending subtask to a worker.

  • tags

    tag name

    description

    service

    always “luna-tasks-worker”

    task_id

    task id

    subtask_id

    subtask id

  • fields

    fields

    description

    transport_time

    initialize sending a task subtasks and send subtask to thr worker execution

It can be to add additional tags or fields.

Database

Monitoring is implemented as data sending to an influx database. You can set up your database credentials in configuration file in section “monitoring”.

Classes

class luna_tasks.monitoring.points.SubtaskErrorPoint(eventTime, subtaskId, taskId, taskType, errorCode, additionalTags=None, additionalFields=None)[source]

Point for sub task error monitoring.

errorCode

error code

Type

int

additionalTags

additional tags

Type

dict

additionalFields

additional fields

Type

dict

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

series: str = 'subtask_errors'

series

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

class luna_tasks.monitoring.points.SubtaskExecutionPoint(eventTime, subtaskId, taskId, taskType, additionalTags=None, additionalFields=None)[source]

Point for monitoring subtask execution

additionalTags

additional tags for point

Type

dict

additionalFields

additional fields for point

Type

dict

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

series: str = 'subtask_execution'

series

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

class luna_tasks.monitoring.points.SubtaskFinalPoint(eventTime, subTaskId, taskId, taskType, executionTime, status, additionalTags=None, additionalFields=None)[source]

Point for monitoring subtask ending.

additionalTags

additional tags for point

Type

dict

additionalFields

additional fields for point

Type

dict

executionTime

subtask execution time

Type

float

status

subtask status

Type

int

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

series: str = 'subtask_final'

series

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

class luna_tasks.monitoring.points.SubtaskMonitoringBasePoint(eventTime, subtaskId, taskId, taskType)[source]

Base class for monitoring an subtask execution .. attribute:: subtaskId

subtask id

type

int

taskId

task id

Type

int

taskType

task type

Type

int

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

classmethod initialize(serviceName)[source]

Initialize request point for custom service. :param serviceName: service name

service: str

service name

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

class luna_tasks.monitoring.points.WorkerQueuePoint(eventTime, subtaskId, taskId, taskType, transportTime)[source]

Point for a worker queue monitoring .. attribute:: transportTime

time between receiving task of a worker and start execution

type

float

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

series: str = 'worker_queue'

series

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

class luna_tasks.monitoring.points.WorkerRequestPoint(eventTime, subtaskId, taskId, transportTime)[source]

Point for monitoring queue of sending subtasks to workers.

taskId

task id

Type

int

subtaskId

subtask id

Type

int

transportTime

initialize sending a task subtasks and send subtask to thr worker execution

Type

float

property fields: dict

Get point fields

Returns

dict with additional fields

Return type

dict

classmethod initialize(serviceName)[source]

Initialize request point for custom service. :param serviceName: service name

series: str = 'worker_requests'

series

service: str

service name

property tags: dict

Get point tags

Returns

dict with service as keys + additionalTags

Return type

dict

Module contains points for monitoring’s.

class luna_tasks.crutches_on_wheels.monitoring.points.BaseMonitoringPoint(eventTime)[source]

Abstract class for points

eventTime

event time as timestamp

Type

float

abstract property fields: Dict[str, Union[int, float, str]]

Get tags from point. We supposed that fields are not indexing data

Returns

dict with fields.

Return type

Dict[str, Union[int, float, str]]

abstract property tags: Dict[str, Union[int, float, str]]

Get tags from point. We supposed that tags are indexing data

Returns

dict with tags.

Return type

Dict[str, Union[int, float, str]]

class luna_tasks.crutches_on_wheels.monitoring.points.BaseRequestMonitoringPoint(requestId, resource, method, requestTime, service, statusCode)[source]

Base class for point which is associated with requests.

requestId

request id

Type

str

route

concatenation of a request method and a request resource

Type

str

service

service name

Type

str

requestTime

a request processing start timestamp

Type

float

statusCode

status code of a request response.

Type

int

property fields: Dict[str, Union[int, float, str]]

Get fields

Returns

“request_id”

Return type

dict with following keys

Return type

Dict[str, Union[int, float, str]]

property tags: Dict[str, Union[int, float, str]]

Get tags

Returns

“route”, “service”, “status_code”

Return type

dict with following keys

Return type

Dict[str, Union[int, float, str]]

class luna_tasks.crutches_on_wheels.monitoring.points.DataForMonitoring(tags=<factory>, fields=<factory>)[source]

Class fo storing an additional data for monitoring.

class luna_tasks.crutches_on_wheels.monitoring.points.RequestErrorMonitoringPoint(requestId, resource, method, errorCode, service, requestTime, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring requests errors (error codes)

errorCode

error code

Type

int

additionalTags

additional tags which was specified for the request

Type

dict

additionalFields

additional fields which was specified for the request

Type

dict

property fields: Dict[str, Union[int, float, str]]

Get fields.

Returns

dict with base fields and additional tags

Return type

Dict[str, Union[int, float, str]]

series: str = 'errors'

series “errors”

property tags: Dict[str, Union[int, float, str]]

Get tags.

Returns

dict with base tags, “error_code” and additional tags

Return type

Dict[str, Union[int, float, str]]

class luna_tasks.crutches_on_wheels.monitoring.points.RequestMonitoringPoint(requestId, resource, method, executionTime, requestTime, service, statusCode, additionalTags=None, additionalFields=None)[source]

Request monitoring point is suspended for monitoring all requests and measure a request time and etc.

executionTime

execution time

Type

float

additionalTags

additional tags which was specified for the request

Type

dict

additionalFields

additional fields which was specified for the request

Type

dict

property fields: Dict[str, Union[int, float, str]]

Get fields.

Returns

dict with base fields, “execution_time” and additional tags

Return type

Dict[str, Union[int, float, str]]

series: str = 'requests'

series “request”

property tags: Dict[str, Union[int, float, str]]

Get tags.

Returns

dict with base tags and additional tags

Return type

Dict[str, Union[int, float, str]]

luna_tasks.crutches_on_wheels.monitoring.points.getRoute(resource, method)[source]

Get a request route, concatenation of a request method and a request resource :param resource: resource :param method: method

Returns

{resource}”

Return type

“{method}

Return type

str

luna_tasks.crutches_on_wheels.monitoring.points.monitorTime(monitoringData, fieldName)[source]

Context manager for timing execution time.

Parameters
  • monitoringData – container for saving result

  • fieldName – field name

Module implement base class for monitoring

class luna_tasks.crutches_on_wheels.monitoring.base_monitoring.BaseLunaMonitoring[source]

Base class for monitoring

abstract flushPoints(points)[source]

Flush point to monitoring.

Parameters

points – point

Return type

None

class luna_tasks.crutches_on_wheels.monitoring.base_monitoring.LunaRequestInfluxMonitoring(credentials, host='localhost', port=8086, ssl=False, flushingPeriod=1)[source]

Class for sending data which is associated with request to influx .. attribute:: settings

influxdb settings

type

InfluxSettings

flushingPeriod

period of flushing points (in seconds)

Type

int

flushPoints(points)[source]

Flush point to influx.

Parameters

points – point

Return type

None

initializeMonitoring()[source]

Initialize monitoring

Return type

None

static prepareSettings(credentials, host, port, ssl)[source]

Prepare influxdb settings :param credentials: database credentials :param host: influx host :param port: influx port :param ssl: use or not ssl for connecting to influx

Returns

influxdb settings container

Return type

InfluxSettings

async stopMonitoring()[source]

Stop monitoring.

Return type

None

Module contains classes for sending a data to an influx monitoring.

class luna_tasks.crutches_on_wheels.monitoring.influx_adapter.BaseMonitoringAdapter(settings, flushingPeriod)[source]

Base monitoring adapter.

client

influx client

Type

InfluxDBClient

backgroundScheduler

runner for periodic flushing monitoring points

Type

AsyncIOScheduler

_buffer

list of buffering points which is waiting sending to influx

Type

List[BaseRequestMonitoringPoint]

flushingPeriod

period of flushing points (in seconds)

Type

float

logger

logger

Type

Logger

_influxSettings

current influx settings

Type

InfluxSettings

_job

sending monitoring data job

Type

Job

addPointsToBuffer(points)[source]

Add points to buffer.

Parameters

points – points

Return type

None

static convertPointToDict(point)[source]

Convert point to influx client point :param point: point

Returns

  • ‘time’ - timestamp in nanoseconds

  • ’measurement’ - time series

  • ’tags’ - dict of tags (values are str)

  • ’fields’ - dict of fields

Return type

dict

Return type

dict

abstract static getClient(settings)[source]

Prepare influx client.

initializeScheduler()[source]

Start the loop for sending data from the buffer to monitoring.

Return type

None

stopScheduler()[source]

Stop monitoring.

Return type

None

updateFlushingPeriod(newPeriod)[source]

Update flushing period :param newPeriod: new period

class luna_tasks.crutches_on_wheels.monitoring.influx_adapter.InfluxMonitoringAdapter(settings, flushingPeriod)[source]

Influx 2.x adaptor. Suspended to send points to an influxdb

client

influx 2.x client

Type

InfluxDBClient

bucket

influx bucket name

Type

str

static getClient(settings)[source]

Initialize influx 2.x client. :param settings: influx 2.x settings

Returns

influx 2.0 write api

Return type

WriteApi

initializeMonitoring()[source]

Initialize monitoring.

Return type

None

stopMonitoring()[source]

Stop monitoring (cancel all request and stop getting new).

Return type

None

class luna_tasks.crutches_on_wheels.monitoring.influx_adapter.InfluxSettings(url, bucket, organization, token, ssl)[source]

Container for influx 2.x settings