Source code for luna_handlers.redis_db.redis_context

import abc
from typing import List, Optional, Tuple

import aioredis
import ujson as json
from async_timeout import timeout as async_timeout

from classes.event import Event
from classes.schemas.event_raw import RawEvent
from crutches_on_wheels.cow.utils.log import Logger, logger
from crutches_on_wheels.cow.utils.timer import timer

INIT_MASTER_POOL_TIMEOUT = 1


[docs]class BaseRedisContext(abc.ABC): """Base class for Redis context. Attributes: channelName: A string, redis channel prefix logger: A Logger storageTime: storage time ("UTC" or "LOCAL") """ def __init__(self, channelName: str = "luna-sender", storageTime: Optional[str] = None) -> None: """Initialize redis context. Args: storageTime: storage time ("UTC" or "LOCAL") channelName: A string, redis channel prefix """ self.channelName = channelName self.logger = Logger("redis_db") self.connectionsPool: Optional[aioredis.ConnectionsPool] = None self.storageTime = storageTime or "UTC"
[docs] @abc.abstractmethod async def initConnections(self) -> None: """Init connections."""
[docs] @abc.abstractmethod async def close(self) -> None: """Close connections."""
[docs] @timer async def publish(self, events: list[Event], requestId: str) -> None: """ Publish events. Msg is published to channel 'luna-sender:{account_id}' in format: :json:object:`msg_to_luna_sender` Args: events: events requestId: request id """ if len(events) == 0: return preparedEvents = [event.asDict() for event in events] meta = events[0].meta msg = { "handler_id": meta.handlerId, "Luna-Request-Id": requestId, "events": preparedEvents, "event_create_time": meta.createEventTime, "event_end_time": meta.endEventTime, } await self._publish(msg, meta.accountId) if self.connectionsPool is None: raise RuntimeError("Redis context was not initialized")
async def _publish(self, msg: dict, accountId: str): """ Publish msg with to redis Args: msg: msg accountId: account id """ if self.connectionsPool is None: raise RuntimeError("Redis context was not initialized") try: with await self.connectionsPool as redis: await redis.publish(f"{self.channelName}:{accountId}", json.dumps(msg, ensure_ascii=False)) except Exception: logger.exception("publish events to redis")
[docs] @timer async def publishRawEvent( self, event: RawEvent, accountId: str, handlerId: str, requestId: str, ) -> None: """ Publish events. Msg is published to channel 'luna-sender:{account_id}' in format: :json:object:`msg_to_luna_sender` Args: event: user raw event accountId: account id handlerId: handler id requestId: request id """ msg = { "handler_id": handlerId, "Luna-Request-Id": requestId, "events": [event.asHandlerEventDict()], "event_create_time": event.createTime, "event_end_time": event.endTime, } await self._publish(msg, accountId)
[docs]class RedisContext(BaseRedisContext): """ Redis context Attributes: connectionsPool: An instance of aioredis.ConnectionsPool channelName: A string, redis channel prefix host: redis host port: redis port password: redis password """ def __init__( self, host="127.0.0.1", port=6379, password: Optional[bool] = None, channelName: str = "luna-sender", storageTime: Optional[str] = None, ) -> None: """Initialize redis context. Args: host: redis host port: redis port password: redis password channelName: A string, redis channel prefix storageTime: storage time ("UTC" or "LOCAL") """ self.host = host self.port = port self.password = password or None super().__init__(channelName, storageTime=storageTime)
[docs] async def initConnections(self) -> None: """Init redis connections pool.""" self.logger.info("Init connections pool for redis") self.connectionsPool = await aioredis.create_redis_pool( (self.host, self.port), password=self.password, maxsize=100 )
[docs] async def close(self) -> None: """Close connections pool.""" self.logger.info("Close redis connections") if self.connectionsPool is None: return self.connectionsPool.close() await self.connectionsPool.wait_closed() self.connectionsPool = None
[docs]class SentinelRedisContext(RedisContext): """Redis context for installations with sentinel. Attributes: sentinels: A List of tuples (host, port), addresses of sentinels sentinel: An instance of Sentinel interface masterName: A str, name of the redis master set channelName: A string, redis channel prefix connectionsPool: A aioredis.sentinel.ManagedPool, pool managed by sentinel """ def __init__( self, sentinels: List[Tuple[str, int]], masterName: str, redisPassword: str, channelName: str = "luna-sender", storageTime: Optional[str] = None, ) -> None: """Initialize context. Args: sentinels: A List of tuples (host, port), addresses of sentinels masterName: A string, name for redis master set redisPassword: redis password channelName: A string, redis channel prefix storageTime: storage time ("UTC" or "LOCAL") """ self.sentinels = sentinels self.masterName = masterName self.redisPassword = redisPassword self.sentinel: Optional[aioredis.RedisSentinel] = None super().__init__(channelName, storageTime=storageTime)
[docs] async def initConnections(self) -> None: """Init sentinel and managed pool. Raises: asyncio.TimeoutError """ self.logger.info("Init connections pool for redis sentinel") self.sentinel = await aioredis.create_sentinel(self.sentinels, password=self.redisPassword or None) with async_timeout(INIT_MASTER_POOL_TIMEOUT): self.connectionsPool = self.sentinel.master_for(self.masterName)
[docs] async def close(self) -> None: """Close sentinel pool.""" self.logger.info("Close redis sentinel connections") if self.sentinel is None: return self.sentinel.close() await self.sentinel.wait_closed() self.sentinel = None self.connectionsPool = None