Source code for luna_api.app.handlers.tasks_create_task_handler
import ujson
from vlutils.recursive_functions import goDeep
from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
from classes.enums import VisibilityArea
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]class CreateTaskHandler(TasksServiceBaseHandler):
    """
    Proxy create tasks handler. See `spec_tasks_processing`_.
        .. _spec_tasks_processing:
            _static/api.html#tag/tasks processing
    """
    allowedMethods = ("POST",)
[docs]    def checkEstimatorTaskMatchingFilters(self):
        """Check matching filters account id for estimator task"""
        if self.request.credentials.visibilityArea == VisibilityArea.account:
            matchPolicies = goDeep(self.request.json, ["content", "handler", "policies", "match_policy"])
            if isinstance(matchPolicies, list):
                for matchPolicy in matchPolicies:
                    candidates = goDeep(matchPolicy, ["candidates"])
                    if not isinstance(candidates, dict):
                        continue
                    accountId = candidates.get("account_id")
                    if accountId is None:
                        candidates["account_id"] = self.request.credentials.accountId
                        continue
                    if accountId != self.request.credentials.accountId:
                        raise VLException(
                            Error.ForbiddenVisibilityAreaDenied.format(
                                "task content handler matching policy candidates"
                            ),
                            403,
                            False,
                        ) 
[docs]    async def prepareRequestPost(self):
        """Prepare post request"""
        inputJson = self.request.json
        headers = self.prepareHeaders()
        headers["Luna-Account-Id"] = self.accountId
        simpleValidationSchema = {
            "type": "object",
            "properties": {"content": {"type": "object"}},
            "required": ["content"],
        }
        isGcTask = self.request.path.endswith("/gc")
        if isGcTask:
            simpleValidationSchema["properties"]["content"]["properties"] = {
                "target": {"type": "string", "enum": ["events", "faces"]}
            }
            simpleValidationSchema["properties"]["content"]["required"] = ["target"]
        self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True)
        inputJson["content"].setdefault("filters", {})
        if self.request.credentials.visibilityArea == None:
            inputJson["content"]["filters"]["account_id"] = self.accountId
        else:
            isTaskWithMatching = any(
                (self.request.path.endswith(taskType) for taskType in ("cross_match", "clustering"))
            )
            accountMissmatchException = VLException(
                Error.ForbiddenVisibilityAreaDenied.format(f"task content filters"), 403, False
            )
            if not isTaskWithMatching or self.request.credentials.visibilityArea == VisibilityArea.account:
                if (filterAccountId := inputJson["content"]["filters"].get("account_id")) is None:
                    inputJson["content"]["filters"]["account_id"] = self.accountId
                elif filterAccountId != self.accountId:
                    raise accountMissmatchException
        if self.request.path.endswith("/estimator"):
            self.checkEstimatorTaskMatchingFilters()
        return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())