Source code for luna_tasks.app.handlers.results_proxy_handler

from sanic.response import HTTPResponse

from app.handlers.base_handler import BaseRequestHandler
from crutches_on_wheels.cow.enums.tasks import TaskStatus
from crutches_on_wheels.cow.errors.errors import Error, ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.web.query_getters import uriParamIntGetter, uuidGetter
from crutches_on_wheels.cow.web.utils import decodeBytes, loadJson


[docs] class TaskResultHandler(BaseRequestHandler): """ Task result handler. """
[docs] async def get(self, taskId: str) -> HTTPResponse: """ Get error, see `spec`_ .. _spec: _static/api.html#operation/getTaskResult Returns: response with result and "Content-Disposition" header """ if not self.config.additionalServicesUsage.lunaImageStore: raise VLException(Error.LISIsDisabled, 403, False) self.request.streamResponse = True taskId = uriParamIntGetter(taskId, Error.PageNotFoundError) accountId = self.getQueryParam("account_id", uuidGetter) task = await self.dbContext.getTask(taskId=taskId, accountId=accountId) resultId = task["result_id"] if task["task_status"] in (TaskStatus.cancelled.value, TaskStatus.failed.value): error = Error.ImpossibleGetTaskResult.format(taskId, task["task_status"]) raise VLException(error, 400, isCriticalError=False) if resultId is None: error = Error.TaskDoesNotHaveResultYet.format(taskId) raise VLException(error, 404, isCriticalError=False) resultResponse = await self.luna3Client.lunaTasksResultsStore.getObject( self.config.taskResultStorage.bucket, objectId=resultId, accountId=accountId, stream=True, totalTimeout=0 ) async with resultResponse.sendRequest() as resultResp: if resultResp.rawResponse.status == 404: # Need to use separate error for task result, we don't want to proxy error from image store return self.error(404, Error.TaskResultNotFound.format(taskId)) elif resultResp.rawResponse.status != 200: errorBody = await resultResp.rawResponse.read() errorInfo = loadJson(decodeBytes(body=errorBody)) return self.error(resultResp.rawResponse.status, ErrorInfo.fromDict(errorInfo)) extension = resultResp.contentType.split("/")[-1] filename = f"task_{taskId}.{extension}" resp = await self.request.respond( content_type=resultResponse.contentType, headers={"Content-Disposition": f"attachment; filename={filename}"}, ) async for chunk in resultResp.iter(): await resp.send(chunk) await resp.eof()