Source code for luna_handlers.classes.schemas.base_schema

"""
Module contains base schema for other complex pydantic models and simple schemas to use in other places
"""
from functools import cached_property
from typing import ClassVar

from vlutils.structures.pydantic import BaseModel as _BaseModel

from configs.configs.configs.settings.classes import HandlersLimitsSettings, RedisKeyStorageSetting


[docs]class BaseSchema(_BaseModel): """Base schema model"""
[docs] class Config: """Pydantic model config""" keep_untouched = (cached_property,)
[docs]class HandlerSettings: """Container class for service settings.""" faceDescriptorVersion: int = 0 bodyDescriptorVersion: int = 0 livenessRealThreshold: float = 0.0 receivedImagesLimit: int = 1 rawEventDetectionsLimit: int = 1 rawEventArraysLimit: int = 1 resultCandidateLimit: int = 1 defaultAttributeTTL: int = 1 maxAttributeTTL: int = 1
[docs] @classmethod def initialize( cls, faceDescriptorVersion: int, bodyDescriptorVersion: int, livenessRealThreshold: float, handlersLimitsSettings: HandlersLimitsSettings, attributeStorageSettings: RedisKeyStorageSetting, ): """ Initialize settings for schemas. Args: faceDescriptorVersion: default face descriptor version bodyDescriptorVersion: default body descriptor version livenessRealThreshold: default liveness threshold setting handlersLimitsSettings: luna handlers limits settings attributeStorageSettings: attribute storage settings """ cls.faceDescriptorVersion = faceDescriptorVersion cls.bodyDescriptorVersion = bodyDescriptorVersion cls.livenessRealThreshold = livenessRealThreshold cls.receivedImagesLimit = handlersLimitsSettings.receivedImagesLimit cls.rawEventDetectionsLimit = handlersLimitsSettings.rawEventDetectionsLimit cls.rawEventArraysLimit = handlersLimitsSettings.rawEventArraysLimit cls.resultCandidateLimit = handlersLimitsSettings.resultCandidateLimit cls.defaultAttributeTTL = attributeStorageSettings.defaultTTL cls.maxAttributeTTL = attributeStorageSettings.maxTTL
[docs]class SchemaUpdaterMixin: """Extended base schema model.""" # handler configs, initialized from application settings: ClassVar[HandlerSettings] = HandlerSettings # list of inherited classes from the current one __inheritors__: list = [] def __init_subclass__(cls, **kwargs): cls.__inheritors__.append(cls)
[docs] @classmethod def update(cls): """Update inherit schema.""" raise NotImplementedError
[docs] @classmethod def updateSchemas(cls): """Run settings update for inherited schemas.""" for obj in cls.__inheritors__: obj.update()