Source code for luna_api.app.handlers.events_handler
""" Get events and get events stats handlers. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import EventServiceBaseHandler
from app.handlers.schemas import schemas
[docs]class EventHandler(EventServiceBaseHandler):
    """
    Get event. See `spec_event`_.
        .. _spec_event:
            _static/api.html#tag/events
    Resource: "/{api_version}/events"
    """
    allowedMethods = ("GET", "HEAD") 
[docs]class EventsHandler(EventServiceBaseHandler):
    """
    Get events. See `spec_events`_.
        .. _spec_events:
            _static/api.html#tag/events
    Resource: "/{api_version}/events"
    """
    allowedMethods = ("GET",)
[docs]    async def get(self) -> HTTPResponse:
        """
        Proxy request with GET method to luna-events. See `spec_get_events`_.
        .. _spec_get_events:
            _static/api.html#operation/getEvents
        Returns:
            response from luna-events service
        """
        return await self.makeRequestToService()  
[docs]class EventsStatsHandler(EventServiceBaseHandler):
    """
    Get events stats.
    Resource: "/{api_version}/events/stats"
    """
    allowedMethods = ("POST",)
[docs]    async def post(self) -> HTTPResponse:
        """
        Send request with POST method to luna-events. See `spec_event_stats`_.
        .. _spec_event_stats:
            _static/api.html#operation/getEventStats
        Returns:
            response from luna-events service
        """
        query = self.request.json
        self.validateJson(query, schemas.EVENT_STATISTICS_SCHEMA, useJsonSchema=False)
        if self.accountId:
            accountIdFilter = {
                "column": "account_id",
                "operator": "eq",
                "value": self.accountId,
            }
            query.setdefault("filters", [])
            query["filters"].append(accountIdFilter)
        eventStats = await self.luna3Client.lunaEvents.getEventsStats(query=query, raiseError=True)
        return self.success(200, outputJson=eventStats.json)