Source code for luna_api.app.handlers.tasks_create_task_handler
import ujson
from vlutils.recursive_functions import goDeep
from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
from classes.enums import VisibilityArea
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class CreateTaskHandler(TasksServiceBaseHandler):
"""
Proxy create tasks handler. See `spec_tasks_processing`_.
.. _spec_tasks_processing:
_static/api.html#tag/tasks processing
"""
allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self):
inputJson = self.request.json
headers = self.prepareHeaders()
headers["Luna-Account-Id"] = self.accountId
simpleValidationSchema = {
"type": "object",
"properties": {"content": {"type": "object"}},
"required": ["content"],
}
isGcTask = self.request.path.endswith("/gc")
if isGcTask:
simpleValidationSchema["properties"]["content"]["properties"] = {
"target": {"type": "string", "enum": ["events"]}
}
simpleValidationSchema["properties"]["content"]["required"] = ["target"]
self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True)
isEstimatortask = self.request.path.endswith("/estimator")
if isEstimatortask and self.request.credentials.visibilityArea == VisibilityArea.account:
matchPolicies = goDeep(self.request.json, ["content", "handler", "policies", "match_policy"])
if isinstance(matchPolicies, list):
for matchPolicy in matchPolicies:
candidates = goDeep(matchPolicy, ["candidates"])
if not isinstance(candidates, dict):
continue
accountId = candidates.get("account_id")
if accountId is None:
candidates["account_id"] = self.request.credentials.accountId
continue
if accountId != self.request.credentials.accountId:
raise VLException(
Error.ForbiddenVisibilityAreaDenied.format(
"task content handler matching policy candidates"
),
403,
False,
)
inputJson["content"].setdefault("filters", {})["account_id"] = self.accountId
return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())