Source code for luna_api.app.handlers.events_handler

""" Get events and get events stats handlers. """

from sanic.response import HTTPResponse

from app.handlers.base_handler import EventServiceBaseHandler
from app.handlers.schemas import schemas


[docs]class EventHandler(EventServiceBaseHandler): """ Get event. See `spec_event`_. .. _spec_event: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ("GET", "HEAD")
[docs]class EventsHandler(EventServiceBaseHandler): """ Get events. See `spec_events`_. .. _spec_events: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ("GET",)
[docs] async def get(self) -> HTTPResponse: """ Proxy request with GET method to luna-events. See `spec_get_events`_. .. _spec_get_events: _static/api.html#operation/getEvents Returns: response from luna-events service """ return await self.makeRequestToService()
[docs]class EventsStatsHandler(EventServiceBaseHandler): """ Get events stats. Resource: "/{api_version}/events/stats" """ allowedMethods = ("POST",)
[docs] async def post(self) -> HTTPResponse: """ Send request with POST method to luna-events. See `spec_event_stats`_. .. _spec_event_stats: _static/api.html#operation/getEventStats Returns: response from luna-events service """ query = self.request.json self.validateJson(query, schemas.EVENT_STATISTICS_SCHEMA, useJsonSchema=False) if self.accountId: accountIdFilter = { "column": "account_id", "operator": "eq", "value": self.accountId, } query.setdefault("filters", []) query["filters"].append(accountIdFilter) eventStats = await self.luna3Client.lunaEvents.getEventsStats(query=query, raiseError=True) return self.success(200, outputJson=eventStats.json)