Source code for luna_backport4.app.handlers.tasks_handler
""" Handler to work with a tasks. See `spec_tasks`_
.. _spec_tasks:
_static/api.html#tag/tasks
"""
from typing import Any, Dict, List, Optional
from luna3.common.luna_response import LunaResponse
from luna3.common.requests import makeRequest
from sanic.response import HTTPResponse
from app.handlers.base_handler import APIProxyBaseHandler
from crutches_on_wheels.cow.enums.tasks import TaskType
from crutches_on_wheels.cow.utils.streamer import streamResponse
[docs]def updateReporterColumns(reporterColumns: List[str]) -> None:
"""
Update event columns for report task
Args:
reporterColumns: list of reporter columns
"""
for idx, column in enumerate(reporterColumns):
if column == "top_match":
reporterColumns.pop(idx)
reporterColumns.extend(("top_similar_face_id", "top_similar_face_list", "top_similar_face_similarity"))
elif column == "face_detections":
reporterColumns[idx] = "extract_result"
[docs]def updateTaskFilters(inputFilters: Dict[str, Any]) -> None:
"""
Update task filters, replace *top_matching* filters
Args:
inputFilters: input events filters
"""
if "account_id" in inputFilters:
del inputFilters["account_id"]
if "top_matching_candidates_label" in inputFilters:
inputFilters["top_similar_face_list"] = inputFilters.pop("top_matching_candidates_label")
if "top_similar_object_ids" in inputFilters:
inputFilters["top_similar_face_ids"] = inputFilters.pop("top_similar_object_ids")
if "top_similar_object_similarity__gte" in inputFilters:
inputFilters["top_similar_face_similarity__gte"] = inputFilters.pop("top_similar_object_similarity__gte")
if "top_similar_object_similarity__lt" in inputFilters:
inputFilters["top_similar_face_similarity__lt"] = inputFilters.pop("top_similar_object_similarity__lt")
[docs]def updateTaskContent(inputTaskContent: Dict[str, Any], taskType: Optional[int] = None) -> None:
"""
Update task content
Args:
inputTaskContent: input task content
taskType: task type
"""
inputTaskContent.pop("descriptor", None)
filters = inputTaskContent.get("filters")
if filters is not None:
updateTaskFilters(inputFilters=filters)
filters.pop("descriptor", None)
if "candidate_filters" in filters:
updateTaskFilters(filters["candidate_filters"])
if "reference_filters" in filters:
updateTaskFilters(filters["reference_filters"])
if "columns" in inputTaskContent:
updateReporterColumns(inputTaskContent["columns"])
if taskType == TaskType.clustering.value:
inputTaskContent.pop("limit", None)
[docs]class TasksHandler(APIProxyBaseHandler):
"""
Handler for work with tasks.
Resource: "/{api_version}/tasks"
"""
allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse:
"""
Remove account_id and descriptor type from response
Args:
response: response from api
Returns:
response in platform4 format
"""
outputJson = response.json
for task in outputJson["tasks"]:
del task["account_id"]
updateTaskContent(task["content"], task["task_type"])
return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs]class TasksCountHandler(APIProxyBaseHandler):
"""
Handler for work with task count.
Resource: "/{api_version}/tasks/count"
"""
allowedMethods = ("GET",)
[docs]class TaskHandler(APIProxyBaseHandler):
"""
Handler for work with task.
Resource: "/{api_version}/tasks/{taskId}"
"""
allowedMethods = ("GET", "HEAD", "DELETE", "PATCH")
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse:
"""
Remove account_id and descriptor type from response
Args:
response: response from api
Returns:
response in platform4 format
"""
task: dict = response.json
del task["account_id"]
updateTaskContent(task["content"], task["task_type"])
return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=task)
[docs]class TaskSubtaskHandler(APIProxyBaseHandler):
"""
Handler for work with task.
Resource: "/{api_version}/tasks/{taskId}/subtasks"
"""
allowedMethods = ("GET",)
[docs] async def postProcessingGet(self, response: LunaResponse) -> HTTPResponse:
"""
Remove descriptor type from response
Args:
response: response from api
Returns:
response in platform4 format
"""
outputJson: dict = response.json
for subTask in outputJson["subtasks"]:
updateTaskContent(subTask["content"])
return self.success(statusCode=response.statusCode, extraHeaders=response.headers, outputJson=outputJson)
[docs]class TaskResultHandler(APIProxyBaseHandler):
"""
Handler for work with task.
Resource: "/{api_version}/tasks/{taskId}/result"
"""
allowedMethods = ("GET",)
[docs] async def get(self, taskId):
"""
Get task result. See `spec_get_task_result`_.
.. spec_get_task_result:
_static/api.html#operation/getTaskResult
Args:
taskId: task id
"""
self.request.streamResponse = True
url = self.prepareUrl()
preparedRequest = await self.prepareProxyRequest()
reply = await makeRequest(
url=url,
method=self.request.method,
queryParams=preparedRequest.query,
headers=preparedRequest.headers,
asyncRequest=True,
connectTimeout=self.serviceTimeouts.connectTimeout,
sockConnectTimeout=self.serviceTimeouts.sockConnectTimeout,
sockReadTimeout=self.serviceTimeouts.sockReadTimeout,
session=self.session,
totalTimeout=0,
stream=True,
)
async with reply.sendRequest() as apiResp:
await streamResponse(request=self.request, response=apiResp, headers=apiResp.headers)