Source code for luna_api.app.handlers.tasks_create_task_handler

import ujson
from vlutils.recursive_functions import goDeep

from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
from classes.enums import VisibilityArea
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException


[docs] class CreateTaskHandler(TasksServiceBaseHandler): """ Proxy create tasks handler. See `spec_tasks_processing`_. .. _spec_tasks_processing: _static/api.html#tag/tasks processing """ allowedMethods = ("POST",)
[docs] def checkEstimatorTaskMatchingFilters(self): """Check matching filters account id for estimator task""" if self.request.credentials.visibilityArea == VisibilityArea.account: matchPolicies = goDeep(self.request.json, ["content", "handler", "policies", "match_policy"]) if isinstance(matchPolicies, list): for matchPolicy in matchPolicies: candidates = goDeep(matchPolicy, ["candidates"]) if not isinstance(candidates, dict): continue accountId = candidates.get("account_id") if accountId is None: candidates["account_id"] = self.request.credentials.accountId continue if accountId != self.request.credentials.accountId: raise VLException( Error.ForbiddenVisibilityAreaDenied.format( "task content handler matching policy candidates" ), 403, False, )
[docs] async def prepareRequestPost(self): """Prepare post request""" inputJson = self.request.json headers = self.prepareHeaders() headers["Luna-Account-Id"] = self.accountId simpleValidationSchema = { "type": "object", "properties": {"content": {"type": "object"}}, "required": ["content"], } isGcTask = self.request.path.endswith("/gc") isLambdaTask = self.request.path.endswith("/lambda") if isGcTask: simpleValidationSchema["properties"]["content"]["properties"] = { "target": {"type": "string", "enum": ["events", "faces"]} } simpleValidationSchema["properties"]["content"]["required"] = ["target"] elif isLambdaTask: simpleValidationSchema["properties"]["content"]["properties"] = {"lambda_id": {"type": "string"}} simpleValidationSchema["properties"]["content"]["required"] = ["lambda_id"] self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True) if not isLambdaTask: inputJson["content"].setdefault("filters", {}) if self.request.credentials.visibilityArea == None: inputJson["content"]["filters"]["account_id"] = self.accountId else: isTaskWithMatching = any( (self.request.path.endswith(taskType) for taskType in ("cross_match", "clustering")) ) accountMissmatchException = VLException( Error.ForbiddenVisibilityAreaDenied.format(f"task content filters"), 403, False ) if not isTaskWithMatching or self.request.credentials.visibilityArea == VisibilityArea.account: if (filterAccountId := inputJson["content"]["filters"].get("account_id")) is None: inputJson["content"]["filters"]["account_id"] = self.accountId elif filterAccountId != self.accountId: raise accountMissmatchException if self.request.path.endswith("/estimator"): self.checkEstimatorTaskMatchingFilters() return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())