Source code for luna_configurator.crutches_on_wheels.cow.monitoring.influx_adapter
"""
Module contains classes for sending a data to an influx monitoring.
"""
# Ignore no_pandas_warning in aioinflux
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterable, List, Optional
import aiohttp
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name
warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException # pylint: disable-msg=C0413
warnings.simplefilter("default", UserWarning)
from influxdb_client.rest import ApiException # pylint: disable-msg=C0413
from ..utils.log import Logger # pylint: disable-msg=C0413
from .helpers import lineProtocolFieldsGenerator # pylint: disable-msg=C0413
from .points import BaseMonitoringPoint, BaseRequestMonitoringPoint # pylint: disable-msg=C0413
[docs]
@dataclass
class InfluxSettings:
"""Container for influx 2.x settings"""
# influx base url
url: str
# bucket name
bucket: str
# workspace
organization: str
# authentication token
token: str
[docs]
class BaseMonitoringAdapter(ABC):
"""
Base monitoring adapter.
Attributes:
backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points
_buffer (List[BaseRequestMonitoringPoint]): list of buffering points which is waiting sending to influx
flushingPeriod (float): period of flushing points (in seconds)
logger (Logger): logger
_influxSettings (InfluxSettings): current influx settings
_job (Job): sending monitoring data job
"""
def __init__(self, settings: InfluxSettings, flushingPeriod: int):
self._influxSettings = settings
self._buffer: List[BaseRequestMonitoringPoint] = []
self.flushingPeriod = flushingPeriod
self.logger = Logger()
self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
self._job: Optional[Job] = None
@abstractmethod
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
[docs]
def initializeScheduler(self) -> None:
"""
Start the loop for sending data from the buffer to monitoring.
"""
self.logger.info("start sending data to influx")
self._job = self.backgroundScheduler.add_job(
self._flushPointsFromBufferToInflux,
trigger="interval",
seconds=self.flushingPeriod,
)
self.backgroundScheduler.start()
[docs]
def stopScheduler(self) -> None:
"""
Stop monitoring.
"""
if self.backgroundScheduler.running:
self.backgroundScheduler.shutdown()
self.logger.info("stop sending data to influx")
[docs]
def updateFlushingPeriod(self, newPeriod: int):
"""
Update flushing period
Args:
newPeriod: new period
"""
self._job.reschedule(trigger="interval", seconds=newPeriod)
self.flushingPeriod = newPeriod
[docs]
def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to buffer.
Args:
points: points
"""
self._buffer.extend((self.generatePointStr(point) for point in points))
[docs]
def generatePointStr(self, point: BaseMonitoringPoint) -> str:
"""
Generate string from point
Args:
point: point
Returns:
influx line protocol string
"""
time = int(point.eventTime * 10**9)
measurement = point.series
tags = ",".join([f"{tag}={value}" for tag, value in point.tags.items()])
if hasattr(point, "convertFieldsToInfluxLineProtocol"):
fields = point.convertFieldsToInfluxLineProtocol()
else:
fields = self.convertFieldsToInfluxLineProtocol(point.fields)
pointStr = f"{measurement},{tags} {fields} {time}"
return pointStr
[docs]
@staticmethod
def convertFieldsToInfluxLineProtocol(fields: dict) -> str:
"""
Convert field value to influx line protocol format
Args:
fields: dict with values to convert
Retruns:
line protocol string
"""
return ",".join(lineProtocolFieldsGenerator(fields))
async def _flushPointsFromBufferToInflux(self) -> None:
"""
Send request to influx
"""
try:
if self._buffer:
copyPoints = self._buffer[:]
self._buffer = []
await self._writePoints(copyPoints)
self.logger.debug(f"send {len(copyPoints)} points to monitoring")
except HTTPException as exc:
self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
except Exception as exc: # pylint: disable-msg=broad-except
self.logger.error(f"failed write monitoring point, message: {exc}")
[docs]
class InfluxMonitoringAdapter(BaseMonitoringAdapter):
"""
Influx 2.x adaptor. Suspended to send points to an influxdb
Attributes:
bucket (str): influx bucket name
"""
def __init__(self, settings: InfluxSettings, flushingPeriod: int):
super().__init__(settings, flushingPeriod=flushingPeriod)
self.bucket = settings.bucket
self.session = aiohttp.ClientSession()
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
try:
headers = {
"Content-Type": "text/plain",
"Authorization": f"Token {self._influxSettings.token}",
}
params = {"bucket": self.bucket, "org": self._influxSettings.organization}
async def fetch(session, data):
resp = await session.post(
f"{self._influxSettings.url}/api/v2/write", data=data, headers=headers, params=params
)
return resp
data = "\n".join(points)
return await fetch(self.session, data.encode())
except ApiException as exc:
self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs]
def initializeMonitoring(self) -> None:
"""
Initialize monitoring.
"""
self.initializeScheduler()
[docs]
async def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all request and stop getting new).
"""
self.stopScheduler()
await self.session.close()