Source code for luna_tasks.crutches_on_wheels.monitoring.influx_adapter

"""
Module contains classes for sending a data to an influx monitoring.
"""
import asyncio

# Ignore no_pandas_warning in aioinflux
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import partial
from typing import Optional, List, Iterable

from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name

warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException  # pylint: disable-msg=C0413

warnings.simplefilter("default", UserWarning)

from influxdb_client import InfluxDBClient  # pylint: disable-msg=C0413
from influxdb_client.client.write_api import SYNCHRONOUS, WriteApi  # pylint: disable-msg=C0413
from influxdb_client.rest import ApiException  # pylint: disable-msg=C0413

from .points import BaseRequestMonitoringPoint, BaseMonitoringPoint  # pylint: disable-msg=C0413
from ..utils.log import Logger  # pylint: disable-msg=C0413


[docs]@dataclass class InfluxSettings: """Container for influx 2.x settings""" # influx base url url: str # bucket name bucket: str # workspace organization: str # authentication token token: str # use or not ssl for connecting to influx ssl: bool
[docs]class BaseMonitoringAdapter(ABC): """ Base monitoring adapter. Attributes: client (InfluxDBClient): influx client backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points _buffer (List[BaseRequestMonitoringPoint]): list of buffering points which is waiting sending to influx flushingPeriod (float): period of flushing points (in seconds) logger (Logger): logger _influxSettings (InfluxSettings): current influx settings _job (Job): sending monitoring data job """ def __init__(self, settings: InfluxSettings, flushingPeriod: int): self.client = self.getClient(settings) self._influxSettings = settings self._buffer: List[BaseRequestMonitoringPoint] = [] self.flushingPeriod = flushingPeriod self.logger = Logger() self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name()) self._job: Optional[Job] = None
[docs] @staticmethod @abstractmethod def getClient(settings: InfluxSettings): """ Prepare influx client. """
@abstractmethod async def _writePoints(self, points): """ Write points. Args: points """
[docs] def initializeScheduler(self) -> None: """ Start the loop for sending data from the buffer to monitoring. """ self.logger.info("start sending data to influx") self._job = self.backgroundScheduler.add_job( self._flushPointsFromBufferToInflux, trigger="interval", seconds=self.flushingPeriod ) self.backgroundScheduler.start()
[docs] def stopScheduler(self) -> None: """ Stop monitoring. """ if self.backgroundScheduler.running: self.backgroundScheduler.shutdown() self.logger.info("stop sending data to influx")
[docs] def updateFlushingPeriod(self, newPeriod: int): """ Update flushing period Args: newPeriod: new period """ self._job.reschedule(trigger="interval", seconds=newPeriod) self.flushingPeriod = newPeriod
[docs] @staticmethod def convertPointToDict(point: BaseMonitoringPoint) -> dict: """ Convert point to influx client point Args: point: point Returns: dict: - 'time' - timestamp in nanoseconds - 'measurement' - time series - 'tags' - dict of tags (values are str) - 'fields' - dict of fields """ influxPoint = { "time": int(point.eventTime * 10 ** 9), "measurement": point.series, "tags": {tag: str(value) for tag, value in point.tags.items()}, "fields": point.fields, } return influxPoint
[docs] def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to buffer. Args: points: points """ self._buffer.extend((self.convertPointToDict(point) for point in points))
async def _flushPointsFromBufferToInflux(self) -> None: """ Send request to influx """ try: if self._buffer: copyPoints = self._buffer[:] self._buffer = [] await self._writePoints(copyPoints) self.logger.debug(f"send {len(copyPoints)} points to monitoring") except HTTPException as e: self.logger.warning(f"failed write monitoring buffering points, message: {e}") except Exception as e: self.logger.error(f"failed write monitoring point, message: {e}")
[docs]class InfluxMonitoringAdapter(BaseMonitoringAdapter): """ Influx 2.x adaptor. Suspended to send points to an influxdb Attributes: client (InfluxDBClient): influx 2.x client bucket (str): influx bucket name """ def __init__(self, settings: InfluxSettings, flushingPeriod: int): super().__init__(settings, flushingPeriod=flushingPeriod) self.bucket = settings.bucket self._write_api = self.client.write_api(write_options=SYNCHRONOUS)
[docs] @staticmethod def getClient(settings: InfluxSettings) -> WriteApi: """ Initialize influx 2.x client. Args: settings: influx 2.x settings Returns: influx 2.0 write api """ return InfluxDBClient( url=settings.url, token=settings.token, org=settings.organization, verify_ssl=settings.ssl )
async def _writePoints(self, points): """ Write points. Args: points """ try: # InfluxDB v2 write api in ASYNCHRONOUS mode is still blocking for some reasons. # So use SYNCHRONOUS mode write and run it in the default loop's executor loop = asyncio.get_running_loop() await loop.run_in_executor(None, partial(self._write_api.write, bucket=self.bucket, record=points)) except ApiException as e: self.logger.warning(f"failed write monitoring buffering points, message: {e}")
[docs] def initializeMonitoring(self) -> None: """ Initialize monitoring. """ self.initializeScheduler()
[docs] def stopMonitoring(self) -> None: """ Stop monitoring (cancel all request and stop getting new). """ self.stopScheduler() self._write_api.close()