"""
Module contains schemas for storage policy
"""
import asyncio
from typing import Optional, List, Union
from uuid import UUID
from app.global_vars.context_vars import requestIdCtx
from luna3.client import Client
from luna3.common.luna_response import LunaResponse
from luna3.image_store.image_store import StoreApi
from pydantic import Field
from classes.monitoring import HandlersMonitoringData as DataForMonitoring
from redis_db.redis_context import RedisContext
from vlutils.jobs.async_runner import AsyncRunner
from classes.event import Event
from classes.luna3event import eventAsLuna3
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.monitoring.points import monitorTime
from crutches_on_wheels.utils.log import Logger
from sdk.sdk_loop.sdk_task import SDKWarp, SDKDetectableImage, FaceWarp, HumanWarp
# current api version for avatar location header
CURRENT_LUNA_API_VERSION = 6
[docs]async def saveSamples(warpsToSave: List[SDKWarp], bucket: str, accountId: str, storeApiClient: StoreApi) -> List[str]:
"""
Save warps in LIS.
Args:
warpsToSave: SDK warps to save
bucket: bucket name
accountId: account id
storeApiClient: image-store client
"""
futures = []
for warp in warpsToSave:
if warp.isSaved:
continue
futures.append(
storeApiClient.putImage(
imageInBytes=warp.asBytes(),
imageId=warp.sampleId,
accountId=accountId,
bucketName=bucket,
raiseError=True,
)
)
await asyncio.gather(*futures)
savedSamples = []
for warp in warpsToSave:
warp.isSaved = True
savedSamples.append(warp.sampleId)
return savedSamples
class _BaseStoragePolicy(BaseSchema):
"""Base storage policy"""
# storage policy complex filters (matching+attributes)
filters: ComplexFilter = ComplexFilter()
def filterEventsByFilters(self, events: List[Event]) -> List[Event]:
"""
Filter events for the further processing.
Args:
events: all events
Returns:
events without filtered ones
"""
if self.filters.isEmpty:
return events
return [event for event in events if self.filters.isEventSatisfies(event)]
def isEventSatisfyFilters(self, event: Event) -> bool:
"""Is events satisfy existing filters"""
if not self.filters.isEmpty:
return self.filters.isEventSatisfies(event)
return True
[docs]class FaceSamplePolicy(_BaseStoragePolicy):
"""Face sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None:
"""
Save face samples.
Args:
events: events
bucket: bucket name
accountId: account id
luna3Client: client
"""
if not self.storeSample:
return
warps = []
for event in self.filterEventsByFilters(events):
for detection in event.detections:
face = detection.detection.face
if face is not None:
warps.append(face.sdkEstimation.warp)
face.url = (
f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}"
f"/images/{face.sdkEstimation.warp.sampleId}"
)
await saveSamples(
warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaFaceSamplesStore
)
[docs]class BodySamplePolicy(_BaseStoragePolicy):
"""Body sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
[docs] async def execute(self, events: List[Event], bucket: str, accountId: str, luna3Client: Client) -> None:
"""
Save body samples.
Args:
events: events
bucket: bucket name
accountId: account id
luna3Client: client
"""
if not self.storeSample:
return
warps = []
for event in self.filterEventsByFilters(events=events):
for detection in event.detections:
body = detection.detection.body
if body is not None:
warps.append(body.sdkEstimation.warp)
body.url = (
f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}"
f"/images/{body.sdkEstimation.warp.sampleId}"
)
await saveSamples(
warpsToSave=warps, bucket=bucket, accountId=accountId, storeApiClient=luna3Client.lunaBodySamplesStore
)
[docs]class ImageOriginPolicy(_BaseStoragePolicy):
"""Image origin policy"""
# whether to store origin image
storeImage: types.Int01 = 0
# use external reference as image origin
useExternalReferences: types.Int01 = 1
[docs] async def execute(
self,
events: List[Event],
bucket: str,
accountId: str,
luna3Client: Client,
originImages: List[Union[SDKDetectableImage, FaceWarp, HumanWarp]],
) -> None:
"""
Save origin images.
Args:
events: events
bucket: bucket name
accountId: account id
luna3Client: client
originImages: list of input images
"""
if not self.storeImage:
return
imagesMap = {image.id: image for image in originImages}
futures = []
def addImageToUpload(detection, body):
futures.append(
luna3Client.lunaImageOriginStore.putImage(
imageInBytes=body,
imageId=detection.image.id,
accountId=accountId,
bucketName=bucket,
raiseError=True,
)
)
detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{detection.image.id}"
for event in self.filterEventsByFilters(events=events):
for detection in event.detections:
img = imagesMap[detection.image.id]
if img.imageOrigin:
continue
if isinstance(img, SDKDetectableImage):
if self.useExternalReferences and img.url and len(img.url) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH:
detection.imageOrigin = img.url
else:
addImageToUpload(detection, img.body)
elif isinstance(img, FaceWarp):
if self.useExternalReferences and detection.detection.face.sdkEstimation.warp.isSaved:
# sample was saved or sample_id in input request
detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{img.sampleId}"
else:
addImageToUpload(detection, img.asBytes())
else:
if self.useExternalReferences and detection.detection.body.sdkEstimation.warp.isSaved:
# sample was saved or sample_id in input request
detection.imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{img.sampleId}"
else:
addImageToUpload(detection, img.asBytes())
await asyncio.gather(*futures)
[docs]class AttributeStorePolicy(_BaseStoragePolicy):
"""Attribute store policy"""
# whether to store attribute
storeAttribute: types.Int01 = 0
# attribute storage ttl
ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL)
[docs] async def execute(self, events: List[Event], accountId: str, luna3Client: Client) -> None:
"""
Save attributes.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeAttribute:
return
futures, eventsToUpdate = [], []
for event in self.filterEventsByFilters(events):
if event.faceAttributes is None:
continue
kwargs = event.faceAttributes._getAttributeKwargs()
eventsToUpdate.append(event)
futures.append(
luna3Client.lunaFaces.createAttribute(accountId=accountId, ttl=self.ttl, **kwargs, raiseError=True,)
)
responses: List[LunaResponse] = await asyncio.gather(*futures)
for event, response in zip(eventsToUpdate, responses):
event.faceAttributes.sdkAttribute.attributeId = response.json["attribute_id"]
event.faceAttributes.url = f'{luna3Client.lunaFaces.origin}{response.json["url"]}'
[docs]class LinkToListsPolicy(_BaseStoragePolicy):
"""Link to lists policy schema"""
# list id to link faces to
listId: UUID
[docs]class FaceStoragePolicy(_BaseStoragePolicy):
"""Face store policy"""
# whether to store face
storeFace: types.Int01 = 0
# whether to set face sample as avatar
setSampleAsAvatar: types.Int01 = 1
# face link to lists policy list
linkToListsPolicy: List[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH)
[docs] async def execute(
self,
events: List[Event],
accountId: str,
luna3Client: Client,
userData: str = "",
externalId: Optional[str] = None,
) -> None:
"""
Execute face policy (with link to list policy).
Args:
events: processing events
accountId: account id
luna3Client: luna3 client
userData: user data for all faces
externalId: external for all faces
"""
if not self.storeFace:
return
futures, eventsToUpdate = [], []
for event in self.filterEventsByFilters(events=events):
lists = list(
{
str(linkListPolicy.listId)
for linkListPolicy in self.linkToListsPolicy
if linkListPolicy.filters is None
or isinstance(linkListPolicy.filters, dict)
or linkListPolicy.filters.isEventSatisfies(event)
}
)
faceDetection = (
event.detections[0].detection.face.asDict(
detectLandmarks68=event.detections[0].detectLandmarks68,
estimationTargets=event.detections[0].estimationTargets,
)
if event.detections and event.detections[0].detection.face
else None
)
avatar = ""
if faceDetection:
firstWarp = next(
(
detection.detection.face.sdkEstimation.warp
for detection in event.detections
if detection.detection.face.sdkEstimation.warp.isSaved
),
None,
)
if self.setSampleAsAvatar and firstWarp is not None:
avatar = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{firstWarp.sampleId}"
event.linkedLists = lists
event.avatar = avatar
kwargs = event.faceAttributes._getAttributeKwargs() if event.faceAttributes is not None else {}
eventsToUpdate.append(event),
futures.append(
luna3Client.lunaFaces.createFace(
accountId=accountId,
**kwargs,
eventId=event.eventId,
userData=userData,
externalId=externalId,
listIds=lists or None,
avatar=avatar,
raiseError=True,
)
)
responses: List[LunaResponse] = await asyncio.gather(*futures)
for event, response in zip(eventsToUpdate, responses):
event.faceId = response.json["face_id"]
event.faceUrl = f'{luna3Client.lunaFaces.origin}{response.json["url"]}'
[docs]class NotificationStoragePolicy(_BaseStoragePolicy):
"""Notification store policy"""
# whether to send notification
sendNotification: types.Int01 = 1
[docs] async def execute(
self,
events: list[Event],
logger: Logger,
handlerId: str,
createEventTime: str,
endEventTime: str,
accountId: str,
redisContext: RedisContext,
lunaSenderUsage: bool,
) -> None:
"""
Save notifications
Args:
events: events
logger: logger
handlerId: handler id
createEventTime: event creation time
endEventTime: event end time
accountId: account id
redisContext: redis context
lunaSenderUsage: use or not luna sender
"""
if not lunaSenderUsage or not self.sendNotification:
return
eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)]
if not eventsToSend:
return
await redisContext.publish(
logger, eventsToSend, handlerId, accountId, requestIdCtx.get(), createEventTime, endEventTime
)
[docs]class EventStoragePolicy(_BaseStoragePolicy):
"""Event store policy"""
# whether to store event
storeEvent: types.Int01 = 1
# whether to wait events saving (response will be received only after events will be saved)
waitSaving: types.Int01 = 1
[docs] @classmethod
async def onStartup(cls):
""" Init Policies """
cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs] @classmethod
async def onShutdown(cls):
""" Stop Policies """
await cls.saveEventsAsyncRunner.close()
[docs] async def execute(
self,
logger: Logger,
events: List[Event],
luna3Client: Client,
createEventTime: str,
endEventTime: str,
accountId: str,
handlerId: str,
lunaEventsUsage: bool,
) -> None:
"""
Save events.
Args:
logger: logger
events: events
luna3Client: client
createEventTime: event creation time
endEventTime: event end time
accountId: account id
handlerId: handler id
lunaEventsUsage: use or not luna events
"""
eventsToSend = []
for event in events:
if not lunaEventsUsage or not self.storeEvent:
event.eventUrl = None
continue
if not self.isEventSatisfyFilters(event):
event.eventUrl = None
continue
event.eventUrl = f"{luna3Client.lunaEvents.baseUri}/events/{event.eventId}"
eventsToSend.append(eventAsLuna3(event, accountId, createEventTime, endEventTime, handlerId))
if not eventsToSend:
return
async def saveEvents():
reply = await luna3Client.lunaEvents.saveEvents(
eventsToSend, waitEventsSaving=bool(self.waitSaving), raiseError=self.waitSaving
)
if not reply.success:
logger.warning(
f"Failed save events to luna-event, receive response "
f"with status code {reply.statusCode}, body {reply.text}"
)
if self.waitSaving:
await saveEvents()
else:
self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs]class StoragePolicy(BaseSchema):
"""Storage policy schema"""
# face sample storage policy
faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy()
# body sample storage policy
bodySamplePolicy: BodySamplePolicy = BodySamplePolicy()
# image origin storage policy
imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy()
# attribute storage policy
attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy())
# face storage policy
facePolicy: FaceStoragePolicy = FaceStoragePolicy()
# event storage policy
eventPolicy: EventStoragePolicy = EventStoragePolicy()
# notification storage policy
notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy()
[docs] async def execute(
self,
events: List[Event],
accountId: str,
luna3Client: Client,
originImages: List[Union[SDKDetectableImage, FaceWarp, HumanWarp]],
userData,
externalId,
logger: Logger,
createEventTime: str,
endEventTime: str,
handlerId: str,
facesBucket: str,
bodiesBucket: str,
originBucket: str,
lunaEventsUsage: bool,
redisContext: RedisContext,
lunaSenderUsage: bool,
) -> DataForMonitoring:
"""
Execute storage policy - save objects.
Args:
events: events to process
accountId: account id
luna3Client: client
originImages: data with origin images
userData: user data
externalId: external id
logger: logger
createEventTime: event creation time
endEventTime: event end time
handlerId: handler id
facesBucket: faces samples bucket
bodiesBucket: bodies samples bucket,
originBucket: origin image bucket,
lunaEventsUsage: luna events usage
redisContext: redis context
lunaSenderUsage: luna sender usage
Returns:
monitoring data
"""
async def _faceSample() -> None:
with monitorTime(monitoringData.request, "face_sample_storage_policy_time"):
await self.faceSamplePolicy.execute(events, facesBucket, accountId, luna3Client)
async def _bodySample() -> None:
with monitorTime(monitoringData.request, "body_sample_storage_policy_time"):
await self.bodySamplePolicy.execute(events, bodiesBucket, accountId, luna3Client)
async def _originImage() -> None:
with monitorTime(monitoringData.request, "image_origin_storage_policy_time"):
await self.imageOriginPolicy.execute(
events, originBucket, accountId=accountId, luna3Client=luna3Client, originImages=originImages,
)
async def _attribute() -> None:
with monitorTime(monitoringData.request, "face_attribute_storage_policy_time"):
await self.attributePolicy.execute(events, accountId, luna3Client)
async def _face() -> None:
with monitorTime(monitoringData.request, "face_storage_policy_time"):
await self.facePolicy.execute(events, accountId, luna3Client, userData, externalId)
async def _event() -> None:
with monitorTime(monitoringData.request, "event_storage_policy_time"):
await self.eventPolicy.execute(
logger, events, luna3Client, createEventTime, endEventTime, accountId, handlerId, lunaEventsUsage
)
async def _notification() -> None:
with monitorTime(monitoringData.request, "notification_storage_policy_time"):
await self.notificationPolicy.execute(
events, logger, handlerId, createEventTime, endEventTime, accountId, redisContext, lunaSenderUsage
)
monitoringData = DataForMonitoring()
await asyncio.gather(_faceSample(), _bodySample())
# save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^)
await asyncio.gather(_attribute(), _face(), _originImage())
# save events only after executing previous policies (^^^ events are updated here ^^^)
await _event()
# save events only after executing previous policies (^^^ events are updated here ^^^)
await _notification()
return monitoringData