Source code for luna_api.app.handlers.tasks_create_task_handler
import ujson
from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
[docs]class CreateTaskHandler(TasksServiceBaseHandler):
"""
Proxy create tasks handler. See `spec_tasks_processing`_.
.. _spec_tasks_processing:
_static/api.html#tag/tasks processing
"""
allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self):
inputJson = self.request.json
headers = self.prepareHeaders()
simpleValidationSchema = {
"type": "object",
"properties": {"content": {"type": "object"}},
"required": ["content"],
}
isGcTask = self.request.path.endswith("/gc")
if isGcTask:
simpleValidationSchema["properties"]["content"]["properties"] = {
"target": {"type": "string", "enum": ["events"]}
}
simpleValidationSchema["properties"]["content"]["required"] = ["target"]
self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True)
inputJson["account_id"] = self.accountId
inputJson["content"].setdefault("filters", {})["account_id"] = self.accountId
return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())