Source code for luna_handlers.redis_db.redis_context
import ujson as json
from classes.event import Event
from classes.schemas.event_raw import RawEvent
from crutches_on_wheels.cow.redis_db.redis_sender_context import (
RedisContext as _SenderRedisContext,
SentinelRedisContext as _SentinelSenderRedisContext,
)
async def _performPublish(ctx, msg: dict, accountId: str):
"""
Publish msg with to redis
Args:
ctx: redis context
msg: msg
accountId: account id
"""
if ctx.connectionsPool is None:
raise RuntimeError("Redis context was not initialized")
try:
with await ctx.connectionsPool as redis:
await redis.publish(f"{ctx.channelName}:{accountId}", json.dumps(msg, ensure_ascii=False))
except Exception:
ctx.logger.exception("publish events to redis")
async def _publish(ctx, events: list[Event], requestId: str) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
ctx: redis context
events: events
requestId: request id
"""
if len(events) == 0:
return
preparedEvents = [event.asDict() for event in events]
meta = events[0].meta
msg = {
"handler_id": meta.handlerId,
"Luna-Request-Id": requestId,
"events": preparedEvents,
"event_create_time": meta.createEventTime,
"event_end_time": meta.endEventTime,
}
await _performPublish(ctx, msg, meta.accountId)
if ctx.connectionsPool is None:
raise RuntimeError("Redis context was not initialized")
[docs]class RedisContext(_SenderRedisContext):
"""Redis context"""
[docs] async def publish(self, events: list[Event], requestId: str) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
events: events
requestId: request id
"""
await _publish(self, events, requestId)
[docs] async def publishRawEvent(
self,
event: RawEvent,
accountId: str,
handlerId: str,
requestId: str,
) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
event: user raw event
accountId: account id
handlerId: handler id
requestId: request id
"""
msg = {
"handler_id": handlerId,
"Luna-Request-Id": requestId,
"events": [event.asDict()],
"event_create_time": event.createTime,
"event_end_time": event.endTime,
}
await _performPublish(self, msg, accountId)
[docs]class SentinelRedisContext(_SentinelSenderRedisContext):
"""Redis context for installations with sentinel."""
[docs] async def publish(self, events: list[Event], requestId: str) -> None:
"""
Publish events. Msg is published to channel 'luna-sender:{account_id}' in format:
:json:object:`msg_to_luna_sender`
Args:
events: events
requestId: request id
"""
await _publish(self, events, requestId)