Source code for luna_handlers.redis_db.redis_context

import ujson as json

from classes.event import Event
from classes.schemas.event_raw import RawEvent
from crutches_on_wheels.cow.redis_db.redis_sender_context import (
    RedisContext as _SenderRedisContext,
    SentinelRedisContext as _SentinelSenderRedisContext,
)


async def _performPublish(ctx, msg: dict, accountId: str):
    """
    Publish msg with to redis
    Args:
        ctx: redis context
        msg: msg
        accountId: account id
    """

    if ctx.connectionsPool is None:
        raise RuntimeError("Redis context was not initialized")
    try:
        with await ctx.connectionsPool as redis:
            await redis.publish(f"{ctx.channelName}:{accountId}", json.dumps(msg, ensure_ascii=False))
    except Exception:
        ctx.logger.exception("publish events to redis")


async def _publish(ctx, events: list[Event], requestId: str) -> None:
    """
    Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
    :json:object:`msg_to_luna_sender`

    Args:
        ctx: redis context
        events: events
        requestId: request id
    """
    if len(events) == 0:
        return
    preparedEvents = [event.asDict() for event in events]
    meta = events[0].meta
    msg = {
        "handler_id": meta.handlerId,
        "Luna-Request-Id": requestId,
        "events": preparedEvents,
        "event_create_time": meta.createEventTime,
        "event_end_time": meta.endEventTime,
    }

    await _performPublish(ctx, msg, meta.accountId)

    if ctx.connectionsPool is None:
        raise RuntimeError("Redis context was not initialized")


[docs]class RedisContext(_SenderRedisContext): """Redis context"""
[docs] async def publish(self, events: list[Event], requestId: str) -> None: """ Publish events. Msg is published to channel 'luna-sender:{account_id}' in format: :json:object:`msg_to_luna_sender` Args: events: events requestId: request id """ await _publish(self, events, requestId)
[docs] async def publishRawEvent( self, event: RawEvent, accountId: str, handlerId: str, requestId: str, ) -> None: """ Publish events. Msg is published to channel 'luna-sender:{account_id}' in format: :json:object:`msg_to_luna_sender` Args: event: user raw event accountId: account id handlerId: handler id requestId: request id """ msg = { "handler_id": handlerId, "Luna-Request-Id": requestId, "events": [event.asDict()], "event_create_time": event.createTime, "event_end_time": event.endTime, } await _performPublish(self, msg, accountId)
[docs]class SentinelRedisContext(_SentinelSenderRedisContext): """Redis context for installations with sentinel."""
[docs] async def publish(self, events: list[Event], requestId: str) -> None: """ Publish events. Msg is published to channel 'luna-sender:{account_id}' in format: :json:object:`msg_to_luna_sender` Args: events: events requestId: request id """ await _publish(self, events, requestId)