Source code for luna_api.app.handlers.tasks_create_task_handler

import ujson

from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler


[docs]class CreateTaskHandler(TasksServiceBaseHandler): """ Proxy create tasks handler. See `spec_tasks_processing`_. .. _spec_tasks_processing: _static/api.html#tag/tasks processing """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self): inputJson = self.request.json headers = self.prepareHeaders() simpleValidationSchema = { "type": "object", "properties": {"content": {"type": "object"}}, "required": ["content"], } isGcTask = self.request.path.endswith("/gc") if isGcTask: simpleValidationSchema["properties"]["content"]["properties"] = { "target": {"type": "string", "enum": ["events"]} } simpleValidationSchema["properties"]["content"]["required"] = ["target"] self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True) inputJson["account_id"] = self.accountId inputJson["content"].setdefault("filters", {})["account_id"] = self.accountId return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())