import contextlib
from asyncio import CancelledError
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import chain
from typing import Any, Awaitable, Callable, Iterable, Literal, Optional, TypeVar, Union
from uuid import uuid4
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from asyncpg import (
    ExternalRoutineInvocationError,
    ForeignKeyViolationError,
    Record,
    UndefinedFunctionError,
    UniqueViolationError,
)
from sqlalchemy import Column, and_, asc, delete, desc, exists, func, insert, select, text, union_all, update
from sqlalchemy.dialects import oracle, postgresql
from sqlalchemy.exc import DatabaseError, DataError, IntegrityError
from sqlalchemy.orm import Query, aliased
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql.elements import BooleanClauseList, ColumnClause, TextClause, not_
from sqlalchemy.sql.operators import mod
from sqlalchemy.sql.selectable import CompoundSelect
from tzlocal import get_localzone_name
from vlutils.descriptors.containers import sdkDescriptorEncode
from vlutils.helpers import bytesToBase64, convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner
from app.handlers.helpers import SearchFacesFilters, SearchListsFilters
from attributes_db.model import BasicAttributes, Descriptor, TemporaryAttributes
from configs.config import BACKGROUND_REMOVAL_BATCH_SIZE, DB_CONNECT_TIMEOUT, DELETE_REQUEST_MAX_SIZE
from configs.configs.configs.settings.classes import DBSetting
from crutches_on_wheels.cow.adaptors.connection import AbstractDBConnection
from crutches_on_wheels.cow.db.base_context import BaseDBContext
from crutches_on_wheels.cow.enums.attributes import TemporaryAttributeTargets as FaceAttributeTargets
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils import mixins
from crutches_on_wheels.cow.utils.check_connection import checkConnectionToDB
from crutches_on_wheels.cow.utils.db_functions import currentDBTimestamp, dbExceptionWrap as exceptionWrap
from crutches_on_wheels.cow.utils.functions import getPageCount
from crutches_on_wheels.cow.utils.healthcheck import checkSql
from crutches_on_wheels.cow.utils.log import Logger, logger
from db.faces_db_tools.models import faces_models as models
from db.faces_db_tools.models.enums import AttributeSample, SampleType
from db.faces_db_tools.models.faces_models import DB_LIST_LIMIT, LINK_SEQUENCE, Base as FacesBase, BaseMatViewsParity
from db.functions import ArrayAgg
from utils.encrypton import decryptCacheValue, encryptMessage
SAMPLE_POSTFIX = "_samples"
OBTAINING_METHOD_POSTFIX = "_obtaining_method"
VERSION_POSTFIX = "_version"
DESCRIPTOR = "descriptor"
MIN_GEN = 0  # minimum attribute generation
FACE_TO_LIST_TARGETS = frozenset(("face_id", "lists", "account_id"))
indexFaceName = None
indexDescriptorName = None
_FACE_BY_LIST_GETTER_COLUMNS_MAP = {
    "lists": select([ArrayAgg((lists := aliased(models.ListFace)).list_id)])
    .where(lists.face_id == models.ListFace.face_id)
    .label("lists"),
    "face_id": models.ListFace.face_id,
    "account_id": models.List.account_id,
    "event_id": models.Face.event_id,
    "user_data": models.Face.user_data,
    "external_id": models.Face.external_id,
    "avatar": models.Face.avatar,
    "create_time": models.Face.create_time,
}
_FACE_BY_FACE_GETTER_COLUMNS_MAP = {
    "lists": select([ArrayAgg((lists := aliased(models.ListFace)).list_id)])
    .where(and_(lists.face_id == models.Face.face_id))
    .label("lists"),
    "face_id": models.Face.face_id,
    "account_id": models.Face.account_id,
    "event_id": models.Face.event_id,
    "user_data": models.Face.user_data,
    "external_id": models.Face.external_id,
    "avatar": models.Face.avatar,
    "create_time": models.Face.create_time,
}
FACES_COUNT_CACHE_NAME = "faces_attributes_count"
REQUESTS_ENCRYPTION_KEY = "^&*Tyghuk2v47fg90vpyuhjn2ewsdgv8]-90i"
REQUEST_CACHE_TTL_SECONDS = 10
[docs]
@dataclass
class Reference:
    """Reference class."""
    descriptorVersion: int
    descriptor: bytes = None
    attributeId: str = None
    faceId: str = None 
# Dict with raw sqls for getting link/unlink keys
RAW_SQLS = {
    "postgres": {
        "link": """(SELECT listId, linkKey FROM (SELECT list_id AS listId, link_key AS linkKey FROM list_face WHERE 
                   list_id = '{listId}' AND mod(link_key, 2) = {mod} ORDER BY link_key DESC) as f LIMIT 1)""",
        "unlink": """(SELECT listId, unLinkKey FROM (SELECT list_id AS listId, unlink_key AS unLinkKey FROM 
                     unlink_attributes_log WHERE list_id = '{listId}' AND mod(unlink_key, 2) = {mod} ORDER BY 
                     unlink_key DESC) as f LIMIT 1)""",
    },
    "oracle": {
        "link": """(SELECT listId, linkKey FROM (SELECT list_id AS listId, link_key AS linkKey FROM list_face WHERE 
                   list_id = '{listId}' AND mod(link_key, 2) = {mod} ORDER BY link_key DESC) WHERE ROWNUM <= 1)""",
        "unlink": """(SELECT listId, unLinkKey FROM (SELECT list_id AS listId, unlink_key AS unLinkKey FROM 
                     unlink_attributes_log WHERE list_id = '{listId}' AND mod(unlink_key, 2) = {mod} ORDER BY 
                     unlink_key DESC) WHERE ROWNUM <= 1)""",
    },
}
[docs]
def getCompiledQuery(query: Union[Query, CompoundSelect], dbType: str) -> str:
    """
    Compile query to with literal_binds = True
    Args:
        query: query made by sqlalchemy or CompoundSelect as result of union_all from n queries operation
        dbType: database type
    Returns:
        string with compiled query
    """
    currentDialect = oracle if dbType == "oracle" else postgresql
    kwargs = {"dialect": currentDialect.dialect(), "compile_kwargs": {"literal_binds": True}}
    if isinstance(query, Query):
        return str(query.statement.compile(**kwargs))
    else:
        return str(query.compile(**kwargs)) 
X = TypeVar("X")
Y = TypeVar("Y")
[docs]
class DBContext(BaseDBContext, mixins.Initializable):
    """
    DB context
    Attributes:
        logger: request logger
    """
    def __init__(
        self,
        dbSettings: DBSetting,
        storageTime: str,
        defaultDescriptorVersion: int,
        encryptionCtx,
        databaseNumber: int,
        useMaterialViews: bool,
    ):
        super().__init__()
        self.dbSettings = dbSettings
        self.storageTime = storageTime
        self.defaultDescriptorVersion = defaultDescriptorVersion
        self.backgroundWorker: Optional[AsyncRunner] = None
        # (AsyncIOScheduler): background scheduler
        self.backgroundScheduler: Optional[AsyncIOScheduler] = None
        # (set): list ids set for deferred updating a last_update_time of list
        self._listsForUpdate: set = set()
        # (Logger): background task logger
        self.backgroundLogger: Optional[Logger] = None
        # cached value converters from db format to face api format
        self._faceProcessFunctions: dict[str, Callable[[Y], X]] = {}
        # cached value converters from db format to list api format
        self._listProcessFunctions: dict[str, Callable[[Y], X]] = {}
        # encryption context
        self.encryptionCtx = encryptionCtx
        # encryption hash filter
        self._encryptionHashFilter: Optional[bytes | TextClause] = None
        # detabase number for oracle golden gate (Literal[0, 1, 2])
        self.databaseNumber = databaseNumber
        # useMaterialViews or not MaterialViews, legacy?
        self.useMaterialViews = useMaterialViews
[docs]
    def makeOutputFaces(self, faceRows: list[Record | dict], columns: Iterable[str]) -> list[dict[str, Any]]:
        """
        Make result faces (from the database reply) proper for an user.
        Args:
            faceRows: face list from db
            columns: selected columns
        Returns:
            faces with changed fields
        """
        if not faceRows:
            return []
        processableTargets = set(self._faceProcessFunctions) & set(columns)
        if processableTargets:
            faces = []
            for row in faceRows:
                face = dict(row)
                for target in processableTargets:
                    face[target] = self._faceProcessFunctions[target](face[target])
                faces.append(face)
        else:
            faces = [dict(row) for row in faceRows]
        return faces 
[docs]
    def makeOutputLists(self, lists: list[Record | dict]) -> list[dict[str, Any]]:
        """
        Make result of lists (from the database reply) proper for an user.
        Args:
            lists: list with lists
        Returns:
            lists with changed fields
        """
        result = []
        for row in lists:
            facesList = {}
            for field, value in row.items():
                if pf := self._listProcessFunctions.get(field):
                    value = pf(value)
                facesList[field] = value
            result.append(facesList)
        return result 
    async def _deferredUpdateListLastUpdateTime(self):
        """
        Deferred update list last update time. Lists for updating are cached in the '_listsForUpdate'
        """
        if not self._listsForUpdate:
            return
        self.backgroundLogger.debug(f"start to update {len(self._listsForUpdate)} lists last update time")
        for listId in list(self._listsForUpdate):
            try:
                # batch update provokes deadlocks (for several luna-faces instances)
                await self.updateListLastUpdateTime(listId)
                self._listsForUpdate.remove(listId)
            except Exception:
                self.backgroundLogger.exception()
            else:
                self.backgroundLogger.debug(f"updated last update time for {listId}")
    def _addListsToDeferrerUpdateListLastUpdateTime(self, listIds: Iterable[str]):
        """
        Add lists to deferrer update list last update time
        Args:
            listIds: list ids
        """
        self._listsForUpdate.update(listIds)
[docs]
    async def initialize(self):
        """Async __init__."""
        await self.__class__.initDBContext(
            dbSettings=self.dbSettings, storageTime=self.storageTime, connectTimeout=DB_CONNECT_TIMEOUT
        )
        self.backgroundLogger = Logger(template="backgroundDatabase")
        self._encryptionHashFilter: Optional[bytes | TextClause] = (
            text(f"HEXTORAW('{self.encryptionCtx.encryptionHash.hex()}')")
            if self.dbSettings.type == "oracle" and self.encryptionCtx.encryptionHash is not None
            else self.encryptionCtx.encryptionHash
        )
        self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
        self.backgroundWorker = AsyncRunner(max(self.adaptor.sessionCount // 2, 1), closeTimeout=5)
        self.backgroundScheduler.add_job(self._deferredUpdateListLastUpdateTime, trigger="interval", seconds=1)
        self.backgroundScheduler.start()
        if self.dbSettings.type == "oracle":
            await self.setOracleIndexNames()
        storageTimeIsUTC = self.storageTime == "UTC"
        self._faceProcessFunctions = {
            "create_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
            "external_id": lambda s: s if s is not None else "",
            "user_data": lambda s: s if s is not None else "",
            "avatar": lambda s: s if s is not None else "",
            "lists": ArrayAgg.typeHandler,
        }
        self._listProcessFunctions = {
            "create_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
            "last_update_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
            "user_data": lambda s: s if s is not None else "",
        }
        self.need2ConvertBinary = await self.checkDbFunctionExistence(func.vlconvert(b"0")) 
    @property
    def currentDBTimestamp(self) -> TextClause:
        """
        Get current db timestamp function
        Returns:
            function for computation current db timestamp
        """
        return currentDBTimestamp(self.dbConfig.type)
[docs]
    async def close(self):
        """
        Close all database contexts
        """
        try:
            if self.backgroundWorker:
                await self.backgroundWorker.close()
                self.backgroundWorker = None
        except CancelledError:
            pass
        await self._deferredUpdateListLastUpdateTime()
        if self.adaptor:
            await self.adaptor.close()
        if self.backgroundScheduler:
            self.backgroundScheduler.shutdown()
            self.backgroundScheduler = None 
[docs]
    async def setOracleIndexNames(self):
        """
        Set index names for some requests with 'use-force-index'
        """
        async with self.adaptor.connection(logger) as connection:
            indexNames = await connection.fetchall(
                "select index_name from all_indexes where table_name in ('FACE', 'DESCRIPTOR') and "
                "index_name like 'SYS_%' order by table_name"
            )
            global indexDescriptorName, indexFaceName
            indexDescriptorName, indexFaceName = [indexName[0] for indexName in indexNames] 
[docs]
    async def ping(self, pingCount: int) -> bool:
        """
        Ping database. Execute 'SELECT 1' from one to 'pingCount' times.
        Args:
            pingCount: ping count
        Returns:
            True - if any request success execute otherwise False
        """
        async def ping():
            async with self.adaptor.connection(logger) as connection:
                st = select([1])
                await connection.execute(st)
        for i in range(pingCount):
            try:
                await ping()
            except Exception:
                logger.exception(f"failed db ping, try {i + 1}")
            else:
                return True
        return False 
[docs]
    @exceptionWrap
    async def checkDbFunctionExistence(self, saFuncCall) -> bool:
        """
        Try call custom function to check wheather it exists in database.
        Args:
            saFuncCall: constructed sqlalchemy function object
        Returns:
            True - if request success execute otherwise False
        """
        logger.info(f"Check database function '{saFuncCall.description}' existence: begin")
        try:
            async with self.adaptor.connection(logger) as connection:
                await connection.fetchall(select([saFuncCall]))
        except (ExternalRoutineInvocationError, DataError):
            pass
        except (UndefinedFunctionError, DatabaseError):
            logger.info(f"Database function '{saFuncCall.description}' is not implemented")
            return False
        logger.info(f"Database function '{saFuncCall.description}' is implemented")
        return True 
[docs]
    def getQuery4Binary(self, column: Column):
        """Generate query to select binary field"""
        if self.need2ConvertBinary:
            return func.vlconvert(column).label(column.description)
        return column 
[docs]
    async def insertFaceAttributeData(
        self, connection: AbstractDBConnection, faceId: str, attribute: TemporaryAttributes
    ) -> None:
        """
        Insert face attribute data.
        Args:
            connection: db connection
            faceId: face id
            attribute: temporary attribute container
        """
        insertToAttributesValue = {
            "face_id": faceId,
            "descriptor_samples_generation": 0,
            "create_time": attribute.createTime.replace(tzinfo=None),
        }
        if attribute.basicAttributes:
            insertToAttributesValue["age"] = attribute.basicAttributes.age
            insertToAttributesValue["gender"] = attribute.basicAttributes.gender
            insertToAttributesValue["ethnicity"] = attribute.basicAttributes.ethnicity
            insertToAttributesValue["gender_obtaining_method"] = 1
            insertToAttributesValue["gender_version"] = 1
            insertToAttributesValue["age_obtaining_method"] = 1
            insertToAttributesValue["age_version"] = 1
            insertToAttributesValue["ethnicity_obtaining_method"] = 1
            insertToAttributesValue["ethnicity_version"] = 1
        insertToSamplesValues = [
            {"face_id": faceId, "sample_id": sampleId, "type": SampleType.basic_attributes.value}
            for sampleId in attribute.basicAttributesSamples
        ]
        insertToSamplesValues += [
            {"face_id": faceId, "sample_id": sampleId, "type": SampleType.face_descriptor.value}
            for sampleId in attribute.descriptorSamples
        ]
        insertDescriptorValues = [
            {
                "descriptor_version": descriptor.version,
                "face_id": faceId,
                "descriptor": descriptor.descriptor,
                "encryption_hash": self.encryptionCtx.encryptionHash,
                "descriptor_obtaining_method": 1,
                "descriptor_generation": 0,
            }
            for descriptor in attribute.descriptors
        ]
        await connection.execute(insert(models.Attribute).values(insertToAttributesValue))
        if self.dbConfig.type == "oracle":
            for value in insertToSamplesValues:
                await connection.execute(insert(models.Sample).values(value))
            for value in insertDescriptorValues:
                await connection.execute(insert(models.Descriptor).values(value))
        else:
            if insertToSamplesValues:
                await connection.execute(insert(models.Sample).values(insertToSamplesValues))
            if insertDescriptorValues:
                await connection.execute(insert(models.Descriptor).values(insertDescriptorValues)) 
[docs]
    async def createFace(
        self,
        externalFaceId: Optional[str] = None,
        listIds: Optional[set[str]] = None,
        attribute: Optional[TemporaryAttributes] = None,
        **kwargs,
    ) -> str:
        """
        Create face.
        Args:
            externalFaceId: external faceId
            listIds: luna lists
            attribute: temporary attribute container
        Keyword Args:
            event_id: reference to event which created face
            user_data: face information
            account_id: id of account, required
            external_id: external id of the face, if it has its own mapping in external system
            avatar: image url that represents the face
        Returns:
            faceId: face id in uuid4 format
        """
        kwargs["face_id"] = faceId = str(uuid4()) if externalFaceId is None else externalFaceId
        accountId = kwargs["account_id"]
        async def processFaceListsAndAttributes(openConnection):
            if attribute is not None:
                await self.insertFaceAttributeData(openConnection, faceId=faceId, attribute=attribute)
            if listIds:
                selectLists = select([models.List.list_id, text(f"'{faceId}'"), LINK_SEQUENCE.next_value()]).where(
                    and_(models.List.account_id == accountId, models.List.list_id.in_(listIds))
                )
                insertFaceListSt = insert(models.ListFace).from_select(
                    [models.ListFace.list_id, models.ListFace.face_id, models.ListFace.link_key], selectLists
                )
                listCount = await openConnection.execute(insertFaceListSt)
                if len(listIds) != listCount:
                    # stupid oracle return only one updated row
                    selectListSt = select([models.List.list_id]).where(
                        and_(models.List.list_id.in_(listIds), models.List.account_id == accountId)
                    )
                    updatedLists = await openConnection.fetchall(selectListSt)
                    updatedLists = set(chain(*updatedLists))
                    nonExistListId = next(iter(listIds - updatedLists))
                    raise VLException(Error.ListsNotFound.format(nonExistListId), 400, False)
            return faceId
        async with self.adaptor.connection(logger) as connection:
            deleteSt = delete(models.Face).where(models.Face.face_id == faceId)
            insertSt = insert(models.Face).values(**kwargs)
            # fast way creating face
            try:
                await connection.execute(insertSt)
            except (IntegrityError, UniqueViolationError):
                # face with same id already exists, need remove it, go to slow way
                pass
            else:
                await processFaceListsAndAttributes(connection)
                if listIds:
                    self._addListsToDeferrerUpdateListLastUpdateTime(listIds)
                return faceId
        # slow way
        async with self.adaptor.connection(logger) as connection:
            await connection.execute(deleteSt)
            await connection.execute(insertSt)
            await processFaceListsAndAttributes(connection)
        if listIds:
            self._addListsToDeferrerUpdateListLastUpdateTime(listIds)
        return faceId 
[docs]
    @exceptionWrap
    async def updateFace(self, faceId: str, accountId: Optional[str] = None, **kwargs) -> int:
        """
        Update face.
        Args:
            faceId: face id
            accountId: account id
        Keyword Args:
            user_data: face information
            event_id: reference to event which created face
            external_id: external id of the face, if it has its own mapping in external system
            avatar: image url that represents the face
        Returns:
            updated face count
        """
        async with self.adaptor.connection(logger) as connection:
            updateFaceSt = (
                update(models.Face)
                .where(and_(models.Face.face_id == faceId, models.Face.account_id @ accountId))
                .values(**kwargs)
            )
            return await connection.execute(updateFaceSt) 
[docs]
    @exceptionWrap
    async def linkFacesToList(self, listId: str, faces: list[str]) -> tuple[list[str], list[str]]:
        """
        Attach faces to list.
        Args:
            listId: list id
            faces: face ids
        Raises:
            IntegrityError
            Exception
        Returns:
            list of success link faces and failed link faces.
        """
        success = []
        error = []
        async def linkFace(faceId, conn):
            try:
                updateFaceListSt = insert(models.ListFace).values(
                    list_id=listId, face_id=faceId, link_key=LINK_SEQUENCE.next_value()
                )
                await conn.execute(updateFaceListSt)
            except (IntegrityError, UniqueViolationError, ForeignKeyViolationError):
                error.append(faceId)
                return False
            else:
                success.append(faceId)
                return True
        async with self.adaptor.connection(logger) as connection:
            updateFaceSt = (
                update(models.Face)
                .where(models.Face.face_id.in_(faces))
                .values(last_update_time=self.currentDBTimestamp)
            )
            await connection.execute(updateFaceSt)
        self._addListsToDeferrerUpdateListLastUpdateTime((listId,))
        for face in faces:
            async with self.adaptor.connection(logger) as connection:
                await linkFace(face, connection)
        return success, error 
[docs]
    @exceptionWrap
    async def unlinkFacesFromList(self, listId: str, faces: list[str], accountId: Optional[str] = None):
        """
        Unlink faces from list.
        Args:
            listId: list id
            faces: face ids
            accountId: account id
        """
        async def updateHistoryLog():
            querySelect = select([models.ListFace.list_id, models.Face.face_id, models.ListFace.link_key]).where(
                and_(
                    models.Face.face_id.in_(faces),
                    models.Face.face_id == models.ListFace.face_id,
                    models.ListFace.list_id == listId,
                )
            )
            insertSt = insert(models.UnlinkAttributesLog).from_select(
                [
                    models.UnlinkAttributesLog.list_id,
                    models.UnlinkAttributesLog.face_id,
                    models.UnlinkAttributesLog.link_key,
                ],
                querySelect,
            )
            await connection.execute(insertSt)
        async def unlinkFaces():
            deleteFacesSt = delete(models.ListFace).where(
                and_(
                    models.ListFace.face_id.in_(faces),
                    models.ListFace.list_id == listId,
                    models.List.account_id @ accountId,
                )
            )
            await connection.execute(deleteFacesSt)
            updateFaceSt = (
                update(models.Face)
                .where(models.Face.face_id.in_(faces))
                .values(last_update_time=self.currentDBTimestamp)
            )
            await connection.execute(updateFaceSt)
        async with self.adaptor.connection(logger) as connection:
            await self.blockFaces(connection, [models.Face.face_id.in_(faces)])
            await updateHistoryLog()
            await unlinkFaces()
        self._addListsToDeferrerUpdateListLastUpdateTime((listId,)) 
[docs]
    @exceptionWrap
    async def createList(self, account_id: str, listId: str, user_data: Optional[str] = "") -> None:
        """
        Create list for account.
        Args:
            account_id: account id
            user_data: user data
            listId: list id
        """
        async with self.adaptor.connection(logger) as connection:
            insertSt = insert(models.List).values(list_id=listId, account_id=account_id, user_data=user_data)
            try:
                await connection.execute(insertSt)
            except (IntegrityError, UniqueViolationError):
                raise VLException(Error.ListAlreadyExist.format(listId), 409, isCriticalError=False) 
[docs]
    @staticmethod
    def prepareSearchFacesFilters(
        filters: SearchFacesFilters,
        modelsFace: Optional[AliasedClass] = None,
        modelsListFace: Optional[AliasedClass] = None,
    ) -> list[BooleanClauseList]:
        """
        Prepare search query filters on "Face" models.
        Args:
            filters: query filters
            modelsFace: database table model for faces
            modelsListFace: database table model for list face
        Returns:
            filters for Face model
        """
        modelsFace = modelsFace or models.Face
        modelsListFace = modelsListFace or models.ListFace
        return [
            modelsFace.account_id == filters.account_id if filters.account_id is not None else True,
            modelsFace.user_data.like("%{}%".format(filters.user_data)) if filters.user_data is not None else True,
            modelsFace.event_id == filters.event_id if filters.event_id is not None else True,
            modelsFace.create_time < filters.create_time__lt if filters.create_time__lt is not None else True,
            modelsFace.create_time >= filters.create_time__gte if filters.create_time__gte is not None else True,
            modelsFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
            modelsFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
            modelsFace.external_id.in_(filters.external_ids) if filters.external_ids is not None else True,
            modelsFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
            modelsListFace.face_id == modelsFace.face_id if filters.list_id is not None else True,
            modelsListFace.list_id == filters.list_id if filters.list_id is not None else True,
        ] 
[docs]
    @staticmethod
    def setOffsetAndOrderToQuery(
        query: Query,
        order: Optional[Literal["desc", "asc"]] = "desc",
        orderColumn: Optional[Column] = None,
        page: Optional[int] = 1,
        pageSize: Optional[int] = 100,
    ) -> Query:
        """
        Apply `OFFSET` and `ORDER BY` criterion to the query.
        Args:
            query: query object
            order: result sort order (ask or desc)
            orderColumn: result sort column
            page: pagination page value
            pageSize: pagination page size value, set -1 to get all faces
        Returns:
            updated query
        """
        if orderColumn is not None:
            query = query.order_by((asc if order == "asc" else desc)(orderColumn))
        if pageSize != -1:
            query = query.offset((page - 1) * pageSize).limit(pageSize)
        return query 
[docs]
    @staticmethod
    def prepareListQueryFilters(filters: SearchListsFilters) -> BooleanClauseList:
        """
        Prepare search query filters on "List" models.
        Args:
            filters: query filters
        Returns:
            filters for List select query
        """
        sqlFilters = [
            models.List.account_id @ filters.account_id,
            models.List.user_data.like("%{}%".format(filters.user_data)) if filters.user_data is not None else True,
            models.List.user_data @ filters.user_data__eq,
            models.List.create_time < filters.create_time__lt if filters.create_time__lt is not None else True,
            models.List.create_time >= filters.create_time__gte if filters.create_time__gte is not None else True,
            (
                models.List.last_update_time < filters.last_update_time__lt
                if filters.last_update_time__lt is not None
                else True
            ),
            (
                models.List.last_update_time >= filters.last_update_time__gte
                if filters.last_update_time__gte is not None
                else True
            ),
            models.List.list_id < filters.list_id__lt if filters.list_id__lt is not None else True,
            models.List.list_id >= filters.list_id__gte if filters.list_id__gte is not None else True,
            models.List.list_id.in_(filters.list_ids) if filters.list_ids is not None else True,
        ]
        return and_(*sqlFilters) 
[docs]
    @exceptionWrap
    async def executeSearchFaces(
        self,
        filters: SearchFacesFilters,
        targets: Optional[list[str]],
        page: Optional[int] = 1,
        pageSize: Optional[int] = 100,
    ) -> list[dict[str, str]]:
        """
        Get faces searched by filters
        Args:
            filters: raw search request filters
            targets: target Face columns' names to get info on
            page: page
            pageSize: page size
        Returns:
            faces list
        """
        queryColumns = [_FACE_BY_FACE_GETTER_COLUMNS_MAP[target] for target in targets]
        orderColumn = (
            models.Face.face_id if any((filters.face_id__gte, filters.face_id__lt)) else models.Face.create_time
        )
        slqFilters = self.prepareSearchFacesFilters(filters)
        query = Query(queryColumns).filter(
            and_(
                *slqFilters,
            )
        )
        query = self.setOffsetAndOrderToQuery(query=query, orderColumn=orderColumn, page=page, pageSize=pageSize)
        async with self.adaptor.connection(logger) as connection:
            faceRows = await connection.fetchall(query.statement)
        return self.makeOutputFaces(faceRows, targets) 
[docs]
    @exceptionWrap
    async def executeSearchFacesFilteredByList(
        self, filters: SearchFacesFilters, targets: list[str], page: Optional[int] = 1, pageSize: Optional[int] = 100
    ) -> list[dict[str, str]]:
        """
        Get faces searched by filters associated with face-list model
        Args:
            filters: raw search request filters
            targets: target Face columns' names to get info on
            page: page
            pageSize: page size
        Returns:
            faces list
        """
        queryFilters = and_(
            models.List.account_id == filters.account_id if filters.account_id is not None else True,
            models.List.list_id == filters.list_id,
            models.ListFace.list_id == models.List.list_id,
            models.ListFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
            models.ListFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
            models.ListFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
            (
                models.Face.face_id == models.ListFace.face_id
                if set(targets) - {"face_id", "account_id", "lists"}
                else True
            ),
        )
        query = select([_FACE_BY_LIST_GETTER_COLUMNS_MAP[target] for target in targets]).where(queryFilters)
        query = self.setOffsetAndOrderToQuery(
            query=query, orderColumn=models.ListFace.face_id, page=page, pageSize=pageSize
        )
        async with self.adaptor.connection(logger) as connection:
            faceRows = await connection.fetchall(query)
        return self.makeOutputFaces(faceRows, targets) 
[docs]
    @exceptionWrap
    async def getFaces(
        self,
        filters: SearchFacesFilters,
        targets: Optional[list[str]] = None,
        page: Optional[int] = 1,
        pageSize: Optional[int] = 100,
    ) -> list[dict[str, str]]:
        """
        Get faces.
        Args:
            filters: raw search request filters
            targets: target Face columns' names to get info on
            page: page
            pageSize: page size
        Returns:
            list of faces
        """
        if filters.listFaceFiltersAreUsed() and any((filters.face_id__gte, filters.face_id__lt)):
            # getting faces from a list-face model only when appropriate filters and targets are available
            # more profitable with a large count of faces in the list
            return await self.executeSearchFacesFilteredByList(filters, targets, page=page, pageSize=pageSize)
        return await self.executeSearchFaces(filters, targets, page=page, pageSize=pageSize) 
[docs]
    @exceptionWrap
    async def getNonexistentFaceId(
        self,
        requiredFaceIds: set[str],
        accountId: Optional[str] = None,
        dbConnection: Optional[AbstractDBConnection] = None,
    ) -> Union[str, None]:
        """
        Get one of requiredFaceIds that not exists.
        Args:
            requiredFaceIds: set of required face ids
            accountId: account id
            dbConnection: current connection. Needed not to get new connection from pool
                or to do some stuff in scope of the transaction.
        Returns:
            Non existing face id
        """
        async with contextlib.AsyncExitStack() as stack:
            connection = (
                await stack.enter_async_context(self.adaptor.connection(logger))
                if dbConnection is None
                else dbConnection
            )
            query = Query([models.Face.face_id]).filter(
                models.Face.account_id == accountId if accountId is not None else True,
                models.Face.face_id.in_(requiredFaceIds),
            )
            existFaceIds = set(chain(*await connection.fetchall(query.statement)))
        nonexistentFaceIds = requiredFaceIds - existFaceIds
        return next(iter(nonexistentFaceIds)) if nonexistentFaceIds else None 
[docs]
    @exceptionWrap
    async def getFacesCountByList(self, filters: SearchFacesFilters) -> int:
        """
        Get count of faces filtered by list id.
        Args:
            filters: raw search request filters
        Returns:
            Number of faces
        """
        async with self.adaptor.connection(logger) as connection:
            if filters.account_id is not None:
                # checking the list by account before getting the faces count is more profitable
                listCountSt = select([func.count(models.List.list_id)]).where(
                    and_(models.List.account_id == filters.account_id, models.List.list_id == filters.list_id)
                )
                if not await connection.scalar(listCountSt):
                    return 0
            selectSt = select([func.count(models.ListFace.face_id)]).where(
                and_(
                    models.ListFace.list_id == filters.list_id,
                    models.ListFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
                    models.ListFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
                    models.ListFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
                )
            )
            return await connection.scalar(selectSt) 
[docs]
    @exceptionWrap
    async def getCachedFaceAttributesCount(self) -> int:
        """
        Get cached count of faces attributes.
        Returns:
            Number of faces attributes
        """
        lockCachedCount = select([1]).where(models.RequestsCache.name == FACES_COUNT_CACHE_NAME).with_for_update()
        delta = timedelta(seconds=REQUEST_CACHE_TTL_SECONDS)
        selectActualCache = select([models.RequestsCache.value]).where(
            and_(
                models.RequestsCache.name == FACES_COUNT_CACHE_NAME,
                models.RequestsCache.created_at + delta > self.currentDBTimestamp,
            )
        )
        async with self.adaptor.connection(logger) as connection:
            facesAttributesCount = None
            await connection.scalar(lockCachedCount)
            if requestedCache := await connection.scalar(selectActualCache):
                facesAttributesCount = decryptCacheValue(requestedCache, REQUESTS_ENCRYPTION_KEY)
                try:
                    facesAttributesCount = int(facesAttributesCount)
                except TypeError:
                    logger.error("Unable to process decrypted cache value")
                    facesAttributesCount = None
            if facesAttributesCount is None:
                selectFaceAttrsCount = select([func.count(models.Attribute.face_id)])
                facesAttributesCount = await connection.scalar(selectFaceAttrsCount)
                updateQuery = (
                    update(models.RequestsCache)
                    .where(models.RequestsCache.name == FACES_COUNT_CACHE_NAME)
                    .values(
                        value=encryptMessage(facesAttributesCount, REQUESTS_ENCRYPTION_KEY),
                        created_at=self.currentDBTimestamp,
                    )
                )
                await connection.execute(updateQuery)
        return facesAttributesCount 
[docs]
    @exceptionWrap
    async def getFacesCount(self, filters: SearchFacesFilters) -> int:
        """
        Get count of faces.
        Args:
            filters: raw search request filters
        Returns:
            Number of faces
        """
        if filters.listFaceFiltersAreUsed():
            return await self.getFacesCountByList(filters)
        else:
            selectSt = select([func.count(models.Face.face_id)]).where(
                and_(
                    *self.prepareSearchFacesFilters(filters),
                )
            )
            async with self.adaptor.connection(logger) as connection:
                return await connection.scalar(selectSt) 
[docs]
    @exceptionWrap
    async def getFacesAttributesCount(self, accountId: Optional[str] = None) -> int:
        """
        Return face attribute count
        Args:
            accountId: account id
        Returns:
            face attribute count
        """
        if accountId:
            filters = and_(models.Face.account_id == accountId, models.Attribute.face_id == models.Face.face_id)
        else:
            filters = and_()
        selectSt = select([func.count(models.Attribute.face_id)]).where(filters)
        async with self.adaptor.connection(logger) as connection:
            faceAttributeCount = await connection.scalar(selectSt)
        return faceAttributeCount 
[docs]
    @exceptionWrap
    async def getLists(
        self, filters: SearchListsFilters, page: Optional[int] = 1, pageSize: Optional[int] = 100
    ) -> list[dict[str, str]]:
        """
        Get list
        Args:
            filters: raw search request filters
            page: page
            pageSize: page size
        Returns:
            List with dicts
        """
        queryFilters = self.prepareListQueryFilters(filters)
        orderColumn = (
            models.List.list_id
            if filters.list_id__gte is not None or filters.list_id__lt is not None
            else models.List.create_time
        )
        async with self.adaptor.connection(logger) as connection:
            query = (
                Query(models.List)
                .filter(queryFilters)
                .order_by(orderColumn.desc())
                .offset((page - 1) * pageSize)
                .limit(pageSize)
            )
            listRows = await connection.fetchall(query.statement)
        return self.makeOutputLists(listRows) 
[docs]
    @exceptionWrap
    async def getNonexistentListId(
        self, requiredListIds: set[str], accountId: Optional[str] = None
    ) -> Union[str, None]:
        """
        Get one of requiredListIds that not exists.
        Args:
            requiredListIds: set of required face ids
            accountId: account id
        Returns:
            Non existsing list id
        """
        async with self.adaptor.connection(logger) as connection:
            query = Query(models.List.list_id).filter(
                models.List.account_id @ accountId, models.List.list_id.in_(requiredListIds)
            )
            res = await connection.fetchall(query.statement)
            existListIds = set(chain(*res))
        nonexistentListIds = requiredListIds - existListIds
        return next(iter(nonexistentListIds)) if nonexistentListIds else None 
[docs]
    @exceptionWrap
    async def getListsCount(self, filters: SearchListsFilters) -> int:
        """
        Count lists
        Args:
            filters: raw search request filters
        Returns:
            Count of lists
        """
        queryFilters = self.prepareListQueryFilters(filters)
        async with self.adaptor.connection(logger) as connection:
            query = Query([func.count(models.List.list_id).label("count")])
            query = query.filter(queryFilters)
            listCount = await connection.scalar(query.statement)
            return listCount 
[docs]
    @exceptionWrap
    async def getListsAndKeysFromMV(self, listIds: set[str], useParity: int) -> list[dict[str, Any]]:
        """
        Get lists with last link and unlink keys from materialized views
        Args:
            listIds: list ids
            useParity: if 1 - get even and odd max link/unlink keys else max link/unlink keys
        Returns:
            List with list ids and its link/unlink keys
        """
        listIdsList = list(listIds)
        listIdsBatches = [listIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(listIdsList), DB_LIST_LIMIT)]
        if useParity:
            async with self.adaptor.connection(logger) as connection:
                selectQueryList = [
                    Query(
                        [
                            models.List.list_id,
                            models.MV_LINK_0.link_key.label("link_key_even"),
                            models.MV_LINK_1.link_key.label("link_key_odd"),
                            models.MV_UNLINK_0.unlink_key.label("unlink_key_even"),
                            models.MV_UNLINK_1.unlink_key.label("unlink_key_odd"),
                        ]
                    )
                    .join(models.MV_LINK_0, models.MV_LINK_0.list_id == models.List.list_id, full=True)
                    .join(models.MV_LINK_1, models.MV_LINK_1.list_id == models.List.list_id, full=True)
                    .join(models.MV_UNLINK_0, models.MV_UNLINK_1.list_id == models.List.list_id, full=True)
                    .join(models.MV_UNLINK_1, models.MV_UNLINK_1.list_id == models.List.list_id, full=True)
                    .filter(models.List.list_id.in_(listIdsBatch))
                    for listIdsBatch in listIdsBatches
                ]
                selectResult = []
                for selectQuery in selectQueryList:
                    selectResult.extend(await connection.fetchall(selectQuery.statement))
                result = [
                    {
                        "list_id": listId,
                        "link_key_even": linkKeyEven,
                        "link_key_odd": linkKeyOdd,
                        "unlink_key_even": unlinkKeyEven,
                        "unlink_key_odd": unlinkKeyOdd,
                    }
                    for listId, linkKeyEven, linkKeyOdd, unlinkKeyEven, unlinkKeyOdd in selectResult
                ]
        else:
            async with self.adaptor.connection(logger) as connection:
                selectQueryList = [
                    Query([models.List.list_id, models.MV_LINK.link_key, models.MV_UNLINK.unlink_key])
                    .join(models.MV_LINK, models.MV_LINK.list_id == models.List.list_id, full=True)
                    .join(models.MV_UNLINK, models.MV_UNLINK.list_id == models.List.list_id, full=True)
                    .filter(models.List.list_id.in_(listIdsBatch))
                    for listIdsBatch in listIdsBatches
                ]
                selectResult = []
                for selectQuery in selectQueryList:
                    selectResult.extend(await connection.fetchall(selectQuery.statement))
                result = [
                    {"list_id": listId, "link_key": linkKey, "unlink_key": unlinkKey}
                    for listId, linkKey, unlinkKey in selectResult
                ]
        return result 
[docs]
    @exceptionWrap
    async def getListsWithKeys(self, listIds: set[str], useParity: int) -> list[dict[str, Any]]:
        """
        Get lists with last link and unlink keys from usual tables
        Args:
            listIds: list ids
            useParity: if 1 - get even and odd max link/unlink keys, if 0 - max link/unlink keys
        Returns:
            list with list ids and its link/unlink keys
        """
        listIdsList = list(listIds)
        listIdsBatches = [listIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(listIdsList), DB_LIST_LIMIT)]
        async def getKeysFromDb(
            searchLists: list[str], dbConnection, keyType: str, parity: Optional[str] = None
        ) -> dict:
            """
            Get link/unlink keys from db
            Args:
                searchLists: list of searching lists
                dbConnection: database connection
                keyType: link or unlink
                parity: none, odd or even
            Returns:
                dict with listIds and keys
            """
            if parity is not None:
                rawSql = " UNION ALL ".join(
                    (
                        RAW_SQLS[self.dbConfig.type][keyType].format(listId=listId, mod=1 if parity == "odd" else 0)
                        for listId in searchLists
                    )
                )
                res = await dbConnection.fetchall(rawSql)
                return dict(res)
            else:
                queries = []
                for listId in searchLists:
                    if keyType == "link":
                        selectParams = [models.ListFace.list_id, models.ListFace.link_key]
                        basicFilters = models.ListFace.list_id == listId
                        key = models.ListFace.link_key
                    else:
                        selectParams = [models.UnlinkAttributesLog.list_id, models.UnlinkAttributesLog.unlink_key]
                        basicFilters = models.UnlinkAttributesLog.list_id == listId
                        key = models.UnlinkAttributesLog.unlink_key
                    query = Query(selectParams).filter(basicFilters).order_by(key.desc()).limit(1)
                    queries.append(query)
                linkKeyAndLists = await dbConnection.fetchall(union_all(*queries))
                return dict(linkKeyAndLists)
        async with self.adaptor.connection(logger) as connection:
            existsListSts = [
                Query(models.List.list_id).filter(models.List.list_id.in_(listIdsBatch)).statement
                for listIdsBatch in listIdsBatches
            ]
            res = []
            for existListSt in existsListSts:
                res.extend(await connection.fetchall(existListSt))
            existsLists = [listId[0] for listId in res]
            existsListsBatches = [
                existsLists[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(existsLists), DB_LIST_LIMIT)
            ]
            if not existsListsBatches:
                return []
            result = dict()
            for existsListsBatch in existsListsBatches:
                if useParity:
                    mapListIdMaxEvenLinkKey = await getKeysFromDb(existsListsBatch, connection, "link", "odd")
                    mapListIdMaxEvenUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink", "odd")
                    mapListIdMaxOddLinkKey = await getKeysFromDb(existsListsBatch, connection, "link", "even")
                    mapListIdMaxOddUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink", "even")
                    result.update(
                        dict(
                            zip(
                                existsListsBatch,
                                [
                                    {
                                        "link_key_even": mapListIdMaxOddLinkKey.get(listId, None),
                                        "unlink_key_even": mapListIdMaxOddUnlinkKey.get(listId, None),
                                        "link_key_odd": mapListIdMaxEvenLinkKey.get(listId, None),
                                        "unlink_key_odd": mapListIdMaxEvenUnlinkKey.get(listId, None),
                                    }
                                    for listId in existsListsBatch
                                ],
                            )
                        )
                    )
                else:
                    mapListIdMaxLinkKey = await getKeysFromDb(existsListsBatch, connection, "link")
                    mapListIdMaxUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink")
                    result.update(
                        dict(
                            zip(
                                existsListsBatch,
                                [
                                    {
                                        "link_key": mapListIdMaxLinkKey.get(listId, None),
                                        "unlink_key": mapListIdMaxUnlinkKey.get(listId, None),
                                    }
                                    for listId in existsListsBatch
                                ],
                            )
                        )
                    )
            return [{"list_id": listId, **listData} for listId, listData in result.items()] 
[docs]
    @exceptionWrap
    async def deleteFaces(
        self,
        faces: list[str] | None = None,
        accountId: str | None = None,
        createTimeGte: str | None = None,
        createTimeLt: str | None = None,
        userData: str | None = None,
        listId: str | None = None,
    ) -> int:
        """
        Remove faces.
        Args:
            accountId: faces account id
            faces: faces ids
            createTimeGte: creation time greater
            createTimeLt: create time lower
            userData: user data
            listId: list id where face exists
        Returns:
            removed face count
        """
        filters = [
            models.Face.face_id.in_(faces) if faces is not None else True,
            models.Face.user_data.like("%{}%".format(userData)) if userData is not None else True,
            models.Face.create_time >= createTimeGte if createTimeGte is not None else True,
            models.Face.create_time < createTimeLt if createTimeLt is not None else True,
            models.Face.account_id == accountId if accountId is not None else True,
            (
                and_(
                    models.ListFace.face_id == models.Face.face_id,
                    models.ListFace.list_id == listId,
                )
                if listId is not None
                else True
            ),
        ]
        async with self.adaptor.connection(logger) as connection:
            async def updateHistoryLog(facesForRemove):
                linkedLists = await self._moveFacesLinksToLog(facesForRemove, connection)
                logger.debug(f"update history log")
                return linkedLists
            if faces:
                logger.debug(f"start delete {len(faces)} faces")
            _, blockedFaces = await self.blockFaces(connection, filters=filters, returnBlockedFaces=True)
            updatedLists = await updateHistoryLog(blockedFaces)
            deleteFacesSt = delete(models.Face).where(and_(*filters))
            removedFaceCount = await connection.execute(deleteFacesSt)
            logger.debug(f"delete {removedFaceCount} faces")
        self._addListsToDeferrerUpdateListLastUpdateTime(updatedLists)
        return removedFaceCount 
[docs]
    @exceptionWrap
    async def deleteFacesExtended(
        self,
        targets: list[str],
        accountId: str | None = None,
        createTimeGte: str | None = None,
        createTimeLt: str | None = None,
        userData: str | None = None,
        listId: str | None = None,
    ) -> list[dict] | None:
        """
        Remove faces.
        Args:
            targets: deletion info targets
            accountId: faces account id
            createTimeGte: creation time greater
            createTimeLt: create time lower
            userData: user data
            listId: list id where face exists
        Returns:
            list with dicts for each deleted face.
        """
        filters = and_(
            models.Face.user_data.like("%{}%".format(userData)) if userData is not None else True,
            models.Face.create_time >= createTimeGte if createTimeGte is not None else True,
            models.Face.create_time < createTimeLt if createTimeLt is not None else True,
            models.Face.account_id == accountId if accountId is not None else True,
            (
                and_(
                    models.ListFace.face_id == models.Face.face_id,
                    models.ListFace.list_id == listId,
                )
                if listId is not None
                else True
            ),
        )
        async def collectFacesToDeletion(conn) -> Iterable[dict]:
            """Get targets from Sample table."""
            selectFaceIds = Query([models.Face.face_id]).filter(filters).limit(DELETE_REQUEST_MAX_SIZE)
            faceData = await conn.fetchall(selectFaceIds.statement)
            faceDict = {
                row["face_id"]: {
                    "face_id": row["face_id"],
                    "basic_attributes_samples": [],
                    "face_descriptor_samples": [],
                }
                for row in faceData
            }
            faceIds = list(faceDict.keys())
            sampleTypes = []
            if "basic_attributes_samples" in targets:
                sampleTypes.append(models.SampleType["basic_attributes"].value)
            if "face_descriptor_samples" in targets:
                sampleTypes.append(models.SampleType["face_descriptor"].value)
            if sampleTypes:
                selectQuery = Query([models.Sample.face_id, models.Sample.sample_id, models.Sample.type]).filter(
                    and_(models.Sample.face_id.in_(faceIds), models.Sample.type.in_(sampleTypes))
                )
                sampleData = await conn.fetchall(selectQuery.statement)
                for row in sampleData:
                    if row["face_id"] in faceDict:
                        if row["type"] == models.SampleType["basic_attributes"].value:
                            faceDict[row["face_id"]]["basic_attributes_samples"].append(row["sample_id"])
                        else:
                            faceDict[row["face_id"]]["face_descriptor_samples"].append(row["sample_id"])
            targetsList = []
            for value in faceDict.values():
                targetItem = {}
                if "basic_attributes_samples" in targets:
                    targetItem["basic_attributes_samples"] = value["basic_attributes_samples"]
                if "face_descriptor_samples" in targets:
                    targetItem["face_descriptor_samples"] = value["face_descriptor_samples"]
                if "face_id" in targets:
                    targetItem["face_id"] = value["face_id"]
                targetsList.append(targetItem)
            return faceIds, targetsList
        async with self.adaptor.connection(logger) as connection:
            if targets:
                faceIds, targetsData = await collectFacesToDeletion(connection)
                await self.deleteFaces(faces=faceIds, accountId=accountId)
                return targetsData
            await self.deleteFaces(
                accountId=accountId,
                createTimeGte=createTimeGte,
                createTimeLt=createTimeLt,
                userData=userData,
                listId=listId,
            )
            return None 
    @exceptionWrap
    async def _deleteLists(self, lists: list[str], accountId: Optional[str] = None) -> int:
        """
        Remove lists.
        Args:
            accountId: lists account id
            lists: lists
        Returns:
            removed list count
        Warnings:
            trigger `trg_lists_deletion_log` will insert a data to the table `ListsDeletionLog`
        """
        async with self.adaptor.connection(logger) as connection:
            deleteListsSt = delete(models.List).where(
                and_(models.List.list_id.in_(lists), models.List.account_id @ accountId)
            )
            return await connection.execute(deleteListsSt)
[docs]
    @exceptionWrap
    async def deleteLists(self, lists: list[str], accountId: Optional[str] = None, withFaces: bool = False) -> int:
        """
        Remove lists.
        Args:
            accountId: lists account id
            lists: lists
            withFaces: remove lists with all faces which is contained in these lists
        Returns:
            removed list count
        """
        if not withFaces:
            return await self._deleteLists(lists, accountId)
        listsFilters = [models.List.list_id.in_(lists), models.List.account_id @ accountId]
        async with self.adaptor.connection(logger) as connection:
            query = Query([models.List.list_id]).filter(and_(*listsFilters)).with_for_update()
            blockListsRes = await connection.fetchall(query.statement)
            if not blockListsRes:
                return 0
            listsToRealDeletion = [blockedList[0] for blockedList in blockListsRes]
            query = select([models.ListFace.face_id]).where(
                and_(
                    models.List.list_id.in_(listsToRealDeletion),
                    models.List.list_id == models.ListFace.list_id,
                    models.List.account_id @ accountId,
                )
            )
            faceIdsToDeletion = [row[0] for row in await connection.fetchall(query)]
            deleteListsSt = delete(models.List).where(and_(*listsFilters))
            removedListCount = await connection.execute(deleteListsSt)
        pageSize = BACKGROUND_REMOVAL_BATCH_SIZE
        pageCount = getPageCount(len(faceIdsToDeletion), pageSize)
        coros = [
            self.deleteFaces(faces=faceIdsToDeletion[page * pageSize : (page + 1) * pageSize])
            for page in range(pageCount)
        ]
        self.backgroundWorker.runNoWait(coros)
        return removedListCount 
[docs]
    @exceptionWrap
    async def updateListUserData(self, listId: str, userData: str, accountId: Optional[str] = None) -> int:
        """
        Update user data of list
        Args:
            listId: list id
            userData: user data
            accountId : account id
        Returns:
            updated list count
        """
        async with self.adaptor.connection(logger) as connection:
            updateListSt = (
                update(models.List)
                .where(and_(models.List.list_id == listId, models.List.account_id @ accountId))
                .values(user_data=userData)
            )
            return await connection.execute(updateListSt) 
[docs]
    @exceptionWrap
    async def isFacesExist(self, faceIds: list[str], accountId: Optional[str] = None) -> bool:
        """
        Checking to exist faces or not.
        Args:
            accountId: account id
            faceIds: face ids
        Returns:
            True if all faces exist else false
        """
        async with self.adaptor.connection(logger) as connection:
            query = Query([func.count(models.Face.face_id).label("count")])
            query = query.filter(models.Face.face_id.in_(faceIds), models.Face.account_id @ accountId)
            res = await connection.fetchone(query.statement)
            faceCount = res[0]
            return True if faceCount == len(faceIds) else False 
[docs]
    @exceptionWrap
    async def isListsExist(self, listIds: set[str], accountId: Optional[str] = None) -> bool:
        """
        Checking to exist lists or not.
        Args:
            accountId: account id
            listIds: list ids
        Returns:
            True if all lists exist else false
        """
        async with self.adaptor.connection(logger) as connection:
            query = Query([func.count(models.List.list_id).label("count")])
            query = query.filter(models.List.list_id.in_(listIds), models.List.account_id @ accountId)
            res = await connection.fetchone(query.statement)
            listCount = res[0]
            return True if listCount == len(listIds) else False 
[docs]
    @exceptionWrap
    async def getListPlusDelta(
        self,
        listId,
        linkKeyGte: Optional[int] = None,
        linkKeyLt: Optional[int] = None,
        limit: int = 10000,
        parity: Optional[int] = None,
    ) -> list[dict]:
        """
        Get attach attributes to lists
        Args:
            listId: list id
            linkKeyLt: upper bound of link key value
            linkKeyGte: lower bound of link key value
            limit: limit
            parity: 0 for odd or 1 for even link keys to search for
        Returns:
            List of dicts with following keys: "face_id", "link_key"
        """
        async with self.adaptor.connection(logger) as connection:
            filters = and_(
                models.ListFace.link_key < linkKeyLt if linkKeyLt is not None else True,
                models.ListFace.link_key >= linkKeyGte if linkKeyGte is not None else True,
                models.ListFace.list_id == listId,
                models.Attribute.face_id == models.ListFace.face_id,
                mod(models.ListFace.link_key, 2) == parity if parity is not None else True,
            )
            query = Query([models.Attribute.face_id, models.ListFace.link_key])
            query = query.filter(filters).order_by(models.ListFace.link_key.asc()).limit(limit)
            attributesRows = await connection.fetchall(query.statement)
            # used by matcher
            res = [dict(zip(("attribute_id", "link_key"), attributesRow)) for attributesRow in attributesRows]
            return res 
[docs]
    @exceptionWrap
    async def getListMinusDelta(
        self,
        listId,
        unlinkKeyGte: Optional[int] = None,
        unlinkKeyLt: Optional[int] = None,
        limit: int = 10000,
        parity: Optional[int] = None,
    ) -> list[dict]:
        """
        Get history of detach attribute to lists
        Args:
            listId: list id
            unlinkKeyLt: upper bound of unlink key value
            unlinkKeyGte: lower bound of unlink key value
            limit: limit
            parity: 0 for odd or 1 for even link keys to search for
        Returns:
            List of dicts with following keys: "face_id", "link_key", "unlink_key"
        """
        async with self.adaptor.connection(logger) as connection:
            filters = and_(
                models.UnlinkAttributesLog.unlink_key < unlinkKeyLt if unlinkKeyLt is not None else True,
                models.UnlinkAttributesLog.unlink_key >= unlinkKeyGte if unlinkKeyGte is not None else True,
                models.UnlinkAttributesLog.list_id == listId,
                mod(models.UnlinkAttributesLog.unlink_key, 2) == parity if parity is not None else True,
                models.UnlinkAttributesLog.face_id != None,
            )
            query = Query(
                [
                    models.UnlinkAttributesLog.face_id,
                    models.UnlinkAttributesLog.link_key,
                    models.UnlinkAttributesLog.unlink_key,
                ]
            )
            query = query.filter(filters).order_by(models.UnlinkAttributesLog.unlink_key.asc()).limit(limit)
            attributesRows = await connection.fetchall(query.statement)
            res = [
                dict(zip(("attributes_id", "link_key", "unlink_key"), attributesRow))
                for attributesRow in attributesRows
            ]
            return res 
[docs]
    @exceptionWrap
    async def cleanLog(self, updateTimeLt: Optional[datetime] = None) -> None:
        """
        Remove notes from unlink tables.
        Args:
            updateTimeLt: lower bound of update time
        """
        async with self.adaptor.connection(logger) as connection:
            cleanLogSt = delete(models.UnlinkAttributesLog).where(
                and_(models.UnlinkAttributesLog.update_time < updateTimeLt if updateTimeLt is not None else True)
            )
            await connection.execute(cleanLogSt) 
[docs]
    @exceptionWrap
    async def getFacesAttributes(
        self,
        faceIds: list[str],
        retrieveAttrs: Iterable[str],
        descriptorVersion: int,
        accountId: Optional[str],
        getDescriptorAsBytes: bool = False,
        descriptorFormat: Literal["raw", "sdk"] = "raw",
    ) -> list[dict[str, Any]]:
        """
        Retrieve attributes
        Args:
            faceIds: face ids
            retrieveAttrs: tuple of attributes what need retrieve
            descriptorVersion: requested descriptor version
            accountId: account id
            getDescriptorAsBytes: whether to get descriptor as bytes otherwise as base64
            descriptorFormat: output descriptor format
        Returns:
            face attributes list with items with the following properties:
                face_id: face id
                attributes: target-specific face attributes dict
            If some attribute data is not found it will be filled with 'empty' value:
                None for attributes, [] for samples
        """
        faceAttributes = {}
        async with self.adaptor.connection(logger) as connection:
            async def getAttributeModelData(
                faceIds_: list[str], accountId_: Optional[str], retrieveAttrs_: Iterable[str], descriptorVersion_: int
            ) -> Iterable[dict]:
                """Get attribute data from Attribute and Descriptor tables."""
                filters = [models.Face.face_id.in_(faceIds_), models.Face.account_id @ accountId_]
                selectQuery = Query([models.Face.face_id])
                if (
                    FaceAttributeTargets.createTime.value in retrieveAttrs_
                    or FaceAttributeTargets.basicAttributes.value in retrieveAttrs_
                ):
                    if FaceAttributeTargets.createTime.value in retrieveAttrs_:
                        selectQuery = selectQuery.add_column(models.Attribute.create_time)
                    if FaceAttributeTargets.basicAttributes.value in retrieveAttrs_:
                        selectQuery = selectQuery.add_columns(
                            models.Attribute.age, models.Attribute.gender, models.Attribute.ethnicity
                        )
                    selectQuery = selectQuery.outerjoin(
                        models.Attribute, models.Attribute.face_id == models.Face.face_id
                    )
                if FaceAttributeTargets.faceDescriptor.value in retrieveAttrs:
                    selectQuery = selectQuery.add_column(self.getQuery4Binary(models.Descriptor.descriptor))
                    selectQuery = selectQuery.outerjoin(
                        models.Descriptor,
                        and_(
                            models.Descriptor.face_id == models.Face.face_id,
                            models.Descriptor.descriptor_version == descriptorVersion_,
                            models.Descriptor.encryption_hash == self._encryptionHashFilter,
                        ),
                    )
                selectQuery = selectQuery.filter(and_(*filters))
                attributesData = await connection.fetchall(selectQuery.statement)
                responseFields = [str(field) for field in selectQuery.statement.columns]
                attributes_ = [{name: data for name, data in zip(responseFields, row)} for row in attributesData]
                return attributes_
            async def getSampleModelData(faceIds_: list[str], accountId_: Optional[str]) -> Iterable[dict]:
                """Get attribute data from Sample table."""
                filters = [models.Face.face_id.in_(faceIds_), models.Face.account_id @ accountId_]
                selectQuery = (
                    Query([models.Face.face_id, models.Sample.sample_id, models.Sample.type])
                    .outerjoin(models.Sample, models.Sample.face_id == models.Face.face_id)
                    .filter(and_(*filters))
                )
                samplesData = await connection.fetchall(selectQuery.statement)
                responseFields = [str(field) for field in selectQuery.statement.columns]
                samples_ = [{name: data for name, data in zip(responseFields, row)} for row in samplesData]
                return samples_
            def updateFaceAttributesWithAttributeData(retrieveAttrs_: Iterable[str], attributes_: Iterable[dict]):
                """Update faceAttributes result dict with attribute data."""
                for attribute in attributes_:
                    attributeFaceId = attribute.pop("face_id")
                    faceAttributes[attributeFaceId] = {}
                    if FaceAttributeTargets.createTime.value in retrieveAttrs_:
                        createTime = attribute.pop(FaceAttributeTargets.createTime.value)
                        if createTime is not None:
                            createTime = convertTimeToString(createTime, self.storageTime == "UTC")
                        faceAttributes[attributeFaceId][FaceAttributeTargets.createTime.value] = createTime
                    if FaceAttributeTargets.faceDescriptor.value in retrieveAttrs_:
                        descriptor = attribute.pop("descriptor")
                        if descriptor is not None:
                            if descriptorFormat == "raw":
                                descriptorData = {
                                    "descriptor": descriptor if getDescriptorAsBytes else bytesToBase64(descriptor),
                                    "descriptor_version": descriptorVersion,
                                }
                            elif descriptorFormat == "sdk":
                                descriptorData = sdkDescriptorEncode(descriptorVersion, descriptor)
                                if not getDescriptorAsBytes:
                                    descriptorData = bytesToBase64(descriptorData)
                            else:
                                raise ValueError(f"Unknown descriptor format {descriptorFormat}")
                        else:
                            descriptorData = None
                        faceAttributes[attributeFaceId][FaceAttributeTargets.faceDescriptor.value] = descriptorData
                    if FaceAttributeTargets.basicAttributes.value in retrieveAttrs_:
                        basicAttributes = attribute if any(attr is not None for attr in attribute.values()) else None
                        faceAttributes[attributeFaceId][FaceAttributeTargets.basicAttributes.value] = basicAttributes
            def updateFaceAttributesWithSampleData(retrieveAttrs_: Iterable[str], samples_: Iterable[dict]):
                """Update faceAttributes result dict with sample data."""
                for sample in samples_:
                    sampleType = sample["type"]
                    if sampleType is None:
                        for attr in (SampleType.face_descriptor, SampleType.basic_attributes):
                            attrType = f"{attr.name}{SAMPLE_POSTFIX}"
                            if attrType in retrieveAttrs_:
                                faceAttributes.setdefault(sample["face_id"], {})[attrType] = []
                    else:
                        attrType = f"{SampleType(sampleType).name}{SAMPLE_POSTFIX}"
                        if attrType in retrieveAttrs_:
                            faceAttributes.setdefault(sample["face_id"], {}).setdefault(attrType, []).append(
                                sample["sample_id"]
                            )
            if {
                FaceAttributeTargets.createTime.value,
                FaceAttributeTargets.basicAttributes.value,
                FaceAttributeTargets.faceDescriptor.value,
            } | set(retrieveAttrs):
                attributes = await getAttributeModelData(
                    faceIds_=faceIds,
                    accountId_=accountId,
                    retrieveAttrs_=retrieveAttrs,
                    descriptorVersion_=descriptorVersion,
                )
                updateFaceAttributesWithAttributeData(retrieveAttrs_=retrieveAttrs, attributes_=attributes)
            if (
                FaceAttributeTargets.basicAttributesSamples.value in retrieveAttrs
                or FaceAttributeTargets.faceDescriptorSamples.value in retrieveAttrs
            ):
                samples = await getSampleModelData(faceIds_=faceIds, accountId_=accountId)
                updateFaceAttributesWithSampleData(retrieveAttrs_=retrieveAttrs, samples_=samples)
            return [{"face_id": faceId, "attributes": attribute} for faceId, attribute in faceAttributes.items()] 
[docs]
    @exceptionWrap
    async def getFacesAttributeSamples(self, faceId: list[str], accountId: Optional[str]) -> list[str]:
        """
        Get all the attribute samples of the specified face
        Args:
            faceId: face id
            accountId: account id
        Returns:
            face attribute samples list
        Raises:
            VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False) if face not found
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = select([models.Sample.sample_id]).where(
                and_(
                    models.Sample.face_id == faceId,
                    models.Face.account_id @ accountId,
                    models.Sample.face_id == models.Face.face_id,
                )
            )
            sampleIds = list(set(chain(*await connection.fetchall(selectSt))))
            if not sampleIds:
                # Check the face existence
                selectSt = select([models.Face.face_id]).where(
                    and_(models.Face.face_id == faceId, models.Face.account_id @ accountId)
                )
                if await connection.scalar(selectSt) is None:
                    raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
            return sampleIds 
[docs]
    @exceptionWrap
    async def putFaceAttributes(self, faceId: str, attribute: TemporaryAttributes, accountId: Optional[str]) -> None:
        """
        Create attributes with exception wrap
        Args:
            faceId: face id
            attribute: temporary attribute container
            accountId: account id
        Raises:
            VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False) if face not found
        """
        async with self.adaptor.connection(logger) as connection:
            async def updateHistoryLog():
                linkedLists = await self._moveFacesLinksToLog((faceId,), connection)
                updateFaceListSt = (
                    update(models.ListFace)
                    .where(models.ListFace.face_id == faceId)
                    .values(last_update_time=self.currentDBTimestamp, link_key=LINK_SEQUENCE.next_value())
                )
                await connection.execute(updateFaceListSt)
                return linkedLists
            # Do select for update
            lockFaceCount, _ = await self.blockFaces(
                connection, filters=[models.Face.face_id == faceId, models.Face.account_id @ accountId]
            )
            if lockFaceCount == 0:
                raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
            linkedLists = await updateHistoryLog()
            await connection.execute(delete(models.Attribute).where(models.Attribute.face_id == faceId))
            await self.insertFaceAttributeData(connection, faceId=faceId, attribute=attribute)
        self._addListsToDeferrerUpdateListLastUpdateTime(linkedLists) 
    @exceptionWrap
    async def _updateAttributeDescriptor(
        self, connection: AbstractDBConnection, faceId: str, descriptors: list[Descriptor], generation: int
    ) -> None:
        """
        Partial update attribute
        Args:
            connection: connection
            faceId: face id
            descriptors: list of descriptor containers for patch
            generation: descriptor generation
        """
        if not descriptors:
            return
        deleteSt = delete(models.Descriptor).where(
            and_(models.Descriptor.face_id == faceId, models.Descriptor.encryption_hash != self._encryptionHashFilter)
        )
        await connection.execute(deleteSt)
        descriptorData = [
            {
                "descriptor_version": descriptor.version,
                "descriptor": descriptor.descriptor,
                "encryption_hash": self.encryptionCtx.encryptionHash,
                "descriptor_obtaining_method": 1,
            }
            for descriptor in descriptors
        ]
        for descriptorDataItem in descriptorData:
            values = {k: v for k, v in descriptorDataItem.items() if k != "descriptor_version"}
            updateSt = (
                update(models.Descriptor)
                .where(
                    and_(
                        models.Descriptor.face_id == faceId,
                        models.Descriptor.descriptor_version == descriptorDataItem["descriptor_version"],
                    )
                )
                .values(**values)
            )
            if not await connection.execute(updateSt):
                # insert full values if cannot update
                insertDescriptorSt = insert(models.Descriptor).values(
                    **descriptorDataItem, face_id=faceId, descriptor_generation=generation
                )
                await connection.execute(insertDescriptorSt)
[docs]
    @exceptionWrap
    async def updateFaceAttributes(
        self,
        faceId: str,
        attribute: TemporaryAttributes,
        accountId: Optional[str] = None,
        forceUpdate: Optional[bool] = False,
    ) -> None:
        """
        Update attribute
        Args:
            faceId: updated attribute face id
            attribute: temporary attributes container
            accountId: account id
            forceUpdate: whether not to compare existing samples and new ones
        Raises:
            VLException(Error.FaceSampleConflict.format(faceId), 400, False) if samples conflict
            VLException(Error.AttributesForUpdateNotFound.format(faceId), 400, False) if attribute for update not found
            VLException(Error.FaceNotFound.format(faceId), 404, False) if face not found
        """
        async with self.adaptor.connection(logger) as connection:
            async def checkSamples(
                faceId_: str, filteredSamples_: dict[int, set[str]], updDescriptorVersions_: list[int]
            ) -> None:
                """
                Check conformity of old and new sample ids.
                Args:
                    faceId_: face id
                    filteredSamples_: {<sample type>: <samples>} map
                    updDescriptorVersions_: versions of updating descriptors
                Raises:
                    VLException(Error.FaceSampleConflict.format(attributeId), 400, False) if old samples do not
                                                                                match specified samples in attribute
                """
                isNeedToCheckSamples = not forceUpdate
                for sampleTypeToCheck in SampleType:
                    sampleTypeToCheck = sampleTypeToCheck.value
                    if sampleTypeToCheck not in filteredSamples_:
                        continue
                    newSamplesSet = filteredSamples_[sampleTypeToCheck]
                    selectSt = select([models.Sample.sample_id]).where(
                        and_(models.Sample.face_id == faceId_, models.Sample.type == sampleTypeToCheck)
                    )
                    if sampleTypeToCheck == SampleType.face_descriptor.value:
                        # (if descriptor samples seems to be updated and descriptors not fully updated)
                        dbReply = await connection.fetchall(selectSt)
                        existingSampleIds = set(chain(*dbReply))
                        if existingSampleIds and existingSampleIds != newSamplesSet:
                            # mb replace all descriptors
                            selectSt = select([models.Descriptor.descriptor_version]).where(
                                and_(
                                    models.Descriptor.face_id == faceId_,
                                    not_(models.Descriptor.descriptor_version.in_(updDescriptorVersions_)),
                                )
                            )
                            notUpdatedDescriptorVersions = await connection.fetchall(selectSt)
                            if notUpdatedDescriptorVersions:
                                raise VLException(Error.FaceSampleConflict.format(faceId_), 400, False)
                    else:
                        # basic attributes samples
                        if not isNeedToCheckSamples:
                            continue
                        dbReply = await connection.fetchall(selectSt)
                        existingSampleIds = set(chain(*dbReply))
                        if existingSampleIds and existingSampleIds != newSamplesSet:
                            raise VLException(Error.FaceSampleConflict.format(faceId_), 400, False)
            async def updateAttributeModel(
                faceId_: str,
                accountId_: Optional[str],
                basicAttributes: Optional[BasicAttributes],
                isNeedToUpdGeneration: bool,
            ) -> int:
                """
                Update attribute model.
                Increase current descriptor's generation if it needs (depends on descriptor samples in attributes)
                Args:
                    faceId_: face id
                    accountId_: account id
                    basicAttributes: basic attributes container
                    isNeedToUpdGeneration: whether to increase descriptor sample generation or not
                Returns:
                    descriptor generation
                """
                if accountId is None:
                    filters = and_(models.Attribute.face_id == faceId_)
                else:
                    filters = exists([models.Attribute.face_id]).where(
                        and_(
                            models.Face.face_id == faceId_,
                            models.Face.account_id == accountId_,
                            models.Attribute.face_id == models.Face.face_id,
                        )
                    )
                values = {
                    "descriptor_samples_generation": models.Attribute.descriptor_samples_generation
                    + int(isNeedToUpdGeneration)
                }
                if basicAttributes is not None:
                    values.update(
                        {
                            "age": basicAttributes.age,
                            "gender": basicAttributes.gender,
                            "ethnicity": basicAttributes.ethnicity,
                        }
                    )
                updateQuery = (
                    update(models.Attribute)
                    .where(filters)
                    .values(**values)
                    .returning(models.Attribute.descriptor_samples_generation)
                )
                generation = await connection.scalar(updateQuery)
                if generation is None:
                    raise VLException(Error.AttributesForUpdateNotFound.format(faceId_), 400, isCriticalError=False)
                return generation
            async def replaceSamples(faceId_: str, filteredSamples_: dict[int, set[str]]) -> None:
                """
                Replace samples
                Args:
                    faceId_: face id
                    filteredSamples_: {<sample type>: <samples>} map
                """
                for sampleType, samples in filteredSamples_.items():
                    deleteSt = delete(models.Sample).where(
                        and_(models.Sample.type == sampleType, models.Sample.face_id == faceId_)
                    )
                    await connection.execute(deleteSt)
                for sampleType, sampleIds in filteredSamples_.items():
                    for sampleId in sampleIds:
                        query = insert(models.Sample).values(face_id=faceId_, sample_id=sampleId, type=sampleType)
                        await connection.execute(query)
            async def recreateLinks(faceId_: str, updDescriptorVersions_: list[int]) -> list[str]:
                """
                Recreate face-list links if the descriptor of the current version was changed.
                Needed for matcher if a descriptor was changed.
                Args:
                    faceId_: face id
                    updDescriptorVersions_: versions of updating descriptors
                """
                needToUpdate = self.defaultDescriptorVersion in updDescriptorVersions_
                if not needToUpdate:
                    return []
                updateLinkKeysSt = (
                    update(models.ListFace)
                    .where(models.ListFace.face_id.in_([faceId_]))
                    .values(link_key=LINK_SEQUENCE.next_value())
                )
                linkedLists = await self._moveFacesLinksToLog((faceId,), connection)
                await connection.execute(updateLinkKeysSt)
                return linkedLists
            # Do select for update
            lockFaceCount, _ = await self.blockFaces(
                connection, filters=[models.Face.face_id == faceId, models.Face.account_id @ accountId]
            )
            if lockFaceCount == 0:
                raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
            # Prepare map: {<sample type>: <samples>}
            filteredSamples = {}
            if len(attribute.basicAttributesSamples) > 0:
                filteredSamples[SampleType[AttributeSample.basic_attributes.value].value] = set(
                    attribute.basicAttributesSamples
                )
            if len(attribute.descriptorSamples) > 0:
                filteredSamples[SampleType[AttributeSample.face_descriptors.value].value] = set(
                    attribute.descriptorSamples
                )
            # Check no sample conflict
            updDescriptorVersions = [descr.version for descr in attribute.descriptors]
            await checkSamples(
                faceId_=faceId, filteredSamples_=filteredSamples, updDescriptorVersions_=updDescriptorVersions
            )
            # Update attribute data
            isNeedToUpdGeneration = filteredSamples.get(SampleType.face_descriptor.value) is not None
            generation = await updateAttributeModel(
                faceId_=faceId,
                accountId_=accountId,
                basicAttributes=attribute.basicAttributes,
                isNeedToUpdGeneration=isNeedToUpdGeneration,
            )
            if forceUpdate:
                await replaceSamples(faceId_=faceId, filteredSamples_=filteredSamples)
            await self._updateAttributeDescriptor(
                connection=connection, faceId=faceId, descriptors=attribute.descriptors, generation=generation
            )
            listsForUpdate = await recreateLinks(faceId_=faceId, updDescriptorVersions_=updDescriptorVersions)
        self._addListsToDeferrerUpdateListLastUpdateTime(listsForUpdate) 
[docs]
    @exceptionWrap
    async def deleteFaceAttributes(self, faceId: str, accountId: Optional[str] = None) -> None:
        """
        Delete face attributes.
        Args:
            faceId: face id to remove its attributes
            accountId: account id
        """
        async with self.adaptor.connection(logger) as connection:
            await self.blockFaces(connection, [models.Face.face_id == faceId])
            faceCheckFilter = and_(models.Face.face_id == faceId, models.Face.account_id @ accountId)
            updateFaceSt = update(models.Face).where(faceCheckFilter).values(last_update_time=self.currentDBTimestamp)
            faceCount = await connection.execute(updateFaceSt)
            if not faceCount:
                raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
            updateListSt = (
                update(models.List)
                .where(
                    exists(
                        select([models.List.list_id]).where(
                            and_(
                                models.List.list_id == models.ListFace.list_id,
                                models.ListFace.face_id == models.Face.face_id,
                                faceCheckFilter,
                            )
                        )
                    )
                )
                .values(last_update_time=self.currentDBTimestamp)
            )
            await connection.execute(updateListSt)
            deleteAttributeSt = delete(models.Attribute).where(
                exists(
                    select([models.Attribute.face_id]).where(
                        and_(faceCheckFilter, models.Attribute.face_id == models.Face.face_id)
                    )
                )
            )
            await connection.execute(deleteAttributeSt) 
[docs]
    @exceptionWrap
    async def getDescriptorsBatchByFaceIds(
        self,
        facesIds: set[str],
        accountId: Optional[str] = None,
        checkObjectExistence: bool = True,
        descriptorVersion: Optional[int] = None,
        receiveExternalId: Optional[bool] = None,
    ) -> dict[str, Union[list[bytes], int, list[str]]]:
        """
        Get descriptors batch by faces Ids.
        ! Splitting into batches is used due to enormous queries (25Mb for 100k faces) that fails Oracle DB. (LUNA-3734)
        Args:
            facesIds: faces ids the descriptors were attached to
            accountId: account id of the faces
            checkObjectExistence: check faces existence or not
            descriptorVersion: requested descriptor version
            receiveExternalId: receive or not external ids
        Returns:
            Dict:
                descriptors: list[bytes]
                descriptorVersion: int
                faceUuids: list[str]
                notFoundUuids: list[str]
                notExtractedUuids: list[str]
                externalIds: list[str]
        Raises:
            VLException(Error.FacesNotFound)
        """
        if descriptorVersion is None:
            descriptorVersion = self.defaultDescriptorVersion
        selectFields = [self.getQuery4Binary(models.Descriptor.descriptor), models.Face.face_id]
        if receiveExternalId:
            selectFields.append(models.Face.external_id)
        # 1k batching splitting
        facesIdsList = list(facesIds)
        faceIdsBatches = [facesIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(facesIdsList), DB_LIST_LIMIT)]
        queries = [
            select(selectFields).where(
                and_(
                    models.Descriptor.face_id.in_(faceIds_),
                    models.Descriptor.descriptor_version == descriptorVersion,
                    # if need filtration by account
                    models.Face.face_id == models.Descriptor.face_id,
                    models.Face.account_id @ accountId,
                )
            )
            for faceIds_ in faceIdsBatches
        ]
        getNotExtractedQuery = lambda existFaceIds: Query([models.Face.face_id]).filter(
            and_(
                models.Face.face_id.in_(list(set(facesIds).difference(set(existFaceIds)))),
                models.Face.account_id @ accountId,
                ~exists(
                    select([models.Face.face_id]).where(
                        and_(
                            models.Face.face_id == models.Descriptor.face_id,
                            models.Descriptor.descriptor_version == descriptorVersion,
                        )
                    )
                ),
            )
        )
        async with self.adaptor.connection(logger) as connection:
            unitedResults = []
            for query in queries:
                unitedResults.extend(await connection.fetchall(query))
            # 1k batching join
            if receiveExternalId:
                descriptors, replyFaceIds, externalIds = tuple(zip(*unitedResults)) or ([], [], [])
                externalIds = list(map(lambda x: x or "", externalIds))
            else:
                descriptors, replyFaceIds = tuple(zip(*unitedResults)) or ([], [])
                externalIds = None
            notFoundUuids = []
            if len(replyFaceIds) != len(facesIds):
                notExtractedFaceIds = [
                    faceId[0] for faceId in (await connection.fetchall(getNotExtractedQuery(replyFaceIds).statement))
                ]
                if checkObjectExistence:
                    firstNotFoundFace = await self.getNonexistentFaceId(facesIds, accountId, connection)
                    if firstNotFoundFace:
                        raise VLException(Error.FacesNotFound.format(firstNotFoundFace), 400, isCriticalError=False)
                else:
                    notFoundUuids = list(facesIds.difference(notExtractedFaceIds).difference(replyFaceIds))
            else:
                notExtractedFaceIds = []
        result = dict(
            descriptors=list(map(bytes, descriptors)),
            uuids=replyFaceIds,
            descriptorVersion=descriptorVersion,
            notFoundUuids=notFoundUuids,
            notExtractedUuids=notExtractedFaceIds,
            externalIds=externalIds,
        )
        return result 
[docs]
    @exceptionWrap
    async def getListFacesDescriptorsBatch(
        self,
        listId: str,
        linkKeyGte: int,
        limit: int,
        descriptorVersion: Optional[int] = None,
        parity: Optional[int] = None,
        receiveExternalId: Optional[bool] = None,
    ) -> dict[str, Union[list[bytes], int, list[int], list[str]]]:
        """
        Get descriptors batch
        Args:
            listId: list id the descriptors were attached to
            linkKeyGte: the lower including boundary
            limit: descriptors count to return
            descriptorVersion: descriptor version
            parity: 0 for odd or 1 for even link keys to search for
            receiveExternalId: receive or not external ids
        Returns:
            Dict:
                descriptors: list[bytes]
                descriptorVersion: int
                faceUuids: list[str]
                linkKeys: list[int]
                faceIds: list[str]
                externalIds: list[str]
        """
        if descriptorVersion is None:
            descriptorVersion = self.defaultDescriptorVersion
        selectFields = [
            self.getQuery4Binary(models.Descriptor.descriptor),
            models.ListFace.link_key,
            models.Descriptor.face_id,
        ]
        if receiveExternalId:
            selectFields.append(models.Face.external_id)
        query = Query(selectFields).filter(
            models.Descriptor.face_id == models.Face.face_id if receiveExternalId else True,
            models.Descriptor.face_id == models.ListFace.face_id,
            models.Descriptor.descriptor_version == descriptorVersion,
            models.ListFace.list_id == listId,
            models.ListFace.link_key >= linkKeyGte,
            mod(models.ListFace.link_key, 2) == parity if parity is not None else True,
        )
        if parity is not None:
            query = query.order_by(
                models.ListFace.list_id, mod(models.ListFace.link_key, 2), models.ListFace.link_key.asc()
            )
        else:
            query = query.order_by(models.ListFace.link_key.asc())
        query = query.limit(limit)
        async with self.adaptor.connection(logger) as connection:
            if self.dbConfig.type == "oracle":
                global indexFaceName, indexDescriptorName
                if not (indexFaceName is indexDescriptorName is None):
                    query = query.with_hint(
                        models.ListFace,
                        f"INDEX(LIST_FACE LINK_KEY_FUNC_INDEX) INDEX(FACE {indexFaceName}) "
                        f"INDEX(DESCRIPTOR {indexDescriptorName})",
                    )
                else:
                    logger.warning("Database indexes may not using")
            if self.dbConfig.type == "oracle":
                queryCursor = await connection.fetchall(getCompiledQuery(query, self.dbConfig.type))
            else:
                # for postgres compiling wrong request in  mod(models.ListFace.link_key, 2) == parity
                queryCursor = await connection.fetchall(query.statement)
            if receiveExternalId:
                descriptors, linkKeys, faceIds, externalIds = tuple(zip(*queryCursor)) or ([], [], [], [])
                externalIds = list(map(lambda x: x or "", externalIds))
            else:
                descriptors, linkKeys, faceIds = tuple(zip(*queryCursor)) or ([], [], [])
                externalIds = None
        result = dict(
            descriptors=descriptors,
            descriptorVersion=descriptorVersion,
            linkKeys=linkKeys,
            uuids=faceIds,
            externalIds=externalIds,
        )
        return result 
    def _prepareMissingDescriptorsFilters(
        self,
        missingVersion,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
    ) -> and_:
        """
        Prepare filters for missing descriptors query.
        Args:
            missingVersion: missing descriptor version
            accountId: account id of the attributes
            faceIds: list of face ids
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
        Returns:
            prepared sa.and_() with filters
        """
        nestedDescriptor = aliased(models.Descriptor)  # table from the nested query need to be aliased
        return and_(
            # whether we need to reextract
            models.Attribute.face_id == models.Descriptor.face_id,
            models.Descriptor.descriptor_version == self.defaultDescriptorVersion,
            # filters
            models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
            models.Face.account_id @ accountId,
            models.Attribute.face_id >= faceIdGte if faceIdGte is not None else True,
            models.Attribute.face_id < faceIdLt if faceIdLt is not None else True,
            models.Attribute.face_id.in_(faceIds) if faceIds is not None else True,
            # whether descriptor was not reextracted
            ~exists(
                select([nestedDescriptor.descriptor_version]).where(
                    and_(
                        nestedDescriptor.face_id == models.Attribute.face_id,
                        nestedDescriptor.descriptor_version == missingVersion,
                        nestedDescriptor.descriptor_generation == models.Attribute.descriptor_samples_generation,
                    )
                )
            ),
        )
[docs]
    @exceptionWrap
    async def getMissingDescriptors(
        self,
        missingVersion: int,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
        limit: int = 1000,
    ) -> list[dict[str, Union[str, list[str]]]]:
        """
        Get missing descriptors of 'missingVersion' version:
            get attributes not having descriptor
            get samples
        Args:
            missingVersion: missing descriptor version
            accountId: account id of the attributes
            faceIds: list of attribute ids
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
            limit: maximum attributes amount to return
        Returns:
            data array in the format:
                {"face_id": "<face_id>", "generation": <generation>, "samples": ["<sample_id>"]}
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = (
                select([models.Attribute.face_id])
                .where(self._prepareMissingDescriptorsFilters(missingVersion, accountId, faceIds, faceIdGte, faceIdLt))
                .order_by(models.Attribute.face_id.asc())
                .limit(limit)
            )
            facesRes = await connection.fetchall(selectSt)
            # {face_id: {face_id: id, samples:[]}}
            result = {faceId: {"face_id": faceId, "samples": []} for (faceId,) in facesRes}
            samplesSt = select([models.Sample.face_id, models.Sample.sample_id]).where(
                and_(models.Sample.face_id.in_(result.keys()), models.Sample.type == SampleType.face_descriptor.value)
            )
            samplesRes = await connection.fetchall(samplesSt)
            for faceId, sampleId in samplesRes:
                result[faceId]["samples"].append(sampleId)
            return list(result.values()) 
[docs]
    @exceptionWrap
    async def getMissingDescriptorsCount(
        self,
        missingVersion: int,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
    ) -> int:
        """
        Get count of missing descriptors of 'missingVersion' version.
        Args:
            missingVersion: missing descriptor version
            accountId: account id of the attributes
            faceIds: list of face ids
            faceIdGte: lower attribute id including boundary
            faceIdLt: upper attribute id excluding boundary
        Returns:
            missing descriptors count
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = select([func.count(models.Attribute.face_id)]).where(
                self._prepareMissingDescriptorsFilters(missingVersion, accountId, faceIds, faceIdGte, faceIdLt)
            )
            count = await connection.scalar(selectSt)
            return count 
[docs]
    @exceptionWrap
    async def getDescriptorsCount(self, accountId: Optional[str] = None) -> list[dict[str, int]]:
        """
        Get face descriptors count with filters.
        Args:
            accountId: account id
        Returns:
            list of dict. Keys of each dict are "descriptor_version" and "descriptor_count"
        """
        async with self.adaptor.connection(logger) as connection:
            filters = and_(models.Face.account_id @ accountId, models.Face.face_id == models.Descriptor.face_id)
            selectSt = (
                select([models.Descriptor.descriptor_version, func.count(models.Descriptor.face_id)])
                .where(filters)
                .group_by(models.Descriptor.descriptor_version)
            )
            resRows = await connection.fetchall(selectSt)
            return [{"descriptor_version": version, "descriptor_count": count} for version, count in resRows] 
    def _prepareMissingBasicAttrsFilters(
        self,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
    ) -> and_:
        """
        Prepare filters for missing basic attributes query.
        Args:
            accountId: account id of the attributes
            faceIds: list of face ids
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
        Returns:
            prepared sa.and_() with filters
        """
        return and_(
            # whether we need to reextract
            models.Attribute.ethnicity.is_(None),
            # filters
            models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
            models.Face.account_id @ accountId,
            models.Attribute.face_id >= faceIdGte if faceIdGte is not None else True,
            models.Attribute.face_id < faceIdLt if faceIdLt is not None else True,
            models.Attribute.face_id.in_(faceIds) if faceIds is not None else True,
        )
[docs]
    @exceptionWrap
    async def getMissingBasicAttrs(
        self,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
        limit: int = 1000,
    ) -> list[dict[str, Union[str, list[str]]]]:
        """
        Get missing basic attributes:
            get attributes without basic attributes
            get samples
        Args:
            accountId: account id of the attributes
            faceIds: list of attribute ids
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
            limit: maximum attributes amount to return
        Returns:
            data array in the format:
                {"face_id": "<face_id>", "samples": ["<sample_id>"]}
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = (
                select([models.Attribute.face_id])
                .where(self._prepareMissingBasicAttrsFilters(accountId, faceIds, faceIdGte, faceIdLt))
                .order_by(models.Attribute.face_id.asc())
                .limit(limit)
            )
            facesRes = await connection.fetchall(selectSt)
            # {face_id: {face_id: id, samples:[]}}
            result = {face[0]: {"face_id": face[0], "samples": []} for face in facesRes}
            samplesSt = select([models.Sample.face_id, models.Sample.sample_id]).where(
                and_(models.Sample.face_id.in_(result.keys()), models.Sample.type == SampleType.face_descriptor.value)
            )
            samplesRes = await connection.fetchall(samplesSt)
            for sample in samplesRes:
                result[sample[0]]["samples"].append(sample[1])
            return list(result.values()) 
[docs]
    @exceptionWrap
    async def getMissingBasicAttrsCount(
        self,
        accountId: Optional[str] = None,
        faceIds: Optional[list[str]] = None,
        faceIdGte: Optional[str] = None,
        faceIdLt: Optional[str] = None,
    ) -> int:
        """
        Get count of missing basic attributes.
        Args:
            accountId: account id of the attributes
            faceIds: list of face ids
            faceIdGte: lower attribute id including boundary
            faceIdLt: upper attribute id excluding boundary
        Returns:
            missing basic attributes count
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = select([func.count(models.Attribute.face_id)]).where(
                self._prepareMissingBasicAttrsFilters(accountId, faceIds, faceIdGte, faceIdLt)
            )
            count = await connection.scalar(selectSt)
            return count 
[docs]
    @exceptionWrap
    async def getBasicAttrsCount(self, accountId: Optional[str] = None) -> int:
        """
        Get count of basic attributes.
        Args:
            accountId: account id of the attributes
        Returns:
            basic attributes count
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = select([func.count(models.Attribute.face_id)]).where(
                and_(
                    models.Attribute.ethnicity.isnot(None),
                    models.Face.account_id @ accountId,
                    models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
                )
            )
            count = await connection.scalar(selectSt)
            return count 
    @staticmethod
    def _prepareDescriptorsFilters(
        descriptorVersion: int, faceIdGte: Optional[str] = None, faceIdLt: Optional[str] = None
    ) -> BooleanClauseList:
        """
        Prepare filters for descriptors query.
        Args:
            descriptorVersion:descriptor version
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
        Returns:
            prepared sa.and_() with filters
        """
        return and_(
            models.Descriptor.descriptor_version == descriptorVersion,
            models.Descriptor.face_id >= faceIdGte if faceIdGte is not None else True,
            models.Descriptor.face_id < faceIdLt if faceIdLt is not None else True,
        )
[docs]
    @exceptionWrap
    async def deleteDescriptors(
        self, descriptorVersion: int, faceIdGte: Optional[str] = None, faceIdLt: Optional[str] = None, limit: int = 1000
    ) -> list[dict[str, Union[str, int, list[str]]]]:
        """
        Delete descriptors by version:
            delete descriptors of preassigned version
        Args:
            descriptorVersion: descriptor version
            faceIdGte: lower face id including boundary
            faceIdLt: upper face id excluding boundary
            limit: maximum attributes amount to delete
        Returns:
            list of face ids id for deleted descriptors of preassigned version
        """
        async with self.adaptor.connection(logger) as connection:
            selectSt = (
                select([models.Descriptor.face_id])
                .where(self._prepareDescriptorsFilters(descriptorVersion, faceIdGte, faceIdLt))
                .order_by(models.Descriptor.face_id.asc())
                .limit(limit)
            )
            faces = [row["face_id"] for row in await connection.fetchall(selectSt)]
            deleteSt = delete(models.Descriptor).where(
                and_(models.Descriptor.face_id.in_(faces), models.Descriptor.descriptor_version == descriptorVersion)
            )
            await connection.execute(deleteSt)
            return faces 
[docs]
    async def blockFaces(
        self, connection, filters: list[ColumnClause], returnBlockedFaces: bool = False
    ) -> tuple[int, Union[tuple[str, ...], None]]:
        """
        Block some faces rows in the db, mechanics "select for update".
        Returns:
            number of blocked faces and blocked faces or number of blocked faces and None, depends on the argument
        """
        query = select([models.Face.face_id]).where(and_(*filters)).with_for_update()
        if self.dbConfig.type == "oracle" or returnBlockedFaces:
            _blockedFaces = await connection.fetchall(query)
            blockedFaces = tuple(face[0] for face in _blockedFaces)
            blockedFaceCount = len(blockedFaces)
        else:
            blockedFaces = None
            count = select([func.count()], from_obj=aliased(query))
            blockedFaceCount = await connection.scalar(count)
        logger.debug(f"lock {blockedFaceCount} faces")
        if returnBlockedFaces:
            return blockedFaceCount, blockedFaces
        else:
            return blockedFaceCount, None 
[docs]
    async def updateListLastUpdateTime(self, listId: str):
        """
        Update last update time of lists.
        Args:
            listId: list id
        """
        async with self.adaptor.connection(logger) as connection:
            st = (
                update(models.List)
                .where(models.List.list_id == listId)
                .values(last_update_time=self.currentDBTimestamp)
            )
            await connection.execute(st) 
    async def _moveFacesLinksToLog(
        self, faceIds: Union[list[str], tuple[str, ...]], connection: AbstractDBConnection
    ) -> list[str]:
        """
        Move face to list links to unlink attributes log
        Args:
            faceIds: face ids
            connection: open connection
        Returns:
            linked to faces list ids
        """
        query = select([models.ListFace.list_id, models.ListFace.face_id, models.ListFace.link_key]).where(
            models.ListFace.face_id.in_(faceIds)
        )
        insertSt = insert(models.UnlinkAttributesLog).from_select(
            [
                models.UnlinkAttributesLog.list_id,
                models.UnlinkAttributesLog.face_id,
                models.UnlinkAttributesLog.link_key,
            ],
            query,
        )
        if self.dbConfig.type == "oracle":
            listCount = await connection.execute(insertSt)
            if listCount:
                query = select([models.ListFace.list_id]).where(models.ListFace.face_id.in_(faceIds))
                _linkedLists = await connection.fetchall(query)
            else:
                _linkedLists = []
        else:
            st = insertSt.returning(models.UnlinkAttributesLog.list_id)
            _linkedLists = await connection.fetchall(st)
        linkedLists = [listId[0] for listId in _linkedLists]
        return linkedLists
[docs]
    async def getListDeletions(
        self,
        deletionTimeLt: Optional[datetime] = None,
        deletionTimeGte: Optional[datetime] = None,
        page: Optional[int] = 1,
        pageSize: Optional[int] = 100,
    ):
        """
        Get lists deletion logs
        Args:
            deletionTimeLt: upper bound of list deletion time
            deletionTimeGte: lower bound of list deletion time
            page: page
            pageSize: page size
        Warnings:
            trigger `trg_lists_deletion_log` inserts a data for table `ListsDeletionLog`
        Returns:
            list of deletions in the reverse order of deletion of lists
        """
        query = (
            select(
                [
                    models.ListsDeletionLog.list_id,
                    models.ListsDeletionLog.account_id,
                    models.ListsDeletionLog.create_time,
                    models.ListsDeletionLog.deletion_id,
                    models.ListsDeletionLog.deletion_time,
                ]
            )
            .where(
                and_(
                    models.ListsDeletionLog.deletion_time >= deletionTimeGte if deletionTimeGte else True,
                    models.ListsDeletionLog.deletion_time < deletionTimeLt if deletionTimeLt else True,
                )
            )
            .order_by(models.ListsDeletionLog.deletion_time.desc())
            .offset((page - 1) * pageSize)
            .limit(pageSize)
            .offset((page - 1) * pageSize)
            .limit(pageSize)
        )
        async with self.adaptor.connection(logger) as connection:
            records = await connection.fetchall(query)
            listsDeletions = []
            for row in records:
                removedList = dict(row)
                removedList["create_time"] = convertTimeToString(removedList["create_time"], self.storageTime == "UTC")
                removedList["deletion_time"] = convertTimeToString(
                    removedList["deletion_time"], self.storageTime == "UTC"
                )
                listsDeletions.append(removedList)
            return listsDeletions 
[docs]
    async def cleanListsDeletionLog(self, deletionTimeLt: datetime) -> int:
        """
        Clear lists deletion log
        Args:
            deletionTimeLt: upper bound of list deletion time
        Returns:
            count of removed rows
        """
        query = delete(models.ListsDeletionLog).where(models.ListsDeletionLog.deletion_time < deletionTimeLt)
        async with self.adaptor.connection(logger) as connection:
            count = await connection.execute(query)
            return count 
[docs]
    def getRuntimeChecks(self, _) -> list[tuple[str, Awaitable]]:
        """Checks for healthcheck."""
        return [("faces_db", checkSql(self.adaptor))] 
[docs]
    async def probe(self) -> bool:
        """Ensure provided config is valid. Create new connection. Can be used without initialization"""
        checkState = await checkConnectionToDB(dbSetting=self.dbSettings, postfix="faces", asyncCheck=True)
        if not checkState:
            return False
        if self.useMaterialViews:
            if self.databaseNumber > 0:
                modelNames = [table.__tablename__ for table in FacesBase.__subclasses__()]
                checkState = await checkConnectionToDB(
                    dbSetting=self.dbSettings,
                    modelNames=modelNames,
                    postfix="luna-faces-materialized_views",
                    asyncCheck=True,
                )
            else:
                modelNames = [table.__tablename__ for table in BaseMatViewsParity.__subclasses__()]
                checkState = await checkConnectionToDB(
                    dbSetting=self.dbSettings,
                    modelNames=modelNames,
                    postfix="luna-faces-materialized_views",
                    asyncCheck=True,
                )
        return checkState