Source code for luna_api.app.handlers.tasks_create_task_handler
import ujson
from luna_auth.classes.enums import VisibilityArea
from vlutils.recursive_functions import goDeep
from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]
class CreateTaskHandler(TasksServiceBaseHandler):
    """
    Proxy create tasks handler. See `spec_tasks_processing`_.
        .. _spec_tasks_processing:
            _static/api.html#tag/tasks processing
    """
    allowedMethods = ("POST",)
[docs]
    def checkEstimatorTaskMatchingFilters(self):
        """Check matching filters account id for estimator task"""
        if self.request.lunaApiCredentials.visibilityArea == VisibilityArea.account:
            matchPolicies = goDeep(self.request.json, ["content", "handler", "policies", "match_policy"])
            if isinstance(matchPolicies, list):
                for matchPolicy in matchPolicies:
                    candidates = goDeep(matchPolicy, ["candidates"])
                    if not isinstance(candidates, dict):
                        continue
                    accountId = candidates.get("account_id")
                    if accountId is None:
                        candidates["account_id"] = self.request.lunaApiCredentials.accountId
                        continue
                    if accountId != self.request.lunaApiCredentials.accountId:
                        raise VLException(
                            Error.ForbiddenVisibilityAreaDenied.format(
                                "task content handler matching policy candidates"
                            ),
                            403,
                            False,
                        ) 
[docs]
    async def prepareRequestPost(self):
        """Prepare post request"""
        inputJson = self.request.json
        headers = self.prepareHeaders()
        headers["Luna-Account-Id"] = self.accountId
        simpleValidationSchema = {
            "type": "object",
            "properties": {"content": {"type": "object"}},
            "required": ["content"],
        }
        isGcTask = self.request.path.endswith("/gc")
        isLambdaTask = self.request.path.endswith("/lambda")
        if isGcTask:
            simpleValidationSchema["properties"]["content"]["properties"] = {
                "target": {"type": "string", "enum": ["events", "faces", "general_events"]}
            }
            simpleValidationSchema["properties"]["content"]["required"] = ["target"]
        elif isLambdaTask:
            simpleValidationSchema["properties"]["content"]["properties"] = {"lambda_id": {"type": "string"}}
            simpleValidationSchema["properties"]["content"]["required"] = ["lambda_id"]
        self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True)
        if not isLambdaTask:
            inputJson["content"].setdefault("filters", {})
            if self.request.lunaApiCredentials.visibilityArea == None:
                inputJson["content"]["filters"]["account_id"] = self.accountId
            else:
                isTaskWithMatching = any(
                    (self.request.path.endswith(taskType) for taskType in ("cross_match", "clustering"))
                )
                accountMissmatchException = VLException(
                    Error.ForbiddenVisibilityAreaDenied.format(f"task content filters"), 403, False
                )
                if not isTaskWithMatching or self.request.lunaApiCredentials.visibilityArea == VisibilityArea.account:
                    if (filterAccountId := inputJson["content"]["filters"].get("account_id")) is None:
                        inputJson["content"]["filters"]["account_id"] = self.accountId
                    elif filterAccountId != self.accountId:
                        raise accountMissmatchException
            if self.request.path.endswith("/estimator"):
                self.checkEstimatorTaskMatchingFilters()
        return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())