Source code for luna_handlers.classes.schemas.storage_policy

"""
Module contains schemas for storage policy
"""

import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Any, List, Literal
from uuid import UUID, uuid4

from luna3.client import Client
from pydantic import Field
from vlutils.jobs.async_runner import AsyncRunner

from app.global_vars.context_vars import requestIdCtx
from classes.event import HandlerEvent as Event, Image, StreamEventDetection
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from classes.schemas.types import IntTTL
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.cow.maps.vl_maps import ETHNIC_MAP
from crutches_on_wheels.cow.monitoring.points import DataForMonitoring, monitorTime
from crutches_on_wheels.cow.pydantic.callback_models import HttpCallback as _HttpCallback, Params as _Params
from crutches_on_wheels.cow.utils.log import logger
from redis_db.redis_context import RedisContext

CURRENT_LUNA_API_VERSION = 6


[docs] @dataclass(slots=True) class StorePolicyConfig: """Handler config that policies should apply.""" facesBucket: str bodiesBucket: str originBucket: str
class _BaseStoragePolicy(BaseSchema): """Base storage policy""" # storage policy complex filters (matching+attributes) filters: ComplexFilter = ComplexFilter() def filterEventsByFilters(self, events: list[Event]) -> list[Event]: """ Filter events for the further processing. Args: events: all events Returns: events without filtered ones """ if self.filters.isEmpty: return events return [event for event in events if self.filters.isEventSatisfies(event)] def isEventSatisfyFilters(self, event: Event) -> bool: """Is events satisfy existing filters""" if not self.filters.isEmpty: return self.filters.isEventSatisfies(event) return True
[docs] class FaceSamplePolicy(_BaseStoragePolicy): """Face sample policy""" # whether to store sample storeSample: types.Int01 = 1 # Objects ttl measured in days. Bucket TTL used if not set ttl: IntTTL | None = None
[docs] def asDict(self) -> dict: """Get FaceSamplePolicy as dict with ttl""" result = super().asDict() result["ttl"] = self.ttl return result
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save face samples. Args: events: events accountId: account id bucket: bucket name luna3Client: client """ if not self.storeSample: return putImage = partial( luna3Client.lunaFaceSamplesStore.putImage, accountId=accountId, bucketName=bucket, ttl=self.ttl ) storeUrl = f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}/images" warpFutures = [] for event in self.filterEventsByFilters(events): for estimation, source, warp in zip(event.raw["detections"], event.sources, event.faceWarps): if (faceSample := estimation["samples"]["face"]) is not None: if not (warp or source.sample): continue if isinstance(source, StreamEventDetection) or not source.sample: sampleId = str(uuid4()) warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta)) else: sampleId = str(source.sample) if faceAttributes := event.raw["face_attributes"]: faceAttributes["samples"].append(sampleId) faceSample["sample_id"] = sampleId faceSample["url"] = f"{storeUrl}/{sampleId}" if warpFutures: await asyncio.gather(*warpFutures)
[docs] class BodySamplePolicy(_BaseStoragePolicy): """Body sample policy""" # whether to store sample storeSample: types.Int01 = 1 # Objects ttl measured in days. Bucket TTL used if not set ttl: IntTTL | None = None
[docs] def asDict(self) -> dict: """Get BodySamplePolicy as dict with ttl""" result = super().asDict() result["ttl"] = self.ttl return result
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save body samples. Args: events: events accountId: account id bucket: bucket name luna3Client: client """ if not self.storeSample: return putImage = partial( luna3Client.lunaBodySamplesStore.putImage, accountId=accountId, bucketName=bucket, ttl=self.ttl ) storeUrl = f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}/images" warpFutures = [] for event in self.filterEventsByFilters(events): for estimation, source, warp in zip(event.raw["detections"], event.sources, event.bodyWarps): if not (warp or source.sample): continue if (bodySample := estimation["samples"]["body"]) is not None: if isinstance(source, StreamEventDetection) or not source.sample: sampleId = str(uuid4()) warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta)) else: sampleId = str(source.sample) if faceAttributes := event.raw["body_attributes"]: faceAttributes["samples"].append(sampleId) bodySample["sample_id"] = sampleId bodySample["url"] = f"{storeUrl}/{sampleId}" if warpFutures: await asyncio.gather(*warpFutures)
[docs] class ImageOriginPolicy(_BaseStoragePolicy): """Image origin policy""" # whether to store origin image storeImage: types.Int01 = 0 # use external reference as image origin useExternalReferences: types.Int01 = 1 # Objects ttl measured in days. Bucket TTL used if not set ttl: IntTTL | None = None
[docs] def asDict(self) -> dict: """Get ImageOriginPolicy as dict with ttl""" result = super().asDict() result["ttl"] = self.ttl return result
[docs] async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client): """ Save origin images. Args: events: events accountId: account id bucket: bucket name luna3Client: client Notes: we need to loop through images if image_origin is image because we have to upload them and save as url """ if not events: return futures = [] def addImageToUpload(accountId, imageId, body, headers: dict = None): futures.append( luna3Client.lunaImageOriginStore.putImage( imageInBytes=body, imageId=imageId, accountId=accountId, bucketName=bucket, ttl=self.ttl, headers=headers, raiseError=True, ) ) def isValidExternalUrl(sourceUrl): return self.useExternalReferences and sourceUrl and len(sourceUrl) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH def getImageOrigin4Image(imageSource: Image) -> str | None: """Get image origin from `Image` or None if image origin existence is not supposed""" if imageSource.image.body is None and ( imageSource.imageOrigin is None or imageSource.imageOrigin.body is None ): return None if imageSource.image.imageType == 0 and not self.storeImage: if isValidExternalUrl(imageSource.url): estimation["image_origin"] = imageSource.url if not self.storeImage: return None imageId = str(uuid4()) if imageSource.imageOrigin: addImageToUpload(accountId, imageId, imageSource.imageOrigin.body, imageSource.imageOrigin.meta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif imageSource.image.imageType == 0: if isValidExternalUrl(imageSource.url): return imageSource.url addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif imageSource.image.imageType == 1: sampleId = None if imageSource.sample: sampleId = str(imageSource.sample) elif face := estimation["samples"]["face"]: sampleId = face["sample_id"] if self.useExternalReferences and sampleId: return f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}" addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif imageSource.image.imageType == 2: sampleId = None if imageSource.sample: sampleId = str(imageSource.sample) elif body := estimation["samples"]["body"]: sampleId = body["sample_id"] if self.useExternalReferences and sampleId: return f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}" addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" else: raise RuntimeError("Unsupported image type") def getImageOrigin4Detection(detectionSource: StreamEventDetection) -> str | None: """Get image origin from `StreamEventDetection` or None is image origin existence is not supposed""" faceWarp = detectionSource.faceWarp.warp if detectionSource.faceWarp is not None else None bodyWarp = detectionSource.bodyWarp.warp if detectionSource.bodyWarp is not None else None noWarps = faceWarp is bodyWarp is None if noWarps and (detectionSource.imageOrigin is None or detectionSource.imageOrigin.body is None): return None if not self.storeImage: if isValidExternalUrl(detectionSource.url): return detectionSource.url return None imageId = str(uuid4()) if detectionSource.imageOrigin: addImageToUpload(accountId, imageId, detectionSource.imageOrigin.body, detectionSource.imageOrigin.meta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" elif isValidExternalUrl(detectionSource.url): return detectionSource.url if faceWarp: sampleId = None if face := estimation["samples"]["face"]: sampleId = face["sample_id"] if self.useExternalReferences and sampleId: return f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}" addImageToUpload(accountId, imageId, detectionSource.faceWarp.warp, detectionSource.faceWarp.meta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" if bodyWarp: sampleId = None if body := estimation["samples"]["body"]: sampleId = body["sample_id"] if self.useExternalReferences and sampleId: return f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}" addImageToUpload(accountId, imageId, detectionSource.bodyWarp.warp, detectionSource.bodyWarp.meta) return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}" for event in self.filterEventsByFilters(events=events): for estimation, source in zip(event.raw["detections"], event.sources): if source.imageOrigin and type(source.imageOrigin.body) is str: continue if isinstance(source, Image): calculatedImageOrigin = getImageOrigin4Image(source) elif isinstance(source, StreamEventDetection): calculatedImageOrigin = getImageOrigin4Detection(source) else: raise RuntimeError(f"Unsupported source type: {type(source)}") if calculatedImageOrigin is not None: estimation["image_origin"] = calculatedImageOrigin await asyncio.gather(*futures)
[docs] class AttributeStorePolicy(_BaseStoragePolicy): """Attribute store policy""" # whether to store attribute storeAttribute: types.Int01 = 0 # attribute storage ttl ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL) @staticmethod def generateFaceAttributesKwargs(event): attributesKwargs = {} attribute = event.raw["face_attributes"] if attribute and (basicAttributes := attribute.get("basic_attributes")): faceBasicAttributes = { "age": basicAttributes["age"], "gender": basicAttributes["gender"], "ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]], } attributesKwargs.update( basicAttributesSamples=attribute["samples"], basicAttributes=faceBasicAttributes, ) if attribute and attribute.get("score"): attributesKwargs.update( descriptorSamples=attribute["samples"], descriptors=[event.faceDescriptor], ) return attributesKwargs
[docs] async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None: """ Save attributes. Args: events: events accountId: account id luna3Client: client """ if not self.storeAttribute: return putAttribute = partial(luna3Client.lunaFaces.putAttribute, accountId=accountId, ttl=self.ttl) storeUrl = f"{luna3Client.lunaFaces.baseUri}/attributes" futures = [] for event in self.filterEventsByFilters(events): attribute = event.raw["face_attributes"] if attribute is None: continue attributeKwargs = self.generateFaceAttributesKwargs(event) attributeId = str(uuid4()) attribute |= {"attribute_id": attributeId, "url": f"{storeUrl}/{attributeId}"} futures.append(putAttribute(attributeId=attributeId, **attributeKwargs, raiseError=True)) if futures: await asyncio.gather(*futures)
[docs] class LinkToListsPolicy(_BaseStoragePolicy): """Link to lists policy schema""" # list id to link faces to listId: UUID
[docs] class FaceStoragePolicy(_BaseStoragePolicy): """Face store policy""" # whether to store face storeFace: types.Int01 = 0 # whether to set face sample as avatar setSampleAsAvatar: types.Int01 = 1 # face link to lists policy list linkToListsPolicy: list[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH) @staticmethod def generateFaceAttributesKwargs(event): attributesKwargs = {} attribute = event.raw["face_attributes"] if attribute and (basicAttributes := attribute.get("basic_attributes")): faceBasicAttributes = { "age": basicAttributes["age"], "gender": basicAttributes["gender"], "ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]], } attributesKwargs.update( basicAttributesSamples=attribute["samples"], basicAttributes=faceBasicAttributes, ) if attribute and attribute.get("score"): attributesKwargs.update( descriptorSamples=attribute["samples"], descriptors=[event.faceDescriptor], ) return attributesKwargs
[docs] async def execute(self, events: list[Event], accountId, luna3Client: Client) -> None: """ Execute face policy (with link to list policy). Args: events: processing events accountId: account id luna3Client: luna3 client """ if not self.storeFace: return futures, eventsToUpdate = [], [] for event in self.filterEventsByFilters(events=events): faceRes = { "face_id": (faceId := str(uuid4())), "url": f"{luna3Client.lunaFaces.baseUri}/faces/{faceId}", "event_id": event.raw["event_id"], "user_data": event.raw.get("user_data", ""), "external_id": event.raw.get("external_id", ""), "avatar": "", } lists = list( { str(linkListPolicy.listId) for linkListPolicy in self.linkToListsPolicy if linkListPolicy.filters is None or linkListPolicy.filters.isEventSatisfies(event) } ) faceRes["lists"] = lists if self.setSampleAsAvatar: if event.raw["detections"] and (face := event.raw["detections"][0]["samples"]["face"]): if face["sample_id"] is not None: faceRes["avatar"] = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{face['sample_id']}" event.raw["face"] = faceRes attributeKwargs = self.generateFaceAttributesKwargs(event) futures.append( luna3Client.lunaFaces.putFace( faceId=faceId, accountId=accountId, eventId=event.raw["event_id"], userData=event.raw.get("user_data"), externalId=event.raw.get("external_id"), listIds=lists or None, avatar=event.raw["face"].get("avatar"), **attributeKwargs, raiseError=True, ) ) if futures: await asyncio.gather(*futures)
[docs] class Params(_Params): # callback request content type contentType: Literal["application/json", "application/msgpack"] = "application/json"
[docs] class HttpCallback(_BaseStoragePolicy, _HttpCallback): """Notification policy callbacks""" params: Params = Params()
[docs] def getJsonBody(self, data: Any) -> dict | None: """ Description see :func:`~_HttpCallback.getJsonBody`. """ if not (filteredEvents := self.filterEventsByFilters(data)): return return {"events": [event.raw for event in filteredEvents]}
[docs] def getRequestCredentialsKey(self) -> str: """Get data which a request characterization (what and where to send)""" return f"{self.url},{self.authorization},{self.params.headers},{self.params.contentType}"
[docs] class NotificationStoragePolicy(_BaseStoragePolicy): """Notification store policy""" # whether to send notification sendNotification: types.Int01 = 1
[docs] async def execute(self, events: list[Event], accountId: str, redisContext: RedisContext) -> None: """ Save notifications Args: events: events accountId: account id redisContext: redis context """ if not self.sendNotification: return eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)] if not eventsToSend: return await redisContext.publish(eventsToSend, accountId, requestIdCtx.get())
[docs] class EventStoragePolicy(_BaseStoragePolicy): """Event store policy""" # whether to store event storeEvent: types.Int01 = 1 # whether to wait events saving (response will be received only after events will be saved) waitSaving: types.Int01 = 1
[docs] @classmethod async def onStartup(cls): """Init Policies""" cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs] @classmethod async def onShutdown(cls): """Stop Policies""" await cls.saveEventsAsyncRunner.close()
[docs] @staticmethod def getEventFaceAttributes(event): """ Get the face attributes of an event :param event: Base event. :return: Constructed object based on event's face descriptor parameters. """ return { "basic_attributes": (event.raw["face_attributes"] or {}).get("basic_attributes"), "descriptor_data": ( { "descriptor": event.faceDescriptor.descriptor, "descriptor_version": event.faceDescriptor.version, } if event.faceDescriptor else None ), }
[docs] @staticmethod def getEventBasicAttributes(event): """ Get the basic attributes of an event. :param event: Base event. :return: Constructed object based on event's body descriptor parameters. """ return { "descriptor_data": ( { "descriptor": event.bodyDescriptor.descriptor, "descriptor_version": event.bodyDescriptor.version, } if event.bodyDescriptor else None ) }
[docs] @staticmethod def processDetectionSamplesData(event): """ Add detection rect properties to event detections samples data :param event: Event to update """ for detection in event["detections"]: samples = detection["samples"] for descriptorType in ("face", "body"): if descriptorData := deepcopy(samples.get(descriptorType)): rect = descriptorData["detection"].get("rect") samples[descriptorType] = { "sample_id": descriptorData["sample_id"], "detection": {"rect": rect} if rect else {}, } else: samples[descriptorType] = None
[docs] @staticmethod def processEventMatchCandidatesData(event): """ Exclude non supported properties from faces' and events' matching candidates :param event: Event to update """ expectedFields = { "event": ( "event_id", "handler_id", "source", "stream_id", "external_id", "user_data", "create_time", ), "face": ("face_id", "external_id", "user_data", "create_time"), } for match in event["matches"]: for candidate in match["candidates"]: candidateType = "face" if "face" in candidate else "event" candidateItems = candidate[candidateType].items() candidate[candidateType] = { key: value for key, value in candidateItems if key in expectedFields[candidateType] }
[docs] def generateEventToSend(self, event): """ Prepare the event data for saving. :param event: Base event. :return: Constructed object based on event's parameters. """ faceAttributes = self.getEventFaceAttributes(event) bodyAttributes = self.getEventBasicAttributes(event) eventToSend = deepcopy(event.raw) eventToSend["handler_id"] = event.handlerId eventToSend["create_time"] = event.createTime eventToSend["end_time"] = event.endTime eventToSend["face"] = ( {"face_id": faceData["face_id"], "lists": faceData["lists"]} if (faceData := event.raw.get("face")) else None ) eventToSend["face_attributes"] = {key: value for key, value in faceAttributes.items() if value is not None} eventToSend["body_attributes"] = {key: value for key, value in bodyAttributes.items() if value is not None} if aggregatedAttrs := eventToSend["aggregate_estimations"]["face"]["attributes"]: if "mask" in aggregatedAttrs: aggregatedAttrs["mask"].pop("face_occlusion") self.processDetectionSamplesData(eventToSend) self.processEventMatchCandidatesData(eventToSend) eventToSend.pop("url") eventToSend = {key: value for key, value in eventToSend.items() if value is not None} if not eventToSend.get("matches"): eventToSend.pop("matches", None) if not eventToSend.get("location"): eventToSend.pop("location", None) if not eventToSend.get("tags"): eventToSend.pop("tags", None) return eventToSend
[docs] async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None: """ Save events. Args: events: events accountId: account id luna3Client: client """ if not self.storeEvent: return eventsToSend = [] for event in events: if not self.isEventSatisfyFilters(event): continue event.raw["url"] = f"{luna3Client.lunaEvents.baseUri}/events/{event.raw['event_id']}" eventToSend = self.generateEventToSend(event) eventsToSend.append(eventToSend) if not eventsToSend: return async def saveEvents(): reply = await luna3Client.lunaEvents.saveEvents( accountId=accountId, events=eventsToSend, waitEventsSaving=bool(self.waitSaving), raiseError=self.waitSaving, ) if not reply.success: logger.warning( f"Failed save events to luna-event, receive response " f"with status code {reply.statusCode}, body {reply.text}" ) if self.waitSaving: await saveEvents() else: self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs] class StoragePolicy(BaseSchema): """Storage policy schema""" # face sample storage policy faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy() # body sample storage policy bodySamplePolicy: BodySamplePolicy = BodySamplePolicy() # image origin storage policy imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy() # attribute storage policy attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy()) # face storage policy facePolicy: FaceStoragePolicy = FaceStoragePolicy() # event storage policy eventPolicy: EventStoragePolicy = EventStoragePolicy() # notification storage policy notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy() # callbacks callbacks: List[HttpCallback] = []
[docs] async def execute( self, events: list[Event], accountId: str, monitoring: DataForMonitoring, config: StorePolicyConfig, luna3Client: Client, redisContext: RedisContext, ): """ Execute storage policy - save objects. Args: events: events to process accountId: account id monitoring: data for monitoring config: app config luna3Client: client redisContext: redis context """ async def _faceSample() -> None: with monitorTime(monitoring, "face_sample_storage_policy_time"): await self.faceSamplePolicy.execute(events, accountId, config.facesBucket, luna3Client) async def _bodySample() -> None: with monitorTime(monitoring, "body_sample_storage_policy_time"): await self.bodySamplePolicy.execute(events, accountId, config.bodiesBucket, luna3Client) async def _originImage() -> None: with monitorTime(monitoring, "image_origin_storage_policy_time"): await self.imageOriginPolicy.execute(events, accountId, config.originBucket, luna3Client) async def _attribute() -> None: with monitorTime(monitoring, "face_attribute_storage_policy_time"): await self.attributePolicy.execute(events, accountId, luna3Client) async def _face() -> None: with monitorTime(monitoring, "face_storage_policy_time"): await self.facePolicy.execute(events, accountId, luna3Client) async def _event() -> None: with monitorTime(monitoring, "event_storage_policy_time"): await self.eventPolicy.execute(events, accountId, luna3Client) async def _notification() -> None: with monitorTime(monitoring, "notification_storage_policy_time"): await self.notificationPolicy.execute(events, accountId, redisContext) def _callbacks(): """Build and execute callbacks""" toSends = set() for callback in self.callbacks: if call := callback.buildCallback(events): if (creds := callback.getRequestCredentialsKey()) in toSends: continue toSends.add(creds) asyncio.create_task(call) await asyncio.gather(_faceSample(), _bodySample()) # save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^) await asyncio.gather(_originImage(), _attribute(), _face()) # save events only after executing previous policies (^^^ events are updated here ^^^) await asyncio.gather(_event(), _notification()) _callbacks()