"""
Module contains schemas for storage policy
"""
import asyncio
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from typing import Any, List, Literal
from uuid import UUID, uuid4
from luna3.client import Client
from pydantic import Field
from vlutils.jobs.async_runner import AsyncRunner
from app.global_vars.context_vars import requestIdCtx
from classes.event import HandlerEvent as Event, Image, StreamEventDetection
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema, HandlerSettings
from classes.schemas.filters import ComplexFilter
from classes.schemas.types import IntTTL
from configs.config import MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
from crutches_on_wheels.cow.maps.vl_maps import ETHNIC_MAP
from crutches_on_wheels.cow.monitoring.points import DataForMonitoring, monitorTime
from crutches_on_wheels.cow.pydantic.callback_models import HttpCallback as _HttpCallback, Params as _Params
from crutches_on_wheels.cow.utils.log import logger
from redis_db.redis_context import RedisContext
CURRENT_LUNA_API_VERSION = 6
[docs]
@dataclass(slots=True)
class StorePolicyConfig:
"""Handler config that policies should apply."""
facesBucket: str
bodiesBucket: str
originBucket: str
class _BaseStoragePolicy(BaseSchema):
"""Base storage policy"""
# storage policy complex filters (matching+attributes)
filters: ComplexFilter = ComplexFilter()
def filterEventsByFilters(self, events: list[Event]) -> list[Event]:
"""
Filter events for the further processing.
Args:
events: all events
Returns:
events without filtered ones
"""
if self.filters.isEmpty:
return events
return [event for event in events if self.filters.isEventSatisfies(event)]
def isEventSatisfyFilters(self, event: Event) -> bool:
"""Is events satisfy existing filters"""
if not self.filters.isEmpty:
return self.filters.isEventSatisfies(event)
return True
[docs]
class FaceSamplePolicy(_BaseStoragePolicy):
"""Face sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
# Objects ttl measured in days. Bucket TTL used if not set
ttl: IntTTL | None = None
[docs]
def asDict(self) -> dict:
"""Get FaceSamplePolicy as dict with ttl"""
result = super().asDict()
result["ttl"] = self.ttl
return result
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save face samples.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
"""
if not self.storeSample:
return
putImage = partial(
luna3Client.lunaFaceSamplesStore.putImage, accountId=accountId, bucketName=bucket, ttl=self.ttl
)
storeUrl = f"{luna3Client.lunaFaceSamplesStore.baseUri}/buckets/{bucket}/images"
warpFutures = []
for event in self.filterEventsByFilters(events):
for estimation, source, warp in zip(event.raw["detections"], event.sources, event.faceWarps):
if (faceSample := estimation["samples"]["face"]) is not None:
if not (warp or source.sample):
continue
if isinstance(source, StreamEventDetection) or not source.sample:
sampleId = str(uuid4())
warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta))
else:
sampleId = str(source.sample)
if faceAttributes := event.raw["face_attributes"]:
faceAttributes["samples"].append(sampleId)
faceSample["sample_id"] = sampleId
faceSample["url"] = f"{storeUrl}/{sampleId}"
if warpFutures:
await asyncio.gather(*warpFutures)
[docs]
class BodySamplePolicy(_BaseStoragePolicy):
"""Body sample policy"""
# whether to store sample
storeSample: types.Int01 = 1
# Objects ttl measured in days. Bucket TTL used if not set
ttl: IntTTL | None = None
[docs]
def asDict(self) -> dict:
"""Get BodySamplePolicy as dict with ttl"""
result = super().asDict()
result["ttl"] = self.ttl
return result
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save body samples.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
"""
if not self.storeSample:
return
putImage = partial(
luna3Client.lunaBodySamplesStore.putImage, accountId=accountId, bucketName=bucket, ttl=self.ttl
)
storeUrl = f"{luna3Client.lunaBodySamplesStore.baseUri}/buckets/{bucket}/images"
warpFutures = []
for event in self.filterEventsByFilters(events):
for estimation, source, warp in zip(event.raw["detections"], event.sources, event.bodyWarps):
if not (warp or source.sample):
continue
if (bodySample := estimation["samples"]["body"]) is not None:
if isinstance(source, StreamEventDetection) or not source.sample:
sampleId = str(uuid4())
warpFutures.append(putImage(warp.body, sampleId, raiseError=True, headers=warp.meta))
else:
sampleId = str(source.sample)
if faceAttributes := event.raw["body_attributes"]:
faceAttributes["samples"].append(sampleId)
bodySample["sample_id"] = sampleId
bodySample["url"] = f"{storeUrl}/{sampleId}"
if warpFutures:
await asyncio.gather(*warpFutures)
[docs]
class ImageOriginPolicy(_BaseStoragePolicy):
"""Image origin policy"""
# whether to store origin image
storeImage: types.Int01 = 0
# use external reference as image origin
useExternalReferences: types.Int01 = 1
# Objects ttl measured in days. Bucket TTL used if not set
ttl: IntTTL | None = None
[docs]
def asDict(self) -> dict:
"""Get ImageOriginPolicy as dict with ttl"""
result = super().asDict()
result["ttl"] = self.ttl
return result
[docs]
async def execute(self, events: list[Event], accountId: str, bucket: str, luna3Client: Client):
"""
Save origin images.
Args:
events: events
accountId: account id
bucket: bucket name
luna3Client: client
Notes:
we need to loop through images if image_origin is image because we have to upload them and save as url
"""
if not events:
return
futures = []
def addImageToUpload(accountId, imageId, body, headers: dict = None):
futures.append(
luna3Client.lunaImageOriginStore.putImage(
imageInBytes=body,
imageId=imageId,
accountId=accountId,
bucketName=bucket,
ttl=self.ttl,
headers=headers,
raiseError=True,
)
)
def isValidExternalUrl(sourceUrl):
return self.useExternalReferences and sourceUrl and len(sourceUrl) <= MAX_IMAGE_ORIGIN_EXTERNAL_URL_LENGTH
def getImageOrigin4Image(imageSource: Image) -> str | None:
"""Get image origin from `Image` or None if image origin existence is not supposed"""
if imageSource.image.body is None and (
imageSource.imageOrigin is None or imageSource.imageOrigin.body is None
):
return None
if imageSource.image.imageType == 0 and not self.storeImage:
if isValidExternalUrl(imageSource.url):
estimation["image_origin"] = imageSource.url
if not self.storeImage:
return None
imageId = str(uuid4())
if imageSource.imageOrigin:
addImageToUpload(accountId, imageId, imageSource.imageOrigin.body, imageSource.imageOrigin.meta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif imageSource.image.imageType == 0:
if isValidExternalUrl(imageSource.url):
return imageSource.url
addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif imageSource.image.imageType == 1:
sampleId = None
if imageSource.sample:
sampleId = str(imageSource.sample)
elif face := estimation["samples"]["face"]:
sampleId = face["sample_id"]
if self.useExternalReferences and sampleId:
return f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}"
addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif imageSource.image.imageType == 2:
sampleId = None
if imageSource.sample:
sampleId = str(imageSource.sample)
elif body := estimation["samples"]["body"]:
sampleId = body["sample_id"]
if self.useExternalReferences and sampleId:
return f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}"
addImageToUpload(accountId, imageId, imageSource.image.body, imageSource.imageMeta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
else:
raise RuntimeError("Unsupported image type")
def getImageOrigin4Detection(detectionSource: StreamEventDetection) -> str | None:
"""Get image origin from `StreamEventDetection` or None is image origin existence is not supposed"""
faceWarp = detectionSource.faceWarp.warp if detectionSource.faceWarp is not None else None
bodyWarp = detectionSource.bodyWarp.warp if detectionSource.bodyWarp is not None else None
noWarps = faceWarp is bodyWarp is None
if noWarps and (detectionSource.imageOrigin is None or detectionSource.imageOrigin.body is None):
return None
if not self.storeImage:
if isValidExternalUrl(detectionSource.url):
return detectionSource.url
return None
imageId = str(uuid4())
if detectionSource.imageOrigin:
addImageToUpload(accountId, imageId, detectionSource.imageOrigin.body, detectionSource.imageOrigin.meta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
elif isValidExternalUrl(detectionSource.url):
return detectionSource.url
if faceWarp:
sampleId = None
if face := estimation["samples"]["face"]:
sampleId = face["sample_id"]
if self.useExternalReferences and sampleId:
return f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{sampleId}"
addImageToUpload(accountId, imageId, detectionSource.faceWarp.warp, detectionSource.faceWarp.meta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
if bodyWarp:
sampleId = None
if body := estimation["samples"]["body"]:
sampleId = body["sample_id"]
if self.useExternalReferences and sampleId:
return f"/{CURRENT_LUNA_API_VERSION}/samples/bodies/{sampleId}"
addImageToUpload(accountId, imageId, detectionSource.bodyWarp.warp, detectionSource.bodyWarp.meta)
return f"/{CURRENT_LUNA_API_VERSION}/images/{imageId}"
for event in self.filterEventsByFilters(events=events):
for estimation, source in zip(event.raw["detections"], event.sources):
if source.imageOrigin and type(source.imageOrigin.body) is str:
continue
if isinstance(source, Image):
calculatedImageOrigin = getImageOrigin4Image(source)
elif isinstance(source, StreamEventDetection):
calculatedImageOrigin = getImageOrigin4Detection(source)
else:
raise RuntimeError(f"Unsupported source type: {type(source)}")
if calculatedImageOrigin is not None:
estimation["image_origin"] = calculatedImageOrigin
await asyncio.gather(*futures)
[docs]
class AttributeStorePolicy(_BaseStoragePolicy):
"""Attribute store policy"""
# whether to store attribute
storeAttribute: types.Int01 = 0
# attribute storage ttl
ttl: types.IntAttributeTTL = Field(default_factory=lambda: HandlerSettings.defaultAttributeTTL)
@staticmethod
def generateFaceAttributesKwargs(event):
attributesKwargs = {}
attribute = event.raw["face_attributes"]
if attribute and (basicAttributes := attribute.get("basic_attributes")):
faceBasicAttributes = {
"age": basicAttributes["age"],
"gender": basicAttributes["gender"],
"ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]],
}
attributesKwargs.update(
basicAttributesSamples=attribute["samples"],
basicAttributes=faceBasicAttributes,
)
if attribute and attribute.get("score"):
attributesKwargs.update(
descriptorSamples=attribute["samples"],
descriptors=[event.faceDescriptor],
)
return attributesKwargs
[docs]
async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None:
"""
Save attributes.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeAttribute:
return
putAttribute = partial(luna3Client.lunaFaces.putAttribute, accountId=accountId, ttl=self.ttl)
storeUrl = f"{luna3Client.lunaFaces.baseUri}/attributes"
futures = []
for event in self.filterEventsByFilters(events):
attribute = event.raw["face_attributes"]
if attribute is None:
continue
attributeKwargs = self.generateFaceAttributesKwargs(event)
attributeId = str(uuid4())
attribute |= {"attribute_id": attributeId, "url": f"{storeUrl}/{attributeId}"}
futures.append(putAttribute(attributeId=attributeId, **attributeKwargs, raiseError=True))
if futures:
await asyncio.gather(*futures)
[docs]
class LinkToListsPolicy(_BaseStoragePolicy):
"""Link to lists policy schema"""
# list id to link faces to
listId: UUID
[docs]
class FaceStoragePolicy(_BaseStoragePolicy):
"""Face store policy"""
# whether to store face
storeFace: types.Int01 = 0
# whether to set face sample as avatar
setSampleAsAvatar: types.Int01 = 1
# face link to lists policy list
linkToListsPolicy: list[LinkToListsPolicy] = Field([], max_items=types.MAX_POLICY_LIST_LENGTH)
@staticmethod
def generateFaceAttributesKwargs(event):
attributesKwargs = {}
attribute = event.raw["face_attributes"]
if attribute and (basicAttributes := attribute.get("basic_attributes")):
faceBasicAttributes = {
"age": basicAttributes["age"],
"gender": basicAttributes["gender"],
"ethnicity": ETHNIC_MAP[basicAttributes["ethnicities"]["predominant_ethnicity"]],
}
attributesKwargs.update(
basicAttributesSamples=attribute["samples"],
basicAttributes=faceBasicAttributes,
)
if attribute and attribute.get("score"):
attributesKwargs.update(
descriptorSamples=attribute["samples"],
descriptors=[event.faceDescriptor],
)
return attributesKwargs
[docs]
async def execute(self, events: list[Event], accountId, luna3Client: Client) -> None:
"""
Execute face policy (with link to list policy).
Args:
events: processing events
accountId: account id
luna3Client: luna3 client
"""
if not self.storeFace:
return
futures, eventsToUpdate = [], []
for event in self.filterEventsByFilters(events=events):
faceRes = {
"face_id": (faceId := str(uuid4())),
"url": f"{luna3Client.lunaFaces.baseUri}/faces/{faceId}",
"event_id": event.raw["event_id"],
"user_data": event.raw.get("user_data", ""),
"external_id": event.raw.get("external_id", ""),
"avatar": "",
}
lists = list(
{
str(linkListPolicy.listId)
for linkListPolicy in self.linkToListsPolicy
if linkListPolicy.filters is None or linkListPolicy.filters.isEventSatisfies(event)
}
)
faceRes["lists"] = lists
if self.setSampleAsAvatar:
if event.raw["detections"] and (face := event.raw["detections"][0]["samples"]["face"]):
if face["sample_id"] is not None:
faceRes["avatar"] = f"/{CURRENT_LUNA_API_VERSION}/samples/faces/{face['sample_id']}"
event.raw["face"] = faceRes
attributeKwargs = self.generateFaceAttributesKwargs(event)
futures.append(
luna3Client.lunaFaces.putFace(
faceId=faceId,
accountId=accountId,
eventId=event.raw["event_id"],
userData=event.raw.get("user_data"),
externalId=event.raw.get("external_id"),
listIds=lists or None,
avatar=event.raw["face"].get("avatar"),
**attributeKwargs,
raiseError=True,
)
)
if futures:
await asyncio.gather(*futures)
[docs]
class Params(_Params):
# callback request content type
contentType: Literal["application/json", "application/msgpack"] = "application/json"
[docs]
class HttpCallback(_BaseStoragePolicy, _HttpCallback):
"""Notification policy callbacks"""
params: Params = Params()
[docs]
def getJsonBody(self, data: Any) -> dict | None:
"""
Description see :func:`~_HttpCallback.getJsonBody`.
"""
if not (filteredEvents := self.filterEventsByFilters(data)):
return
return {"events": [event.raw for event in filteredEvents]}
[docs]
def getRequestCredentialsKey(self) -> str:
"""Get data which a request characterization (what and where to send)"""
return f"{self.url},{self.authorization},{self.params.headers},{self.params.contentType}"
[docs]
class NotificationStoragePolicy(_BaseStoragePolicy):
"""Notification store policy"""
# whether to send notification
sendNotification: types.Int01 = 1
[docs]
async def execute(self, events: list[Event], accountId: str, redisContext: RedisContext) -> None:
"""
Save notifications
Args:
events: events
accountId: account id
redisContext: redis context
"""
if not self.sendNotification:
return
eventsToSend = [event for event in events if self.isEventSatisfyFilters(event)]
if not eventsToSend:
return
await redisContext.publish(eventsToSend, accountId, requestIdCtx.get())
[docs]
class EventStoragePolicy(_BaseStoragePolicy):
"""Event store policy"""
# whether to store event
storeEvent: types.Int01 = 1
# whether to wait events saving (response will be received only after events will be saved)
waitSaving: types.Int01 = 1
[docs]
@classmethod
async def onStartup(cls):
"""Init Policies"""
cls.saveEventsAsyncRunner = AsyncRunner(100, closeTimeout=1)
[docs]
@classmethod
async def onShutdown(cls):
"""Stop Policies"""
await cls.saveEventsAsyncRunner.close()
[docs]
@staticmethod
def getEventFaceAttributes(event):
"""
Get the face attributes of an event
:param event: Base event.
:return: Constructed object based on event's face descriptor parameters.
"""
return {
"basic_attributes": (event.raw["face_attributes"] or {}).get("basic_attributes"),
"descriptor_data": (
{
"descriptor": event.faceDescriptor.descriptor,
"descriptor_version": event.faceDescriptor.version,
}
if event.faceDescriptor
else None
),
}
[docs]
@staticmethod
def getEventBasicAttributes(event):
"""
Get the basic attributes of an event.
:param event: Base event.
:return: Constructed object based on event's body descriptor parameters.
"""
return {
"descriptor_data": (
{
"descriptor": event.bodyDescriptor.descriptor,
"descriptor_version": event.bodyDescriptor.version,
}
if event.bodyDescriptor
else None
)
}
[docs]
@staticmethod
def processDetectionSamplesData(event):
"""
Add detection rect properties to event detections samples data
:param event: Event to update
"""
for detection in event["detections"]:
samples = detection["samples"]
for descriptorType in ("face", "body"):
if descriptorData := deepcopy(samples.get(descriptorType)):
rect = descriptorData["detection"].get("rect")
samples[descriptorType] = {
"sample_id": descriptorData["sample_id"],
"detection": {"rect": rect} if rect else {},
}
else:
samples[descriptorType] = None
[docs]
@staticmethod
def processEventMatchCandidatesData(event):
"""
Exclude non supported properties from faces' and events' matching candidates
:param event: Event to update
"""
expectedFields = {
"event": (
"event_id",
"handler_id",
"source",
"stream_id",
"external_id",
"user_data",
"create_time",
),
"face": ("face_id", "external_id", "user_data", "create_time"),
}
for match in event["matches"]:
for candidate in match["candidates"]:
candidateType = "face" if "face" in candidate else "event"
candidateItems = candidate[candidateType].items()
candidate[candidateType] = {
key: value for key, value in candidateItems if key in expectedFields[candidateType]
}
[docs]
def generateEventToSend(self, event):
"""
Prepare the event data for saving.
:param event: Base event.
:return: Constructed object based on event's parameters.
"""
faceAttributes = self.getEventFaceAttributes(event)
bodyAttributes = self.getEventBasicAttributes(event)
eventToSend = deepcopy(event.raw)
eventToSend["handler_id"] = event.handlerId
eventToSend["create_time"] = event.createTime
eventToSend["end_time"] = event.endTime
eventToSend["face"] = (
{"face_id": faceData["face_id"], "lists": faceData["lists"]}
if (faceData := event.raw.get("face"))
else None
)
eventToSend["face_attributes"] = {key: value for key, value in faceAttributes.items() if value is not None}
eventToSend["body_attributes"] = {key: value for key, value in bodyAttributes.items() if value is not None}
if aggregatedAttrs := eventToSend["aggregate_estimations"]["face"]["attributes"]:
if "mask" in aggregatedAttrs:
aggregatedAttrs["mask"].pop("face_occlusion")
self.processDetectionSamplesData(eventToSend)
self.processEventMatchCandidatesData(eventToSend)
eventToSend.pop("url")
eventToSend = {key: value for key, value in eventToSend.items() if value is not None}
if not eventToSend.get("matches"):
eventToSend.pop("matches", None)
if not eventToSend.get("location"):
eventToSend.pop("location", None)
if not eventToSend.get("tags"):
eventToSend.pop("tags", None)
return eventToSend
[docs]
async def execute(self, events: list[Event], accountId: str, luna3Client: Client) -> None:
"""
Save events.
Args:
events: events
accountId: account id
luna3Client: client
"""
if not self.storeEvent:
return
eventsToSend = []
for event in events:
if not self.isEventSatisfyFilters(event):
continue
event.raw["url"] = f"{luna3Client.lunaEvents.baseUri}/events/{event.raw['event_id']}"
eventToSend = self.generateEventToSend(event)
eventsToSend.append(eventToSend)
if not eventsToSend:
return
async def saveEvents():
reply = await luna3Client.lunaEvents.saveEvents(
accountId=accountId,
events=eventsToSend,
waitEventsSaving=bool(self.waitSaving),
raiseError=self.waitSaving,
)
if not reply.success:
logger.warning(
f"Failed save events to luna-event, receive response "
f"with status code {reply.statusCode}, body {reply.text}"
)
if self.waitSaving:
await saveEvents()
else:
self.saveEventsAsyncRunner.runNoWait((saveEvents(),))
[docs]
class StoragePolicy(BaseSchema):
"""Storage policy schema"""
# face sample storage policy
faceSamplePolicy: FaceSamplePolicy = FaceSamplePolicy()
# body sample storage policy
bodySamplePolicy: BodySamplePolicy = BodySamplePolicy()
# image origin storage policy
imageOriginPolicy: ImageOriginPolicy = ImageOriginPolicy()
# attribute storage policy
attributePolicy: AttributeStorePolicy = Field(default_factory=lambda: AttributeStorePolicy())
# face storage policy
facePolicy: FaceStoragePolicy = FaceStoragePolicy()
# event storage policy
eventPolicy: EventStoragePolicy = EventStoragePolicy()
# notification storage policy
notificationPolicy: NotificationStoragePolicy = NotificationStoragePolicy()
# callbacks
callbacks: List[HttpCallback] = []
[docs]
async def execute(
self,
events: list[Event],
accountId: str,
monitoring: DataForMonitoring,
config: StorePolicyConfig,
luna3Client: Client,
redisContext: RedisContext,
):
"""
Execute storage policy - save objects.
Args:
events: events to process
accountId: account id
monitoring: data for monitoring
config: app config
luna3Client: client
redisContext: redis context
"""
async def _faceSample() -> None:
with monitorTime(monitoring, "face_sample_storage_policy_time"):
await self.faceSamplePolicy.execute(events, accountId, config.facesBucket, luna3Client)
async def _bodySample() -> None:
with monitorTime(monitoring, "body_sample_storage_policy_time"):
await self.bodySamplePolicy.execute(events, accountId, config.bodiesBucket, luna3Client)
async def _originImage() -> None:
with monitorTime(monitoring, "image_origin_storage_policy_time"):
await self.imageOriginPolicy.execute(events, accountId, config.originBucket, luna3Client)
async def _attribute() -> None:
with monitorTime(monitoring, "face_attribute_storage_policy_time"):
await self.attributePolicy.execute(events, accountId, luna3Client)
async def _face() -> None:
with monitorTime(monitoring, "face_storage_policy_time"):
await self.facePolicy.execute(events, accountId, luna3Client)
async def _event() -> None:
with monitorTime(monitoring, "event_storage_policy_time"):
await self.eventPolicy.execute(events, accountId, luna3Client)
async def _notification() -> None:
with monitorTime(monitoring, "notification_storage_policy_time"):
await self.notificationPolicy.execute(events, accountId, redisContext)
def _callbacks():
"""Build and execute callbacks"""
toSends = set()
for callback in self.callbacks:
if call := callback.buildCallback(events):
if (creds := callback.getRequestCredentialsKey()) in toSends:
continue
toSends.add(creds)
asyncio.create_task(call)
await asyncio.gather(_faceSample(), _bodySample())
# save attribute and face only after executing previous policies (^^^ samples and images are updated here ^^^)
await asyncio.gather(_originImage(), _attribute(), _face())
# save events only after executing previous policies (^^^ events are updated here ^^^)
await asyncio.gather(_event(), _notification())
_callbacks()