Source code for luna_backport4.app.handlers.tasks_handler

""" Handler to work with a tasks. See `spec_tasks`_

    .. _spec_tasks:
        _static/api.html#tag/tasks
"""
from typing import Any, Dict, List, Optional

from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest
from sanic.response import HTTPResponse

from app.handlers.base_handler import APIProxyBaseHandler
from crutches_on_wheels.cow.enums.tasks import TaskType
from crutches_on_wheels.cow.utils.streamer import streamResponse


[docs]def updateReporterColumns(reporterColumns: List[str]) -> None: """ Update event columns for report task Args: reporterColumns: list of reporter columns """ for idx, column in enumerate(reporterColumns): if column == "top_match": reporterColumns.pop(idx) reporterColumns.extend(("top_similar_face_id", "top_similar_face_list", "top_similar_face_similarity")) elif column == "face_detections": reporterColumns[idx] = "extract_result"
[docs]def updateTaskFilters(inputFilters: Dict[str, Any]) -> None: """ Update task filters, replace *top_matching* filters Args: inputFilters: input events filters """ if "account_id" in inputFilters: del inputFilters["account_id"] if "top_matching_candidates_label" in inputFilters: inputFilters["top_similar_face_list"] = inputFilters.pop("top_matching_candidates_label") if "top_similar_object_ids" in inputFilters: inputFilters["top_similar_face_ids"] = inputFilters.pop("top_similar_object_ids") if "top_similar_object_similarity__gte" in inputFilters: inputFilters["top_similar_face_similarity__gte"] = inputFilters.pop("top_similar_object_similarity__gte") if "top_similar_object_similarity__lt" in inputFilters: inputFilters["top_similar_face_similarity__lt"] = inputFilters.pop("top_similar_object_similarity__lt")
[docs]def updateTaskContent(inputTaskContent: Dict[str, Any], taskType: Optional[int] = None) -> None: """ Update task content Args: inputTaskContent: input task content taskType: task type """ inputTaskContent.pop("descriptor", None) filters = inputTaskContent.get("filters") if filters is not None: updateTaskFilters(inputFilters=filters) filters.pop("descriptor", None) if "candidate_filters" in filters: updateTaskFilters(filters["candidate_filters"]) if "reference_filters" in filters: updateTaskFilters(filters["reference_filters"]) if "columns" in inputTaskContent: updateReporterColumns(inputTaskContent["columns"]) if taskType == TaskType.clustering.value: inputTaskContent.pop("limit", None)
[docs]class TasksHandler(APIProxyBaseHandler): """ Handler for work with tasks. Resource: "/{api_version}/tasks" """ allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove account_id and descriptor type from response Args: response: response from api Returns: response in platform4 format """ outputJson = response.json for task in outputJson["tasks"]: del task["account_id"] updateTaskContent(task["content"], task["task_type"]) return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs]class TasksCountHandler(APIProxyBaseHandler): """ Handler for work with task count. Resource: "/{api_version}/tasks/count" """ allowedMethods = ("GET",)
[docs]class TaskHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}" """ allowedMethods = ("GET", "HEAD", "DELETE", "PATCH")
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove account_id and descriptor type from response Args: response: response from api Returns: response in platform4 format """ task: dict = response.json del task["account_id"] updateTaskContent(task["content"], task["task_type"]) return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=task)
[docs]class TaskSubtaskHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}/subtasks" """ allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove descriptor type from response Args: response: response from api Returns: response in platform4 format """ outputJson: dict = response.json for subTask in outputJson["subtasks"]: updateTaskContent(subTask["content"]) return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs]class TaskResultHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}/result" """ allowedMethods = ("GET",)
[docs] async def get(self, taskId): """ Get task result. See `spec_get_task_result`_. .. spec_get_task_result: _static/api.html#operation/getTaskResult Args: taskId: task id """ self.request.streamResponse = True url = self.prepareUrl() preparedRequest = await self.prepareProxyRequest() reply = await makeRequest( url=url, method=self.request.method, queryParams=preparedRequest.query, headers=preparedRequest.headers, asyncRequest=True, connectTimeout=self.serviceTimeouts.connectTimeout, sockConnectTimeout=self.serviceTimeouts.sockConnectTimeout, sockReadTimeout=self.serviceTimeouts.sockReadTimeout, session=self.session, totalTimeout=0, stream=True, ) async with reply.sendRequest() as apiResp: await streamResponse(request=self.request, response=apiResp, headers=apiResp.headers)