Source code for luna_licenses.crutches_on_wheels.cow.monitoring.influx_adapter

"""
Module contains classes for sending a data to an influx monitoring.
"""

# Ignore no_pandas_warning in aioinflux
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterable, List, Optional

import aiohttp
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name

warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException  # pylint: disable-msg=C0413

warnings.simplefilter("default", UserWarning)

from influxdb_client.rest import ApiException  # pylint: disable-msg=C0413

from ..utils.log import Logger  # pylint: disable-msg=C0413
from .helpers import lineProtocolFieldsGenerator  # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint  # pylint: disable-msg=C0413


[docs] @dataclass class InfluxSettings: """Container for influx 2.x settings""" # influx base url url: str # bucket name bucket: str # workspace organization: str # authentication token token: str
[docs] class BaseMonitoringAdapter(ABC): """ Base monitoring adapter. Attributes: backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points _buffer (List[BaseRequestMonitoringPoint]): list of buffering points which is waiting sending to influx flushingPeriod (float): period of flushing points (in seconds) logger (Logger): logger _influxSettings (InfluxSettings): current influx settings _job (Job): sending monitoring data job """ def __init__(self, settings: InfluxSettings, flushingPeriod: int): self._influxSettings = settings self._buffer: List[BaseRequestMonitoringPoint] = [] self.flushingPeriod = flushingPeriod self.logger = Logger() self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name()) self._job: Optional[Job] = None @abstractmethod async def _writePoints(self, points): """ Write points. Args: points """
[docs] def initializeScheduler(self) -> None: """ Start the loop for sending data from the buffer to monitoring. """ self.logger.info("start sending data to influx") self._job = self.backgroundScheduler.add_job( self._flushPointsFromBufferToInflux, trigger="interval", seconds=self.flushingPeriod, ) self.backgroundScheduler.start()
[docs] def stopScheduler(self) -> None: """ Stop monitoring. """ if self.backgroundScheduler.running: self.backgroundScheduler.shutdown() self.logger.info("stop sending data to influx")
[docs] def updateFlushingPeriod(self, newPeriod: int): """ Update flushing period Args: newPeriod: new period """ self._job.reschedule(trigger="interval", seconds=newPeriod) self.flushingPeriod = newPeriod
[docs] def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to buffer. Args: points: points """ self._buffer.extend((self.generatePointStr(point) for point in points))
[docs] def generatePointStr(self, point: BaseMonitoringPoint) -> str: """ Generate string from point Args: point: point Returns: influx line protocol string """ time = int(point.eventTime * 10**9) measurement = point.series tags = ",".join([f"{tag}={value}" for tag, value in point.tags.items()]) if hasattr(point, "convertFieldsToInfluxLineProtocol"): fields = point.convertFieldsToInfluxLineProtocol() else: fields = self.convertFieldsToInfluxLineProtocol(point.fields) pointStr = f"{measurement},{tags} {fields} {time}" return pointStr
[docs] @staticmethod def convertFieldsToInfluxLineProtocol(fields: dict) -> str: """ Convert field value to influx line protocol format Args: fields: dict with values to convert Retruns: line protocol string """ return ",".join(lineProtocolFieldsGenerator(fields))
async def _flushPointsFromBufferToInflux(self) -> None: """ Send request to influx """ try: if self._buffer: copyPoints = self._buffer[:] self._buffer = [] await self._writePoints(copyPoints) self.logger.debug(f"send {len(copyPoints)} points to monitoring") except HTTPException as exc: self.logger.warning(f"failed write monitoring buffering points, message: {exc}") except Exception as exc: # pylint: disable-msg=broad-except self.logger.error(f"failed write monitoring point, message: {exc}")
[docs] class InfluxMonitoringAdapter(BaseMonitoringAdapter): """ Influx 2.x adaptor. Suspended to send points to an influxdb Attributes: bucket (str): influx bucket name """ def __init__(self, settings: InfluxSettings, flushingPeriod: int): super().__init__(settings, flushingPeriod=flushingPeriod) self.bucket = settings.bucket self.session = aiohttp.ClientSession() async def _writePoints(self, points): """ Write points. Args: points """ try: headers = { "Content-Type": "text/plain", "Authorization": f"Token {self._influxSettings.token}", } params = {"bucket": self.bucket, "org": self._influxSettings.organization} async def fetch(session, data): resp = await session.post( f"{self._influxSettings.url}/api/v2/write", data=data, headers=headers, params=params ) return resp data = "\n".join(points) return await fetch(self.session, data.encode()) except ApiException as exc: self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs] def initializeMonitoring(self) -> None: """ Initialize monitoring. """ self.initializeScheduler()
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring (cancel all request and stop getting new). """ self.stopScheduler() await self.session.close()