"""Clickhouse monitoring adaptor"""
from dataclasses import dataclass
from typing import Iterable, List
import aiohttp
import ujson
from aiochclient import ChClient, ChClientError
from .base_adapter import BaseMonitoringAdapter
from .points import BaseMonitoringPoint # pylint: disable-msg=C0413
INSERT_QUERY = "INSERT INTO {table} FORMAT JSONEachRow"
SERIES_CACHE = {}
[docs]
@dataclass
class ClickhouseSettings:
"""Clickhouse monitoring settings"""
url: str
user: str
password: str | None
database: str
[docs]
class ClickhouseMonitoringAdaptor(BaseMonitoringAdapter):
"""
Clickhouse adaptor. Suspended to send points to a clickhouse
"""
def __init__(self, settings: ClickhouseSettings, flushingPeriod: int):
super().__init__(flushingPeriod=flushingPeriod)
self._settings = settings
self._buffer: dict[str, list[dict]] = {}
self.session = aiohttp.ClientSession()
self.client: None | ChClient = None
self.newSeries: List[str] = []
[docs]
async def initializeMonitoring(self) -> None:
"""
Initialize monitoring.
"""
self.client = ChClient(
self.session,
url=self._settings.url,
password=self._settings.password,
user=self._settings.user,
allow_experimental_object_type=1,
database=self._settings.database,
json=ujson,
)
await super().initializeMonitoring()
async def _writePoints(self, points: dict[str, list[dict]]):
"""
Write points.
Args:
points: monitoring points to write
"""
try:
newSeries = self.newSeries.copy()
self.newSeries = []
for series in newSeries:
SERIES_CACHE[series] = INSERT_QUERY.format(table=series)
for series, seriesPoints in points.items():
await self.client.execute(SERIES_CACHE[series], *seriesPoints)
except ChClientError as exc:
self.logger.warning(f"failed write monitoring buffering points, message: {exc}")
[docs]
def addPointsToBuffer(self, points: Iterable[BaseMonitoringPoint]) -> None:
"""
Add points to the buffer.
Args:
points: points
"""
for point in points:
self._buffer.setdefault(point.series, []).append(self.generateRecord(point))
if point.series not in SERIES_CACHE and point.series not in self.newSeries:
self.newSeries.append(point.series)
[docs]
def clearBuffer(self) -> dict[str, list[dict]]:
"""Clear monitoring buffer"""
copyPoints = self._buffer.copy()
self._buffer = {}
return copyPoints
[docs]
@staticmethod
def generateRecord(point: BaseMonitoringPoint) -> dict:
"""
Generate monitoring record from point
Args:
point: point
Returns:
monitoring record
"""
return {
"time": str(round(point.eventTime, 6)),
"data": {**point.tags, **point.fields},
}
[docs]
async def stopMonitoring(self) -> None:
"""
Stop monitoring (cancel all requests and stop getting new).
"""
await super().stopMonitoring()
await self.session.close()