Source code for luna_handlers.classes.schemas.filters

"""
Module contains schemas for filters
"""

from pydantic import conlist
from stringcase import snakecase

from classes.event import HandlerEvent as Event
from classes.schemas import types
from classes.schemas.base_schema import BaseSchema
from classes.schemas.types import MAX_MATCH_FILTER_LIST_LENGTH
from crutches_on_wheels.cow.maps.vl_maps import DEEPFAKE_MAP, ETHNIC_MAP, LIVENESS_MAP
from crutches_on_wheels.cow.pydantic.types import OptionalNotNullable


[docs] class MatchFilter(BaseSchema): """Simple match filter""" # match label label: types.Str36 # similarity upper including boundary similarityGte: types.StrictFloat01 = 0.0 # similarity lower including boundary similarityLte: types.StrictFloat01 = 1.0 @property def isEmpty(self) -> bool: """ Check there is nothing to filter. Returns: True if all filters are equal to their defaults """ return self.similarityGte == 0 and self.similarityLte == 1
[docs] class AttributesFilters(BaseSchema): """Matching filters schema""" # ethnic group list ethnicities: conlist(types.IntEthnicities, min_length=1, max_length=len(ETHNIC_MAP)) = OptionalNotNullable() # age upper including boundary ageGte: types.IntAge = OptionalNotNullable() # age lower excluding boundary ageLt: types.IntAge = OptionalNotNullable() # gender gender: types.Int01 = OptionalNotNullable() # liveness filter liveness: conlist(types.IntLiveness, min_length=1, max_length=len(LIVENESS_MAP)) = OptionalNotNullable() # deepfake filter deepfake: conlist(types.IntDeepfake, min_length=1, max_length=len(DEEPFAKE_MAP)) = OptionalNotNullable() @property def isEmpty(self) -> bool: """ Check there is nothing to filter. Returns: True if all filters are equal to their defaults """ return self.ethnicities is self.ageLt is self.ageGte is self.gender is self.liveness is self.deepfake is None
[docs] def isEventSatisfies(self, event: Event) -> bool: """ Does event satisfy filters by face attributes. Args: event: event Returns: true if event satisfied a filters otherwise false """ if self.isEmpty: return True if not (self.ethnicities is self.ageLt is self.ageGte is self.gender is None): faceAttributes = event.raw["face_attributes"] if faceAttributes is None: return False if self.gender is not None: if self.gender != faceAttributes["basic_attributes"]["gender"]: return False if self.ageLt is not None: if self.ageLt <= faceAttributes["basic_attributes"]["age"]: return False if self.ageGte is not None: if self.ageGte > faceAttributes["basic_attributes"]["age"]: return False if self.ethnicities is not None: if ( ETHNIC_MAP[snakecase(faceAttributes["basic_attributes"]["ethnicities"]["predominant_ethnicity"])] not in self.ethnicities ): return False if self.liveness is not None: liveness = None if (estimations := event.raw.get("aggregate_estimations")) and (face := estimations["face"]): liveness = face["attributes"].get("liveness") if liveness is None: return False if LIVENESS_MAP[liveness["prediction"]] not in self.liveness: return False if self.deepfake is not None: deepfake = None if (estimations := event.raw.get("aggregate_estimations")) and (face := estimations["face"]): deepfake = face["attributes"].get("deepfake") if deepfake is None: return False if DEEPFAKE_MAP[deepfake["prediction"]] not in self.deepfake: return False return True
[docs] class ComplexFilter(AttributesFilters): """Complex filter - includes attributes and list of match filters""" # match filter list match: conlist(MatchFilter, min_length=1, max_length=MAX_MATCH_FILTER_LIST_LENGTH) = OptionalNotNullable() @property def isEmpty(self) -> bool: """ Check there is nothing to filter. Returns: True if all filters are equal to None """ if self.match is None: return super().isEmpty return super().isEmpty and all(filter_.isEmpty for filter_ in self.match)
[docs] def isEventSatisfies(self, event: Event) -> bool: """ Does an event satisfy the filters. Args: event: event Returns: true if event satisfied filters otherwise false """ if not super().isEventSatisfies(event): return False if self.match is None: return True for matchFilter in self.match: # empty ranges if matchFilter.isEmpty: continue for match in event.raw["matches"]: # match to check? if match["label"] == matchFilter.label: # empty match? if not match["candidates"]: return False # filter case if not ( matchFilter.similarityGte <= match["candidates"][0]["similarity"] <= matchFilter.similarityLte ): return False else: # next filter break else: # matching with label exists in handler, but not in event: # event was not matched by it because of one of match policy filters return False # all filters passed return True