import contextlib
from asyncio import CancelledError
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import chain
from typing import Any, Awaitable, Callable, Iterable, Literal, Optional, TypeVar, Union
from uuid import uuid4
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from asyncpg import ForeignKeyViolationError, Record, UndefinedFunctionError, UniqueViolationError
from sqlalchemy import Column, and_, asc, delete, desc, exists, func, insert, select, text, union_all, update
from sqlalchemy.dialects import oracle, postgresql
from sqlalchemy.exc import DatabaseError, IntegrityError
from sqlalchemy.orm import Query, aliased
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql.elements import BooleanClauseList, ColumnClause, TextClause, not_
from sqlalchemy.sql.operators import mod
from sqlalchemy.sql.selectable import CompoundSelect
from tzlocal import get_localzone_name
from vlutils.descriptors.containers import sdkDescriptorEncode
from vlutils.helpers import bytesToBase64, convertTimeToString
from vlutils.jobs.async_runner import AsyncRunner
from app.handlers.helpers import SearchFacesFilters, SearchListsFilters
from attributes_db.model import BasicAttributes, Descriptor, TemporaryAttributes
from configs.config import BACKGROUND_REMOVAL_BATCH_SIZE, DB_CONNECT_TIMEOUT, DELETE_REQUEST_MAX_SIZE
from configs.configs.configs.settings.classes import DBSetting
from crutches_on_wheels.cow.adaptors.connection import AbstractDBConnection
from crutches_on_wheels.cow.db.base_context import BaseDBContext
from crutches_on_wheels.cow.enums.attributes import TemporaryAttributeTargets as FaceAttributeTargets
from crutches_on_wheels.cow.errors.errors import Error
from crutches_on_wheels.cow.errors.exception import VLException
from crutches_on_wheels.cow.utils import mixins
from crutches_on_wheels.cow.utils.check_connection import checkConnectionToDB
from crutches_on_wheels.cow.utils.db_functions import currentDBTimestamp, dbExceptionWrap as exceptionWrap
from crutches_on_wheels.cow.utils.functions import getPageCount
from crutches_on_wheels.cow.utils.healthcheck import checkSql
from crutches_on_wheels.cow.utils.log import Logger, logger
from db.faces_db_tools.models import faces_models as models
from db.faces_db_tools.models.enums import AttributeSample, SampleType
from db.faces_db_tools.models.faces_models import DB_LIST_LIMIT, LINK_SEQUENCE, Base as FacesBase, BaseMatViewsParity
from db.functions import ArrayAgg
from utils.encrypton import decryptCacheValue, encryptMessage
SAMPLE_POSTFIX = "_samples"
OBTAINING_METHOD_POSTFIX = "_obtaining_method"
VERSION_POSTFIX = "_version"
DESCRIPTOR = "descriptor"
MIN_GEN = 0 # minimum attribute generation
FACE_TO_LIST_TARGETS = frozenset(("face_id", "lists", "account_id"))
indexFaceName = None
indexDescriptorName = None
_FACE_BY_LIST_GETTER_COLUMNS_MAP = {
"lists": select([ArrayAgg((lists := aliased(models.ListFace)).list_id)])
.where(lists.face_id == models.ListFace.face_id)
.label("lists"),
"face_id": models.ListFace.face_id,
"account_id": models.List.account_id,
"event_id": models.Face.event_id,
"user_data": models.Face.user_data,
"external_id": models.Face.external_id,
"avatar": models.Face.avatar,
"create_time": models.Face.create_time,
}
_FACE_BY_FACE_GETTER_COLUMNS_MAP = {
"lists": select([ArrayAgg((lists := aliased(models.ListFace)).list_id)])
.where(and_(lists.face_id == models.Face.face_id))
.label("lists"),
"face_id": models.Face.face_id,
"account_id": models.Face.account_id,
"event_id": models.Face.event_id,
"user_data": models.Face.user_data,
"external_id": models.Face.external_id,
"avatar": models.Face.avatar,
"create_time": models.Face.create_time,
}
FACES_COUNT_CACHE_NAME = "faces_attributes_count"
REQUESTS_ENCRYPTION_KEY = "^&*Tyghuk2v47fg90vpyuhjn2ewsdgv8]-90i"
REQUEST_CACHE_TTL_SECONDS = 10
[docs]
@dataclass
class Reference:
"""Reference class."""
descriptorVersion: int
descriptor: bytes = None
attributeId: str = None
faceId: str = None
# Dict with raw sqls for getting link/unlink keys
RAW_SQLS = {
"postgres": {
"link": """(SELECT listId, linkKey FROM (SELECT list_id AS listId, link_key AS linkKey FROM list_face WHERE
list_id = '{listId}' AND mod(link_key, 2) = {mod} ORDER BY link_key DESC) as f LIMIT 1)""",
"unlink": """(SELECT listId, unLinkKey FROM (SELECT list_id AS listId, unlink_key AS unLinkKey FROM
unlink_attributes_log WHERE list_id = '{listId}' AND mod(unlink_key, 2) = {mod} ORDER BY
unlink_key DESC) as f LIMIT 1)""",
},
"oracle": {
"link": """(SELECT listId, linkKey FROM (SELECT list_id AS listId, link_key AS linkKey FROM list_face WHERE
list_id = '{listId}' AND mod(link_key, 2) = {mod} ORDER BY link_key DESC) WHERE ROWNUM <= 1)""",
"unlink": """(SELECT listId, unLinkKey FROM (SELECT list_id AS listId, unlink_key AS unLinkKey FROM
unlink_attributes_log WHERE list_id = '{listId}' AND mod(unlink_key, 2) = {mod} ORDER BY
unlink_key DESC) WHERE ROWNUM <= 1)""",
},
}
[docs]
def getCompiledQuery(query: Union[Query, CompoundSelect], dbType: str) -> str:
"""
Compile query to with literal_binds = True
Args:
query: query made by sqlalchemy or CompoundSelect as result of union_all from n queries operation
dbType: database type
Returns:
string with compiled query
"""
currentDialect = oracle if dbType == "oracle" else postgresql
kwargs = {"dialect": currentDialect.dialect(), "compile_kwargs": {"literal_binds": True}}
if isinstance(query, Query):
return str(query.statement.compile(**kwargs))
else:
return str(query.compile(**kwargs))
X = TypeVar("X")
Y = TypeVar("Y")
[docs]
class DBContext(BaseDBContext, mixins.Initializable):
"""
DB context
Attributes:
logger: request logger
"""
def __init__(
self,
dbSettings: DBSetting,
storageTime: str,
defaultDescriptorVersion: int,
encryptionCtx,
databaseNumber: int,
useMaterialViews: bool,
):
super().__init__()
self.dbSettings = dbSettings
self.storageTime = storageTime
self.defaultDescriptorVersion = defaultDescriptorVersion
self.backgroundWorker: Optional[AsyncRunner] = None
# (AsyncIOScheduler): background scheduler
self.backgroundScheduler: Optional[AsyncIOScheduler] = None
# (set): list ids set for deferred updating a last_update_time of list
self._listsForUpdate: set = set()
# (Logger): background task logger
self.backgroundLogger: Optional[Logger] = None
# cached value converters from db format to face api format
self._faceProcessFunctions: dict[str, Callable[[Y], X]] = {}
# cached value converters from db format to list api format
self._listProcessFunctions: dict[str, Callable[[Y], X]] = {}
# encryption context
self.encryptionCtx = encryptionCtx
# encryption hash filter
self._encryptionHashFilter: Optional[bytes | TextClause] = None
# detabase number for oracle golden gate (Literal[0, 1, 2])
self.databaseNumber = databaseNumber
# useMaterialViews or not MaterialViews, legacy?
self.useMaterialViews = useMaterialViews
[docs]
def makeOutputFaces(self, faceRows: list[Record | dict], columns: Iterable[str]) -> list[dict[str, Any]]:
"""
Make result faces (from the database reply) proper for an user.
Args:
faceRows: face list from db
columns: selected columns
Returns:
faces with changed fields
"""
if not faceRows:
return []
processableTargets = set(self._faceProcessFunctions) & set(columns)
if processableTargets:
faces = []
for row in faceRows:
face = dict(row)
for target in processableTargets:
face[target] = self._faceProcessFunctions[target](face[target])
faces.append(face)
else:
faces = [dict(row) for row in faceRows]
return faces
[docs]
def makeOutputLists(self, lists: list[Record | dict]) -> list[dict[str, Any]]:
"""
Make result of lists (from the database reply) proper for an user.
Args:
lists: list with lists
Returns:
lists with changed fields
"""
result = []
for row in lists:
facesList = {}
for field, value in row.items():
if pf := self._listProcessFunctions.get(field):
value = pf(value)
facesList[field] = value
result.append(facesList)
return result
async def _deferredUpdateListLastUpdateTime(self):
"""
Deferred update list last update time. Lists for updating are cached in the '_listsForUpdate'
"""
if not self._listsForUpdate:
return
self.backgroundLogger.debug(f"start to update {len(self._listsForUpdate)} lists last update time")
for listId in list(self._listsForUpdate):
try:
# batch update provokes deadlocks (for several luna-faces instances)
await self.updateListLastUpdateTime(listId)
self._listsForUpdate.remove(listId)
except Exception:
self.backgroundLogger.exception()
else:
self.backgroundLogger.debug(f"updated last update time for {listId}")
def _addListsToDeferrerUpdateListLastUpdateTime(self, listIds: Iterable[str]):
"""
Add lists to deferrer update list last update time
Args:
listIds: list ids
"""
self._listsForUpdate.update(listIds)
[docs]
async def initialize(self):
"""Async __init__."""
await self.__class__.initDBContext(
dbSettings=self.dbSettings, storageTime=self.storageTime, connectTimeout=DB_CONNECT_TIMEOUT
)
self.backgroundLogger = Logger(template="backgroundDatabase")
self._encryptionHashFilter: Optional[bytes | TextClause] = (
text(f"HEXTORAW('{self.encryptionCtx.encryptionHash.hex()}')")
if self.dbSettings.type == "oracle" and self.encryptionCtx.encryptionHash is not None
else self.encryptionCtx.encryptionHash
)
self.backgroundScheduler = AsyncIOScheduler(timezone=get_localzone_name())
self.backgroundWorker = AsyncRunner(max(self.adaptor.sessionCount // 2, 1), closeTimeout=5)
self.backgroundScheduler.add_job(self._deferredUpdateListLastUpdateTime, trigger="interval", seconds=1)
self.backgroundScheduler.start()
if self.dbSettings.type == "oracle":
await self.setOracleIndexNames()
storageTimeIsUTC = self.storageTime == "UTC"
self._faceProcessFunctions = {
"create_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
"external_id": lambda s: s if s is not None else "",
"user_data": lambda s: s if s is not None else "",
"avatar": lambda s: s if s is not None else "",
"lists": ArrayAgg.typeHandler,
}
self._listProcessFunctions = {
"create_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
"last_update_time": lambda t: convertTimeToString(t, storageTimeIsUTC),
"user_data": lambda s: s if s is not None else "",
}
@property
def currentDBTimestamp(self) -> TextClause:
"""
Get current db timestamp function
Returns:
function for computation current db timestamp
"""
return currentDBTimestamp(self.dbConfig.type)
[docs]
async def close(self):
"""
Close all database contexts
"""
try:
if self.backgroundWorker:
await self.backgroundWorker.close()
self.backgroundWorker = None
except CancelledError:
pass
await self._deferredUpdateListLastUpdateTime()
if self.adaptor:
await self.adaptor.close()
if self.backgroundScheduler:
self.backgroundScheduler.shutdown()
self.backgroundScheduler = None
[docs]
async def setOracleIndexNames(self):
"""
Set index names for some requests with 'use-force-index'
"""
async with self.adaptor.connection(logger) as connection:
indexNames = await connection.fetchall(
"select index_name from all_indexes where table_name in ('FACE', 'DESCRIPTOR') and "
"index_name like 'SYS_%' order by table_name"
)
global indexDescriptorName, indexFaceName
indexDescriptorName, indexFaceName = [indexName[0] for indexName in indexNames]
[docs]
async def ping(self, pingCount: int) -> bool:
"""
Ping database. Execute 'SELECT 1' from one to 'pingCount' times.
Args:
pingCount: ping count
Returns:
True - if any request success execute otherwise False
"""
async def ping():
async with self.adaptor.connection(logger) as connection:
st = select([1])
await connection.execute(st)
for i in range(pingCount):
try:
await ping()
except Exception:
logger.exception(f"failed db ping, try {i + 1}")
else:
return True
return False
[docs]
@exceptionWrap
async def checkDbMatchFunctionExistence(
self, saFuncCall: ColumnClause, comparator: Callable[[Any], bool], *, name: str
) -> bool:
"""
Try call custom function that should exist in database. Assert expected result.
Args:
saFuncCall: constructed sqlalchemy function object
comparator: function for checking `saFuncCall` result
name: function name for logging
Returns:
True - if request success execute otherwise False
"""
logger.debug(f"Check database function '{name}' correctness: begin")
try:
async with self.adaptor.connection(logger) as connection:
dbRes = await connection.fetchall(select([saFuncCall]))
except (DatabaseError, UndefinedFunctionError):
logger.debug(f"Database function '{name}' is not implemented.")
return False
if comparator(dbRes):
logger.debug(f"Check database function '{name}' correctness: success")
return True
logger.debug(f"Check database function '{name}' correctness: function returns incorrect result '{dbRes}'.")
return False
[docs]
async def insertFaceAttributeData(
self, connection: AbstractDBConnection, faceId: str, attribute: TemporaryAttributes
) -> None:
"""
Insert face attribute data.
Args:
connection: db connection
faceId: face id
attribute: temporary attribute container
"""
insertToAttributesValue = {
"face_id": faceId,
"descriptor_samples_generation": 0,
"create_time": attribute.createTime.replace(tzinfo=None),
}
if attribute.basicAttributes:
insertToAttributesValue["age"] = attribute.basicAttributes.age
insertToAttributesValue["gender"] = attribute.basicAttributes.gender
insertToAttributesValue["ethnicity"] = attribute.basicAttributes.ethnicity
insertToAttributesValue["gender_obtaining_method"] = 1
insertToAttributesValue["gender_version"] = 1
insertToAttributesValue["age_obtaining_method"] = 1
insertToAttributesValue["age_version"] = 1
insertToAttributesValue["ethnicity_obtaining_method"] = 1
insertToAttributesValue["ethnicity_version"] = 1
insertToSamplesValues = [
{"face_id": faceId, "sample_id": sampleId, "type": SampleType.basic_attributes.value}
for sampleId in attribute.basicAttributesSamples
]
insertToSamplesValues += [
{"face_id": faceId, "sample_id": sampleId, "type": SampleType.face_descriptor.value}
for sampleId in attribute.descriptorSamples
]
insertDescriptorValues = [
{
"descriptor_version": descriptor.version,
"face_id": faceId,
"descriptor": descriptor.descriptor,
"encryption_hash": self.encryptionCtx.encryptionHash,
"descriptor_obtaining_method": 1,
"descriptor_generation": 0,
}
for descriptor in attribute.descriptors
]
await connection.execute(insert(models.Attribute).values(insertToAttributesValue))
if self.dbConfig.type == "oracle":
for value in insertToSamplesValues:
await connection.execute(insert(models.Sample).values(value))
for value in insertDescriptorValues:
await connection.execute(insert(models.Descriptor).values(value))
else:
if insertToSamplesValues:
await connection.execute(insert(models.Sample).values(insertToSamplesValues))
if insertDescriptorValues:
await connection.execute(insert(models.Descriptor).values(insertDescriptorValues))
[docs]
async def createFace(
self,
externalFaceId: Optional[str] = None,
listIds: Optional[set[str]] = None,
attribute: Optional[TemporaryAttributes] = None,
**kwargs,
) -> str:
"""
Create face.
Args:
externalFaceId: external faceId
listIds: luna lists
attribute: temporary attribute container
Keyword Args:
event_id: reference to event which created face
user_data: face information
account_id: id of account, required
external_id: external id of the face, if it has its own mapping in external system
avatar: image url that represents the face
Returns:
faceId: face id in uuid4 format
"""
kwargs["face_id"] = faceId = str(uuid4()) if externalFaceId is None else externalFaceId
accountId = kwargs["account_id"]
async def processFaceListsAndAttributes(openConnection):
if attribute is not None:
await self.insertFaceAttributeData(openConnection, faceId=faceId, attribute=attribute)
if listIds:
selectLists = select([models.List.list_id, text(f"'{faceId}'"), LINK_SEQUENCE.next_value()]).where(
and_(models.List.account_id == accountId, models.List.list_id.in_(listIds))
)
insertFaceListSt = insert(models.ListFace).from_select(
[models.ListFace.list_id, models.ListFace.face_id, models.ListFace.link_key], selectLists
)
listCount = await openConnection.execute(insertFaceListSt)
if len(listIds) != listCount:
# stupid oracle return only one updated row
selectListSt = select([models.List.list_id]).where(
and_(models.List.list_id.in_(listIds), models.List.account_id == accountId)
)
updatedLists = await openConnection.fetchall(selectListSt)
updatedLists = set(chain(*updatedLists))
nonExistListId = next(iter(listIds - updatedLists))
raise VLException(Error.ListsNotFound.format(nonExistListId), 400, False)
return faceId
async with self.adaptor.connection(logger) as connection:
deleteSt = delete(models.Face).where(models.Face.face_id == faceId)
insertSt = insert(models.Face).values(**kwargs)
# fast way creating face
try:
await connection.execute(insertSt)
except (IntegrityError, UniqueViolationError):
# face with same id already exists, need remove it, go to slow way
pass
else:
await processFaceListsAndAttributes(connection)
if listIds:
self._addListsToDeferrerUpdateListLastUpdateTime(listIds)
return faceId
# slow way
async with self.adaptor.connection(logger) as connection:
await connection.execute(deleteSt)
await connection.execute(insertSt)
await processFaceListsAndAttributes(connection)
if listIds:
self._addListsToDeferrerUpdateListLastUpdateTime(listIds)
return faceId
[docs]
@exceptionWrap
async def updateFace(self, faceId: str, accountId: Optional[str] = None, **kwargs) -> int:
"""
Update face.
Args:
faceId: face id
accountId: account id
Keyword Args:
user_data: face information
event_id: reference to event which created face
external_id: external id of the face, if it has its own mapping in external system
avatar: image url that represents the face
Returns:
updated face count
"""
async with self.adaptor.connection(logger) as connection:
updateFaceSt = (
update(models.Face)
.where(and_(models.Face.face_id == faceId, models.Face.account_id @ accountId))
.values(**kwargs)
)
return await connection.execute(updateFaceSt)
[docs]
@exceptionWrap
async def linkFacesToList(self, listId: str, faces: list[str]) -> tuple[list[str], list[str]]:
"""
Attach faces to list.
Args:
listId: list id
faces: face ids
Raises:
IntegrityError
Exception
Returns:
list of success link faces and failed link faces.
"""
success = []
error = []
async def linkFace(faceId, conn):
try:
updateFaceListSt = insert(models.ListFace).values(
list_id=listId, face_id=faceId, link_key=LINK_SEQUENCE.next_value()
)
await conn.execute(updateFaceListSt)
except (IntegrityError, UniqueViolationError, ForeignKeyViolationError):
error.append(faceId)
return False
else:
success.append(faceId)
return True
async with self.adaptor.connection(logger) as connection:
updateFaceSt = (
update(models.Face)
.where(models.Face.face_id.in_(faces))
.values(last_update_time=self.currentDBTimestamp)
)
await connection.execute(updateFaceSt)
self._addListsToDeferrerUpdateListLastUpdateTime((listId,))
for face in faces:
async with self.adaptor.connection(logger) as connection:
await linkFace(face, connection)
return success, error
[docs]
@exceptionWrap
async def unlinkFacesFromList(self, listId: str, faces: list[str], accountId: Optional[str] = None):
"""
Unlink faces from list.
Args:
listId: list id
faces: face ids
accountId: account id
"""
async def updateHistoryLog():
querySelect = select([models.ListFace.list_id, models.Face.face_id, models.ListFace.link_key]).where(
and_(
models.Face.face_id.in_(faces),
models.Face.face_id == models.ListFace.face_id,
models.ListFace.list_id == listId,
)
)
insertSt = insert(models.UnlinkAttributesLog).from_select(
[
models.UnlinkAttributesLog.list_id,
models.UnlinkAttributesLog.face_id,
models.UnlinkAttributesLog.link_key,
],
querySelect,
)
await connection.execute(insertSt)
async def unlinkFaces():
deleteFacesSt = delete(models.ListFace).where(
and_(
models.ListFace.face_id.in_(faces),
models.ListFace.list_id == listId,
models.List.account_id @ accountId,
)
)
await connection.execute(deleteFacesSt)
updateFaceSt = (
update(models.Face)
.where(models.Face.face_id.in_(faces))
.values(last_update_time=self.currentDBTimestamp)
)
await connection.execute(updateFaceSt)
async with self.adaptor.connection(logger) as connection:
await self.blockFaces(connection, [models.Face.face_id.in_(faces)])
await updateHistoryLog()
await unlinkFaces()
self._addListsToDeferrerUpdateListLastUpdateTime((listId,))
[docs]
@exceptionWrap
async def createList(self, account_id: str, listId: str, user_data: Optional[str] = "") -> None:
"""
Create list for account.
Args:
account_id: account id
user_data: user data
listId: list id
"""
async with self.adaptor.connection(logger) as connection:
insertSt = insert(models.List).values(list_id=listId, account_id=account_id, user_data=user_data)
try:
await connection.execute(insertSt)
except (IntegrityError, UniqueViolationError):
raise VLException(Error.ListAlreadyExist.format(listId), 409, isCriticalError=False)
[docs]
@staticmethod
def prepareSearchFacesFilters(
filters: SearchFacesFilters,
modelsFace: Optional[AliasedClass] = None,
modelsListFace: Optional[AliasedClass] = None,
) -> list[BooleanClauseList]:
"""
Prepare search query filters on "Face" models.
Args:
filters: query filters
modelsFace: database table model for faces
modelsListFace: database table model for list face
Returns:
filters for Face model
"""
modelsFace = modelsFace or models.Face
modelsListFace = modelsListFace or models.ListFace
return [
modelsFace.account_id == filters.account_id if filters.account_id is not None else True,
modelsFace.user_data.like("%{}%".format(filters.user_data)) if filters.user_data is not None else True,
modelsFace.event_id == filters.event_id if filters.event_id is not None else True,
modelsFace.create_time < filters.create_time__lt if filters.create_time__lt is not None else True,
modelsFace.create_time >= filters.create_time__gte if filters.create_time__gte is not None else True,
modelsFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
modelsFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
modelsFace.external_id.in_(filters.external_ids) if filters.external_ids is not None else True,
modelsFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
modelsListFace.face_id == modelsFace.face_id if filters.list_id is not None else True,
modelsListFace.list_id == filters.list_id if filters.list_id is not None else True,
]
[docs]
@staticmethod
def setOffsetAndOrderToQuery(
query: Query,
order: Optional[Literal["desc", "asc"]] = "desc",
orderColumn: Optional[Column] = None,
page: Optional[int] = 1,
pageSize: Optional[int] = 100,
) -> Query:
"""
Apply `OFFSET` and `ORDER BY` criterion to the query.
Args:
query: query object
order: result sort order (ask or desc)
orderColumn: result sort column
page: pagination page value
pageSize: pagination page size value, set -1 to get all faces
Returns:
updated query
"""
if orderColumn is not None:
query = query.order_by((asc if order == "asc" else desc)(orderColumn))
if pageSize != -1:
query = query.offset((page - 1) * pageSize).limit(pageSize)
return query
[docs]
@staticmethod
def prepareListQueryFilters(filters: SearchListsFilters) -> BooleanClauseList:
"""
Prepare search query filters on "List" models.
Args:
filters: query filters
Returns:
filters for List select query
"""
sqlFilters = [
models.List.account_id @ filters.account_id,
models.List.user_data.like("%{}%".format(filters.user_data)) if filters.user_data is not None else True,
models.List.user_data @ filters.user_data__eq,
models.List.create_time < filters.create_time__lt if filters.create_time__lt is not None else True,
models.List.create_time >= filters.create_time__gte if filters.create_time__gte is not None else True,
(
models.List.last_update_time < filters.last_update_time__lt
if filters.last_update_time__lt is not None
else True
),
(
models.List.last_update_time >= filters.last_update_time__gte
if filters.last_update_time__gte is not None
else True
),
models.List.list_id < filters.list_id__lt if filters.list_id__lt is not None else True,
models.List.list_id >= filters.list_id__gte if filters.list_id__gte is not None else True,
models.List.list_id.in_(filters.list_ids) if filters.list_ids is not None else True,
]
return and_(*sqlFilters)
[docs]
@exceptionWrap
async def executeSearchFaces(
self,
filters: SearchFacesFilters,
targets: Optional[list[str]],
page: Optional[int] = 1,
pageSize: Optional[int] = 100,
) -> list[dict[str, str]]:
"""
Get faces searched by filters
Args:
filters: raw search request filters
targets: target Face columns' names to get info on
page: page
pageSize: page size
Returns:
faces list
"""
queryColumns = [_FACE_BY_FACE_GETTER_COLUMNS_MAP[target] for target in targets]
orderColumn = (
models.Face.face_id if any((filters.face_id__gte, filters.face_id__lt)) else models.Face.create_time
)
slqFilters = self.prepareSearchFacesFilters(filters)
query = Query(queryColumns).filter(
and_(
*slqFilters,
)
)
query = self.setOffsetAndOrderToQuery(query=query, orderColumn=orderColumn, page=page, pageSize=pageSize)
async with self.adaptor.connection(logger) as connection:
faceRows = await connection.fetchall(query.statement)
return self.makeOutputFaces(faceRows, targets)
[docs]
@exceptionWrap
async def executeSearchFacesFilteredByList(
self, filters: SearchFacesFilters, targets: list[str], page: Optional[int] = 1, pageSize: Optional[int] = 100
) -> list[dict[str, str]]:
"""
Get faces searched by filters associated with face-list model
Args:
filters: raw search request filters
targets: target Face columns' names to get info on
page: page
pageSize: page size
Returns:
faces list
"""
queryFilters = and_(
models.List.account_id == filters.account_id if filters.account_id is not None else True,
models.List.list_id == filters.list_id,
models.ListFace.list_id == models.List.list_id,
models.ListFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
models.ListFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
models.ListFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
(
models.Face.face_id == models.ListFace.face_id
if set(targets) - {"face_id", "account_id", "lists"}
else True
),
)
query = select([_FACE_BY_LIST_GETTER_COLUMNS_MAP[target] for target in targets]).where(queryFilters)
query = self.setOffsetAndOrderToQuery(
query=query, orderColumn=models.ListFace.face_id, page=page, pageSize=pageSize
)
async with self.adaptor.connection(logger) as connection:
faceRows = await connection.fetchall(query)
return self.makeOutputFaces(faceRows, targets)
[docs]
@exceptionWrap
async def getFaces(
self,
filters: SearchFacesFilters,
targets: Optional[list[str]] = None,
page: Optional[int] = 1,
pageSize: Optional[int] = 100,
) -> list[dict[str, str]]:
"""
Get faces.
Args:
filters: raw search request filters
targets: target Face columns' names to get info on
page: page
pageSize: page size
Returns:
list of faces
"""
if filters.listFaceFiltersAreUsed() and any((filters.face_id__gte, filters.face_id__lt)):
# getting faces from a list-face model only when appropriate filters and targets are available
# more profitable with a large count of faces in the list
return await self.executeSearchFacesFilteredByList(filters, targets, page=page, pageSize=pageSize)
return await self.executeSearchFaces(filters, targets, page=page, pageSize=pageSize)
[docs]
@exceptionWrap
async def getNonexistentFaceId(
self,
requiredFaceIds: set[str],
accountId: Optional[str] = None,
dbConnection: Optional[AbstractDBConnection] = None,
) -> Union[str, None]:
"""
Get one of requiredFaceIds that not exists.
Args:
requiredFaceIds: set of required face ids
accountId: account id
dbConnection: current connection. Needed not to get new connection from pool
or to do some stuff in scope of the transaction.
Returns:
Non existing face id
"""
async with contextlib.AsyncExitStack() as stack:
connection = (
await stack.enter_async_context(self.adaptor.connection(logger))
if dbConnection is None
else dbConnection
)
query = Query([models.Face.face_id]).filter(
models.Face.account_id == accountId if accountId is not None else True,
models.Face.face_id.in_(requiredFaceIds),
)
existFaceIds = set(chain(*await connection.fetchall(query.statement)))
nonexistentFaceIds = requiredFaceIds - existFaceIds
return next(iter(nonexistentFaceIds)) if nonexistentFaceIds else None
[docs]
@exceptionWrap
async def getFacesCountByList(self, filters: SearchFacesFilters) -> int:
"""
Get count of faces filtered by list id.
Args:
filters: raw search request filters
Returns:
Number of faces
"""
async with self.adaptor.connection(logger) as connection:
if filters.account_id is not None:
# checking the list by account before getting the faces count is more profitable
listCountSt = select([func.count(models.List.list_id)]).where(
and_(models.List.account_id == filters.account_id, models.List.list_id == filters.list_id)
)
if not await connection.scalar(listCountSt):
return 0
selectSt = select([func.count(models.ListFace.face_id)]).where(
and_(
models.ListFace.list_id == filters.list_id,
models.ListFace.face_id < filters.face_id__lt if filters.face_id__lt is not None else True,
models.ListFace.face_id >= filters.face_id__gte if filters.face_id__gte is not None else True,
models.ListFace.face_id.in_(filters.face_ids) if filters.face_ids is not None else True,
)
)
return await connection.scalar(selectSt)
[docs]
@exceptionWrap
async def getCachedFaceAttributesCount(self) -> int:
"""
Get cached count of faces attributes.
Returns:
Number of faces attributes
"""
lockCachedCount = select([1]).where(models.RequestsCache.name == FACES_COUNT_CACHE_NAME).with_for_update()
delta = timedelta(seconds=REQUEST_CACHE_TTL_SECONDS)
selectActualCache = select([models.RequestsCache.value]).where(
and_(
models.RequestsCache.name == FACES_COUNT_CACHE_NAME,
models.RequestsCache.created_at + delta > self.currentDBTimestamp,
)
)
async with self.adaptor.connection(logger) as connection:
facesAttributesCount = None
await connection.scalar(lockCachedCount)
if requestedCache := await connection.scalar(selectActualCache):
facesAttributesCount = decryptCacheValue(requestedCache, REQUESTS_ENCRYPTION_KEY)
try:
facesAttributesCount = int(facesAttributesCount)
except TypeError:
logger.error("Unable to process decrypted cache value")
facesAttributesCount = None
if facesAttributesCount is None:
selectFaceAttrsCount = select([func.count(models.Attribute.face_id)])
facesAttributesCount = await connection.scalar(selectFaceAttrsCount)
updateQuery = (
update(models.RequestsCache)
.where(models.RequestsCache.name == FACES_COUNT_CACHE_NAME)
.values(
value=encryptMessage(facesAttributesCount, REQUESTS_ENCRYPTION_KEY),
created_at=self.currentDBTimestamp,
)
)
await connection.execute(updateQuery)
return facesAttributesCount
[docs]
@exceptionWrap
async def getFacesCount(self, filters: SearchFacesFilters) -> int:
"""
Get count of faces.
Args:
filters: raw search request filters
Returns:
Number of faces
"""
if filters.listFaceFiltersAreUsed():
return await self.getFacesCountByList(filters)
else:
selectSt = select([func.count(models.Face.face_id)]).where(
and_(
*self.prepareSearchFacesFilters(filters),
)
)
async with self.adaptor.connection(logger) as connection:
return await connection.scalar(selectSt)
[docs]
@exceptionWrap
async def getFacesAttributesCount(self, accountId: Optional[str] = None) -> int:
"""
Return face attribute count
Args:
accountId: account id
Returns:
face attribute count
"""
if accountId:
filters = and_(models.Face.account_id == accountId, models.Attribute.face_id == models.Face.face_id)
else:
filters = and_()
selectSt = select([func.count(models.Attribute.face_id)]).where(filters)
async with self.adaptor.connection(logger) as connection:
faceAttributeCount = await connection.scalar(selectSt)
return faceAttributeCount
[docs]
@exceptionWrap
async def getLists(
self, filters: SearchListsFilters, page: Optional[int] = 1, pageSize: Optional[int] = 100
) -> list[dict[str, str]]:
"""
Get list
Args:
filters: raw search request filters
page: page
pageSize: page size
Returns:
List with dicts
"""
queryFilters = self.prepareListQueryFilters(filters)
orderColumn = (
models.List.list_id
if filters.list_id__gte is not None or filters.list_id__lt is not None
else models.List.create_time
)
async with self.adaptor.connection(logger) as connection:
query = (
Query(models.List)
.filter(queryFilters)
.order_by(orderColumn.desc())
.offset((page - 1) * pageSize)
.limit(pageSize)
)
listRows = await connection.fetchall(query.statement)
return self.makeOutputLists(listRows)
[docs]
@exceptionWrap
async def getNonexistentListId(
self, requiredListIds: set[str], accountId: Optional[str] = None
) -> Union[str, None]:
"""
Get one of requiredListIds that not exists.
Args:
requiredListIds: set of required face ids
accountId: account id
Returns:
Non existsing list id
"""
async with self.adaptor.connection(logger) as connection:
query = Query(models.List.list_id).filter(
models.List.account_id @ accountId, models.List.list_id.in_(requiredListIds)
)
res = await connection.fetchall(query.statement)
existListIds = set(chain(*res))
nonexistentListIds = requiredListIds - existListIds
return next(iter(nonexistentListIds)) if nonexistentListIds else None
[docs]
@exceptionWrap
async def getListsCount(self, filters: SearchListsFilters) -> int:
"""
Count lists
Args:
filters: raw search request filters
Returns:
Count of lists
"""
queryFilters = self.prepareListQueryFilters(filters)
async with self.adaptor.connection(logger) as connection:
query = Query([func.count(models.List.list_id).label("count")])
query = query.filter(queryFilters)
listCount = await connection.scalar(query.statement)
return listCount
[docs]
@exceptionWrap
async def getListsAndKeysFromMV(self, listIds: set[str], useParity: int) -> list[dict[str, Any]]:
"""
Get lists with last link and unlink keys from materialized views
Args:
listIds: list ids
useParity: if 1 - get even and odd max link/unlink keys else max link/unlink keys
Returns:
List with list ids and its link/unlink keys
"""
listIdsList = list(listIds)
listIdsBatches = [listIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(listIdsList), DB_LIST_LIMIT)]
if useParity:
async with self.adaptor.connection(logger) as connection:
selectQueryList = [
Query(
[
models.List.list_id,
models.MV_LINK_0.link_key.label("link_key_even"),
models.MV_LINK_1.link_key.label("link_key_odd"),
models.MV_UNLINK_0.unlink_key.label("unlink_key_even"),
models.MV_UNLINK_1.unlink_key.label("unlink_key_odd"),
]
)
.join(models.MV_LINK_0, models.MV_LINK_0.list_id == models.List.list_id, full=True)
.join(models.MV_LINK_1, models.MV_LINK_1.list_id == models.List.list_id, full=True)
.join(models.MV_UNLINK_0, models.MV_UNLINK_1.list_id == models.List.list_id, full=True)
.join(models.MV_UNLINK_1, models.MV_UNLINK_1.list_id == models.List.list_id, full=True)
.filter(models.List.list_id.in_(listIdsBatch))
for listIdsBatch in listIdsBatches
]
selectResult = []
for selectQuery in selectQueryList:
selectResult.extend(await connection.fetchall(selectQuery.statement))
result = [
{
"list_id": listId,
"link_key_even": linkKeyEven,
"link_key_odd": linkKeyOdd,
"unlink_key_even": unlinkKeyEven,
"unlink_key_odd": unlinkKeyOdd,
}
for listId, linkKeyEven, linkKeyOdd, unlinkKeyEven, unlinkKeyOdd in selectResult
]
else:
async with self.adaptor.connection(logger) as connection:
selectQueryList = [
Query([models.List.list_id, models.MV_LINK.link_key, models.MV_UNLINK.unlink_key])
.join(models.MV_LINK, models.MV_LINK.list_id == models.List.list_id, full=True)
.join(models.MV_UNLINK, models.MV_UNLINK.list_id == models.List.list_id, full=True)
.filter(models.List.list_id.in_(listIdsBatch))
for listIdsBatch in listIdsBatches
]
selectResult = []
for selectQuery in selectQueryList:
selectResult.extend(await connection.fetchall(selectQuery.statement))
result = [
{"list_id": listId, "link_key": linkKey, "unlink_key": unlinkKey}
for listId, linkKey, unlinkKey in selectResult
]
return result
[docs]
@exceptionWrap
async def getListsWithKeys(self, listIds: set[str], useParity: int) -> list[dict[str, Any]]:
"""
Get lists with last link and unlink keys from usual tables
Args:
listIds: list ids
useParity: if 1 - get even and odd max link/unlink keys, if 0 - max link/unlink keys
Returns:
list with list ids and its link/unlink keys
"""
listIdsList = list(listIds)
listIdsBatches = [listIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(listIdsList), DB_LIST_LIMIT)]
async def getKeysFromDb(
searchLists: list[str], dbConnection, keyType: str, parity: Optional[str] = None
) -> dict:
"""
Get link/unlink keys from db
Args:
searchLists: list of searching lists
dbConnection: database connection
keyType: link or unlink
parity: none, odd or even
Returns:
dict with listIds and keys
"""
if parity is not None:
rawSql = " UNION ALL ".join(
(
RAW_SQLS[self.dbConfig.type][keyType].format(listId=listId, mod=1 if parity == "odd" else 0)
for listId in searchLists
)
)
res = await dbConnection.fetchall(rawSql)
return dict(res)
else:
queries = []
for listId in searchLists:
if keyType == "link":
selectParams = [models.ListFace.list_id, models.ListFace.link_key]
basicFilters = models.ListFace.list_id == listId
key = models.ListFace.link_key
else:
selectParams = [models.UnlinkAttributesLog.list_id, models.UnlinkAttributesLog.unlink_key]
basicFilters = models.UnlinkAttributesLog.list_id == listId
key = models.UnlinkAttributesLog.unlink_key
query = Query(selectParams).filter(basicFilters).order_by(key.desc()).limit(1)
queries.append(query)
linkKeyAndLists = await dbConnection.fetchall(union_all(*queries))
return dict(linkKeyAndLists)
async with self.adaptor.connection(logger) as connection:
existsListSts = [
Query(models.List.list_id).filter(models.List.list_id.in_(listIdsBatch)).statement
for listIdsBatch in listIdsBatches
]
res = []
for existListSt in existsListSts:
res.extend(await connection.fetchall(existListSt))
existsLists = [listId[0] for listId in res]
existsListsBatches = [
existsLists[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(existsLists), DB_LIST_LIMIT)
]
if not existsListsBatches:
return []
result = dict()
for existsListsBatch in existsListsBatches:
if useParity:
mapListIdMaxEvenLinkKey = await getKeysFromDb(existsListsBatch, connection, "link", "odd")
mapListIdMaxEvenUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink", "odd")
mapListIdMaxOddLinkKey = await getKeysFromDb(existsListsBatch, connection, "link", "even")
mapListIdMaxOddUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink", "even")
result.update(
dict(
zip(
existsListsBatch,
[
{
"link_key_even": mapListIdMaxOddLinkKey.get(listId, None),
"unlink_key_even": mapListIdMaxOddUnlinkKey.get(listId, None),
"link_key_odd": mapListIdMaxEvenLinkKey.get(listId, None),
"unlink_key_odd": mapListIdMaxEvenUnlinkKey.get(listId, None),
}
for listId in existsListsBatch
],
)
)
)
else:
mapListIdMaxLinkKey = await getKeysFromDb(existsListsBatch, connection, "link")
mapListIdMaxUnlinkKey = await getKeysFromDb(existsListsBatch, connection, "unlink")
result.update(
dict(
zip(
existsListsBatch,
[
{
"link_key": mapListIdMaxLinkKey.get(listId, None),
"unlink_key": mapListIdMaxUnlinkKey.get(listId, None),
}
for listId in existsListsBatch
],
)
)
)
return [{"list_id": listId, **listData} for listId, listData in result.items()]
[docs]
@exceptionWrap
async def deleteFaces(
self,
faces: list[str] | None = None,
accountId: str | None = None,
createTimeGte: str | None = None,
createTimeLt: str | None = None,
userData: str | None = None,
listId: str | None = None,
) -> int:
"""
Remove faces.
Args:
accountId: faces account id
faces: faces ids
createTimeGte: creation time greater
createTimeLt: create time lower
userData: user data
listId: list id where face exists
Returns:
removed face count
"""
filters = [
models.Face.face_id.in_(faces) if faces is not None else True,
models.Face.user_data.like("%{}%".format(userData)) if userData is not None else True,
models.Face.create_time >= createTimeGte if createTimeGte is not None else True,
models.Face.create_time < createTimeLt if createTimeLt is not None else True,
models.Face.account_id == accountId if accountId is not None else True,
(
and_(
models.ListFace.face_id == models.Face.face_id,
models.ListFace.list_id == listId,
)
if listId is not None
else True
),
]
async with self.adaptor.connection(logger) as connection:
async def updateHistoryLog(facesForRemove):
linkedLists = await self._moveFacesLinksToLog(facesForRemove, connection)
logger.debug(f"update history log")
return linkedLists
if faces:
logger.debug(f"start delete {len(faces)} faces")
_, blockedFaces = await self.blockFaces(connection, filters=filters, returnBlockedFaces=True)
updatedLists = await updateHistoryLog(blockedFaces)
deleteFacesSt = delete(models.Face).where(and_(*filters))
removedFaceCount = await connection.execute(deleteFacesSt)
logger.debug(f"delete {removedFaceCount} faces")
self._addListsToDeferrerUpdateListLastUpdateTime(updatedLists)
return removedFaceCount
[docs]
@exceptionWrap
async def deleteFacesExtended(
self,
targets: list[str],
accountId: str | None = None,
createTimeGte: str | None = None,
createTimeLt: str | None = None,
userData: str | None = None,
listId: str | None = None,
) -> list[dict] | None:
"""
Remove faces.
Args:
targets: deletion info targets
accountId: faces account id
createTimeGte: creation time greater
createTimeLt: create time lower
userData: user data
listId: list id where face exists
Returns:
list with dicts for each deleted face.
"""
filters = and_(
models.Face.user_data.like("%{}%".format(userData)) if userData is not None else True,
models.Face.create_time >= createTimeGte if createTimeGte is not None else True,
models.Face.create_time < createTimeLt if createTimeLt is not None else True,
models.Face.account_id == accountId if accountId is not None else True,
(
and_(
models.ListFace.face_id == models.Face.face_id,
models.ListFace.list_id == listId,
)
if listId is not None
else True
),
)
async def collectFacesToDeletion(conn) -> Iterable[dict]:
"""Get targets from Sample table."""
selectFaceIds = Query([models.Face.face_id]).filter(filters).limit(DELETE_REQUEST_MAX_SIZE)
faceData = await conn.fetchall(selectFaceIds.statement)
faceDict = {
row["face_id"]: {
"face_id": row["face_id"],
"basic_attributes_samples": [],
"face_descriptor_samples": [],
}
for row in faceData
}
faceIds = list(faceDict.keys())
sampleTypes = []
if "basic_attributes_samples" in targets:
sampleTypes.append(models.SampleType["basic_attributes"].value)
if "face_descriptor_samples" in targets:
sampleTypes.append(models.SampleType["face_descriptor"].value)
if sampleTypes:
selectQuery = Query([models.Sample.face_id, models.Sample.sample_id, models.Sample.type]).filter(
and_(models.Sample.face_id.in_(faceIds), models.Sample.type.in_(sampleTypes))
)
sampleData = await conn.fetchall(selectQuery.statement)
for row in sampleData:
if row["face_id"] in faceDict:
if row["type"] == models.SampleType["basic_attributes"].value:
faceDict[row["face_id"]]["basic_attributes_samples"].append(row["sample_id"])
else:
faceDict[row["face_id"]]["face_descriptor_samples"].append(row["sample_id"])
targetsList = []
for value in faceDict.values():
targetItem = {}
if "basic_attributes_samples" in targets:
targetItem["basic_attributes_samples"] = value["basic_attributes_samples"]
if "face_descriptor_samples" in targets:
targetItem["face_descriptor_samples"] = value["face_descriptor_samples"]
if "face_id" in targets:
targetItem["face_id"] = value["face_id"]
targetsList.append(targetItem)
return faceIds, targetsList
async with self.adaptor.connection(logger) as connection:
if targets:
faceIds, targetsData = await collectFacesToDeletion(connection)
await self.deleteFaces(faces=faceIds, accountId=accountId)
return targetsData
await self.deleteFaces(
accountId=accountId,
createTimeGte=createTimeGte,
createTimeLt=createTimeLt,
userData=userData,
listId=listId,
)
return None
@exceptionWrap
async def _deleteLists(self, lists: list[str], accountId: Optional[str] = None) -> int:
"""
Remove lists.
Args:
accountId: lists account id
lists: lists
Returns:
removed list count
Warnings:
trigger `trg_lists_deletion_log` will insert a data to the table `ListsDeletionLog`
"""
async with self.adaptor.connection(logger) as connection:
deleteListsSt = delete(models.List).where(
and_(models.List.list_id.in_(lists), models.List.account_id @ accountId)
)
return await connection.execute(deleteListsSt)
[docs]
@exceptionWrap
async def deleteLists(self, lists: list[str], accountId: Optional[str] = None, withFaces: bool = False) -> int:
"""
Remove lists.
Args:
accountId: lists account id
lists: lists
withFaces: remove lists with all faces which is contained in these lists
Returns:
removed list count
"""
if not withFaces:
return await self._deleteLists(lists, accountId)
listsFilters = [models.List.list_id.in_(lists), models.List.account_id @ accountId]
async with self.adaptor.connection(logger) as connection:
query = Query([models.List.list_id]).filter(and_(*listsFilters)).with_for_update()
blockListsRes = await connection.fetchall(query.statement)
if not blockListsRes:
return 0
listsToRealDeletion = [blockedList[0] for blockedList in blockListsRes]
query = select([models.ListFace.face_id]).where(
and_(
models.List.list_id.in_(listsToRealDeletion),
models.List.list_id == models.ListFace.list_id,
models.List.account_id @ accountId,
)
)
faceIdsToDeletion = [row[0] for row in await connection.fetchall(query)]
deleteListsSt = delete(models.List).where(and_(*listsFilters))
removedListCount = await connection.execute(deleteListsSt)
pageSize = BACKGROUND_REMOVAL_BATCH_SIZE
pageCount = getPageCount(len(faceIdsToDeletion), pageSize)
coros = [
self.deleteFaces(faces=faceIdsToDeletion[page * pageSize : (page + 1) * pageSize])
for page in range(pageCount)
]
self.backgroundWorker.runNoWait(coros)
return removedListCount
[docs]
@exceptionWrap
async def updateListUserData(self, listId: str, userData: str, accountId: Optional[str] = None) -> int:
"""
Update user data of list
Args:
listId: list id
userData: user data
accountId : account id
Returns:
updated list count
"""
async with self.adaptor.connection(logger) as connection:
updateListSt = (
update(models.List)
.where(and_(models.List.list_id == listId, models.List.account_id @ accountId))
.values(user_data=userData)
)
return await connection.execute(updateListSt)
[docs]
@exceptionWrap
async def isFacesExist(self, faceIds: list[str], accountId: Optional[str] = None) -> bool:
"""
Checking to exist faces or not.
Args:
accountId: account id
faceIds: face ids
Returns:
True if all faces exist else false
"""
async with self.adaptor.connection(logger) as connection:
query = Query([func.count(models.Face.face_id).label("count")])
query = query.filter(models.Face.face_id.in_(faceIds), models.Face.account_id @ accountId)
res = await connection.fetchone(query.statement)
faceCount = res[0]
return True if faceCount == len(faceIds) else False
[docs]
@exceptionWrap
async def isListsExist(self, listIds: set[str], accountId: Optional[str] = None) -> bool:
"""
Checking to exist lists or not.
Args:
accountId: account id
listIds: list ids
Returns:
True if all lists exist else false
"""
async with self.adaptor.connection(logger) as connection:
query = Query([func.count(models.List.list_id).label("count")])
query = query.filter(models.List.list_id.in_(listIds), models.List.account_id @ accountId)
res = await connection.fetchone(query.statement)
listCount = res[0]
return True if listCount == len(listIds) else False
[docs]
@exceptionWrap
async def getListPlusDelta(
self,
listId,
linkKeyGte: Optional[int] = None,
linkKeyLt: Optional[int] = None,
limit: int = 10000,
parity: Optional[int] = None,
) -> list[dict]:
"""
Get attach attributes to lists
Args:
listId: list id
linkKeyLt: upper bound of link key value
linkKeyGte: lower bound of link key value
limit: limit
parity: 0 for odd or 1 for even link keys to search for
Returns:
List of dicts with following keys: "face_id", "link_key"
"""
async with self.adaptor.connection(logger) as connection:
filters = and_(
models.ListFace.link_key < linkKeyLt if linkKeyLt is not None else True,
models.ListFace.link_key >= linkKeyGte if linkKeyGte is not None else True,
models.ListFace.list_id == listId,
models.Attribute.face_id == models.ListFace.face_id,
mod(models.ListFace.link_key, 2) == parity if parity is not None else True,
)
query = Query([models.Attribute.face_id, models.ListFace.link_key])
query = query.filter(filters).order_by(models.ListFace.link_key.asc()).limit(limit)
attributesRows = await connection.fetchall(query.statement)
# used by matcher
res = [dict(zip(("attribute_id", "link_key"), attributesRow)) for attributesRow in attributesRows]
return res
[docs]
@exceptionWrap
async def getListMinusDelta(
self,
listId,
unlinkKeyGte: Optional[int] = None,
unlinkKeyLt: Optional[int] = None,
limit: int = 10000,
parity: Optional[int] = None,
) -> list[dict]:
"""
Get history of detach attribute to lists
Args:
listId: list id
unlinkKeyLt: upper bound of unlink key value
unlinkKeyGte: lower bound of unlink key value
limit: limit
parity: 0 for odd or 1 for even link keys to search for
Returns:
List of dicts with following keys: "face_id", "link_key", "unlink_key"
"""
async with self.adaptor.connection(logger) as connection:
filters = and_(
models.UnlinkAttributesLog.unlink_key < unlinkKeyLt if unlinkKeyLt is not None else True,
models.UnlinkAttributesLog.unlink_key >= unlinkKeyGte if unlinkKeyGte is not None else True,
models.UnlinkAttributesLog.list_id == listId,
mod(models.UnlinkAttributesLog.unlink_key, 2) == parity if parity is not None else True,
models.UnlinkAttributesLog.face_id != None,
)
query = Query(
[
models.UnlinkAttributesLog.face_id,
models.UnlinkAttributesLog.link_key,
models.UnlinkAttributesLog.unlink_key,
]
)
query = query.filter(filters).order_by(models.UnlinkAttributesLog.unlink_key.asc()).limit(limit)
attributesRows = await connection.fetchall(query.statement)
res = [
dict(zip(("attributes_id", "link_key", "unlink_key"), attributesRow))
for attributesRow in attributesRows
]
return res
[docs]
@exceptionWrap
async def cleanLog(self, updateTimeLt: Optional[datetime] = None) -> None:
"""
Remove notes from unlink tables.
Args:
updateTimeLt: lower bound of update time
"""
async with self.adaptor.connection(logger) as connection:
cleanLogSt = delete(models.UnlinkAttributesLog).where(
and_(models.UnlinkAttributesLog.update_time < updateTimeLt if updateTimeLt is not None else True)
)
await connection.execute(cleanLogSt)
[docs]
@exceptionWrap
async def getFacesAttributes(
self,
faceIds: list[str],
retrieveAttrs: Iterable[str],
descriptorVersion: int,
accountId: Optional[str],
getDescriptorAsBytes: bool = False,
descriptorFormat: Literal["raw", "sdk"] = "raw",
) -> list[dict[str, Any]]:
"""
Retrieve attributes
Args:
faceIds: face ids
retrieveAttrs: tuple of attributes what need retrieve
descriptorVersion: requested descriptor version
accountId: account id
getDescriptorAsBytes: whether to get descriptor as bytes otherwise as base64
descriptorFormat: output descriptor format
Returns:
face attributes list with items with the following properties:
face_id: face id
attributes: target-specific face attributes dict
If some attribute data is not found it will be filled with 'empty' value:
None for attributes, [] for samples
"""
faceAttributes = {}
async with self.adaptor.connection(logger) as connection:
async def getAttributeModelData(
faceIds_: list[str], accountId_: Optional[str], retrieveAttrs_: Iterable[str], descriptorVersion_: int
) -> Iterable[dict]:
"""Get attribute data from Attribute and Descriptor tables."""
filters = [models.Face.face_id.in_(faceIds_), models.Face.account_id @ accountId_]
selectQuery = Query([models.Face.face_id])
if (
FaceAttributeTargets.createTime.value in retrieveAttrs_
or FaceAttributeTargets.basicAttributes.value in retrieveAttrs_
):
if FaceAttributeTargets.createTime.value in retrieveAttrs_:
selectQuery = selectQuery.add_column(models.Attribute.create_time)
if FaceAttributeTargets.basicAttributes.value in retrieveAttrs_:
selectQuery = selectQuery.add_columns(
models.Attribute.age, models.Attribute.gender, models.Attribute.ethnicity
)
selectQuery = selectQuery.outerjoin(
models.Attribute, models.Attribute.face_id == models.Face.face_id
)
if FaceAttributeTargets.faceDescriptor.value in retrieveAttrs:
selectQuery = selectQuery.add_column(models.Descriptor.descriptor)
selectQuery = selectQuery.outerjoin(
models.Descriptor,
and_(
models.Descriptor.face_id == models.Face.face_id,
models.Descriptor.descriptor_version == descriptorVersion_,
models.Descriptor.encryption_hash == self._encryptionHashFilter,
),
)
selectQuery = selectQuery.filter(and_(*filters))
attributesData = await connection.fetchall(selectQuery.statement)
responseFields = [str(field) for field in selectQuery.statement.columns]
attributes_ = [{name: data for name, data in zip(responseFields, row)} for row in attributesData]
return attributes_
async def getSampleModelData(faceIds_: list[str], accountId_: Optional[str]) -> Iterable[dict]:
"""Get attribute data from Sample table."""
filters = [models.Face.face_id.in_(faceIds_), models.Face.account_id @ accountId_]
selectQuery = (
Query([models.Face.face_id, models.Sample.sample_id, models.Sample.type])
.outerjoin(models.Sample, models.Sample.face_id == models.Face.face_id)
.filter(and_(*filters))
)
samplesData = await connection.fetchall(selectQuery.statement)
responseFields = [str(field) for field in selectQuery.statement.columns]
samples_ = [{name: data for name, data in zip(responseFields, row)} for row in samplesData]
return samples_
def updateFaceAttributesWithAttributeData(retrieveAttrs_: Iterable[str], attributes_: Iterable[dict]):
"""Update faceAttributes result dict with attribute data."""
for attribute in attributes_:
attributeFaceId = attribute.pop("face_id")
faceAttributes[attributeFaceId] = {}
if FaceAttributeTargets.createTime.value in retrieveAttrs_:
createTime = attribute.pop(FaceAttributeTargets.createTime.value)
if createTime is not None:
createTime = convertTimeToString(createTime, self.storageTime == "UTC")
faceAttributes[attributeFaceId][FaceAttributeTargets.createTime.value] = createTime
if FaceAttributeTargets.faceDescriptor.value in retrieveAttrs_:
descriptor = attribute.pop("descriptor")
if descriptor is not None:
if descriptorFormat == "raw":
descriptorData = {
"descriptor": descriptor if getDescriptorAsBytes else bytesToBase64(descriptor),
"descriptor_version": descriptorVersion,
}
elif descriptorFormat == "sdk":
descriptorData = sdkDescriptorEncode(descriptorVersion, descriptor)
if not getDescriptorAsBytes:
descriptorData = bytesToBase64(descriptorData)
else:
raise ValueError(f"Unknown descriptor format {descriptorFormat}")
else:
descriptorData = None
faceAttributes[attributeFaceId][FaceAttributeTargets.faceDescriptor.value] = descriptorData
if FaceAttributeTargets.basicAttributes.value in retrieveAttrs_:
basicAttributes = attribute if any(attr is not None for attr in attribute.values()) else None
faceAttributes[attributeFaceId][FaceAttributeTargets.basicAttributes.value] = basicAttributes
def updateFaceAttributesWithSampleData(retrieveAttrs_: Iterable[str], samples_: Iterable[dict]):
"""Update faceAttributes result dict with sample data."""
for sample in samples_:
sampleType = sample["type"]
if sampleType is None:
for attr in (SampleType.face_descriptor, SampleType.basic_attributes):
attrType = f"{attr.name}{SAMPLE_POSTFIX}"
if attrType in retrieveAttrs_:
faceAttributes.setdefault(sample["face_id"], {})[attrType] = []
else:
attrType = f"{SampleType(sampleType).name}{SAMPLE_POSTFIX}"
if attrType in retrieveAttrs_:
faceAttributes.setdefault(sample["face_id"], {}).setdefault(attrType, []).append(
sample["sample_id"]
)
if {
FaceAttributeTargets.createTime.value,
FaceAttributeTargets.basicAttributes.value,
FaceAttributeTargets.faceDescriptor.value,
} | set(retrieveAttrs):
attributes = await getAttributeModelData(
faceIds_=faceIds,
accountId_=accountId,
retrieveAttrs_=retrieveAttrs,
descriptorVersion_=descriptorVersion,
)
updateFaceAttributesWithAttributeData(retrieveAttrs_=retrieveAttrs, attributes_=attributes)
if (
FaceAttributeTargets.basicAttributesSamples.value in retrieveAttrs
or FaceAttributeTargets.faceDescriptorSamples.value in retrieveAttrs
):
samples = await getSampleModelData(faceIds_=faceIds, accountId_=accountId)
updateFaceAttributesWithSampleData(retrieveAttrs_=retrieveAttrs, samples_=samples)
return [{"face_id": faceId, "attributes": attribute} for faceId, attribute in faceAttributes.items()]
[docs]
@exceptionWrap
async def getFacesAttributeSamples(self, faceId: list[str], accountId: Optional[str]) -> list[str]:
"""
Get all the attribute samples of the specified face
Args:
faceId: face id
accountId: account id
Returns:
face attribute samples list
Raises:
VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False) if face not found
"""
async with self.adaptor.connection(logger) as connection:
selectSt = select([models.Sample.sample_id]).where(
and_(
models.Sample.face_id == faceId,
models.Face.account_id @ accountId,
models.Sample.face_id == models.Face.face_id,
)
)
sampleIds = list(set(chain(*await connection.fetchall(selectSt))))
if not sampleIds:
# Check the face existence
selectSt = select([models.Face.face_id]).where(
and_(models.Face.face_id == faceId, models.Face.account_id @ accountId)
)
if await connection.scalar(selectSt) is None:
raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
return sampleIds
[docs]
@exceptionWrap
async def putFaceAttributes(self, faceId: str, attribute: TemporaryAttributes, accountId: Optional[str]) -> None:
"""
Create attributes with exception wrap
Args:
faceId: face id
attribute: temporary attribute container
accountId: account id
Raises:
VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False) if face not found
"""
async with self.adaptor.connection(logger) as connection:
async def updateHistoryLog():
linkedLists = await self._moveFacesLinksToLog((faceId,), connection)
updateFaceListSt = (
update(models.ListFace)
.where(models.ListFace.face_id == faceId)
.values(last_update_time=self.currentDBTimestamp, link_key=LINK_SEQUENCE.next_value())
)
await connection.execute(updateFaceListSt)
return linkedLists
# Do select for update
lockFaceCount, _ = await self.blockFaces(
connection, filters=[models.Face.face_id == faceId, models.Face.account_id @ accountId]
)
if lockFaceCount == 0:
raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
linkedLists = await updateHistoryLog()
await connection.execute(delete(models.Attribute).where(models.Attribute.face_id == faceId))
await self.insertFaceAttributeData(connection, faceId=faceId, attribute=attribute)
self._addListsToDeferrerUpdateListLastUpdateTime(linkedLists)
@exceptionWrap
async def _updateAttributeDescriptor(
self, connection: AbstractDBConnection, faceId: str, descriptors: list[Descriptor], generation: int
) -> None:
"""
Partial update attribute
Args:
connection: connection
faceId: face id
descriptors: list of descriptor containers for patch
generation: descriptor generation
"""
if not descriptors:
return
deleteSt = delete(models.Descriptor).where(
and_(models.Descriptor.face_id == faceId, models.Descriptor.encryption_hash != self._encryptionHashFilter)
)
await connection.execute(deleteSt)
descriptorData = [
{
"descriptor_version": descriptor.version,
"descriptor": descriptor.descriptor,
"encryption_hash": self.encryptionCtx.encryptionHash,
"descriptor_obtaining_method": 1,
}
for descriptor in descriptors
]
for descriptorDataItem in descriptorData:
values = {k: v for k, v in descriptorDataItem.items() if k != "descriptor_version"}
updateSt = (
update(models.Descriptor)
.where(
and_(
models.Descriptor.face_id == faceId,
models.Descriptor.descriptor_version == descriptorDataItem["descriptor_version"],
)
)
.values(**values)
)
if not await connection.execute(updateSt):
# insert full values if cannot update
insertDescriptorSt = insert(models.Descriptor).values(
**descriptorDataItem, face_id=faceId, descriptor_generation=generation
)
await connection.execute(insertDescriptorSt)
[docs]
@exceptionWrap
async def updateFaceAttributes(
self,
faceId: str,
attribute: TemporaryAttributes,
accountId: Optional[str] = None,
forceUpdate: Optional[bool] = False,
) -> None:
"""
Update attribute
Args:
faceId: updated attribute face id
attribute: temporary attributes container
accountId: account id
forceUpdate: whether not to compare existing samples and new ones
Raises:
VLException(Error.FaceSampleConflict.format(faceId), 400, False) if samples conflict
VLException(Error.AttributesForUpdateNotFound.format(faceId), 400, False) if attribute for update not found
VLException(Error.FaceNotFound.format(faceId), 404, False) if face not found
"""
async with self.adaptor.connection(logger) as connection:
async def checkSamples(
faceId_: str, filteredSamples_: dict[int, set[str]], updDescriptorVersions_: list[int]
) -> None:
"""
Check conformity of old and new sample ids.
Args:
faceId_: face id
filteredSamples_: {<sample type>: <samples>} map
updDescriptorVersions_: versions of updating descriptors
Raises:
VLException(Error.FaceSampleConflict.format(attributeId), 400, False) if old samples do not
match specified samples in attribute
"""
isNeedToCheckSamples = not forceUpdate
for sampleTypeToCheck in SampleType:
sampleTypeToCheck = sampleTypeToCheck.value
if sampleTypeToCheck not in filteredSamples_:
continue
newSamplesSet = filteredSamples_[sampleTypeToCheck]
selectSt = select([models.Sample.sample_id]).where(
and_(models.Sample.face_id == faceId_, models.Sample.type == sampleTypeToCheck)
)
if sampleTypeToCheck == SampleType.face_descriptor.value:
# (if descriptor samples seems to be updated and descriptors not fully updated)
dbReply = await connection.fetchall(selectSt)
existingSampleIds = set(chain(*dbReply))
if existingSampleIds and existingSampleIds != newSamplesSet:
# mb replace all descriptors
selectSt = select([models.Descriptor.descriptor_version]).where(
and_(
models.Descriptor.face_id == faceId_,
not_(models.Descriptor.descriptor_version.in_(updDescriptorVersions_)),
)
)
notUpdatedDescriptorVersions = await connection.fetchall(selectSt)
if notUpdatedDescriptorVersions:
raise VLException(Error.FaceSampleConflict.format(faceId_), 400, False)
else:
# basic attributes samples
if not isNeedToCheckSamples:
continue
dbReply = await connection.fetchall(selectSt)
existingSampleIds = set(chain(*dbReply))
if existingSampleIds and existingSampleIds != newSamplesSet:
raise VLException(Error.FaceSampleConflict.format(faceId_), 400, False)
async def updateAttributeModel(
faceId_: str,
accountId_: Optional[str],
basicAttributes: Optional[BasicAttributes],
isNeedToUpdGeneration: bool,
) -> int:
"""
Update attribute model.
Increase current descriptor's generation if it needs (depends on descriptor samples in attributes)
Args:
faceId_: face id
accountId_: account id
basicAttributes: basic attributes container
isNeedToUpdGeneration: whether to increase descriptor sample generation or not
Returns:
descriptor generation
"""
if accountId is None:
filters = and_(models.Attribute.face_id == faceId_)
else:
filters = exists([models.Attribute.face_id]).where(
and_(
models.Face.face_id == faceId_,
models.Face.account_id == accountId_,
models.Attribute.face_id == models.Face.face_id,
)
)
values = {
"descriptor_samples_generation": models.Attribute.descriptor_samples_generation
+ int(isNeedToUpdGeneration)
}
if basicAttributes is not None:
values.update(
{
"age": basicAttributes.age,
"gender": basicAttributes.gender,
"ethnicity": basicAttributes.ethnicity,
}
)
updateQuery = (
update(models.Attribute)
.where(filters)
.values(**values)
.returning(models.Attribute.descriptor_samples_generation)
)
generation = await connection.scalar(updateQuery)
if generation is None:
raise VLException(Error.AttributesForUpdateNotFound.format(faceId_), 400, isCriticalError=False)
return generation
async def replaceSamples(faceId_: str, filteredSamples_: dict[int, set[str]]) -> None:
"""
Replace samples
Args:
faceId_: face id
filteredSamples_: {<sample type>: <samples>} map
"""
for sampleType, samples in filteredSamples_.items():
deleteSt = delete(models.Sample).where(
and_(models.Sample.type == sampleType, models.Sample.face_id == faceId_)
)
await connection.execute(deleteSt)
for sampleType, sampleIds in filteredSamples_.items():
for sampleId in sampleIds:
query = insert(models.Sample).values(face_id=faceId_, sample_id=sampleId, type=sampleType)
await connection.execute(query)
async def recreateLinks(faceId_: str, updDescriptorVersions_: list[int]) -> list[str]:
"""
Recreate face-list links if the descriptor of the current version was changed.
Needed for matcher if a descriptor was changed.
Args:
faceId_: face id
updDescriptorVersions_: versions of updating descriptors
"""
needToUpdate = self.defaultDescriptorVersion in updDescriptorVersions_
if not needToUpdate:
return []
updateLinkKeysSt = (
update(models.ListFace)
.where(models.ListFace.face_id.in_([faceId_]))
.values(link_key=LINK_SEQUENCE.next_value())
)
linkedLists = await self._moveFacesLinksToLog((faceId,), connection)
await connection.execute(updateLinkKeysSt)
return linkedLists
# Do select for update
lockFaceCount, _ = await self.blockFaces(
connection, filters=[models.Face.face_id == faceId, models.Face.account_id @ accountId]
)
if lockFaceCount == 0:
raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
# Prepare map: {<sample type>: <samples>}
filteredSamples = {}
if len(attribute.basicAttributesSamples) > 0:
filteredSamples[SampleType[AttributeSample.basic_attributes.value].value] = set(
attribute.basicAttributesSamples
)
if len(attribute.descriptorSamples) > 0:
filteredSamples[SampleType[AttributeSample.face_descriptors.value].value] = set(
attribute.descriptorSamples
)
# Check no sample conflict
updDescriptorVersions = [descr.version for descr in attribute.descriptors]
await checkSamples(
faceId_=faceId, filteredSamples_=filteredSamples, updDescriptorVersions_=updDescriptorVersions
)
# Update attribute data
isNeedToUpdGeneration = filteredSamples.get(SampleType.face_descriptor.value) is not None
generation = await updateAttributeModel(
faceId_=faceId,
accountId_=accountId,
basicAttributes=attribute.basicAttributes,
isNeedToUpdGeneration=isNeedToUpdGeneration,
)
if forceUpdate:
await replaceSamples(faceId_=faceId, filteredSamples_=filteredSamples)
await self._updateAttributeDescriptor(
connection=connection, faceId=faceId, descriptors=attribute.descriptors, generation=generation
)
listsForUpdate = await recreateLinks(faceId_=faceId, updDescriptorVersions_=updDescriptorVersions)
self._addListsToDeferrerUpdateListLastUpdateTime(listsForUpdate)
[docs]
@exceptionWrap
async def deleteFaceAttributes(self, faceId: str, accountId: Optional[str] = None) -> None:
"""
Delete face attributes.
Args:
faceId: face id to remove its attributes
accountId: account id
"""
async with self.adaptor.connection(logger) as connection:
await self.blockFaces(connection, [models.Face.face_id == faceId])
faceCheckFilter = and_(models.Face.face_id == faceId, models.Face.account_id @ accountId)
updateFaceSt = update(models.Face).where(faceCheckFilter).values(last_update_time=self.currentDBTimestamp)
faceCount = await connection.execute(updateFaceSt)
if not faceCount:
raise VLException(Error.FaceNotFound.format(faceId), 404, isCriticalError=False)
updateListSt = (
update(models.List)
.where(
exists(
select([models.List.list_id]).where(
and_(
models.List.list_id == models.ListFace.list_id,
models.ListFace.face_id == models.Face.face_id,
faceCheckFilter,
)
)
)
)
.values(last_update_time=self.currentDBTimestamp)
)
await connection.execute(updateListSt)
deleteAttributeSt = delete(models.Attribute).where(
exists(
select([models.Attribute.face_id]).where(
and_(faceCheckFilter, models.Attribute.face_id == models.Face.face_id)
)
)
)
await connection.execute(deleteAttributeSt)
[docs]
@exceptionWrap
async def getDescriptorsBatchByFaceIds(
self,
facesIds: set[str],
accountId: Optional[str] = None,
checkObjectExistence: bool = True,
descriptorVersion: Optional[int] = None,
receiveExternalId: Optional[bool] = None,
) -> dict[str, Union[list[bytes], int, list[str]]]:
"""
Get descriptors batch by faces Ids.
! Splitting into batches is used due to enormous queries (25Mb for 100k faces) that fails Oracle DB. (LUNA-3734)
Args:
facesIds: faces ids the descriptors were attached to
accountId: account id of the faces
checkObjectExistence: check faces existence or not
descriptorVersion: requested descriptor version
receiveExternalId: receive or not external ids
Returns:
Dict:
descriptors: list[bytes]
descriptorVersion: int
faceUuids: list[str]
notFoundUuids: list[str]
notExtractedUuids: list[str]
externalIds: list[str]
Raises:
VLException(Error.FacesNotFound)
"""
if descriptorVersion is None:
descriptorVersion = self.defaultDescriptorVersion
selectFields = [models.Descriptor.descriptor, models.Face.face_id]
if receiveExternalId:
selectFields.append(models.Face.external_id)
# 1k batching splitting
facesIdsList = list(facesIds)
faceIdsBatches = [facesIdsList[idx : idx + DB_LIST_LIMIT] for idx in range(0, len(facesIdsList), DB_LIST_LIMIT)]
queries = [
select(selectFields).where(
and_(
models.Descriptor.face_id.in_(faceIds_),
models.Descriptor.descriptor_version == descriptorVersion,
# if need filtration by account
models.Face.face_id == models.Descriptor.face_id,
models.Face.account_id @ accountId,
)
)
for faceIds_ in faceIdsBatches
]
getNotExtractedQuery = lambda existFaceIds: Query([models.Face.face_id]).filter(
and_(
models.Face.face_id.in_(list(set(facesIds).difference(set(existFaceIds)))),
models.Face.account_id @ accountId,
~exists(
select([models.Face.face_id]).where(
and_(
models.Face.face_id == models.Descriptor.face_id,
models.Descriptor.descriptor_version == descriptorVersion,
)
)
),
)
)
async with self.adaptor.connection(logger) as connection:
unitedResults = []
for query in queries:
unitedResults.extend(await connection.fetchall(query))
# 1k batching join
if receiveExternalId:
descriptors, replyFaceIds, externalIds = tuple(zip(*unitedResults)) or ([], [], [])
externalIds = list(map(lambda x: x or "", externalIds))
else:
descriptors, replyFaceIds = tuple(zip(*unitedResults)) or ([], [])
externalIds = None
notFoundUuids = []
if len(replyFaceIds) != len(facesIds):
notExtractedFaceIds = [
faceId[0] for faceId in (await connection.fetchall(getNotExtractedQuery(replyFaceIds).statement))
]
if checkObjectExistence:
firstNotFoundFace = await self.getNonexistentFaceId(facesIds, accountId, connection)
if firstNotFoundFace:
raise VLException(Error.FacesNotFound.format(firstNotFoundFace), 400, isCriticalError=False)
else:
notFoundUuids = list(facesIds.difference(notExtractedFaceIds).difference(replyFaceIds))
else:
notExtractedFaceIds = []
result = dict(
descriptors=list(map(bytes, descriptors)),
uuids=replyFaceIds,
descriptorVersion=descriptorVersion,
notFoundUuids=notFoundUuids,
notExtractedUuids=notExtractedFaceIds,
externalIds=externalIds,
)
return result
[docs]
@exceptionWrap
async def getListFacesDescriptorsBatch(
self,
listId: str,
linkKeyGte: int,
limit: int,
descriptorVersion: Optional[int] = None,
parity: Optional[int] = None,
receiveExternalId: Optional[bool] = None,
) -> dict[str, Union[list[bytes], int, list[int], list[str]]]:
"""
Get descriptors batch
Args:
listId: list id the descriptors were attached to
linkKeyGte: the lower including boundary
limit: descriptors count to return
descriptorVersion: descriptor version
parity: 0 for odd or 1 for even link keys to search for
receiveExternalId: receive or not external ids
Returns:
Dict:
descriptors: list[bytes]
descriptorVersion: int
faceUuids: list[str]
linkKeys: list[int]
faceIds: list[str]
externalIds: list[str]
"""
if descriptorVersion is None:
descriptorVersion = self.defaultDescriptorVersion
selectFields = [models.Descriptor.descriptor, models.ListFace.link_key, models.Descriptor.face_id]
if receiveExternalId:
selectFields.append(models.Face.external_id)
query = Query(selectFields).filter(
models.Descriptor.face_id == models.Face.face_id if receiveExternalId else True,
models.Descriptor.face_id == models.ListFace.face_id,
models.Descriptor.descriptor_version == descriptorVersion,
models.ListFace.list_id == listId,
models.ListFace.link_key >= linkKeyGte,
mod(models.ListFace.link_key, 2) == parity if parity is not None else True,
)
if parity is not None:
query = query.order_by(
models.ListFace.list_id, mod(models.ListFace.link_key, 2), models.ListFace.link_key.asc()
)
else:
query = query.order_by(models.ListFace.link_key.asc())
query = query.limit(limit)
async with self.adaptor.connection(logger) as connection:
if self.dbConfig.type == "oracle":
global indexFaceName, indexDescriptorName
if not (indexFaceName is indexDescriptorName is None):
query = query.with_hint(
models.ListFace,
f"INDEX(LIST_FACE LINK_KEY_FUNC_INDEX) INDEX(FACE {indexFaceName}) "
f"INDEX(DESCRIPTOR {indexDescriptorName})",
)
else:
logger.warning("Database indexes may not using")
if self.dbConfig.type == "oracle":
queryCursor = await connection.fetchall(getCompiledQuery(query, self.dbConfig.type))
else:
# for postgres compiling wrong request in mod(models.ListFace.link_key, 2) == parity
queryCursor = await connection.fetchall(query.statement)
if receiveExternalId:
descriptors, linkKeys, faceIds, externalIds = tuple(zip(*queryCursor)) or ([], [], [], [])
externalIds = list(map(lambda x: x or "", externalIds))
else:
descriptors, linkKeys, faceIds = tuple(zip(*queryCursor)) or ([], [], [])
externalIds = None
result = dict(
descriptors=descriptors,
descriptorVersion=descriptorVersion,
linkKeys=linkKeys,
uuids=faceIds,
externalIds=externalIds,
)
return result
def _prepareMissingDescriptorsFilters(
self,
missingVersion,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
) -> and_:
"""
Prepare filters for missing descriptors query.
Args:
missingVersion: missing descriptor version
accountId: account id of the attributes
faceIds: list of face ids
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
Returns:
prepared sa.and_() with filters
"""
nestedDescriptor = aliased(models.Descriptor) # table from the nested query need to be aliased
return and_(
# whether we need to reextract
models.Attribute.face_id == models.Descriptor.face_id,
models.Descriptor.descriptor_version == self.defaultDescriptorVersion,
# filters
models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
models.Face.account_id @ accountId,
models.Attribute.face_id >= faceIdGte if faceIdGte is not None else True,
models.Attribute.face_id < faceIdLt if faceIdLt is not None else True,
models.Attribute.face_id.in_(faceIds) if faceIds is not None else True,
# whether descriptor was not reextracted
~exists(
select([nestedDescriptor.descriptor_version]).where(
and_(
nestedDescriptor.face_id == models.Attribute.face_id,
nestedDescriptor.descriptor_version == missingVersion,
nestedDescriptor.descriptor_generation == models.Attribute.descriptor_samples_generation,
)
)
),
)
[docs]
@exceptionWrap
async def getMissingDescriptors(
self,
missingVersion: int,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
limit: int = 1000,
) -> list[dict[str, Union[str, list[str]]]]:
"""
Get missing descriptors of 'missingVersion' version:
get attributes not having descriptor
get samples
Args:
missingVersion: missing descriptor version
accountId: account id of the attributes
faceIds: list of attribute ids
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
limit: maximum attributes amount to return
Returns:
data array in the format:
{"face_id": "<face_id>", "generation": <generation>, "samples": ["<sample_id>"]}
"""
async with self.adaptor.connection(logger) as connection:
selectSt = (
select([models.Attribute.face_id])
.where(self._prepareMissingDescriptorsFilters(missingVersion, accountId, faceIds, faceIdGte, faceIdLt))
.order_by(models.Attribute.face_id.asc())
.limit(limit)
)
facesRes = await connection.fetchall(selectSt)
# {face_id: {face_id: id, samples:[]}}
result = {faceId: {"face_id": faceId, "samples": []} for (faceId,) in facesRes}
samplesSt = select([models.Sample.face_id, models.Sample.sample_id]).where(
and_(models.Sample.face_id.in_(result.keys()), models.Sample.type == SampleType.face_descriptor.value)
)
samplesRes = await connection.fetchall(samplesSt)
for faceId, sampleId in samplesRes:
result[faceId]["samples"].append(sampleId)
return list(result.values())
[docs]
@exceptionWrap
async def getMissingDescriptorsCount(
self,
missingVersion: int,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
) -> int:
"""
Get count of missing descriptors of 'missingVersion' version.
Args:
missingVersion: missing descriptor version
accountId: account id of the attributes
faceIds: list of face ids
faceIdGte: lower attribute id including boundary
faceIdLt: upper attribute id excluding boundary
Returns:
missing descriptors count
"""
async with self.adaptor.connection(logger) as connection:
selectSt = select([func.count(models.Attribute.face_id)]).where(
self._prepareMissingDescriptorsFilters(missingVersion, accountId, faceIds, faceIdGte, faceIdLt)
)
count = await connection.scalar(selectSt)
return count
[docs]
@exceptionWrap
async def getDescriptorsCount(self, accountId: Optional[str] = None) -> list[dict[str, int]]:
"""
Get face descriptors count with filters.
Args:
accountId: account id
Returns:
list of dict. Keys of each dict are "descriptor_version" and "descriptor_count"
"""
async with self.adaptor.connection(logger) as connection:
filters = and_(models.Face.account_id @ accountId, models.Face.face_id == models.Descriptor.face_id)
selectSt = (
select([models.Descriptor.descriptor_version, func.count(models.Descriptor.face_id)])
.where(filters)
.group_by(models.Descriptor.descriptor_version)
)
resRows = await connection.fetchall(selectSt)
return [{"descriptor_version": version, "descriptor_count": count} for version, count in resRows]
def _prepareMissingBasicAttrsFilters(
self,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
) -> and_:
"""
Prepare filters for missing basic attributes query.
Args:
accountId: account id of the attributes
faceIds: list of face ids
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
Returns:
prepared sa.and_() with filters
"""
return and_(
# whether we need to reextract
models.Attribute.ethnicity.is_(None),
# filters
models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
models.Face.account_id @ accountId,
models.Attribute.face_id >= faceIdGte if faceIdGte is not None else True,
models.Attribute.face_id < faceIdLt if faceIdLt is not None else True,
models.Attribute.face_id.in_(faceIds) if faceIds is not None else True,
)
[docs]
@exceptionWrap
async def getMissingBasicAttrs(
self,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
limit: int = 1000,
) -> list[dict[str, Union[str, list[str]]]]:
"""
Get missing basic attributes:
get attributes without basic attributes
get samples
Args:
accountId: account id of the attributes
faceIds: list of attribute ids
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
limit: maximum attributes amount to return
Returns:
data array in the format:
{"face_id": "<face_id>", "samples": ["<sample_id>"]}
"""
async with self.adaptor.connection(logger) as connection:
selectSt = (
select([models.Attribute.face_id])
.where(self._prepareMissingBasicAttrsFilters(accountId, faceIds, faceIdGte, faceIdLt))
.order_by(models.Attribute.face_id.asc())
.limit(limit)
)
facesRes = await connection.fetchall(selectSt)
# {face_id: {face_id: id, samples:[]}}
result = {face[0]: {"face_id": face[0], "samples": []} for face in facesRes}
samplesSt = select([models.Sample.face_id, models.Sample.sample_id]).where(
and_(models.Sample.face_id.in_(result.keys()), models.Sample.type == SampleType.face_descriptor.value)
)
samplesRes = await connection.fetchall(samplesSt)
for sample in samplesRes:
result[sample[0]]["samples"].append(sample[1])
return list(result.values())
[docs]
@exceptionWrap
async def getMissingBasicAttrsCount(
self,
accountId: Optional[str] = None,
faceIds: Optional[list[str]] = None,
faceIdGte: Optional[str] = None,
faceIdLt: Optional[str] = None,
) -> int:
"""
Get count of missing basic attributes.
Args:
accountId: account id of the attributes
faceIds: list of face ids
faceIdGte: lower attribute id including boundary
faceIdLt: upper attribute id excluding boundary
Returns:
missing basic attributes count
"""
async with self.adaptor.connection(logger) as connection:
selectSt = select([func.count(models.Attribute.face_id)]).where(
self._prepareMissingBasicAttrsFilters(accountId, faceIds, faceIdGte, faceIdLt)
)
count = await connection.scalar(selectSt)
return count
[docs]
@exceptionWrap
async def getBasicAttrsCount(self, accountId: Optional[str] = None) -> int:
"""
Get count of basic attributes.
Args:
accountId: account id of the attributes
Returns:
basic attributes count
"""
async with self.adaptor.connection(logger) as connection:
selectSt = select([func.count(models.Attribute.face_id)]).where(
and_(
models.Attribute.ethnicity.isnot(None),
models.Face.account_id @ accountId,
models.Attribute.face_id == models.Face.face_id if accountId is not None else True,
)
)
count = await connection.scalar(selectSt)
return count
@staticmethod
def _prepareDescriptorsFilters(
descriptorVersion: int, faceIdGte: Optional[str] = None, faceIdLt: Optional[str] = None
) -> BooleanClauseList:
"""
Prepare filters for descriptors query.
Args:
descriptorVersion:descriptor version
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
Returns:
prepared sa.and_() with filters
"""
return and_(
models.Descriptor.descriptor_version == descriptorVersion,
models.Descriptor.face_id >= faceIdGte if faceIdGte is not None else True,
models.Descriptor.face_id < faceIdLt if faceIdLt is not None else True,
)
[docs]
@exceptionWrap
async def deleteDescriptors(
self, descriptorVersion: int, faceIdGte: Optional[str] = None, faceIdLt: Optional[str] = None, limit: int = 1000
) -> list[dict[str, Union[str, int, list[str]]]]:
"""
Delete descriptors by version:
delete descriptors of preassigned version
Args:
descriptorVersion: descriptor version
faceIdGte: lower face id including boundary
faceIdLt: upper face id excluding boundary
limit: maximum attributes amount to delete
Returns:
list of face ids id for deleted descriptors of preassigned version
"""
async with self.adaptor.connection(logger) as connection:
selectSt = (
select([models.Descriptor.face_id])
.where(self._prepareDescriptorsFilters(descriptorVersion, faceIdGte, faceIdLt))
.order_by(models.Descriptor.face_id.asc())
.limit(limit)
)
faces = [row["face_id"] for row in await connection.fetchall(selectSt)]
deleteSt = delete(models.Descriptor).where(
and_(models.Descriptor.face_id.in_(faces), models.Descriptor.descriptor_version == descriptorVersion)
)
await connection.execute(deleteSt)
return faces
[docs]
async def blockFaces(
self, connection, filters: list[ColumnClause], returnBlockedFaces: bool = False
) -> tuple[int, Union[tuple[str, ...], None]]:
"""
Block some faces rows in the db, mechanics "select for update".
Returns:
number of blocked faces and blocked faces or number of blocked faces and None, depends on the argument
"""
query = select([models.Face.face_id]).where(and_(*filters)).with_for_update()
if self.dbConfig.type == "oracle" or returnBlockedFaces:
_blockedFaces = await connection.fetchall(query)
blockedFaces = tuple(face[0] for face in _blockedFaces)
blockedFaceCount = len(blockedFaces)
else:
blockedFaces = None
count = select([func.count()], from_obj=aliased(query))
blockedFaceCount = await connection.scalar(count)
logger.debug(f"lock {blockedFaceCount} faces")
if returnBlockedFaces:
return blockedFaceCount, blockedFaces
else:
return blockedFaceCount, None
[docs]
async def updateListLastUpdateTime(self, listId: str):
"""
Update last update time of lists.
Args:
listId: list id
"""
async with self.adaptor.connection(logger) as connection:
st = (
update(models.List)
.where(models.List.list_id == listId)
.values(last_update_time=self.currentDBTimestamp)
)
await connection.execute(st)
async def _moveFacesLinksToLog(
self, faceIds: Union[list[str], tuple[str, ...]], connection: AbstractDBConnection
) -> list[str]:
"""
Move face to list links to unlink attributes log
Args:
faceIds: face ids
connection: open connection
Returns:
linked to faces list ids
"""
query = select([models.ListFace.list_id, models.ListFace.face_id, models.ListFace.link_key]).where(
models.ListFace.face_id.in_(faceIds)
)
insertSt = insert(models.UnlinkAttributesLog).from_select(
[
models.UnlinkAttributesLog.list_id,
models.UnlinkAttributesLog.face_id,
models.UnlinkAttributesLog.link_key,
],
query,
)
if self.dbConfig.type == "oracle":
listCount = await connection.execute(insertSt)
if listCount:
query = select([models.ListFace.list_id]).where(models.ListFace.face_id.in_(faceIds))
_linkedLists = await connection.fetchall(query)
else:
_linkedLists = []
else:
st = insertSt.returning(models.UnlinkAttributesLog.list_id)
_linkedLists = await connection.fetchall(st)
linkedLists = [listId[0] for listId in _linkedLists]
return linkedLists
[docs]
async def getListDeletions(
self,
deletionTimeLt: Optional[datetime] = None,
deletionTimeGte: Optional[datetime] = None,
page: Optional[int] = 1,
pageSize: Optional[int] = 100,
):
"""
Get lists deletion logs
Args:
deletionTimeLt: upper bound of list deletion time
deletionTimeGte: lower bound of list deletion time
page: page
pageSize: page size
Warnings:
trigger `trg_lists_deletion_log` inserts a data for table `ListsDeletionLog`
Returns:
list of deletions in the reverse order of deletion of lists
"""
query = (
select(
[
models.ListsDeletionLog.list_id,
models.ListsDeletionLog.account_id,
models.ListsDeletionLog.create_time,
models.ListsDeletionLog.deletion_id,
models.ListsDeletionLog.deletion_time,
]
)
.where(
and_(
models.ListsDeletionLog.deletion_time >= deletionTimeGte if deletionTimeGte else True,
models.ListsDeletionLog.deletion_time < deletionTimeLt if deletionTimeLt else True,
)
)
.order_by(models.ListsDeletionLog.deletion_time.desc())
.offset((page - 1) * pageSize)
.limit(pageSize)
.offset((page - 1) * pageSize)
.limit(pageSize)
)
async with self.adaptor.connection(logger) as connection:
records = await connection.fetchall(query)
listsDeletions = []
for row in records:
removedList = dict(row)
removedList["create_time"] = convertTimeToString(removedList["create_time"], self.storageTime == "UTC")
removedList["deletion_time"] = convertTimeToString(
removedList["deletion_time"], self.storageTime == "UTC"
)
listsDeletions.append(removedList)
return listsDeletions
[docs]
async def cleanListsDeletionLog(self, deletionTimeLt: datetime) -> int:
"""
Clear lists deletion log
Args:
deletionTimeLt: upper bound of list deletion time
Returns:
count of removed rows
"""
query = delete(models.ListsDeletionLog).where(models.ListsDeletionLog.deletion_time < deletionTimeLt)
async with self.adaptor.connection(logger) as connection:
count = await connection.execute(query)
return count
[docs]
def getRuntimeChecks(self, _) -> list[tuple[str, Awaitable]]:
"""Checks for healthcheck."""
return [("faces_db", checkSql(self.adaptor))]
[docs]
async def probe(self) -> bool:
"""Ensure provided config is valid. Create new connection. Can be used without initialization"""
checkState = await checkConnectionToDB(dbSetting=self.dbSettings, postfix="faces", asyncCheck=True)
if not checkState:
return False
if self.useMaterialViews:
if self.databaseNumber > 0:
modelNames = [table.__tablename__ for table in FacesBase.__subclasses__()]
checkState = await checkConnectionToDB(
dbSetting=self.dbSettings,
modelNames=modelNames,
postfix="luna-faces-materialized_views",
asyncCheck=True,
)
else:
modelNames = [table.__tablename__ for table in BaseMatViewsParity.__subclasses__()]
checkState = await checkConnectionToDB(
dbSetting=self.dbSettings,
modelNames=modelNames,
postfix="luna-faces-materialized_views",
asyncCheck=True,
)
return checkState