"""
Module contains schemas for storage policy
"""
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Coroutine, Dict, List, Literal
from uuid import UUID, uuid4
import aiohttp
from aiohttp import BasicAuth
from luna3.client import Client
from luna3.common.requests import RequestPayload, makeRequest
from pydantic import Field, PrivateAttr
from vlutils.jobs.async_runner import AsyncRunner
from app.global_vars.context_vars import requestIdCtx
from classes.event import HandlerEvent as Event
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.cow.maps.vl_maps import ETHNIC_MAP
from crutches_on_wheels.cow.monitoring.points import DataForMonitoring, monitorTime
from crutches_on_wheels.cow.pydantic.types import OptionalNotNullable, Str
from crutches_on_wheels.cow.utils.log import logger
from redis_db.redis_context import RedisContext
CURRENT_LUNA_API_VERSION = 6
[docs]
@dataclass(slots=True)
class StorePolicyConfig:
"""Handler config that policies should apply."""
facesBucket: str
bodiesBucket: str
originBucket: str
class _BaseStoragePolicy(BaseSchema):
"""Base storage policy"""
# storage policy complex filters (matching+attributes)
filters: ComplexFilter = ComplexFilter()
def filterEventsByFilters(self, events: list[Event]) -> list[Event]:
"""
Filter events for the further processing.
Args:
events: all events
Returns:
events without filtered ones
"""
if self.filters.isEmpty:
return events
return [event for event in events if self.filters.isEventSatisfies(event)]
def isEventSatisfyFilters(self, event: Event) -> bool:
"""Is events satisfy existing filters"""
if not self.filters.isEmpty:
return self.filters.isEventSatisfies(event)
return True
[docs]
class FaceSamplePolicy(_BaseStoragePolicy):
"""Face sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save face samples.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
"""
if not self.storeSample:
return
putImage = partial(luna3Client.lunaFaceSamplesStore.putImage, accountId=accountId, bucketName=bucket)
storeUrl = f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}/images"
warpFutures = []
for event in self.filterEventsByFilters(events):
for estimation, source, warp in zip(event.raw["detections"], event.sources, event.faceWarps):
if (faceSample := estimation["samples"]["face"]) is not None:
if not (warp or source.sample):
continue
sampleId = str(source.sample) if source.sample else None
if not sampleId:
sampleId = str(uuid4())
warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta))
if faceAttributes := event.raw["face_attributes"]:
faceAttributes["samples"].append(sampleId)
faceSample["sample_id"] = sampleId
faceSample["url"] = f"{storeUrl}/{sampleId}"
if warpFutures:
await asyncio.gather(*warpFutures)
[docs]
class BodySamplePolicy(_BaseStoragePolicy):
"""Body sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save body samples.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
"""
if not self.storeSample:
return
putImage = partial(luna3Client.lunaBodySamplesStore.putImage, accountId=accountId, bucketName=bucket)
storeUrl = f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}/images"
warpFutures = []
for event in self.filterEventsByFilters(events):
for estimation, source, warp in zip(event.raw["detections"], event.sources, event.bodyWarps):
if not (warp or source.sample):
continue
if (bodySample := estimation["samples"]["body"]) is not None:
sampleId = str(source.sample) if source.sample else None
if not sampleId:
sampleId = str(uuid4())
warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta))
if faceAttributes := event.raw["body_attributes"]:
faceAttributes["samples"].append(sampleId)
bodySample["sample_id"] = sampleId
bodySample["url"] = f"{storeUrl}/{sampleId}"
if warpFutures:
await asyncio.gather(*warpFutures)
[docs]
class ImageOriginPolicy(_BaseStoragePolicy):
"""Image origin policy"""
# whether to store origin image
storeImage: types.Int01 = 0
# use external reference as image origin
useExternalReferences: types.Int01 = 1
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save origin images.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
Notes:
we need to loop through images if image_origin is image because we have to upload them and save as url
"""
if not events:
return
futures = []
def addImageToUpload(accountId, imageId, body, headers: dict = None):
futures.append(
luna3Client.lunaImageOriginStore.putImage(
imageInBytes=body,
imageId=imageId,
accountId=accountId,
bucketName=bucket,
headers=headers,
raiseError=True,
)
)
def isValidExternalUrl(source):
return self.useExternalReferences and source.url and len(source.url) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
for event in self.filterEventsByFilters(events=events):
for estimation, source in zip(event.raw["detections"], event.sources):
imageId = str(uuid4())
if source.imageOrigin and type(source.imageOrigin.body) is str:
continue
if source.image.body is None and (source.imageOrigin is None or source.imageOrigin.body is None):
continue
if source.image.imageType == 0 and not self.storeImage:
if isValidExternalUrl(source):
estimation["image_origin"] = source.url
if not self.storeImage:
continue
if source.imageOrigin:
addImageToUpload(accountId, imageId, source.imageOrigin.body, source.imageOrigin.meta)
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif source.image.imageType == 0:
if isValidExternalUrl(source):
imageOrigin = source.url
else:
addImageToUpload(accountId, imageId, source.image.body, source.imageMeta)
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif source.image.imageType == 1:
sampleId = None
if source.sample:
sampleId = str(source.sample)
elif face := estimation["samples"]["face"]:
sampleId = face["sample_id"]
if self.useExternalReferences and sampleId:
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}"
else:
addImageToUpload(accountId, imageId, source.image.body, source.imageMeta)
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif source.image.imageType == 2:
sampleId = None
if source.sample:
sampleId = str(source.sample)
elif body := estimation["samples"]["body"]:
sampleId = body["sample_id"]
if self.useExternalReferences and sampleId:
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}"
else:
addImageToUpload(accountId, imageId, source.image.body, source.imageMeta)
imageOrigin = f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
else:
raise RuntimeError("Unsupported image type")
estimation["image_origin"] = imageOrigin
await asyncio.gather(*futures)
[docs]
class AttributeStorePolicy(_BaseStoragePolicy):
"""Attribute store policy"""
# whether to store attribute
storeAttribute: types.Int01 = 0
# attribute storage ttl
ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL)
@staticmethod
def generateFaceAttributesKwargs(event):
attributesKwargs = {}
attribute = event.raw["face_attributes"]
if attribute and (basicAttributes := attribute.get("basic_attributes")):
faceBasicAttributes = {
"age": basicAttributes["age"],
"gender": basicAttributes["gender"],
"ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]],
}
attributesKwargs.update(
basicAttributesSamples=attribute["samples"],
basicAttributes=faceBasicAttributes,
)
if attribute and attribute.get("score"):
attributesKwargs.update(
descriptorSamples=attribute["samples"],
descriptors=[event.faceDescriptor],
)
return attributesKwargs
[docs]
async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None:
"""
Save attributes.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeAttribute:
return
putAttribute = partial(luna3Client.lunaFaces.putAttribute, accountId=accountId, ttl=self.ttl)
storeUrl = f"{luna3Client.lunaFaces.baseUri}/attributes/"
futures = []
for event in self.filterEventsByFilters(events):
attribute = event.raw["face_attributes"]
if attribute is None:
continue
attributeKwargs = self.generateFaceAttributesKwargs(event)
attributeId = str(uuid4())
attribute |= {"attribute_id": attributeId, "url": f"{storeUrl}/{attributeId}"}
futures.append(putAttribute(attributeId=attributeId, **attributeKwargs, raiseError=True))
if futures:
await asyncio.gather(*futures)
[docs]
class LinkToListsPolicy(_BaseStoragePolicy):
"""Link to lists policy schema"""
# list id to link faces to
listId: UUID
[docs]
class FaceStoragePolicy(_BaseStoragePolicy):
"""Face store policy"""
# whether to store face
storeFace: types.Int01 = 0
# whether to set face sample as avatar
setSampleAsAvatar: types.Int01 = 1
# face link to lists policy list
linkToListsPolicy: list[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH)
@staticmethod
def generateFaceAttributesKwargs(event):
attributesKwargs = {}
attribute = event.raw["face_attributes"]
if attribute and (basicAttributes := attribute.get("basic_attributes")):
faceBasicAttributes = {
"age": basicAttributes["age"],
"gender": basicAttributes["gender"],
"ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]],
}
attributesKwargs.update(
basicAttributesSamples=attribute["samples"],
basicAttributes=faceBasicAttributes,
)
if attribute and attribute.get("score"):
attributesKwargs.update(
descriptorSamples=attribute["samples"],
descriptors=[event.faceDescriptor],
)
return attributesKwargs
[docs]
async def execute(self, events: list[Event], accountId, luna3Client: Client) -> None:
"""
Execute face policy (with link to list policy).
Args:
events: processing events
accountId: account id
luna3Client: luna3 client
"""
if not self.storeFace:
return
futures, eventsToUpdate = [], []
for event in self.filterEventsByFilters(events=events):
faceRes = {
"face_id": (faceId := str(uuid4())),
"url": f"{luna3Client.lunaFaces.origin}/faces/{faceId}",
"event_id": event.raw["event_id"],
"user_data": event.raw.get("user_data", ""),
"external_id": event.raw.get("external_id", ""),
"avatar": "",
}
lists = list(
{
str(linkListPolicy.listId)
for linkListPolicy in self.linkToListsPolicy
if linkListPolicy.filters is None or linkListPolicy.filters.isEventSatisfies(event)
}
)
faceRes["lists"] = lists
if self.setSampleAsAvatar:
if event.raw["detections"] and (face := event.raw["detections"][0]["samples"]["face"]):
if face["sample_id"] is not None:
faceRes["avatar"] = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{face['sample_id']}"
event.raw["face"] = faceRes
attributeKwargs = self.generateFaceAttributesKwargs(event)
futures.append(
luna3Client.lunaFaces.putFace(
faceId=faceId,
accountId=accountId,
eventId=event.raw["event_id"],
userData=event.raw.get("user_data"),
externalId=event.raw.get("external_id"),
listIds=lists or None,
avatar=event.raw["face"].get("avatar"),
**attributeKwargs,
raiseError=True,
)
)
if futures:
await asyncio.gather(*futures)
[docs]
class BasicAuthorization(BaseSchema):
"""Callback basic authorization"""
# authorization type
type: Literal["basic"]
# authorization login
login: Str(maxLength=128)
# authorization password
password: Str(maxLength=128)
@property
def headers(self):
"""Get basic authorization headers"""
return {"Authorization": BasicAuth(login=self.login, password=self.password).encode()}
[docs]
class Params(BaseSchema):
"""Callback request parameters"""
# callback request timeout
timeout: int = OptionalNotNullable()
# callback request content type
contentType: Literal["application/json", "application/msgpack"] = OptionalNotNullable()
# callback request headers
headers: Dict[str, str] = OptionalNotNullable()
[docs]
class HttpCallback(_BaseStoragePolicy):
"""Notification policy callbacks"""
# callback type
type: Literal["http"]
# callback url
url: str
# callback authorization parameters
authorization: BasicAuthorization = OptionalNotNullable()
# callback request parameters
params: Params = Params()
# whether callback enabled or not
enable: types.Int01 = 1
# callback final headers
_headers: Dict[str, str] = PrivateAttr()
def model_post_init(self, __context):
"""Get basic authorization headers"""
self._headers = self.params.headers or {}
if self.authorization is not None:
self._headers |= self.authorization.headers
[docs]
@classmethod
async def onStartup(cls):
"""Init Policies"""
cls.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60))
[docs]
@classmethod
async def onShutdown(cls):
"""Stop Policies"""
await cls.session.close()
[docs]
def buildCallback(self, events: list[Event]) -> Coroutine | None:
"""
Build callback coroutine
Args:
events: events to send
Returns:
callback coroutine or None if callback disabled or filtered
"""
if not self.enable or not (filteredEvents := self.filterEventsByFilters(events)):
return
if self.params.contentType == "application/msgpack":
body = RequestPayload.buildMsgpack({"events": [event.raw for event in filteredEvents]})
else:
body = RequestPayload.buildJson({"events": [event.raw for event in filteredEvents]})
async def sendNotification():
reply = await makeRequest(
url=self.url,
method="POST",
headers=self._headers,
totalTimeout=self.params.timeout,
body=body,
session=self.session,
asyncRequest=True,
)
if not reply.success:
logger.warning(f"Notification to {self.url} failed. {reply.text}")
return sendNotification()
[docs]
class NotificationStoragePolicy(_BaseStoragePolicy):
"""Notification store policy"""
# whether to send notification
sendNotification: types.Int01 = 1
[docs]
async def execute(self, events: list[Event], accountId: str, redisContext: RedisContext) -> None:
"""
Save notifications
Args:
events: events
accountId: account id
redisContext: redis context
"""
if not self.sendNotification:
return
eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)]
if not eventsToSend:
return
await redisContext.publish(eventsToSend, accountId, requestIdCtx.get())
[docs]
class EventStoragePolicy(_BaseStoragePolicy):
"""Event store policy"""
# whether to store event
storeEvent: types.Int01 = 1
# whether to wait events saving (response will be received only after events will be saved)
waitSaving: types.Int01 = 1
[docs]
@classmethod
async def onStartup(cls):
"""Init Policies"""
cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs]
@classmethod
async def onShutdown(cls):
"""Stop Policies"""
await cls.saveEventsAsyncRunner.close()
[docs]
@staticmethod
def getEventFaceAttributes(event):
"""
Get the face attributes of an event
:param event: Base event.
:return: Constructed object based on event's face descriptor parameters.
"""
return {
"basic_attributes": (event.raw["face_attributes"] or {}).get("basic_attributes"),
"descriptor_data": {
"descriptor": event.faceDescriptor.descriptor,
"descriptor_version": event.faceDescriptor.version,
}
if event.faceDescriptor
else None,
}
[docs]
@staticmethod
def getEventBasicAttributes(event):
"""
Get the basic attributes of an event.
:param event: Base event.
:return: Constructed object based on event's body descriptor parameters.
"""
return {
"descriptor_data": {
"descriptor": event.bodyDescriptor.descriptor,
"descriptor_version": event.bodyDescriptor.version,
}
if event.bodyDescriptor
else None
}
[docs]
@staticmethod
def processDetectionSamplesData(event):
"""
Add detection rect properties to event detections samples data
:param event: Event to update
"""
for detection in event["detections"]:
samples = detection["samples"]
for descriptorType in ("face", "body"):
if descriptorData := deepcopy(samples.get(descriptorType)):
rect = descriptorData["detection"].get("rect")
samples[descriptorType] = {
"sample_id": descriptorData["sample_id"],
"detection": {"rect": rect} if rect else {},
}
else:
samples[descriptorType] = None
[docs]
@staticmethod
def processEventMatchCandidatesData(event):
"""
Exclude non supported properties from faces' and events' matching candidates
:param event: Event to update
"""
expectedFields = {
"event": (
"event_id",
"handler_id",
"source",
"stream_id",
"external_id",
"user_data",
"create_time",
),
"face": ("face_id", "external_id", "user_data", "create_time"),
}
for match in event["matches"]:
for candidate in match["candidates"]:
candidateType = "face" if "face" in candidate else "event"
candidateItems = candidate[candidateType].items()
candidate[candidateType] = {
key: value for key, value in candidateItems if key in expectedFields[candidateType]
}
[docs]
def generateEventToSend(self, event):
"""
Prepare the event data for saving.
:param event: Base event.
:return: Constructed object based on event's parameters.
"""
faceAttributes = self.getEventFaceAttributes(event)
bodyAttributes = self.getEventBasicAttributes(event)
eventToSend = deepcopy(event.raw)
eventToSend["handler_id"] = event.handlerId
eventToSend["create_time"] = event.createTime
eventToSend["end_time"] = event.endTime
eventToSend["face"] = (
{"face_id": faceData["face_id"], "lists": faceData["lists"]}
if (faceData := event.raw.get("face"))
else None
)
eventToSend["face_attributes"] = {key: value for key, value in faceAttributes.items() if value is not None}
eventToSend["body_attributes"] = {key: value for key, value in bodyAttributes.items() if value is not None}
if aggregatedAttrs := eventToSend["aggregate_estimations"]["face"]["attributes"]:
if "mask" in aggregatedAttrs:
aggregatedAttrs["mask"].pop("face_occlusion")
self.processDetectionSamplesData(eventToSend)
self.processEventMatchCandidatesData(eventToSend)
eventToSend.pop("url")
return eventToSend
[docs]
async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None:
"""
Save events.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeEvent:
return
eventsToSend = []
for event in events:
if not self.isEventSatisfyFilters(event):
continue
event.raw["url"] = f"{luna3Client.lunaEvents.baseUri}/events/{event.raw['event_id']}"
eventToSend = self.generateEventToSend(event)
eventsToSend.append(eventToSend)
if not eventsToSend:
return
async def saveEvents():
reply = await luna3Client.lunaEvents.saveEvents(
accountId=accountId,
events=eventsToSend,
waitEventsSaving=bool(self.waitSaving),
raiseError=self.waitSaving,
)
if not reply.success:
logger.warning(
f"Failed save events to luna-event, receive response "
f"with status code {reply.statusCode}, body {reply.text}"
)
if self.waitSaving:
await saveEvents()
else:
self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs]
class StoragePolicy(BaseSchema):
"""Storage policy schema"""
# face sample storage policy
faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy()
# body sample storage policy
bodySamplePolicy: BodySamplePolicy = BodySamplePolicy()
# image origin storage policy
imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy()
# attribute storage policy
attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy())
# face storage policy
facePolicy: FaceStoragePolicy = FaceStoragePolicy()
# event storage policy
eventPolicy: EventStoragePolicy = EventStoragePolicy()
# notification storage policy
notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy()
# callbacks
callbacks: List[HttpCallback] = []
[docs]
async def execute(
self,
events: list[Event],
accountId: str,
monitoring: DataForMonitoring,
config: StorePolicyConfig,
luna3Client: Client,
redisContext: RedisContext,
):
"""
Execute storage policy - save objects.
Args:
events: events to process
accountId: account id
monitoring: data for monitoring
config: app config
luna3Client: client
redisContext: redis context
"""
async def _faceSample() -> None:
with monitorTime(monitoring, "face_sample_storage_policy_time"):
await self.faceSamplePolicy.execute(events, accountId, config.facesBucket, luna3Client)
async def _bodySample() -> None:
with monitorTime(monitoring, "body_sample_storage_policy_time"):
await self.bodySamplePolicy.execute(events, accountId, config.bodiesBucket, luna3Client)
async def _originImage() -> None:
with monitorTime(monitoring, "image_origin_storage_policy_time"):
await self.imageOriginPolicy.execute(events, accountId, config.originBucket, luna3Client)
async def _attribute() -> None:
with monitorTime(monitoring, "face_attribute_storage_policy_time"):
await self.attributePolicy.execute(events, accountId, luna3Client)
async def _face() -> None:
with monitorTime(monitoring, "face_storage_policy_time"):
await self.facePolicy.execute(events, accountId, luna3Client)
async def _event() -> None:
with monitorTime(monitoring, "event_storage_policy_time"):
await self.eventPolicy.execute(events, accountId, luna3Client)
async def _notification() -> None:
with monitorTime(monitoring, "notification_storage_policy_time"):
await self.notificationPolicy.execute(events, accountId, redisContext)
def _callbacks():
"""Build and execute callbacks"""
for callback in self.callbacks:
if call := callback.buildCallback(events):
asyncio.create_task(call)
await asyncio.gather(_faceSample(), _bodySample())
# save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^)
await asyncio.gather(_originImage(), _attribute(), _face())
# save events only after executing previous policies (^^^ events are updated here ^^^)
await asyncio.gather(_event(), _notification())
_callbacks()