Source code for luna_admin.crutches_on_wheels.cow.monitoring.influx_adapter

"""
Module contains classes for sending a data to an influx monitoring.
"""

# Ignore no_pandas_warning in aioinflux
import warnings
from dataclasses import dataclass
from typing import Iterable

import aiohttp

warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException  # pylint: disable-msg=C0413

warnings.simplefilter("default", UserWarning)

from influxdb_client.rest import ApiException  # pylint: disable-msg=C0413

from .base_adapter import BaseMonitoringAdapter  # pylint: disable-msg=C0413
from .helpers import lineProtocolFieldsGenerator  # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint  # pylint: disable-msg=C0413


[docs] @dataclass class InfluxSettings: """Container for influx 2.x settings""" # influx base url url: str # bucket name bucket: str # workspace organization: str # authentication token token: str
[docs] class InfluxMonitoringAdapter(BaseMonitoringAdapter): """ Influx 2.x adaptor. Suspended to send points to an influxdb Attributes: bucket (str): influx bucket name """ _buffer: list[str] def __init__(self, settings: InfluxSettings, flushingPeriod: int): super().__init__(flushingPeriod=flushingPeriod) self._influxSettings = settings self._buffer = [] self.bucket = settings.bucket self.session = aiohttp.ClientSession()
[docs] def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to buffer. Args: points: points """ self._buffer.extend((self.generatePointStr(point) for point in points))
[docs] def generatePointStr(self, point: BaseMonitoringPoint) -> str: """ Generate string from point Args: point: point Returns: influx line protocol string """ time = int(point.eventTime * 10**9) measurement = point.series tags = ",".join([f"{tag}={value}" for tag, value in point.tags.items()]) if hasattr(point, "convertFieldsToInfluxLineProtocol"): fields = point.convertFieldsToInfluxLineProtocol() else: fields = self.convertFieldsToInfluxLineProtocol(point.fields) pointStr = f"{measurement},{tags} {fields} {time}" return pointStr
[docs] @staticmethod def convertFieldsToInfluxLineProtocol(fields: dict) -> str: """ Convert field value to influx line protocol format Args: fields: dict with values to convert Returns: line protocol string """ return ",".join(lineProtocolFieldsGenerator(fields))
async def _writePoints(self, points): """ Write points. Args: points """ try: headers = { "Content-Type": "text/plain", "Authorization": f"Token {self._influxSettings.token}", } params = {"bucket": self.bucket, "org": self._influxSettings.organization} async def fetch(session, data): resp = await session.post( f"{self._influxSettings.url}/api/v2/write", data=data, headers=headers, params=params ) return resp data = "\n".join(points) return await fetch(self.session, data.encode()) except (ApiException, HTTPException) as exc: self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs] def clearBuffer(self) -> list[str]: """Clear monitoring buffer""" copyPoints = self._buffer.copy() self._buffer = [] return copyPoints
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring (cancel all request and stop getting new). """ await super().stopMonitoring() await self.session.close()