Source code for luna_admin.crutches_on_wheels.cow.monitoring.influx_adapter
"""
Module contains classes for sending a data to an influx monitoring.
"""
# Ignore no_pandas_warning in aioinflux
import warnings
from dataclasses import dataclass
from typing import Iterable
import aiohttp
warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException # pylint: disable-msg=C0413
warnings.simplefilter("default", UserWarning)
from influxdb_client.rest import ApiException # pylint: disable-msg=C0413
from .base_adapter import BaseMonitoringAdapter # pylint: disable-msg=C0413
from .helpers import lineProtocolFieldsGenerator # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint # pylint: disable-msg=C0413
[docs]
@dataclass
class InfluxSettings:
"""Container for influx 2.x settings"""
# influx base url
url: str
# bucket name
bucket: str
# workspace
organization: str
# authentication token
token: str
[docs]
class InfluxMonitoringAdapter(BaseMonitoringAdapter):
"""
Influx 2.x adaptor. Suspended to send points to an influxdb
Attributes:
bucket (str): influx bucket name
"""
_buffer: list[str]
def __init__(self, settings: InfluxSettings, flushingPeriod: int):
super().__init__(flushingPeriod=flushingPeriod)
self._influxSettings = settings
self._buffer = []
self.bucket = settings.bucket
self.session = aiohttp.ClientSession()
[docs]
def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to buffer.
Args:
points: points
"""
self._buffer.extend((self.generatePointStr(point) for point in points))
[docs]
def generatePointStr(self, point: BaseMonitoringPoint) -> str:
"""
Generate string from point
Args:
point: point
Returns:
influx line protocol string
"""
time = int(point.eventTime * 10**9)
measurement = point.series
tags = ",".join([f"{tag}={value}" for tag, value in point.tags.items()])
if hasattr(point, "convertFieldsToInfluxLineProtocol"):
fields = point.convertFieldsToInfluxLineProtocol()
else:
fields = self.convertFieldsToInfluxLineProtocol(point.fields)
pointStr = f"{measurement},{tags} {fields} {time}"
return pointStr
[docs]
@staticmethod
def convertFieldsToInfluxLineProtocol(fields: dict) -> str:
"""
Convert field value to influx line protocol format
Args:
fields: dict with values to convert
Returns:
line protocol string
"""
return ",".join(lineProtocolFieldsGenerator(fields))
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
try:
headers = {
"Content-Type": "text/plain",
"Authorization": f"Token {self._influxSettings.token}",
}
params = {"bucket": self.bucket, "org": self._influxSettings.organization}
async def fetch(session, data):
resp = await session.post(
f"{self._influxSettings.url}/api/v2/write", data=data, headers=headers, params=params
)
return resp
data = "\n".join(points)
return await fetch(self.session, data.encode())
except (ApiException, HTTPException) as exc:
self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs]
def clearBuffer(self) -> list[str]:
"""Clear monitoring buffer"""
copyPoints = self._buffer.copy()
self._buffer = []
return copyPoints
[docs]
async def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all request and stop getting new).
"""
await super().stopMonitoring()
await self.session.close()