Source code for luna_backport4.app.handlers.tasks_create_task_handler

from typing import List

import ujson

from app.handlers.base_handler import APIProxyBaseHandler
from app.handlers.schemas import (
    CLUSTERING_TASK_SCHEMA,
    CROSS_MATCH_TASK_SCHEMA,
    LINKER_TASK_SCHEMA,
    REPORTER_TASK_SCHEMA,
)
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.web.base_proxy_handler_class import ProxyRequest


[docs] def updateEventFilters(inputFilters: dict): """ Update event filters, replace *top_matching* filters Args: inputFilters: input events filters """ if "top_similar_face_list" in inputFilters: inputFilters["top_matching_candidates_label"] = inputFilters.pop("top_similar_face_list") if "top_similar_face_ids" in inputFilters: inputFilters["top_similar_object_ids"] = inputFilters.pop("top_similar_face_ids") if "top_similar_face_similarity__gte" in inputFilters: inputFilters["top_similar_object_similarity__gte"] = inputFilters.pop("top_similar_face_similarity__gte") if "top_similar_face_similarity__lt" in inputFilters: inputFilters["top_similar_object_similarity__lt"] = inputFilters.pop("top_similar_face_similarity__lt")
[docs] class CrossMatchingTaskHandler(APIProxyBaseHandler): """ Handler for work with cross match task. See `spec_cross_match`_. .. _spec_cross_match: _static/api.html#operation/createCrossMatchTask Resource: "/{api_version}/tasks/cross_match" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self) -> ProxyRequest: """ Create cross matching proxy request in platform5 format. Update events filters. Returns: request in platform5 format """ body = self.request.json self.validateJson(body, CROSS_MATCH_TASK_SCHEMA, useJsonSchema=False) if body["content"]["filters"]["reference_type"] == "events": updateEventFilters(body["content"]["filters"]["reference_filters"]) if body["content"]["filters"]["candidate_type"] == "events": updateEventFilters(body["content"]["filters"]["candidate_filters"]) return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs] class ROCTaskHandler(APIProxyBaseHandler): """ Handler for work with roc task. See `spec_roc`_. .. _spec_roc: _static/api.html#operation/createROCTask Resource: "/{api_version}/tasks/roc" """ allowedMethods = ("POST",)
[docs] class LinkerTaskHandler(APIProxyBaseHandler): """ Handler for work with linker task. See `spec_linker`_. .. _spec_linker: _static/api.html#operation/createLinkerTask Resource: "/{api_version}/tasks/linker" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self): """ Create linker proxy request in platform5 format. Update events filters. Returns: request in platform5 format """ body = self.request.json self.validateJson(body, LINKER_TASK_SCHEMA, useJsonSchema=False) if body["content"]["objects_type"] == "events": if "filters" in body["content"]: updateEventFilters(body["content"]["filters"]) return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs] class ClusteringTaskHandler(APIProxyBaseHandler): """ Handler for work with clustering task. See `spec_clustering`_. .. _spec_clustering: _static/api.html#operation/createClusteringTask Resource: "/{api_version}/tasks/clustering" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self): """ Create clustering proxy request in platform5 format. Update events filters. Returns: request in platform5 format """ body = self.request.json self.validateJson(body, CLUSTERING_TASK_SCHEMA, useJsonSchema=False) if body["content"]["objects_type"] == "events": if "filters" in body["content"]: updateEventFilters(body["content"]["filters"]) return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())
[docs] class GCTaskHandler(APIProxyBaseHandler): """ Handler for work with gc task. Resource: "/{api_version}/tasks/gc" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self): raise VLException(statusCode=400, isCriticalError=False, error=Error.GCAttributeNotAvailable)
[docs] class ReporterTaskHandler(APIProxyBaseHandler): """ Handler for work with reporter task.See `spec_reporter`_. .. _spec_reporter: _static/api.html#operation/createReporterTask Resource: "/{api_version}/tasks/reporter" """ allowedMethods = ("POST",)
[docs] async def prepareRequestPost(self): """ Create reporter proxy request in platform5 format. Update report columns. Returns: request in platform5 format """ body = self.request.json self.validateJson(body, REPORTER_TASK_SCHEMA, useJsonSchema=False) if "columns" in body["content"]: columns: List[str] = body["content"]["columns"] indexToInsert = len(columns) + 1 for column in ("top_similar_face_id", "top_similar_face_list", "top_similar_face_similarity"): while column in columns: index = columns.index(column) columns.pop(index) indexToInsert = min(index, indexToInsert) if indexToInsert != len(columns) + 1: columns.insert(indexToInsert, "top_match") if "extract_result" in columns: index = columns.index("extract_result") columns[index] = "face_detections" if "attribute_id" in columns: # for api compatibility columns.remove("attribute_id") return ProxyRequest(headers=self.prepareHeaders(), body=ujson.dumps(body), query=self.prepareQuery())