Source code for luna_backport4.app.handlers.tasks_handler

""" Handler to work with a tasks. See `spec_tasks`_

    .. _spec_tasks:
        _static/api.html#tag/tasks
"""

from typing import Any, Dict, List, Optional

from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest
from sanic.response import HTTPResponse

from app.handlers.base_handler import APIProxyBaseHandler
from crutches_on_wheels.cow.enums.tasks import TaskType
from crutches_on_wheels.cow.utils.streamer import streamResponse


[docs] def updateReporterColumns(reporterColumns: List[str]) -> None: """ Update event columns for report task Args: reporterColumns: list of reporter columns """ for idx, column in enumerate(reporterColumns): if column == "top_match": reporterColumns.pop(idx) reporterColumns.extend(("top_similar_face_id", "top_similar_face_list", "top_similar_face_similarity")) elif column == "face_detections": reporterColumns[idx] = "extract_result"
[docs] def updateTaskFilters(inputFilters: Dict[str, Any]) -> None: """ Update task filters, replace *top_matching* filters Args: inputFilters: input events filters """ if "account_id" in inputFilters: del inputFilters["account_id"] if "top_matching_candidates_label" in inputFilters: inputFilters["top_similar_face_list"] = inputFilters.pop("top_matching_candidates_label") if "top_similar_object_ids" in inputFilters: inputFilters["top_similar_face_ids"] = inputFilters.pop("top_similar_object_ids") if "top_similar_object_similarity__gte" in inputFilters: inputFilters["top_similar_face_similarity__gte"] = inputFilters.pop("top_similar_object_similarity__gte") if "top_similar_object_similarity__lt" in inputFilters: inputFilters["top_similar_face_similarity__lt"] = inputFilters.pop("top_similar_object_similarity__lt")
[docs] def convertTaskContent(inputTaskContent: Dict[str, Any], taskType: Optional[int] = None) -> Dict[str, Any]: """ Convert task content to backport4 format Args: inputTaskContent: input task content taskType: task type """ inputTaskContent.pop("descriptor", None) filters = inputTaskContent.get("filters") if filters is not None: updateTaskFilters(inputFilters=filters) filters.pop("descriptor", None) if "candidate_filters" in filters: updateTaskFilters(filters["candidate_filters"]) if "reference_filters" in filters: updateTaskFilters(filters["reference_filters"]) if "columns" in inputTaskContent: updateReporterColumns(inputTaskContent["columns"]) if taskType == TaskType.clustering.value: inputTaskContent.pop("limit", None) return inputTaskContent
[docs] def convertTaskResponse(task: Dict[str, Any]) -> Dict[str, Any]: """ Convert task response to backport4 format """ task = { "task_id": task["task_id"], "create_time": task["create_time"], "end_time": task["end_time"], "last_update_time": task["last_update_time"], "task_type": task["task_type"], "task_status": task["task_status"], "result_id": task["result_id"], "count_task_parts_done": task["count_task_parts_done"], "count_task_parts_all": task["count_task_parts_all"], "content": convertTaskContent(task["content"], task["task_type"]), "description": task["description"], } return task
[docs] class TasksHandler(APIProxyBaseHandler): """ Handler for work with tasks. Resource: "/{api_version}/tasks" """ allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove account_id and descriptor type from response Args: response: response from api Returns: response in platform4 format """ outputJson = response.json outputJson["tasks"] = [convertTaskResponse(task) for task in outputJson["tasks"]] return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs] class TasksCountHandler(APIProxyBaseHandler): """ Handler for work with task count. Resource: "/{api_version}/tasks/count" """ allowedMethods = ("GET",)
[docs] class TaskHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}" """ allowedMethods = ("GET", "HEAD", "DELETE", "PATCH")
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove account_id and descriptor type from response Args: response: response from api Returns: response in platform4 format """ task: dict = convertTaskResponse(response.json) return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=task)
[docs] class TaskSubtaskHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}/subtasks" """ allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse: """ Remove descriptor type from response Args: response: response from api Returns: response in platform4 format """ outputJson: dict = response.json outputJson["subtasks"] = [convertTaskResponse(subTask) for subTask in outputJson["subtasks"]] return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs] class TaskResultHandler(APIProxyBaseHandler): """ Handler for work with task. Resource: "/{api_version}/tasks/{taskId}/result" """ allowedMethods = ("GET",)
[docs] async def get(self, taskId): """ Get task result. See `spec_get_task_result`_. .. spec_get_task_result: _static/api.html#operation/getTaskResult Args: taskId: task id """ self.request.streamResponse = True url = self.prepareUrl() preparedRequest = await self.prepareProxyRequest() reply = await makeRequest( url=url, method=self.request.method, queryParams=preparedRequest.query, headers=preparedRequest.headers, asyncRequest=True, connectTimeout=self.serviceTimeouts.connectTimeout, sockConnectTimeout=self.serviceTimeouts.sockConnectTimeout, sockReadTimeout=self.serviceTimeouts.sockReadTimeout, session=self.session, totalTimeout=0, stream=True, ) async with reply.sendRequest() as apiResp: await streamResponse(request=self.request, response=apiResp, headers=apiResp.headers)