"""
Module contains schemas for match policy
"""
from copy import deepcopy
from enum import Enum
from typing import List, Optional, Union
from uuid import UUID
from luna3.client import Client
from luna3.common.http_objs import RawDescriptor
from luna3.python_matcher.match_objects import (
AttributeFilters,
Candidates,
EventFilters,
FaceFilters,
RawDescriptorReference,
)
from pydantic import conlist, validator
from typing_extensions import Literal
from vlutils.helpers import convertToSnakeCase
from vlutils.structures.pydantic import DiscriminatedUnion
from classes.event import Event
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema
from classes.schemas.filters import AttributesFilters
from classes.schemas.types import MAX_ITEMS_LIST_LENGTH, OptionalNotNullable
from crutches_on_wheels.cow.errors.errors import ErrorInfo
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.maps.vl_maps import (
CLOTHING_COLOR_MAP,
EMOTION_MAP,
ETHNIC_MAP,
EVENT_TARGET_MAP,
FACE_TARGET_MAP,
LIVENESS_MAP,
MASK_MAP,
SLEEVE_LENGTH_MAP,
TEMPORARY_ATTRIBUTE_MATCH_TARGET_MAP,
)
from crutches_on_wheels.cow.utils.timer import timer
[docs]class OriginEnum(str, Enum):
"""Match candidates origin"""
faces = "faces"
events = "events"
attributes = "attributes"
[docs]class FaceMatchCandidates(BaseSchema):
"""Face match candidates"""
# candidates origin - faces
origin: Literal[OriginEnum.faces.value]
# face ids
faceIds: conlist(UUID, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# face account id
accountId: UUID = types.OptionalNotNullable()
# face external ids
externalIds: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# face user data
userData: types.Str128 = types.OptionalNotNullable()
# face create time upper including boundary
createTimeGte: types.CustomExtendedDatetime = types.OptionalNotNullable()
# face create time lower excluding boundary
createTimeLt: types.CustomExtendedDatetime = types.OptionalNotNullable()
# face id upper including boundary
faceIdGte: UUID = types.OptionalNotNullable()
# face id lower excluding boundary
faceIdLt: UUID = types.OptionalNotNullable()
# face linked list id
listId: UUID = types.OptionalNotNullable()
[docs]class GeoPosition(BaseSchema):
"""Geo position: longitude and latitude with deltas"""
# geo position longitude origin
originLongitude: types.FloatLongitude
# geo position latitude origin
originLatitude: types.FloatLatitude
# geo position longitude delta
longitudeDelta: types.FloatGeoDelta = 0.01
# geo position latitude delta
latitudeDelta: types.FloatGeoDelta = 0.01
[docs]class EventMatchCandidates(BaseSchema):
"""Event match candidates"""
# candidates origin - events
origin: Literal[OriginEnum.events.value]
# event ids
eventIds: conlist(UUID, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event account id
accountId: UUID = types.OptionalNotNullable()
# event id upper including boundary
eventIdGte: UUID = types.OptionalNotNullable()
# event id lower excluding boundary
eventIdLt: UUID = types.OptionalNotNullable()
# event create time upper including boundary
createTimeGte: types.CustomExtendedDatetime = types.OptionalNotNullable()
# event create time lower excluding boundary
createTimeLt: types.CustomExtendedDatetime = types.OptionalNotNullable()
# event end time upper including boundary
endTimeGte: types.CustomExtendedDatetime = types.OptionalNotNullable()
# event end time lower excluding boundary
endTimeLt: types.CustomExtendedDatetime = types.OptionalNotNullable()
# event handler ids
handlerIds: conlist(UUID, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event external ids
externalIds: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event top matching candidates label
topMatchingCandidatesLabel: types.Str36 = types.OptionalNotNullable()
# event top similar object ids
topSimilarObjectIds: conlist(UUID, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event top similar external ids
topSimilarExternalIds: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event top similar object similarity upper including boundary
topSimilarObjectSimilarityGte: types.StrictFloat01 = types.OptionalNotNullable()
# event top similar object similarity lower excluding boundary
topSimilarObjectSimilarityLt: types.StrictFloat01 = types.OptionalNotNullable()
# event age upper including boundary
ageGte: types.IntAge = types.OptionalNotNullable()
# event age lower excluding boundary
ageLt: types.IntAge = types.OptionalNotNullable()
# event gender
gender: types.Int01 = types.OptionalNotNullable()
# event emotion list
emotions: conlist(types.IntEmotions, min_items=1, max_items=len(EMOTION_MAP)) = OptionalNotNullable()
# event mask list
masks: conlist(types.IntMasks, min_items=1, max_items=len(MASK_MAP)) = OptionalNotNullable()
# event liveness states
liveness: conlist(types.IntLiveness, min_items=1, max_items=len(LIVENESS_MAP)) = OptionalNotNullable()
# event ethnic group list
ethnicGroups: conlist(types.IntEthnicities, min_items=1, max_items=len(ETHNIC_MAP)) = OptionalNotNullable()
# event face ids
faceIds: conlist(UUID, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event user data
userData: types.Str128 = types.OptionalNotNullable()
# event sources
sources: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event tags
tags: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event cities
cities: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event areas
areas: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event districts
districts: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event streets
streets: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event house numbers
house_numbers: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# event geo position
geoPosition: GeoPosition = types.OptionalNotNullable()
# event track ids
trackIds: conlist(types.Str36, min_items=1, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# apparent age lt
apparentAgeLt: types.IntAge = types.OptionalNotNullable()
# apparent age gte
apparentAgeGte: types.IntAge = types.OptionalNotNullable()
# apparent gender
apparentGender: conlist(types.Int02, min_items=1, unique_items=True) = types.OptionalNotNullable()
# headwear states
headwearStates: conlist(types.Int02, min_items=1, unique_items=True) = OptionalNotNullable()
# sleeve length
sleeveLengths: conlist(Literal[tuple(SLEEVE_LENGTH_MAP)], min_items=1, unique_items=True) = OptionalNotNullable()
# upper clothing colors
upperClothingColors: conlist(
Literal[tuple(CLOTHING_COLOR_MAP)], min_items=1, unique_items=True
) = OptionalNotNullable()
# backpack states
backpackStates: conlist(types.Int02, min_items=1, unique_items=True) = OptionalNotNullable()
[docs]class AttributeMatchCandidates(BaseSchema):
"""Face match candidates"""
# candidates origin - attributes
origin: Literal[OriginEnum.attributes.value]
# temporary attribute ids
attributeIds: conlist(UUID, max_items=MAX_ITEMS_LIST_LENGTH) = OptionalNotNullable()
# temporary attribute account id
accountId: UUID = types.OptionalNotNullable()
[docs]class MatchingDescriptor(BaseSchema):
"""Matching descriptor parameters"""
# matching descriptor type
descriptorType: Literal["face", "body"] = "face"
[docs]class EventMatchResult:
"""
Event match result.
Attributes:
candidates: match candidates
listInfo: matching list info
matchingLabel: matching label
"""
__slots__ = ("candidates", "listInfo", "matchingLabel")
def __init__(self, matchingLabel: str, matchResult: dict):
self.candidates = matchResult["result"]
self.listInfo = matchResult["filters"].get("list")
self.matchingLabel = matchingLabel
[docs] def asDict(self) -> dict:
"""
Get Event match results without matching label as dict
Returns:
dict with matching candidates and matching label
"""
return dict(candidates=deepcopy(self.candidates), label=self.matchingLabel)
[docs]class BaseMatchPolicy(BaseSchema):
"""Base match policy schema"""
# matching label
label: types.Str36 = ""
# matching candidates
candidates: DiscriminatedUnion("origin", [FaceMatchCandidates, EventMatchCandidates, AttributeMatchCandidates])
# matching candidate descriptor parameters
descriptor: MatchingDescriptor = MatchingDescriptor()
# matching filters
filters: AttributesFilters = AttributesFilters()
# matching limit
limit: Optional[types.IntMatchingLimit] = types.DEFAULT_MATCH_LIMIT
# matching targets
targets: Optional[List[str]] = types.OptionalNotNullable()
# matching threshold
threshold: Optional[types.StrictFloat01] = types.OptionalNotNullable()
[docs] @validator("targets")
def targetsValidator(cls, targets, values) -> Union[List[str], None]:
"""Targets validator, depends on matching candidates"""
if targets is None:
return targets
if "candidates" not in values:
# if validation fails on field (or that field is missing), it is not included in values
return
candidatesType = values["candidates"].origin
candidateTargetMap = {
OriginEnum.faces.name: FACE_TARGET_MAP,
OriginEnum.events.name: EVENT_TARGET_MAP,
OriginEnum.attributes.name: TEMPORARY_ATTRIBUTE_MATCH_TARGET_MAP,
}[candidatesType]
expectedTargets = list(candidateTargetMap)
unexpectedTargets = [row for row in targets if row not in expectedTargets]
if unexpectedTargets:
raise ValueError(
f"Bad matching policy targets {targets}, "
f"allowed targets for '{candidatesType}' candidates are: {expectedTargets}"
)
return targets
[docs] @validator("descriptor")
def descriptorValidator(cls, descriptor, values) -> Optional[str]:
"""Candidate descriptor parameters validator, depends on matching candidates"""
if "candidates" not in values:
# if validation fails on field (or that field is missing), it is not included in values
return
if values["candidates"].origin == OriginEnum.faces.value and descriptor.descriptorType != "face":
raise ValueError(f"Bad matching policy descriptor type '{descriptor.descriptorType}' for face candidates")
return descriptor
[docs] @timer
async def execute(self, events: List[Event], luna3Client: Client) -> None:
"""
Execute match policy:
* filter events fo matching
* match events' descriptors with policy candidates
Results are stored in the input events.
Args:
events: events (references)
luna3Client: client
"""
references = []
for event in events:
if self.filters.isEventSatisfies(event):
if attributes := getattr(event, f"{self.descriptor.descriptorType}Attributes"):
if descriptor := attributes.descriptor:
if isinstance(descriptor, RawDescriptor):
refDescriptor = descriptor
else:
refDescriptor = RawDescriptor(version=descriptor.model, descriptor=descriptor.asBytes)
references.append(RawDescriptorReference(event.eventId, refDescriptor))
if len(references) == 0:
return
candidatesFilters = self.candidates.asDict()
origin = candidatesFilters.pop("origin")
if "geo_position" in candidatesFilters:
candidatesFilters["geo_position"] = convertToSnakeCase(candidatesFilters["geo_position"])
ObjectFilters = {"faces": FaceFilters, "events": EventFilters, "attributes": AttributeFilters}[origin]
matchKwargs = {
"candidates": [
Candidates(
filters=ObjectFilters().initFromSnakeKwargs(candidatesFilters),
targets=self.targets + ["similarity"] if self.targets is not None else None,
limit=self.limit,
threshold=self.threshold,
)
],
"references": references,
}
if self.descriptor.descriptorType == "face":
reply = await luna3Client.lunaPythonMatcher.matchFaces(**matchKwargs, raiseError=True)
else:
reply = await luna3Client.lunaPythonMatcher.matchBodies(**matchKwargs, raiseError=True)
for event in events:
for match in reply.json:
for singleMatch in match["matches"]:
err = singleMatch.get("error")
if err is not None:
raise VLException(ErrorInfo.fromDict(err))
if event.eventId == match["reference"]["id"]:
event.matches.append(EventMatchResult(matchingLabel=self.label, matchResult=match["matches"][0]))
break
[docs]class MatchPolicy(BaseMatchPolicy):
"""Match policy schema"""
# matching candidates
candidates: DiscriminatedUnion("origin", [FaceMatchCandidates, EventMatchCandidates])