Source code for luna_handlers.app.handlers.extractor_handler

""" Extractor handler """
import asyncio
from typing import List, Optional

from sanic.response import HTTPResponse

from app.api_sdk_adaptors.extractor import APISDKExtractorAdaptor, RelatedAttributes
from app.handlers.base_handler import BaseHandler
from classes.schemas.extractor import Extractor
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.query_getters import int01Getter, float01Getter, uuidGetter
from crutches_on_wheels.monitoring.points import monitorTime
from sdk.sdk_loop.estimation_targets import SDKEstimationTargets, SDKFaceEstimationTargets
from sdk.sdk_loop.sdk_task import SDKTaskFilters, SDKTask


[docs]class ExtractorHandler(BaseHandler): """ Extract attributes such as gender, age, ethnicity, descriptor from samples. Resource: "/{api_version}/extractor" """
[docs] async def saveAttributes(self, attributesToSave: List[RelatedAttributes], ttl: Optional[str]) -> None: """ Save attributes. Args: attributesToSave: attributes accountId: account id ttl: time to store attribute """ for attribute in attributesToSave: if attribute.face is not None: kwargs = attribute.face._getAttributeKwargs() if not kwargs: continue # no need to create empty attribute reply = await self.luna3Client.lunaFaces.createAttribute( accountId=self.accountId, **kwargs, ttl=ttl, raiseError=True ) attribute.face.sdkAttribute.attributeId = reply.json["attribute_id"] attribute.face.url = f'{self.config.facesAddress.origin}{reply.json["url"]}' else: raise NotImplementedError("Cannot save attribute without face")
[docs] async def post(self) -> HTTPResponse: """ Extract attributes from samples. See `spec_extractor`_. .. _spec_extractor: _static/api.html#operation/extractAttributes Returns: response with list of extracted attributes Raises: VLException(Error.NotSelectedAttributesForExtract, 400, isCriticalError=True): if extract_descriptor=0 and extract_basic_attributes=0 """ extractDescriptor = self.getQueryParam("extract_descriptor", int01Getter, default=1) extractBasicAttributes = self.getQueryParam("extract_basic_attributes", int01Getter, default=0) aggregateAttributes = self.getQueryParam("aggregate_attributes", int01Getter, default=0) scoreThreshold = self.getQueryParam("score_threshold", float01Getter, default=0) self.accountId = self.getQueryParam("account_id", uuidGetter, require=True) ttl = self.getQueryParam("ttl") if not (extractBasicAttributes or extractDescriptor): raise VLException(Error.NotSelectedAttributesForExtract, 400, isCriticalError=False) sampleIds = self.request.json self.loadDataFromJson({"samples": sampleIds}, Extractor) faceTargets = SDKFaceEstimationTargets( estimateFaceDescriptor=extractDescriptor, estimateBasicAttributes=extractBasicAttributes ) toEstimate = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceTargets) filters = SDKTaskFilters(garbageScoreThreshold=scoreThreshold) futures = [self.downloadFaceSample(sampleId) for sampleId in sampleIds] with monitorTime(self.request.dataForMonitoring, "load_face_samples_time"): samples = await asyncio.gather(*futures) task = SDKTask(toEstimate, data=samples, filters=filters, aggregateAttributes=aggregateAttributes) extractor = APISDKExtractorAdaptor(accountId=self.accountId, logger=self.logger, sdkLoop=self.sdkLoop,) niceAttributes, monitoringData, filteredAttributes = await extractor.extract(task=task) with monitorTime(monitoringData.request, "save_samples_time"): await self.saveAttributes(attributesToSave=niceAttributes, ttl=ttl) self.handleMonitoringData(monitoringData) return self.success(201, outputJson=[attribute.face.json for attribute in niceAttributes + filteredAttributes])