Source code for luna_admin.crutches_on_wheels.monitoring.influx_adapter
"""
Module contains classes for sending a data to an influx monitoring.
"""
import asyncio
# Ignore no_pandas_warning in aioinflux
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import partial
from typing import Optional, List, Iterable
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tzlocal import get_localzone_name
warnings.simplefilter("ignore", UserWarning)
from aiohttp.web_exceptions import HTTPException # pylint: disable-msg=C0413
warnings.simplefilter("default", UserWarning)
from influxdb_client import InfluxDBClient # pylint: disable-msg=C0413
from influxdb_client.client.write_api import SYNCHRONOUS, WriteApi # pylint: disable-msg=C0413
from influxdb_client.rest import ApiException # pylint: disable-msg=C0413
from .points import BaseRequestMonitoringPoint, BaseMonitoringPoint # pylint: disable-msg=C0413
from ..utils.log import Logger # pylint: disable-msg=C0413
[docs]@dataclass
class InfluxSettings:
"""Container for influx 2.x settings"""
# influx base url
url: str
# bucket name
bucket: str
# workspace
organization: str
# authentication token
token: str
# use or not ssl for connecting to influx
ssl: bool
[docs]class BaseMonitoringAdapter(ABC):
"""
Base monitoring adapter.
Attributes:
client (InfluxDBClient): influx client
backgroundScheduler (AsyncIOScheduler): runner for periodic flushing monitoring points
_buffer (List[BaseRequestMonitoringPoint]): list of buffering points which is waiting sending to influx
flushingPeriod (float): period of flushing points (in seconds)
logger (Logger): logger
_influxSettings (InfluxSettings): current influx settings
_job (Job): sending monitoring data job
"""
def __init__(self, settings: InfluxSettings, flushingPeriod: int):
self.client = self.getClient(settings)
self._influxSettings = settings
self._buffer: List[BaseRequestMonitoringPoint] = []
self.flushingPeriod = flushingPeriod
self.logger = Logger()
self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
self._job: Optional[Job] = None
[docs] @staticmethod
@abstractmethod
def getClient(settings: InfluxSettings):
"""
Prepare influx client.
"""
@abstractmethod
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
[docs] def initializeScheduler(self) -> None:
"""
Start the loop for sending data from the buffer to monitoring.
"""
self.logger.info("start sending data to influx")
self._job = self.backgroundScheduler.add_job(
self._flushPointsFromBufferToInflux, trigger="interval", seconds=self.flushingPeriod
)
self.backgroundScheduler.start()
[docs] def stopScheduler(self) -> None:
"""
Stop monitoring.
"""
if self.backgroundScheduler.running:
self.backgroundScheduler.shutdown()
self.logger.info("stop sending data to influx")
[docs] def updateFlushingPeriod(self, newPeriod: int):
"""
Update flushing period
Args:
newPeriod: new period
"""
self._job.reschedule(trigger="interval", seconds=newPeriod)
self.flushingPeriod = newPeriod
[docs] @staticmethod
def convertPointToDict(point: BaseMonitoringPoint) -> dict:
"""
Convert point to influx client point
Args:
point: point
Returns:
dict:
- 'time' - timestamp in nanoseconds
- 'measurement' - time series
- 'tags' - dict of tags (values are str)
- 'fields' - dict of fields
"""
influxPoint = {
"time": int(point.eventTime * 10 ** 9),
"measurement": point.series,
"tags": {tag: str(value) for tag, value in point.tags.items()},
"fields": point.fields,
}
return influxPoint
[docs] def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to buffer.
Args:
points: points
"""
self._buffer.extend((self.convertPointToDict(point) for point in points))
async def _flushPointsFromBufferToInflux(self) -> None:
"""
Send request to influx
"""
try:
if self._buffer:
copyPoints = self._buffer[:]
self._buffer = []
await self._writePoints(copyPoints)
self.logger.debug(f"send {len(copyPoints)} points to monitoring")
except HTTPException as e:
self.logger.warning(f"failed write monitoring buffering points, message: {e}")
except Exception as e:
self.logger.error(f"failed write monitoring point, message: {e}")
[docs]class InfluxMonitoringAdapter(BaseMonitoringAdapter):
"""
Influx 2.x adaptor. Suspended to send points to an influxdb
Attributes:
client (InfluxDBClient): influx 2.x client
bucket (str): influx bucket name
"""
def __init__(self, settings: InfluxSettings, flushingPeriod: int):
super().__init__(settings, flushingPeriod=flushingPeriod)
self.bucket = settings.bucket
self._write_api = self.client.write_api(write_options=SYNCHRONOUS)
[docs] @staticmethod
def getClient(settings: InfluxSettings) -> WriteApi:
"""
Initialize influx 2.x client.
Args:
settings: influx 2.x settings
Returns:
influx 2.0 write api
"""
return InfluxDBClient(
url=settings.url, token=settings.token, org=settings.organization, verify_ssl=settings.ssl
)
async def _writePoints(self, points):
"""
Write points.
Args:
points
"""
try:
# InfluxDB v2 write api in ASYNCHRONOUS mode is still blocking for some reasons.
# So use SYNCHRONOUS mode write and run it in the default loop's executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, partial(self._write_api.write, bucket=self.bucket, record=points))
except ApiException as e:
self.logger.warning(f"failed write monitoring buffering points, message: {e}")
[docs] def initializeMonitoring(self) -> None:
"""
Initialize monitoring.
"""
self.initializeScheduler()
[docs] def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all request and stop getting new).
"""
self.stopScheduler()
self._write_api.close()