Source code for luna_handlers.classes.schemas.storage_policy

"""
Module contains schemas for storage policy
"""
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Coroutine, Dict, List, Literal
from uuid import UUID, uuid4

import aiohttp
from aiohttp import BasicAuth
from luna3.client import Client
from luna3.common.requests import RequestPayload, makeRequest
from pydantic import Field, PrivateAttr
from vlutils.jobs.async_runner import AsyncRunner

from app.global_vars.context_vars import requestIdCtx
from classes.event import HandlerEvent as Event
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.cow.maps.vl_maps import ETHNIC_MAP
from crutches_on_wheels.cow.monitoring.points import DataForMonitoring, monitorTime
from crutches_on_wheels.cow.pydantic.types import OptionalNotNullable, Str
from crutches_on_wheels.cow.utils.log import logger
from redis_db.redis_context import RedisContext

CURRENT_LUNA_API_VERSION = 6


[docs] @dataclass(slots=True) class StorePolicyConfig: """Handler config that policies should apply.""" facesBucket: str bodiesBucket: str originBucket: str
class _BaseStoragePolicy(BaseSchema): """Base storage policy""" # storage policy complex filters (matching+attributes) filters: ComplexFilter = ComplexFilter() def filterEventsByFilters(self, events: list[Event]) -> list[Event]: """ Filter events for the further processing. Args: events: all events Returns: events without filtered ones """ if self.filters.isEmpty: return events return [event for event in events if self.filters.isEventSatisfies(event)] def isEventSatisfyFilters(self, event: Event) -> bool: """Is events satisfy existing filters""" if not self.filters.isEmpty: return self.filters.isEventSatisfies(event) return True
[docs] class FaceSamplePolicy(_BaseStoragePolicy): """Face sample policy""" # whether to store sample storeSample: types.Int01 = 1
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save face samples. Args: events: events accountId: account id bucket: bucket name luna3Client: client """ if not self.storeSample: return putImage = partial(luna3Client.lunaFaceSamplesStore.putImage, accountId=accountId, bucketName=bucket) storeUrl = f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}/images" warpFutures = [] for event in self.filterEventsByFilters(events): for estimation, source, warp in zip(event.raw["detections"], event.sources, event.faceWarps): if (faceSample := estimation["samples"]["face"]) is not None: if not (warp or source.sample): continue sampleId = str(source.sample) if source.sample else None if not sampleId: sampleId = str(uuid4()) warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta)) if faceAttributes := event.raw["face_attributes"]: faceAttributes["samples"].append(sampleId) faceSample["sample_id"] = sampleId faceSample["url"] = f"{storeUrl}/{sampleId}" if warpFutures: await asyncio.gather(*warpFutures)
[docs] class BodySamplePolicy(_BaseStoragePolicy): """Body sample policy""" # whether to store sample storeSample: types.Int01 = 1
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save body samples. Args: events: events accountId: account id bucket: bucket name luna3Client: client """ if not self.storeSample: return putImage = partial(luna3Client.lunaBodySamplesStore.putImage, accountId=accountId, bucketName=bucket) storeUrl = f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}/images" warpFutures = [] for event in self.filterEventsByFilters(events): for estimation, source, warp in zip(event.raw["detections"], event.sources, event.bodyWarps): if not (warp or source.sample): continue if (bodySample := estimation["samples"]["body"]) is not None: sampleId = str(source.sample) if source.sample else None if not sampleId: sampleId = str(uuid4()) warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta)) if faceAttributes := event.raw["body_attributes"]: faceAttributes["samples"].append(sampleId) bodySample["sample_id"] = sampleId bodySample["url"] = f"{storeUrl}/{sampleId}" if warpFutures: await asyncio.gather(*warpFutures)
[docs] class ImageOriginPolicy(_BaseStoragePolicy): """Image origin policy""" # whether to store origin image storeImage: types.Int01 = 0 # use external reference as image origin useExternalReferences: types.Int01 = 1
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save origin images. Args: events: events accountId: account id bucket: bucket name luna3Client: client Notes: we need to loop through images if image_origin is image because we have to upload them and save as url """ if not events: return futures = [] def addImageToUpload(accountId, imageId, body, headers: dict = None): futures.append( luna3Client.lunaImageOriginStore.putImage( imageInBytes=body, imageId=imageId, accountId=accountId, bucketName=bucket, headers=headers, raiseError=True, ) ) def isValidExternalUrl(source): return self.useExternalReferences and source.url and len(source.url) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH for event in self.filterEventsByFilters(events=events): for estimation, source in zip(event.raw["detections"], event.sources): imageId = str(uuid4()) if source.imageOrigin and type(source.imageOrigin.body) is str: continue if source.image.body is None and (source.imageOrigin is None or source.imageOrigin.body is None): continue if source.image.imageType == 0 and not self.storeImage: if isValidExternalUrl(source): estimation["image_origin"] = source.url if not self.storeImage: continue if source.imageOrigin: addImageToUpload(accountId, imageId, source.imageOrigin.body, source.imageOrigin.meta) imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif source.image.imageType == 0: if isValidExternalUrl(source): imageOrigin = source.url else: addImageToUpload(accountId, imageId, source.image.body, source.imageMeta) imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif source.image.imageType == 1: sampleId = None if source.sample: sampleId = str(source.sample) elif face := estimation["samples"]["face"]: sampleId = face["sample_id"] if self.useExternalReferences and sampleId: imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}" else: addImageToUpload(accountId, imageId, source.image.body, source.imageMeta) imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif source.image.imageType == 2: sampleId = None if source.sample: sampleId = str(source.sample) elif body := estimation["samples"]["body"]: sampleId = body["sample_id"] if self.useExternalReferences and sampleId: imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}" else: addImageToUpload(accountId, imageId, source.image.body, source.imageMeta) imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" else: raise RuntimeError("Unsupported image type") estimation["image_origin"] = imageOrigin await asyncio.gather(*futures)
[docs] class AttributeStorePolicy(_BaseStoragePolicy): """Attribute store policy""" # whether to store attribute storeAttribute: types.Int01 = 0 # attribute storage ttl ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL) @staticmethod def generateFaceAttributesKwargs(event): attributesKwargs = {} attribute = event.raw["face_attributes"] if attribute and (basicAttributes := attribute.get("basic_attributes")): faceBasicAttributes = { "age": basicAttributes["age"], "gender": basicAttributes["gender"], "ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]], } attributesKwargs.update( basicAttributesSamples=attribute["samples"], basicAttributes=faceBasicAttributes, ) if attribute and attribute.get("score"): attributesKwargs.update( descriptorSamples=attribute["samples"], descriptors=[event.faceDescriptor], ) return attributesKwargs
[docs] async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None: """ Save attributes. Args: events: events accountId: account id luna3Client: client """ if not self.storeAttribute: return putAttribute = partial(luna3Client.lunaFaces.putAttribute, accountId=accountId, ttl=self.ttl) storeUrl = f"{luna3Client.lunaFaces.baseUri}/attributes/" futures = [] for event in self.filterEventsByFilters(events): attribute = event.raw["face_attributes"] if attribute is None: continue attributeKwargs = self.generateFaceAttributesKwargs(event) attributeId = str(uuid4()) attribute |= {"attribute_id": attributeId, "url": f"{storeUrl}/{attributeId}"} futures.append(putAttribute(attributeId=attributeId, **attributeKwargs, raiseError=True)) if futures: await asyncio.gather(*futures)
[docs] class LinkToListsPolicy(_BaseStoragePolicy): """Link to lists policy schema""" # list id to link faces to listId: UUID
[docs] class FaceStoragePolicy(_BaseStoragePolicy): """Face store policy""" # whether to store face storeFace: types.Int01 = 0 # whether to set face sample as avatar setSampleAsAvatar: types.Int01 = 1 # face link to lists policy list linkToListsPolicy: list[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH) @staticmethod def generateFaceAttributesKwargs(event): attributesKwargs = {} attribute = event.raw["face_attributes"] if attribute and (basicAttributes := attribute.get("basic_attributes")): faceBasicAttributes = { "age": basicAttributes["age"], "gender": basicAttributes["gender"], "ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]], } attributesKwargs.update( basicAttributesSamples=attribute["samples"], basicAttributes=faceBasicAttributes, ) if attribute and attribute.get("score"): attributesKwargs.update( descriptorSamples=attribute["samples"], descriptors=[event.faceDescriptor], ) return attributesKwargs
[docs] async def execute(self, events: list[Event], accountId, luna3Client: Client) -> None: """ Execute face policy (with link to list policy). Args: events: processing events accountId: account id luna3Client: luna3 client """ if not self.storeFace: return futures, eventsToUpdate = [], [] for event in self.filterEventsByFilters(events=events): faceRes = { "face_id": (faceId := str(uuid4())), "url": f"{luna3Client.lunaFaces.origin}/faces/{faceId}", "event_id": event.raw["event_id"], "user_data": event.raw.get("user_data", ""), "external_id": event.raw.get("external_id", ""), "avatar": "", } lists = list( { str(linkListPolicy.listId) for linkListPolicy in self.linkToListsPolicy if linkListPolicy.filters is None or linkListPolicy.filters.isEventSatisfies(event) } ) faceRes["lists"] = lists if self.setSampleAsAvatar: if event.raw["detections"] and (face := event.raw["detections"][0]["samples"]["face"]): if face["sample_id"] is not None: faceRes["avatar"] = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{face['sample_id']}" event.raw["face"] = faceRes attributeKwargs = self.generateFaceAttributesKwargs(event) futures.append( luna3Client.lunaFaces.putFace( faceId=faceId, accountId=accountId, eventId=event.raw["event_id"], userData=event.raw.get("user_data"), externalId=event.raw.get("external_id"), listIds=lists or None, avatar=event.raw["face"].get("avatar"), **attributeKwargs, raiseError=True, ) ) if futures: await asyncio.gather(*futures)
[docs] class BasicAuthorization(BaseSchema): """Callback basic authorization""" # authorization type type: Literal["basic"] # authorization login login: Str(maxLength=128) # authorization password password: Str(maxLength=128) @property def headers(self): """Get basic authorization headers""" return {"Authorization": BasicAuth(login=self.login, password=self.password).encode()}
[docs] class Params(BaseSchema): """Callback request parameters""" # callback request timeout timeout: int = OptionalNotNullable() # callback request content type contentType: Literal["application/json", "application/msgpack"] = OptionalNotNullable() # callback request headers headers: Dict[str, str] = OptionalNotNullable()
[docs] class HttpCallback(_BaseStoragePolicy): """Notification policy callbacks""" # callback type type: Literal["http"] # callback url url: str # callback authorization parameters authorization: BasicAuthorization = OptionalNotNullable() # callback request parameters params: Params = Params() # whether callback enabled or not enable: types.Int01 = 1 # callback final headers _headers: Dict[str, str] = PrivateAttr() def model_post_init(self, __context): """Get basic authorization headers""" self._headers = self.params.headers or {} if self.authorization is not None: self._headers |= self.authorization.headers
[docs] @classmethod async def onStartup(cls): """Init Policies""" cls.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60))
[docs] @classmethod async def onShutdown(cls): """Stop Policies""" await cls.session.close()
[docs] def buildCallback(self, events: list[Event]) -> Coroutine | None: """ Build callback coroutine Args: events: events to send Returns: callback coroutine or None if callback disabled or filtered """ if not self.enable or not (filteredEvents := self.filterEventsByFilters(events)): return if self.params.contentType == "application/msgpack": body = RequestPayload.buildMsgpack({"events": [event.raw for event in filteredEvents]}) else: body = RequestPayload.buildJson({"events": [event.raw for event in filteredEvents]}) async def sendNotification(): reply = await makeRequest( url=self.url, method="POST", headers=self._headers, totalTimeout=self.params.timeout, body=body, session=self.session, asyncRequest=True, ) if not reply.success: logger.warning(f"Notification to {self.url} failed. {reply.text}") return sendNotification()
[docs] class NotificationStoragePolicy(_BaseStoragePolicy): """Notification store policy""" # whether to send notification sendNotification: types.Int01 = 1
[docs] async def execute(self, events: list[Event], accountId: str, redisContext: RedisContext) -> None: """ Save notifications Args: events: events accountId: account id redisContext: redis context """ if not self.sendNotification: return eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)] if not eventsToSend: return await redisContext.publish(eventsToSend, accountId, requestIdCtx.get())
[docs] class EventStoragePolicy(_BaseStoragePolicy): """Event store policy""" # whether to store event storeEvent: types.Int01 = 1 # whether to wait events saving (response will be received only after events will be saved) waitSaving: types.Int01 = 1
[docs] @classmethod async def onStartup(cls): """Init Policies""" cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs] @classmethod async def onShutdown(cls): """Stop Policies""" await cls.saveEventsAsyncRunner.close()
[docs] @staticmethod def getEventFaceAttributes(event): """ Get the face attributes of an event :param event: Base event. :return: Constructed object based on event's face descriptor parameters. """ return { "basic_attributes": (event.raw["face_attributes"] or {}).get("basic_attributes"), "descriptor_data": { "descriptor": event.faceDescriptor.descriptor, "descriptor_version": event.faceDescriptor.version, } if event.faceDescriptor else None, }
[docs] @staticmethod def getEventBasicAttributes(event): """ Get the basic attributes of an event. :param event: Base event. :return: Constructed object based on event's body descriptor parameters. """ return { "descriptor_data": { "descriptor": event.bodyDescriptor.descriptor, "descriptor_version": event.bodyDescriptor.version, } if event.bodyDescriptor else None }
[docs] @staticmethod def processDetectionSamplesData(event): """ Add detection rect properties to event detections samples data :param event: Event to update """ for detection in event["detections"]: samples = detection["samples"] for descriptorType in ("face", "body"): if descriptorData := deepcopy(samples.get(descriptorType)): rect = descriptorData["detection"].get("rect") samples[descriptorType] = { "sample_id": descriptorData["sample_id"], "detection": {"rect": rect} if rect else {}, } else: samples[descriptorType] = None
[docs] @staticmethod def processEventMatchCandidatesData(event): """ Exclude non supported properties from faces' and events' matching candidates :param event: Event to update """ expectedFields = { "event": ( "event_id", "handler_id", "source", "stream_id", "external_id", "user_data", "create_time", ), "face": ("face_id", "external_id", "user_data", "create_time"), } for match in event["matches"]: for candidate in match["candidates"]: candidateType = "face" if "face" in candidate else "event" candidateItems = candidate[candidateType].items() candidate[candidateType] = { key: value for key, value in candidateItems if key in expectedFields[candidateType] }
[docs] def generateEventToSend(self, event): """ Prepare the event data for saving. :param event: Base event. :return: Constructed object based on event's parameters. """ faceAttributes = self.getEventFaceAttributes(event) bodyAttributes = self.getEventBasicAttributes(event) eventToSend = deepcopy(event.raw) eventToSend["handler_id"] = event.handlerId eventToSend["create_time"] = event.createTime eventToSend["end_time"] = event.endTime eventToSend["face"] = ( {"face_id": faceData["face_id"], "lists": faceData["lists"]} if (faceData := event.raw.get("face")) else None ) eventToSend["face_attributes"] = {key: value for key, value in faceAttributes.items() if value is not None} eventToSend["body_attributes"] = {key: value for key, value in bodyAttributes.items() if value is not None} if aggregatedAttrs := eventToSend["aggregate_estimations"]["face"]["attributes"]: if "mask" in aggregatedAttrs: aggregatedAttrs["mask"].pop("face_occlusion") self.processDetectionSamplesData(eventToSend) self.processEventMatchCandidatesData(eventToSend) eventToSend.pop("url") return eventToSend
[docs] async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None: """ Save events. Args: events: events accountId: account id luna3Client: client """ if not self.storeEvent: return eventsToSend = [] for event in events: if not self.isEventSatisfyFilters(event): continue event.raw["url"] = f"{luna3Client.lunaEvents.baseUri}/events/{event.raw['event_id']}" eventToSend = self.generateEventToSend(event) eventsToSend.append(eventToSend) if not eventsToSend: return async def saveEvents(): reply = await luna3Client.lunaEvents.saveEvents( accountId=accountId, events=eventsToSend, waitEventsSaving=bool(self.waitSaving), raiseError=self.waitSaving, ) if not reply.success: logger.warning( f"Failed save events to luna-event, receive response " f"with status code {reply.statusCode}, body {reply.text}" ) if self.waitSaving: await saveEvents() else: self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs] class StoragePolicy(BaseSchema): """Storage policy schema""" # face sample storage policy faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy() # body sample storage policy bodySamplePolicy: BodySamplePolicy = BodySamplePolicy() # image origin storage policy imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy() # attribute storage policy attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy()) # face storage policy facePolicy: FaceStoragePolicy = FaceStoragePolicy() # event storage policy eventPolicy: EventStoragePolicy = EventStoragePolicy() # notification storage policy notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy() # callbacks callbacks: List[HttpCallback] = []
[docs] async def execute( self, events: list[Event], accountId: str, monitoring: DataForMonitoring, config: StorePolicyConfig, luna3Client: Client, redisContext: RedisContext, ): """ Execute storage policy - save objects. Args: events: events to process accountId: account id monitoring: data for monitoring config: app config luna3Client: client redisContext: redis context """ async def _faceSample() -> None: with monitorTime(monitoring, "face_sample_storage_policy_time"): await self.faceSamplePolicy.execute(events, accountId, config.facesBucket, luna3Client) async def _bodySample() -> None: with monitorTime(monitoring, "body_sample_storage_policy_time"): await self.bodySamplePolicy.execute(events, accountId, config.bodiesBucket, luna3Client) async def _originImage() -> None: with monitorTime(monitoring, "image_origin_storage_policy_time"): await self.imageOriginPolicy.execute(events, accountId, config.originBucket, luna3Client) async def _attribute() -> None: with monitorTime(monitoring, "face_attribute_storage_policy_time"): await self.attributePolicy.execute(events, accountId, luna3Client) async def _face() -> None: with monitorTime(monitoring, "face_storage_policy_time"): await self.facePolicy.execute(events, accountId, luna3Client) async def _event() -> None: with monitorTime(monitoring, "event_storage_policy_time"): await self.eventPolicy.execute(events, accountId, luna3Client) async def _notification() -> None: with monitorTime(monitoring, "notification_storage_policy_time"): await self.notificationPolicy.execute(events, accountId, redisContext) def _callbacks(): """Build and execute callbacks""" for callback in self.callbacks: if call := callback.buildCallback(events): asyncio.create_task(call) await asyncio.gather(_faceSample(), _bodySample()) # save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^) await asyncio.gather(_originImage(), _attribute(), _face()) # save events only after executing previous policies (^^^ events are updated here ^^^) await asyncio.gather(_event(), _notification()) _callbacks()