Source code for luna_handlers.classes.schemas.base_schema
"""
Module contains base schema for other complex pydantic models and simple schemas to use in other places
"""
from functools import cached_property
from typing import ClassVar
from vlutils.structures.pydantic import BaseModel as _BaseModel
from configs.configs.configs.settings.classes import HandlersLimitsSettings, RedisKeyStorageSetting
[docs]class BaseSchema(_BaseModel):
"""Base schema model"""
[docs] class Config:
"""Pydantic model config"""
keep_untouched = (cached_property,)
[docs]class HandlerSettings:
"""Container class for service settings."""
faceDescriptorVersion: int = 0
bodyDescriptorVersion: int = 0
livenessRealThreshold: float = 0.0
receivedImagesLimit: int = 1
rawEventDetectionsLimit: int = 1
rawEventArraysLimit: int = 1
resultCandidateLimit: int = 1
defaultAttributeTTL: int = 1
maxAttributeTTL: int = 1
[docs] @classmethod
def initialize(
cls,
faceDescriptorVersion: int,
bodyDescriptorVersion: int,
livenessRealThreshold: float,
handlersLimitsSettings: HandlersLimitsSettings,
attributeStorageSettings: RedisKeyStorageSetting,
):
"""
Initialize settings for schemas.
Args:
faceDescriptorVersion: default face descriptor version
bodyDescriptorVersion: default body descriptor version
livenessRealThreshold: default liveness threshold setting
handlersLimitsSettings: luna handlers limits settings
attributeStorageSettings: attribute storage settings
"""
cls.faceDescriptorVersion = faceDescriptorVersion
cls.bodyDescriptorVersion = bodyDescriptorVersion
cls.livenessRealThreshold = livenessRealThreshold
cls.receivedImagesLimit = handlersLimitsSettings.receivedImagesLimit
cls.rawEventDetectionsLimit = handlersLimitsSettings.rawEventDetectionsLimit
cls.rawEventArraysLimit = handlersLimitsSettings.rawEventArraysLimit
cls.resultCandidateLimit = handlersLimitsSettings.resultCandidateLimit
cls.defaultAttributeTTL = attributeStorageSettings.defaultTTL
cls.maxAttributeTTL = attributeStorageSettings.maxTTL
[docs]class SchemaUpdaterMixin:
"""Extended base schema model."""
# handler configs, initialized from application
settings: ClassVar[HandlerSettings] = HandlerSettings
# list of inherited classes from the current one
__inheritors__: list = []
def __init_subclass__(cls, **kwargs):
cls.__inheritors__.append(cls)
[docs] @classmethod
def update(cls):
"""Update inherit schema."""
raise NotImplementedError
[docs] @classmethod
def updateSchemas(cls):
"""Run settings update for inherited schemas."""
for obj in cls.__inheritors__:
obj.update()