Source code for luna_api.app.handlers.events_handler
""" Get events and get events stats handlers. """
from sanic.response import HTTPResponse
from app.handlers.base_handler import EventServiceBaseHandler
from app.handlers.schemas import schemas
[docs]class EventHandler(EventServiceBaseHandler):
"""
Get event. See `spec_event`_.
.. _spec_event:
_static/api.html#tag/events
Resource: "/{api_version}/events"
"""
allowedMethods = ("GET", "HEAD")
[docs]class EventsHandler(EventServiceBaseHandler):
"""
Get events. See `spec_events`_.
.. _spec_events:
_static/api.html#tag/events
Resource: "/{api_version}/events"
"""
allowedMethods = ("GET",)
[docs] async def get(self) -> HTTPResponse:
"""
Proxy request with GET method to luna-events. See `spec_get_events`_.
.. _spec_get_events:
_static/api.html#operation/getEvents
Returns:
response from luna-events service
"""
return await self.makeRequestToService()
[docs]class EventsStatsHandler(EventServiceBaseHandler):
"""
Get events stats.
Resource: "/{api_version}/events/stats"
"""
allowedMethods = ("POST",)
[docs] async def post(self) -> HTTPResponse:
"""
Send request with POST method to luna-events. See `spec_event_stats`_.
.. _spec_event_stats:
_static/api.html#operation/getEventStats
Returns:
response from luna-events service
"""
query = self.request.json
self.validateJson(query, schemas.EVENT_STATISTICS_SCHEMA, useJsonSchema=False)
if self.accountId:
accountIdFilter = {
"column": "account_id",
"operator": "eq",
"value": self.accountId,
}
query.setdefault("filters", [])
query["filters"].append(accountIdFilter)
eventStats = await self.luna3Client.lunaEvents.getEventsStats(query=query, raiseError=True)
return self.success(200, outputJson=eventStats.json)