Source code for luna_api.app.handlers.tasks_create_task_handler
import ujson
from vlutils.recursive_functions import goDeep
from app.handlers.base_handler import ProxyRequest, TasksServiceBaseHandler
from classes.enums import VisibilityArea
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
[docs]
class CreateTaskHandler(TasksServiceBaseHandler):
"""
Proxy create tasks handler. See `spec_tasks_processing`_.
.. _spec_tasks_processing:
_static/api.html#tag/tasks processing
"""
allowedMethods = ("POST",)
[docs]
def checkEstimatorTaskMatchingFilters(self):
"""Check matching filters account id for estimator task"""
if self.request.lunaApiCredentials.visibilityArea == VisibilityArea.account:
matchPolicies = goDeep(self.request.json, ["content", "handler", "policies", "match_policy"])
if isinstance(matchPolicies, list):
for matchPolicy in matchPolicies:
candidates = goDeep(matchPolicy, ["candidates"])
if not isinstance(candidates, dict):
continue
accountId = candidates.get("account_id")
if accountId is None:
candidates["account_id"] = self.request.lunaApiCredentials.accountId
continue
if accountId != self.request.lunaApiCredentials.accountId:
raise VLException(
Error.ForbiddenVisibilityAreaDenied.format(
"task content handler matching policy candidates"
),
403,
False,
)
[docs]
async def prepareRequestPost(self):
"""Prepare post request"""
inputJson = self.request.json
headers = self.prepareHeaders()
headers["Luna-Account-Id"] = self.accountId
simpleValidationSchema = {
"type": "object",
"properties": {"content": {"type": "object"}},
"required": ["content"],
}
isGcTask = self.request.path.endswith("/gc")
isLambdaTask = self.request.path.endswith("/lambda")
if isGcTask:
simpleValidationSchema["properties"]["content"]["properties"] = {
"target": {"type": "string", "enum": ["events", "faces"]}
}
simpleValidationSchema["properties"]["content"]["required"] = ["target"]
elif isLambdaTask:
simpleValidationSchema["properties"]["content"]["properties"] = {"lambda_id": {"type": "string"}}
simpleValidationSchema["properties"]["content"]["required"] = ["lambda_id"]
self.validateJson(inputJson, simpleValidationSchema, useJsonSchema=True)
if not isLambdaTask:
inputJson["content"].setdefault("filters", {})
if self.request.lunaApiCredentials.visibilityArea == None:
inputJson["content"]["filters"]["account_id"] = self.accountId
else:
isTaskWithMatching = any(
(self.request.path.endswith(taskType) for taskType in ("cross_match", "clustering"))
)
accountMissmatchException = VLException(
Error.ForbiddenVisibilityAreaDenied.format(f"task content filters"), 403, False
)
if not isTaskWithMatching or self.request.lunaApiCredentials.visibilityArea == VisibilityArea.account:
if (filterAccountId := inputJson["content"]["filters"].get("account_id")) is None:
inputJson["content"]["filters"]["account_id"] = self.accountId
elif filterAccountId != self.accountId:
raise accountMissmatchException
if self.request.path.endswith("/estimator"):
self.checkEstimatorTaskMatchingFilters()
return ProxyRequest(ujson.dumps(inputJson), headers, self.prepareQuery())