Source code for luna_handlers.classes.schemas.storage_policy

"""
Module contains schemas for storage policy
"""
import asyncio
from typing import Optional, List, Union
from uuid import UUID

from app.global_vars.context_vars import requestIdCtx
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from luna3.image_store.image_store import StoreApi
from pydantic import Field

from classes.monitoring import HandlersMonitoringData as DataForMonitoring
from redis_db.redis_context import RedisContext
from vlutils.jobs.async_runner import AsyncRunner

from classes.event import Event
from classes.luna3event import eventAsLuna3
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.utils.log import Logger
from sdk.sdk_loop.sdk_task import SDKWarp, SDKDetectableImage, FaceWarp, HumanWarp

# current api version for avatar location header
CURRENT_LUNA_API_VERSION = 6


[docs]async def saveSamples(warpsToSave: List[SDKWarp], bucket: str, accountId: str, storeApiClient: StoreApi) -> List[str]: """ Save warps in LIS. Args: warpsToSave: SDK warps to save bucket: bucket name accountId: account id storeApiClient: image-store client """ futures = [] for warp in warpsToSave: if warp.isSaved: continue futures.append( storeApiClient.putImage( imageInBytes=warp.asBytes(), imageId=warp.sampleId, accountId=accountId, bucketName=bucket, raiseError=True, ) ) await asyncio.gather(*futures) savedSamples = [] for warp in warpsToSave: warp.isSaved = True savedSamples.append(warp.sampleId) return savedSamples
class _BaseStoragePolicy(BaseSchema): """Base storage policy""" # storage policy complex filters (matching+attributes) filters: ComplexFilter = ComplexFilter() def filterEventsByFilters(self, events: List[Event]) -> List[Event]: """ Filter events for the further processing. Args: events: all events Returns: events without filtered ones """ if self.filters.isEmpty: return events return [event for event in events if self.filters.isEventSatisfies(event)] def isEventSatisfyFilters(self, event: Event) -> bool: """Is events satisfy existing filters""" if not self.filters.isEmpty: return self.filters.isEventSatisfies(event) return True
[docs]class FaceSamplePolicy(_BaseStoragePolicy): """Face sample policy""" # whether to store sample storeSample: types.Int01 = 1
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None: """ Save face samples. Args: events: events bucket: bucket name accountId: account id luna3Client: client """ if not self.storeSample: return warps = [] for event in self.filterEventsByFilters(events): for detection in event.detections: face = detection.detection.face if face is not None: warps.append(face.sdkEstimation.warp) face.url = ( f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}" f"/images/{face.sdkEstimation.warp.sampleId}" ) await saveSamples( warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaFaceSamplesStore )
[docs]class BodySamplePolicy(_BaseStoragePolicy): """Body sample policy""" # whether to store sample storeSample: types.Int01 = 1
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None: """ Save body samples. Args: events: events bucket: bucket name accountId: account id luna3Client: client """ if not self.storeSample: return warps = [] for event in self.filterEventsByFilters(events=events): for detection in event.detections: body = detection.detection.body if body is not None: warps.append(body.sdkEstimation.warp) body.url = ( f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}" f"/images/{body.sdkEstimation.warp.sampleId}" ) await saveSamples( warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaBodySamplesStore )
[docs]class ImageOriginPolicy(_BaseStoragePolicy): """Image origin policy""" # whether to store origin image storeImage: types.Int01 = 0 # use external reference as image origin useExternalReferences: types.Int01 = 1
[docs] async def execute( self, events: List[Event], bucket: str, accountId: str, luna3Client: Client, originImages: List[Union[SDKDetectableImage, FaceWarp, HumanWarp]], ) -> None: """ Save origin images. Args: events: events bucket: bucket name accountId: account id luna3Client: client originImages: list of input images """ if not self.storeImage: return imagesMap = {image.id: image for image in originImages} futures = [] def addImageToUpload(detection, body): futures.append( luna3Client.lunaImageOriginStore.putImage( imageInBytes=body, imageId=detection.image.id, accountId=accountId, bucketName=bucket, raiseError=True, ) ) detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{detection.image.id}" for event in self.filterEventsByFilters(events=events): for detection in event.detections: img = imagesMap[detection.image.id] if img.imageOrigin: continue if isinstance(img, SDKDetectableImage): if self.useExternalReferences and img.url and len(img.url) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH: detection.imageOrigin = img.url else: addImageToUpload(detection, img.body) elif isinstance(img, FaceWarp): if self.useExternalReferences and detection.detection.face.sdkEstimation.warp.isSaved: # sample was saved or sample_id in input request detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{img.sampleId}" else: addImageToUpload(detection, img.asBytes()) else: if self.useExternalReferences and detection.detection.body.sdkEstimation.warp.isSaved: # sample was saved or sample_id in input request detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{img.sampleId}" else: addImageToUpload(detection, img.asBytes()) await asyncio.gather(*futures)
[docs]class AttributeStorePolicy(_BaseStoragePolicy): """Attribute store policy""" # whether to store attribute storeAttribute: types.Int01 = 0 # attribute storage ttl ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL)
[docs] async def execute(self, events: List[Event], accountId: str, luna3Client: Client) -> None: """ Save attributes. Args: events: events accountId: account id luna3Client: client """ if not self.storeAttribute: return futures, eventsToUpdate = [], [] for event in self.filterEventsByFilters(events): if event.faceAttributes is None: continue kwargs = event.faceAttributes._getAttributeKwargs() eventsToUpdate.append(event) futures.append( luna3Client.lunaFaces.createAttribute(accountId=accountId, ttl=self.ttl, **kwargs, raiseError=True,) ) responses: List[LunaResponse] = await asyncio.gather(*futures) for event, response in zip(eventsToUpdate, responses): event.faceAttributes.sdkAttribute.attributeId = response.json["attribute_id"] event.faceAttributes.url = f'{luna3Client.lunaFaces.origin}{response.json["url"]}'
[docs]class LinkToListsPolicy(_BaseStoragePolicy): """Link to lists policy schema""" # list id to link faces to listId: UUID
[docs]class FaceStoragePolicy(_BaseStoragePolicy): """Face store policy""" # whether to store face storeFace: types.Int01 = 0 # whether to set face sample as avatar setSampleAsAvatar: types.Int01 = 1 # face link to lists policy list linkToListsPolicy: List[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH)
[docs] async def execute( self, events: List[Event], accountId: str, luna3Client: Client, userData: str = "", externalId: Optional[str] = None, ) -> None: """ Execute face policy (with link to list policy). Args: events: processing events accountId: account id luna3Client: luna3 client userData: user data for all faces externalId: external for all faces """ if not self.storeFace: return futures, eventsToUpdate = [], [] for event in self.filterEventsByFilters(events=events): lists = list( { str(linkListPolicy.listId) for linkListPolicy in self.linkToListsPolicy if linkListPolicy.filters is None or isinstance(linkListPolicy.filters, dict) or linkListPolicy.filters.isEventSatisfies(event) } ) faceDetection = ( event.detections[0].detection.face.asDict( detectLandmarks68=event.detections[0].detectLandmarks68, estimationTargets=event.detections[0].estimationTargets, ) if event.detections and event.detections[0].detection.face else None ) avatar = "" if faceDetection: firstWarp = next( ( detection.detection.face.sdkEstimation.warp for detection in event.detections if detection.detection.face.sdkEstimation.warp.isSaved ), None, ) if self.setSampleAsAvatar and firstWarp is not None: avatar = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{firstWarp.sampleId}" event.linkedLists = lists event.avatar = avatar kwargs = event.faceAttributes._getAttributeKwargs() if event.faceAttributes is not None else {} eventsToUpdate.append(event), futures.append( luna3Client.lunaFaces.createFace( accountId=accountId, **kwargs, eventId=event.eventId, userData=userData, externalId=externalId, listIds=lists or None, avatar=avatar, raiseError=True, ) ) responses: List[LunaResponse] = await asyncio.gather(*futures) for event, response in zip(eventsToUpdate, responses): event.faceId = response.json["face_id"] event.faceUrl = f'{luna3Client.lunaFaces.origin}{response.json["url"]}'
[docs]class NotificationStoragePolicy(_BaseStoragePolicy): """Notification store policy""" # whether to send notification sendNotification: types.Int01 = 1
[docs] async def execute( self, events: list[Event], logger: Logger, handlerId: str, createEventTime: str, endEventTime: str, accountId: str, redisContext: RedisContext, lunaSenderUsage: bool, ) -> None: """ Save notifications Args: events: events logger: logger handlerId: handler id createEventTime: event creation time endEventTime: event end time accountId: account id redisContext: redis context lunaSenderUsage: use or not luna sender """ if not lunaSenderUsage or not self.sendNotification: return eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)] if not eventsToSend: return await redisContext.publish( logger, eventsToSend, handlerId, accountId, requestIdCtx.get(), createEventTime, endEventTime )
[docs]class EventStoragePolicy(_BaseStoragePolicy): """Event store policy""" # whether to store event storeEvent: types.Int01 = 1 # whether to wait events saving (response will be received only after events will be saved) waitSaving: types.Int01 = 1
[docs] @classmethod async def onStartup(cls): """ Init Policies """ cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs] @classmethod async def onShutdown(cls): """ Stop Policies """ await cls.saveEventsAsyncRunner.close()
[docs] async def execute( self, logger: Logger, events: List[Event], luna3Client: Client, createEventTime: str, endEventTime: str, accountId: str, handlerId: str, lunaEventsUsage: bool, ) -> None: """ Save events. Args: logger: logger events: events luna3Client: client createEventTime: event creation time endEventTime: event end time accountId: account id handlerId: handler id lunaEventsUsage: use or not luna events """ eventsToSend = [] for event in events: if not lunaEventsUsage or not self.storeEvent: event.eventUrl = None continue if not self.isEventSatisfyFilters(event): event.eventUrl = None continue event.eventUrl = f"{luna3Client.lunaEvents.baseUri}/events/{event.eventId}" eventsToSend.append(eventAsLuna3(event, accountId, createEventTime, endEventTime, handlerId)) if not eventsToSend: return async def saveEvents(): reply = await luna3Client.lunaEvents.saveEvents( eventsToSend, waitEventsSaving=bool(self.waitSaving), raiseError=self.waitSaving ) if not reply.success: logger.warning( f"Failed save events to luna-event, receive response " f"with status code {reply.statusCode}, body {reply.text}" ) if self.waitSaving: await saveEvents() else: self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs]class StoragePolicy(BaseSchema): """Storage policy schema""" # face sample storage policy faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy() # body sample storage policy bodySamplePolicy: BodySamplePolicy = BodySamplePolicy() # image origin storage policy imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy() # attribute storage policy attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy()) # face storage policy facePolicy: FaceStoragePolicy = FaceStoragePolicy() # event storage policy eventPolicy: EventStoragePolicy = EventStoragePolicy() # notification storage policy notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy()
[docs] async def execute( self, events: List[Event], accountId: str, luna3Client: Client, originImages: List[Union[SDKDetectableImage, FaceWarp, HumanWarp]], userData, externalId, logger: Logger, createEventTime: str, endEventTime: str, handlerId: str, facesBucket: str, bodiesBucket: str, originBucket: str, lunaEventsUsage: bool, redisContext: RedisContext, lunaSenderUsage: bool, ) -> DataForMonitoring: """ Execute storage policy - save objects. Args: events: events to process accountId: account id luna3Client: client originImages: data with origin images userData: user data externalId: external id logger: logger createEventTime: event creation time endEventTime: event end time handlerId: handler id facesBucket: faces samples bucket bodiesBucket: bodies samples bucket, originBucket: origin image bucket, lunaEventsUsage: luna events usage redisContext: redis context lunaSenderUsage: luna sender usage Returns: monitoring data """ async def _faceSample() -> None: with monitorTime(monitoringData.request, "face_sample_storage_policy_time"): await self.faceSamplePolicy.execute(events, facesBucket, accountId, luna3Client) async def _bodySample() -> None: with monitorTime(monitoringData.request, "body_sample_storage_policy_time"): await self.bodySamplePolicy.execute(events, bodiesBucket, accountId, luna3Client) async def _originImage() -> None: with monitorTime(monitoringData.request, "image_origin_storage_policy_time"): await self.imageOriginPolicy.execute( events, originBucket, accountId=accountId, luna3Client=luna3Client, originImages=originImages, ) async def _attribute() -> None: with monitorTime(monitoringData.request, "face_attribute_storage_policy_time"): await self.attributePolicy.execute(events, accountId, luna3Client) async def _face() -> None: with monitorTime(monitoringData.request, "face_storage_policy_time"): await self.facePolicy.execute(events, accountId, luna3Client, userData, externalId) async def _event() -> None: with monitorTime(monitoringData.request, "event_storage_policy_time"): await self.eventPolicy.execute( logger, events, luna3Client, createEventTime, endEventTime, accountId, handlerId, lunaEventsUsage ) async def _notification() -> None: with monitorTime(monitoringData.request, "notification_storage_policy_time"): await self.notificationPolicy.execute( events, logger, handlerId, createEventTime, endEventTime, accountId, redisContext, lunaSenderUsage ) monitoringData = DataForMonitoring() await asyncio.gather(_faceSample(), _bodySample()) # save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^) await asyncio.gather(_attribute(), _face(), _originImage()) # save events only after executing previous policies (^^^ events are updated here ^^^) await _event() # save events only after executing previous policies (^^^ events are updated here ^^^) await _notification() return monitoringData