Source code for luna_api.app.handlers.events_handler

""" Get events and get events stats handlers. """
from sanic.response import HTTPResponse

from app.handlers.base_handler import EventServiceBaseHandler
from app.handlers.schemas import schemas
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException


[docs]class EventHandler(EventServiceBaseHandler): """ Get event. See `spec_event`_. .. _spec_event: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ("GET", "HEAD")
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="event")
[docs]class EventsHandler(EventServiceBaseHandler): """ Get events. See `spec_events`_. .. _spec_events: _static/api.html#tag/events Resource: "/{api_version}/events" """ allowedMethods = ("GET",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ self.checkTokenPermissionsDefault(objectName="event")
[docs] async def get(self) -> HTTPResponse: """ Proxy request with GET method to luna-events. See `spec_get_events`_. .. _spec_get_events: _static/api.html#operation/getEvents Returns: response from luna-events service """ return await self.makeRequestToService()
[docs]class EventsStatsHandler(EventServiceBaseHandler): """ Get events stats. Resource: "/{api_version}/events/stats" """ allowedMethods = ("POST",)
[docs] def checkTokenPermissions(self) -> None: """ Description see :func:`~BaseRequestHandler.checkTokenPermissions`. """ if (permissions := self.request.credentials.permissions) is not None: if self.request.method == "POST" and (action := "view") not in permissions.event: raise VLException(Error.ForbiddenByToken.format(action, "event"), 403, False)
[docs] async def post(self) -> HTTPResponse: """ Send request with POST method to luna-events. See `spec_event_stats`_. .. _spec_event_stats: _static/api.html#operation/getEventStats Returns: response from luna-events service """ self.checkTokenPermissions() query = self.request.json self.validateJson(query, schemas.EVENT_STATISTICS_SCHEMA, useJsonSchema=False) if self.accountId: accountIdFilter = { "column": "account_id", "operator": "eq", "value": self.accountId, } query.setdefault("filters", []) query["filters"].append(accountIdFilter) eventStats = await self.luna3Client.lunaEvents.getEventsStats(query=query, raiseError=True) return self.success(200, outputJson=eventStats.json)