Source code for luna_tasks.app.handlers.results_proxy_handler
from sanic.response import HTTPResponse
from app.handlers.base_handler import BaseRequestHandler
from crutches_on_wheels.cow.enums.tasks import TaskStatus
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.web.query_getters import uriParamIntGetter, uuidGetter
from crutches_on_wheels.cow.web.utils import decodeBytes, loadJson
[docs]
class TaskResultHandler(BaseRequestHandler):
"""
Task result handler.
"""
[docs]
async def get(self, taskId: str) -> HTTPResponse:
"""
Get error, see `spec`_
.. _spec:
_static/api.html#operation/getTaskResult
Returns:
response with result and "Content-Disposition" header
"""
if not self.config.additionalServicesUsage.lunaImageStore:
raise VLException(Error.LISIsDisabled, 403, False)
self.request.streamResponse = True
taskId = uriParamIntGetter(taskId, Error.PageNotFoundError)
accountId = self.getQueryParam("account_id", uuidGetter)
task = await self.dbContext.getTask(taskId=taskId, accountId=accountId)
resultId = task["result_id"]
if task["task_status"] in (TaskStatus.cancelled.value, TaskStatus.failed.value):
error = Error.ImpossibleGetTaskResult.format(taskId, task["task_status"])
raise VLException(error, 400, isCriticalError=False)
if resultId is None:
error = Error.TaskDoesNotHaveResultYet.format(taskId)
raise VLException(error, 404, isCriticalError=False)
resultResponse = await self.luna3Client.lunaTasksResultsStore.getObject(
self.config.taskResultStorage.bucket, objectId=resultId, accountId=accountId, stream=True, totalTimeout=0
)
async with resultResponse.sendRequest() as resultResp:
if resultResp.rawResponse.status == 404:
# Need to use separate error for task result, we don't want to proxy error from image store
return self.error(404, Error.TaskResultNotFound.format(taskId))
elif resultResp.rawResponse.status != 200:
errorBody = await resultResp.rawResponse.read()
errorInfo = loadJson(decodeBytes(body=errorBody))
return self.error(resultResp.rawResponse.status, ErrorInfo.fromDict(errorInfo))
extension = resultResp.contentType.split("/")[-1]
filename = f"task_{taskId}.{extension}"
resp = await self.request.respond(
content_type=resultResponse.contentType,
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
async for chunk in resultResp.iter():
await resp.send(chunk)
await resp.eof()