Source code for luna_backport4.app.handlers.tasks_create_task_handler
from typing import List
import ujson
from app.handlers.base_handler import APIProxyBaseHandler
from app.handlers.schemas import (
CLUSTERING_TASK_SCHEMA,
CROSS_MATCH_TASK_SCHEMA,
LINKER_TASK_SCHEMA,
REPORTER_TASK_SCHEMA,
)
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.web.base_proxy_handler_class import ProxyRequest
[docs]
def updateEventFilters(inputFilters: dict):
"""
Update event filters, replace *top_matching* filters
Args:
inputFilters: input events filters
"""
if "top_similar_face_list" in inputFilters:
inputFilters["top_matching_candidates_label"] = inputFilters.pop("top_similar_face_list")
if "top_similar_face_ids" in inputFilters:
inputFilters["top_similar_object_ids"] = inputFilters.pop("top_similar_face_ids")
if "top_similar_face_similarity__gte" in inputFilters:
inputFilters["top_similar_object_similarity__gte"] = inputFilters.pop("top_similar_face_similarity__gte")
if "top_similar_face_similarity__lt" in inputFilters:
inputFilters["top_similar_object_similarity__lt"] = inputFilters.pop("top_similar_face_similarity__lt")
[docs]
class CrossMatchingTaskHandler(APIProxyBaseHandler):
"""
Handler for work with cross match task. See `spec_cross_match`_.
.. _spec_cross_match:
_static/api.html#operation/createCrossMatchTask
Resource: "/{api_version}/tasks/cross_match"
"""
allowedMethods = ("POST",)
[docs]
async def prepareRequestPost(self) -> ProxyRequest:
"""
Create cross matching proxy request in platform5 format.
Update events filters.
Returns:
request in platform5 format
"""
body = self.request.json
self.validateJson(body, CROSS_MATCH_TASK_SCHEMA, useJsonSchema=False)
if body["content"]["filters"]["reference_type"] == "events":
updateEventFilters(body["content"]["filters"]["reference_filters"])
if body["content"]["filters"]["candidate_type"] == "events":
updateEventFilters(body["content"]["filters"]["candidate_filters"])
return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs]
class ROCTaskHandler(APIProxyBaseHandler):
"""
Handler for work with roc task. See `spec_roc`_.
.. _spec_roc:
_static/api.html#operation/createROCTask
Resource: "/{api_version}/tasks/roc"
"""
allowedMethods = ("POST",)
[docs]
class LinkerTaskHandler(APIProxyBaseHandler):
"""
Handler for work with linker task. See `spec_linker`_.
.. _spec_linker:
_static/api.html#operation/createLinkerTask
Resource: "/{api_version}/tasks/linker"
"""
allowedMethods = ("POST",)
[docs]
async def prepareRequestPost(self):
"""
Create linker proxy request in platform5 format.
Update events filters.
Returns:
request in platform5 format
"""
body = self.request.json
self.validateJson(body, LINKER_TASK_SCHEMA, useJsonSchema=False)
if body["content"]["objects_type"] == "events":
if "filters" in body["content"]:
updateEventFilters(body["content"]["filters"])
return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs]
class ClusteringTaskHandler(APIProxyBaseHandler):
"""
Handler for work with clustering task. See `spec_clustering`_.
.. _spec_clustering:
_static/api.html#operation/createClusteringTask
Resource: "/{api_version}/tasks/clustering"
"""
allowedMethods = ("POST",)
[docs]
async def prepareRequestPost(self):
"""
Create clustering proxy request in platform5 format.
Update events filters.
Returns:
request in platform5 format
"""
body = self.request.json
self.validateJson(body, CLUSTERING_TASK_SCHEMA, useJsonSchema=False)
if body["content"]["objects_type"] == "events":
if "filters" in body["content"]:
updateEventFilters(body["content"]["filters"])
return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs]
class GCTaskHandler(APIProxyBaseHandler):
"""
Handler for work with gc task.
Resource: "/{api_version}/tasks/gc"
"""
allowedMethods = ("POST",)
[docs]
async def prepareRequestPost(self):
raise VLException(statusCode=400, isCriticalError=False, error=Error.GCAttributeNotAvailable)
[docs]
class ReporterTaskHandler(APIProxyBaseHandler):
"""
Handler for work with reporter task.See `spec_reporter`_.
.. _spec_reporter:
_static/api.html#operation/createReporterTask
Resource: "/{api_version}/tasks/reporter"
"""
allowedMethods = ("POST",)
[docs]
async def prepareRequestPost(self):
"""
Create reporter proxy request in platform5 format.
Update report columns.
Returns:
request in platform5 format
"""
body = self.request.json
self.validateJson(body, REPORTER_TASK_SCHEMA, useJsonSchema=False)
if "columns" in body["content"]:
columns: List[str] = body["content"]["columns"]
indexToInsert = len(columns) + 1
for column in ("top_similar_face_id", "top_similar_face_list", "top_similar_face_similarity"):
while column in columns:
index = columns.index(column)
columns.pop(index)
indexToInsert = min(index, indexToInsert)
if indexToInsert != len(columns) + 1:
columns.insert(indexToInsert, "top_match")
if "extract_result" in columns:
index = columns.index("extract_result")
columns[index] = "face_detections"
if "attribute_id" in columns:
# for api compatibility
columns.remove("attribute_id")
return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())