Source code for luna_handlers.app.handlers.extractor_handler
""" Extractor handler """
import asyncio
from typing import List, Optional
from sanic.response import HTTPResponse
from app.api_sdk_adaptors.extractor import APISDKExtractorAdaptor, RelatedAttributes
from app.handlers.base_handler import BaseHandler
from classes.schemas.extractor import Extractor
from crutches_on_wheels.errors.errors import Error
from crutches_on_wheels.errors.exception import VLException
from crutches_on_wheels.web.query_getters import int01Getter, float01Getter, uuidGetter
from crutches_on_wheels.monitoring.points import monitorTime
from sdk.sdk_loop.estimation_targets import SDKEstimationTargets, SDKFaceEstimationTargets
from sdk.sdk_loop.sdk_task import SDKTaskFilters, SDKTask
[docs]class ExtractorHandler(BaseHandler):
"""
Extract attributes such as gender, age, ethnicity, descriptor from samples.
Resource: "/{api_version}/extractor"
"""
[docs] async def saveAttributes(self, attributesToSave: List[RelatedAttributes], ttl: Optional[str]) -> None:
"""
Save attributes.
Args:
attributesToSave: attributes
accountId: account id
ttl: time to store attribute
"""
for attribute in attributesToSave:
if attribute.face is not None:
kwargs = attribute.face._getAttributeKwargs()
if not kwargs:
continue # no need to create empty attribute
reply = await self.luna3Client.lunaFaces.createAttribute(
accountId=self.accountId, **kwargs, ttl=ttl, raiseError=True
)
attribute.face.sdkAttribute.attributeId = reply.json["attribute_id"]
attribute.face.url = f'{self.config.facesAddress.origin}{reply.json["url"]}'
else:
raise NotImplementedError("Cannot save attribute without face")
[docs] async def post(self) -> HTTPResponse:
"""
Extract attributes from samples. See `spec_extractor`_.
.. _spec_extractor:
_static/api.html#operation/extractAttributes
Returns:
response with list of extracted attributes
Raises:
VLException(Error.NotSelectedAttributesForExtract, 400, isCriticalError=True): if extract_descriptor=0 and
extract_basic_attributes=0
"""
extractDescriptor = self.getQueryParam("extract_descriptor", int01Getter, default=1)
extractBasicAttributes = self.getQueryParam("extract_basic_attributes", int01Getter, default=0)
aggregateAttributes = self.getQueryParam("aggregate_attributes", int01Getter, default=0)
scoreThreshold = self.getQueryParam("score_threshold", float01Getter, default=0)
self.accountId = self.getQueryParam("account_id", uuidGetter, require=True)
ttl = self.getQueryParam("ttl")
if not (extractBasicAttributes or extractDescriptor):
raise VLException(Error.NotSelectedAttributesForExtract, 400, isCriticalError=False)
sampleIds = self.request.json
self.loadDataFromJson({"samples": sampleIds}, Extractor)
faceTargets = SDKFaceEstimationTargets(
estimateFaceDescriptor=extractDescriptor, estimateBasicAttributes=extractBasicAttributes
)
toEstimate = SDKEstimationTargets(estimateHuman=0, faceEstimationTargets=faceTargets)
filters = SDKTaskFilters(garbageScoreThreshold=scoreThreshold)
futures = [self.downloadFaceSample(sampleId) for sampleId in sampleIds]
with monitorTime(self.request.dataForMonitoring, "load_face_samples_time"):
samples = await asyncio.gather(*futures)
task = SDKTask(toEstimate, data=samples, filters=filters, aggregateAttributes=aggregateAttributes)
extractor = APISDKExtractorAdaptor(accountId=self.accountId, logger=self.logger, sdkLoop=self.sdkLoop,)
niceAttributes, monitoringData, filteredAttributes = await extractor.extract(task=task)
with monitorTime(monitoringData.request, "save_samples_time"):
await self.saveAttributes(attributesToSave=niceAttributes, ttl=ttl)
self.handleMonitoringData(monitoringData)
return self.success(201, outputJson=[attribute.face.json for attribute in niceAttributes + filteredAttributes])