Source code for luna_api.app.handlers.events_handler

""" Get events and get events stats handlers. """

from sanic.response import HTTPResponse

from app.handlers.base_handler import EventServiceBaseHandler
from app.handlers.schemas import schemas


[docs]class EventHandler(EventServiceBaseHandler): """ Get event. See `spec_event`_. .. _spec_event: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ('GET', 'HEAD')
[docs]class EventsHandler(EventServiceBaseHandler): """ Get events. See `spec_events`_. .. _spec_events: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ("GET",)
[docs] async def get(self)-> HTTPResponse: """ Proxy request with GET method to luna-events. See `spec_get_events`_. .. _spec_get_events: _static/api.html#operation/getEvents Returns: response from luna-events service """ return await self.makeRequestToService()
[docs]class EventsStatsHandler(EventServiceBaseHandler): """ Get events stats. Resource: "/{api_version}/events/stats" """ allowedMethods = ("POST",)
[docs] async def post(self)-> HTTPResponse: """ Send request with POST method to luna-events. See `spec_event_stats`_. .. _spec_event_stats: _static/api.html#operation/getEventStats Returns: response from luna-events service """ query = self.request.json self.validateJson(query, schemas.EVENT_STATISTICS_SCHEMA, useJsonSchema=False) if self.accountId: accountIdFilter = {'column': 'account_id', 'operator': 'eq', 'value': self.accountId} query.setdefault("filters", []) query['filters'].append(accountIdFilter) eventStats = await self.luna3Client.lunaEvents.getEventsStats(query=query, raiseError=True) return self.success(200, outputJson=eventStats.json)