import abc
from typing import List, Optional, Tuple
import aioredis
import ujson as json
from async_timeout import timeout as async_timeout
from classes.event import Event
from classes.schemas.event_raw import RawEvent
from crutches_on_wheels.cow.utils.log import Logger, logger
from crutches_on_wheels.cow.utils.timer import timer
INIT_MASTER_POOL_TIMEOUT = 1
[docs]class BaseRedisContext(abc.ABC):
"""Base class for Redis context.
Attributes:
channelName: A string, redis channel prefix
logger: A Logger
storageTime: storage time ("UTC" or "LOCAL")
"""
def __init__(self, channelName: str = "luna-sender", storageTime: Optional[str] = None) -> None:
"""Initialize redis context.
Args:
storageTime: storage time ("UTC" or "LOCAL")
channelName: A string, redis channel prefix
"""
self.channelName = channelName
self.logger = Logger("redis_db")
self.connectionsPool: Optional[aioredis.ConnectionsPool] = None
self.storageTime = storageTime or "UTC"
[docs] @abc.abstractmethod
async def initConnections(self) -> None:
"""Init connections."""
[docs] @abc.abstractmethod
async def close(self) -> None:
"""Close connections."""
[docs] @timer
async def publish(self, events: list[Event], requestId: str) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
events: events
requestId: request id
"""
if len(events) == 0:
return
preparedEvents = [event.asDict() for event in events]
meta = events[0].meta
msg = {
"handler_id": meta.handlerId,
"Luna-Request-Id": requestId,
"events": preparedEvents,
"event_create_time": meta.createEventTime,
"event_end_time": meta.endEventTime,
}
await self._publish(msg, meta.accountId)
if self.connectionsPool is None:
raise RuntimeError("Redis context was not initialized")
async def _publish(self, msg: dict, accountId: str):
"""
Publish msg with to redis
Args:
msg: msg
accountId: account id
"""
if self.connectionsPool is None:
raise RuntimeError("Redis context was not initialized")
try:
with await self.connectionsPool as redis:
await redis.publish(f"{self.channelName}:{accountId}", json.dumps(msg, ensure_ascii=False))
except Exception:
logger.exception("publish events to redis")
[docs] @timer
async def publishRawEvent(
self,
event: RawEvent,
accountId: str,
handlerId: str,
requestId: str,
) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
event: user raw event
accountId: account id
handlerId: handler id
requestId: request id
"""
msg = {
"handler_id": handlerId,
"Luna-Request-Id": requestId,
"events": [event.asHandlerEventDict()],
"event_create_time": event.createTime,
"event_end_time": event.endTime,
}
await self._publish(msg, accountId)
[docs]class RedisContext(BaseRedisContext):
"""
Redis context
Attributes:
connectionsPool: An instance of aioredis.ConnectionsPool
channelName: A string, redis channel prefix
host: redis host
port: redis port
password: redis password
"""
def __init__(
self,
host="127.0.0.1",
port=6379,
password: Optional[bool] = None,
channelName: str = "luna-sender",
storageTime: Optional[str] = None,
) -> None:
"""Initialize redis context.
Args:
host: redis host
port: redis port
password: redis password
channelName: A string, redis channel prefix
storageTime: storage time ("UTC" or "LOCAL")
"""
self.host = host
self.port = port
self.password = password or None
super().__init__(channelName, storageTime=storageTime)
[docs] async def initConnections(self) -> None:
"""Init redis connections pool."""
self.logger.info("Init connections pool for redis")
self.connectionsPool = await aioredis.create_redis_pool(
(self.host, self.port), password=self.password, maxsize=100
)
[docs] async def close(self) -> None:
"""Close connections pool."""
self.logger.info("Close redis connections")
if self.connectionsPool is None:
return
self.connectionsPool.close()
await self.connectionsPool.wait_closed()
self.connectionsPool = None
[docs]class SentinelRedisContext(RedisContext):
"""Redis context for installations with sentinel.
Attributes:
sentinels: A List of tuples (host, port), addresses of sentinels
sentinel: An instance of Sentinel interface
masterName: A str, name of the redis master set
channelName: A string, redis channel prefix
connectionsPool: A aioredis.sentinel.ManagedPool, pool managed by sentinel
"""
def __init__(
self,
sentinels: List[Tuple[str, int]],
masterName: str,
redisPassword: str,
channelName: str = "luna-sender",
storageTime: Optional[str] = None,
) -> None:
"""Initialize context.
Args:
sentinels: A List of tuples (host, port), addresses of sentinels
masterName: A string, name for redis master set
redisPassword: redis password
channelName: A string, redis channel prefix
storageTime: storage time ("UTC" or "LOCAL")
"""
self.sentinels = sentinels
self.masterName = masterName
self.redisPassword = redisPassword
self.sentinel: Optional[aioredis.RedisSentinel] = None
super().__init__(channelName, storageTime=storageTime)
[docs] async def initConnections(self) -> None:
"""Init sentinel and managed pool.
Raises:
asyncio.TimeoutError
"""
self.logger.info("Init connections pool for redis sentinel")
self.sentinel = await aioredis.create_sentinel(self.sentinels, password=self.redisPassword or None)
with async_timeout(INIT_MASTER_POOL_TIMEOUT):
self.connectionsPool = self.sentinel.master_for(self.masterName)
[docs] async def close(self) -> None:
"""Close sentinel pool."""
self.logger.info("Close redis sentinel connections")
if self.sentinel is None:
return
self.sentinel.close()
await self.sentinel.wait_closed()
self.sentinel = None
self.connectionsPool = None