Source code for cow.monitoring.clickhouse_adapter

"""Clickhouse monitoring adaptor"""

from dataclasses import dataclass
from typing import Iterable, List

import aiohttp
import ujson
from aiochclient import ChClient, ChClientError

from .base_adapter import BaseMonitoringAdapter
from .points import BaseMonitoringPoint  # pylint: disable-msg=C0413

INSERT_QUERY = "INSERT INTO {table} FORMAT JSONEachRow"
SERIES_CACHE = {}


[docs] @dataclass class ClickhouseSettings: """Clickhouse monitoring settings""" url: str user: str password: str | None database: str
[docs] class ClickhouseMonitoringAdaptor(BaseMonitoringAdapter): """ Clickhouse adaptor. Suspended to send points to a clickhouse """ def __init__(self, settings: ClickhouseSettings, flushingPeriod: int): super().__init__(flushingPeriod=flushingPeriod) self._settings = settings self._buffer: dict[str, list[dict]] = {} self.session = aiohttp.ClientSession() self.client: None | ChClient = None self.newSeries: List[str] = []
[docs] async def initializeMonitoring(self) -> None: """ Initialize monitoring. """ self.client = ChClient( self.session, url=self._settings.url, password=self._settings.password, user=self._settings.user, allow_experimental_object_type=1, database=self._settings.database, json=ujson, ) await super().initializeMonitoring()
async def _writePoints(self, points: dict[str, list[dict]]): """ Write points. Args: points: monitoring points to write """ try: newSeries = self.newSeries.copy() self.newSeries = [] for series in newSeries: SERIES_CACHE[series] = INSERT_QUERY.format(table=series) for series, seriesPoints in points.items(): await self.client.execute(SERIES_CACHE[series], *seriesPoints) except ChClientError as exc: self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs] def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None: """ Add points to the buffer. Args: points: points """ for point in points: self._buffer.setdefault(point.series, []).append(self.generateRecord(point)) if point.series not in SERIES_CACHE and point.series not in self.newSeries: self.newSeries.append(point.series)
[docs] def clearBuffer(self) -> dict[str, list[dict]]: """Clear monitoring buffer""" copyPoints = self._buffer.copy() self._buffer = {} return copyPoints
[docs] @staticmethod def generateRecord(point: BaseMonitoringPoint) -> dict: """ Generate monitoring record from point Args: point: point Returns: monitoring record """ return { "time": str(round(point.eventTime, 6)), "data": {**point.tags, **point.fields}, }
[docs] async def stopMonitoring(self) -> None: """ Stop monitoring (cancel all requests and stop getting new). """ await super().stopMonitoring() await self.session.close()