General information¶
Warning
Matching plugins refers only to Luna Python Matcher proxy service.
The service supports the system of matching plugins.
By default, all matching requests are processed by Luna-Python-Matcher-Proxy by redirecting requests to Luna-Python-Matcher. It is possible that matching requests processing is slower than it needs for several reasons, including:
large amount of data and inability to speed up request by any database configuration changes, e.g. create an index in a database that speeds up request
the way of data storage - descriptor and entity id (face_id/event_id) are kept in different database tables (due to Luna Platform restrictions), filters, which specified in matching request also can be presented in a separate database table, what slows down the request processing speed
internal database specific restrictions
It is possible to separate some groups of requests and improve their processing speed by utilizing matching plugins, including by transferring data to another storage with a specific way of data storage which makes possible the fastest matching in comparison to the default way (see plugin data source). For example:
matching requests where all faces (let’s say that all matching candidates are faces) are linked to one list and any other filters do not specify in the request.
In this case, it is possible to duplicate those candidates to other data storage than the default data storage and create a matching plugin, which will only match specified references with these candidates, but not with any other entities.
The matching request processing will be faster in comparison to the default way, because the plugin will not spend time to separate faces, which linked to list from all faces, which store in the database.
matching requests where all candidates are events and specify only one filter - event_ids and it needs to match only by bodies, not by faces.
In this case, it is possible to duplicate all event_id and its body descriptors to other data storage than the default data storage and create a matching plugin, that which will match specified reference with these candidates, but not with any other entities.
The matching request processing will be faster in comparison to the default way, because the plugin will not spend time to separate events with bodies from all events and overlook filters.
It is possible to use built-in matching plugins or create your own matching plugins.
Each matching request is presented in the form of all possible combinations of candidates and references, then each such combination (further sub-request means combination of reference and candidates) is processed as a separate sub-request as follows:
Get the sub-request matching cost (see matching cost for description).
- Choose the way for the sub-request processing using the lowest estimated matching cost: matching plugin or Luna-Python-Matcher.
If in the previous step Luna-Python-Matcher was selected, it will process sub-request, returns the response to the Luna-Python-Matcher-Proxy.
If in the previous step matching plugin was selected, it will process sub-request. If sub-request was successfully processed, the response returns to the Luna-Python-Matcher-Proxy. If a sub-request was not successfully processed, it will try to process by Luna-Python-Matcher.
If the request was successfully processed by matching plugin and plugin does not have access to all matching targets which specified in sub-request, then Luna-Python-Matcher-Proxy will enrich data before next step, see matching targets for details.
The Luna-Python-Matcher-Proxy collects results from all sub-requests, sorts them in the right order, and replies to the user.
Matching cost¶
Matching cost is a float numeric expression of matching request process complexity using a plugin. Matching cost is necessary to choose the best way to process a matching request: Luna-Python-Matcher service or one or more plugins.
The matching cost value for the Luna-Python-Matcher service is 100. If there are several plugins, then the matching cost value will be calculated for each plugin. The matching plugin with the lowest matching cost will be used if its matching cost is lower than the Luna-Python-Matcher matching cost. All requests with matching costs greater than 100 will be processed in the Luna-Python-Matcher service. If there are no plugins, Luna-Python-Matcher will be used for the request processing.
Each plugin should implement getMatchingCost method that should take match request as argument and return float if the request can be processed with the plugin or “None” if it cannot.
Example:
from typing import Optional
from app_proxy.matcher.base_struct import MatchRequest
LIST_ID_1 = "..."
LIST_ID_2 = "..."
class BestMatcher(IMatcher):
...
def isGoodFilters(self, filters: dict) -> bool:
"""
example of a function that enables you to understand whether query filters are appropriate to perform matching using this plugin.
"""
return filters.get("list_id") in (LIST_ID_1, LIST_ID_2)
def getMatchingCost(self, matchRequest: MatchRequest) -> Optional[float]:
"""
example of calculating matching cost:
if the request does not match the filters - "None" will be returned
if the request passes through the filters, then the matching cost will be <10*number of targets in the request>
"""
if not self.isGoodFilters(matchRequest.candidate.filters):
return None
return 10 * len(matchRequest.candidate.targets)
Matching targets¶
The Luna-Python-Matcher has access to all data of matching entities, so it can process matching requests with all targets. Matching plugins may not have access to data, which is specified in request targets. In this case, Luna-Python-Matcher-Proxy will enrich response of plugin with missing targets data, e.g.:
matching response contains next targets: face_id, user_data and similarity and the chosen matching plugin does not have access to user_data field
matching plugin match reference with specified face_ids and return the matching response to the Luna-Python-Matcher-Proxy, which contains only pairs of face_id and similarity
for every match candidate in result, Luna-Python-Matcher-Proxy will get user_data from the main database by face_id and merge face_id and similarity with user_data
return enriched response with specified targets to the user
matching response contains next targets: age, gender (all candidates are events’ faces) and the chosen matching plugin have access only to event_id, descriptor, and age fields
matching plugin match reference and return the matching response to the Luna-Python-Matcher-Proxy, which contains only pairs of event_id, age and similarity
for every match candidate in result, Luna-Python-Matcher-Proxy will get gender from the main database by event_id and merge event_id with gender, also after that it drops non-required event_id and similarity from the response
return a prepared response with specified targets to the user
Built-in matching plugins¶
Luna-Python-Matcher-Proxy provides several built-in matching plugins, see their description in the corresponding chapters:
Note
It is possible to use built-in matching plugins as examples for new user matching plugins.
User matching plugins¶
Matching plugins should be written in the Python programming language.
To create a user matching plugin it is required to:
select the data source and synchronize data from Luna Platform if it needs (see plugin data source for details)
write the code (see plugin code for details)
It is possible that the matching plugin will redirect the matching request to remote service, but there are only several ways to get the correct match result of two descriptors matching:
using match function from vlutils package that presented in example (for additional information about function requirements see DB matching section from Luna-Faces and Luna-Events documentation)
using FSDK matching (FSDK documentation is presented separately)
Plugin data source¶
To speed up request processing, each matching plugin may use a separated data source instead of the default one ( luna-events, faces, or attributes database (see Database chapter of Luna-Faces/Luna-Events services documentation for more info)) such as a separate database, a new table in the existing database, in-memory cache, etc.
If it uses an external database plugin needs access to the source databases to fill them in. To get access to the source events, faces, or attributes databases, see the Luna Platform settings.
There are several ways to synchronize data in a custom data source with the default database, among them:
streaming replication (for more information see postgres streaming replication)
- materialized views (for more information see postgres materialized views). Example:
CREATE MATERIALIZED VIEW BEST_MATERIALIZE_VIEW AS event_id, user_data, age, gender, create_time from eventNote
It also needs to refresh materialized view to keep data up-to-date, see documentation for more information
triggers (for more information see postgres triggers)
Plugin code¶
Requirements for matching plugins:
plugin should represent a class inherited from BaseMatcherPlugin, which implements all its abstract methods
availableReferenceTypes must represent reference types that can be processed using the matcher
availableCandidateTypes must represent candidate types that can be processed using the matcher
availableDescriptorTypes must represent descriptor types that can be processed using the matcher
availableSortOrder must represent sorting order that can be processed using the matcher
function getAvailableTargets must return targets that the matcher can process and return in reply to a user request for each match result
from app_proxy.matcher.plugin import BaseMatcherPlugin from app_proxy.matcher.base_struct import IMatcher, MatchUnitType class BestMatcher(IMatcher): availableReferenceTypes = frozenset([MatchUnitType.face, MatchUnitType.descriptor]) availableCandidateTypes = frozenset([MatchUnitType.face]) availableDescriptorTypes = frozenset([DescriptorType.face]) availableSortOrder = frozenset(["similarity"]) def getAvailableTargets(): return frozenset(["face_id", "user_data"]) ... class BestPlugin(BaseMatcherPlugin): def getMatcher(self) -> IMatcher: return BestMatcher() async def initialize(self) -> None: await super().initialize() print('plugin initialization has been completed') async def close(self) -> None: print('plugin has been successfully stopped')
- plugin should implement match method that takes MatchRequest as input and returns MatchResult as a response
from app_proxy.matcher.base_struct import IMatcher, MatchRequest, MatchResult class BestMatcher(IMatcher): ... async def match(self, matchRequest: MatchRequest) -> MatchResult: """must return match result"""
- if any error occurred during matching, it should raise Rematch exception
from app_proxy.matcher.base_struct import IMatcher, MatchRequest, MatchResult from app_proxy.matcher.exceptions import Rematch class BestMatcher(IMatcher): ... async def _match(self, matchRequest: MatchRequest) -> MatchResult: """must return match result""" async def match(self, matchRequest: MatchRequest) -> MatchResult: try: result: MatchResult = await self._match(matchRequest) except Exception as exc: raise Rematch from exc return resultNote
Rematch exception generalizes errors during matching progress, and its appearance supposes that matching request will be processed using Luna-Python-Matcher.
The application is passed during the initialization of the plugin to get the settings that are used by the service (taking into account config-reload), as well as some other features (for example, a logger or an adapter for connecting to the database). See application for more information).
The example of receiving faces for the plugin using the adapter to the luna-faces database is given below.
from app_proxy.matcher.base_struct import IMatcher class BestMatcher(IMatcher): def __init__(self, app: "LunaApplication") -> None: super().__init__(app) self.dbContext = app.ctx.facesDBContext async def match(self, matchRequest: MatchRequest) -> MatchResult: ... facesOfInterest: list[dict[str, str]] = await self.dbContext.getFaces(faceIds=matchRequest.candidate.filters["face_ids"]) ...
Matching plugin implementation example:
from vlutils.descriptors.match import match class BestMatcher(IMatcher): async def _match(self, descriptor_1, descriptor_2, descriptor_version: int = 59) -> float: """Match two descriptors and return similarity as result""" similarity = match(descriptor_1, descriptor_2, 59) return similarity
Matching plugins can store their configuration in different ways. For example, in the configuration file or in the Luna-Configurator service. An example of a setting creation and receiving using luna3 library is given below. For additional information, see luna-configurator documentation.
from luna3.configurator.configurator import ConfiguratorApi
configuratorApi = ConfiguratorApi(origin="http://configurator_host:configurator_port", api=1)
configuratorApi.putLimitation(
"LUNA_CACHED_LIST_PLUGIN", defaultValue={"cost": 10, "shards": ["http:/127.0.0.1:5301"]}, services=["new_plugin"],
validationSchema={
"type": "object",
"properties": {
"cost": {"type": "number", "minimum": 0},
"shards": {
"type": "array",
"items": {
"type": "string",
"format": "uri",
},
"minItems": 1
}
},
"required": ["cost", "shards"]},
description="description", raiseError=True)
setting = configuratorApi.pullConfig(serviceName="new_plugin")
print(setting)
The structures, the classes that should be inherited when creating user plugins, data structures that a particular method should accept are listed below. You can find the source files of the given examples in the “luna_python_matcher” directory.
""" Module realizes base matcher plugin and interface classes """ import json import os import re from abc import abstractmethod from typing import Optional, Any from configs_core.functions import parseConfiguratorAddress from luna3.configurator.configurator import ConfiguratorApi from stringcase import snakecase from crutches_on_wheels.plugins.plugins_meta.base_plugins import BasePlugin from crutches_on_wheels.plugins.plugins_meta.plugin_meta import PluginMeta from .base_struct import IMatcher from .exceptions import LunaPluginException # allowed plugin meta options ALLOWED_PLUGIN_OPTIONS = ("configurator", "file", None) # regexp for 'tag' option TAG_REGEXP = re.compile(r"^[a-zA-Z0-9_\-]+$", re.I) class MetaOptions(object): """ Meta options for matcher plugins. """ def __init__(self, meta: "Meta"): """ Init class options. Args: meta: stored meta options. """ self.configSource = getattr(meta, "configSource", None) self.configFile = getattr(meta, "configFile", None) self.tag = getattr(meta, "tag", None) if self.configSource not in ALLOWED_PLUGIN_OPTIONS: raise ValueError(f"The `configSource` option must be one of: {ALLOWED_PLUGIN_OPTIONS}") if self.tag is not None: if not isinstance(self.tag, str): raise ValueError("The `tag` option must be string") if not TAG_REGEXP.match(self.tag): raise ValueError(f"The `tag` option does not match pattern: {TAG_REGEXP.pattern}") if self.configSource == "file": if not self.configFile: raise ValueError("The `configFile` option is required for the 'file' source") if not isinstance(self.configFile, str): raise ValueError("The `configFile` option must be string") class MatcherPluginMeta(PluginMeta): """ Metaclass for matcher plugins. """ def __new__(mcs, name, bases, attrs): """ Metaclass new method Args: name: class name bases: bases classes attrs: class attributes Returns: new class """ meta = attrs.pop("Meta", None) obj = super().__new__(mcs, name, bases, attrs) if meta: obj.options = MetaOptions(meta=meta) return obj class BaseMatcherPlugin(BasePlugin, metaclass=MatcherPluginMeta): """Base matcher plugin.""" # config for matcher matcherConfig: Optional[dict[str, Any]] = None class Meta: """Options object for a Schema. Example usage: :: class Meta: configSource = "configurator" or class Meta: configSource = "file" configFile = "../source/configs/best_plugin_config.json" Available options: - ``configSource``: Source type for pulling matcher settings, must be one of ["configurator", "file", None] - ``configFile``: Path to custom json file with matcher configuration, if source type is file. - ``tag``: Tag for pulling a tagged setting from the configurator, if `configSource` is "configurator" """ pass async def initialize(self) -> None: """Initialize plugin and load config for matcher.""" await self.loadMatcherConfig() @abstractmethod def getMatcher(self) -> IMatcher: """ Get matcher class. Returns: class `~IMatcher` """ async def loadMatcherConfig(self) -> None: """ Load matcher config. """ if self.options.configSource is None: # skip loading matcher config return elif self.options.configSource == "configurator": if not self.app.ctx.serviceConfig._configuratorAddress: raise LunaPluginException("Configurator address is not defined in the service") configurator = ConfiguratorApi(*parseConfiguratorAddress(self.app.ctx.serviceConfig._configuratorAddress)) settingName = f"LUNA_{snakecase(self.pluginName).upper()}" response = await configurator.getSettings(settingName=settingName, tags=self.options.tag, asyncRequest=True) if not response.success: raise LunaPluginException(f"Error while pulling settings from luna-configurator: {response.json}") elif not (settings := response.json["settings"]): raise LunaPluginException(f"Setting '{settingName}' for plugin {self.pluginName} not found") self.matcherConfig = settings[0]["value"] elif self.options.configSource == "file": if not os.path.isfile(self.options.configFile): raise LunaPluginException(f"Configuration file '{self.options.configFile}' not found") with open(self.options.configFile) as _conf: self.matcherConfig = json.loads(_conf.read()) else: raise RuntimeError(f"Unsupported config source: {self.options.configSource}")"""The module implements the service structure.""" from abc import abstractmethod, ABC from enum import Enum from typing import Optional, Union, ClassVar, Literal from attr import dataclass, fields from stringcase import snakecase from vlutils.descriptors.data import DescriptorType from classes.candidate_batch import ORDER @dataclass(slots=True) class _BaseResult: """Base for matching results""" _aliases: ClassVar[dict[str, str]] def __init_subclass__(cls, **kwargs): """Extend subclass with default attribute.""" super().__init_subclass__(**kwargs) cls._aliases = {snakecase(clsField.name): clsField.name for clsField in fields(cls)} def asDict(self, targets: set[str]) -> dict: """ Recursively cast class to dict. Args: targets: list of required fields to return Returns: self as dict """ res = {} for field in targets: res[field] = getattr(self, self._aliases[field]) return res @dataclass(slots=True) class Face(_BaseResult): """Simple face""" #: face id faceId: Optional[str] = None #: account id accountId: Optional[str] = None #: avatar avatar: Optional[str] = None #: linked list ids lists: Optional[list[str]] = None #: event id eventId: Optional[str] = None #: user data userData: Optional[str] = None #: create time createTime: Optional[str] = None #: external id externalId: Optional[str] = None @dataclass(slots=True) class FaceMatchResult(_BaseResult): """Simple face match result""" #: face face: Face #: similarity similarity: Optional[float] = None def asDict(self, targets: set[str]) -> dict: """Represent face match result as dict""" if "similarity" in targets: return dict(face=self.face.asDict(targets - {"similarity"}), similarity=self.similarity) return dict(face=self.face.asDict(targets)) @dataclass(slots=True) class Event(_BaseResult): """Simple event""" #: create time createTime: Optional[str] = None #: end time endTime: Optional[str] = None #: event id eventId: Optional[str] = None #: handler id handlerId: Optional[str] = None #: account id accountId: Optional[str] = None #: external id externalId: Optional[str] = None #: source source: Optional[str] = None #: top match face/event topMatch: Optional[dict] = None #: match results matchResult: Optional[list[dict]] = None #: face detections faceDetections: Optional[list] = None #: body detections bodyDetections: Optional[list] = None #: face id faceId: Optional[str] = None #: attach results attachResult: list[str] = None #: gender gender: Optional[int] = None #: age age: Optional[int] = None #: predominant emotion emotion: Optional[int] = None #: predominant mask mask: Optional[int] = None #: reference ethnicity ethnicGroup: Optional[int] = None #: liveness liveness: Optional[int] = None #: user data userData: Optional[str] = None #: location object location: Optional[dict] = None #: track id trackId: Optional[str] = None #: tags tags: Optional[list[str]] = None @dataclass(slots=True) class EventMatchResult(_BaseResult): """Simple event match result""" #: event event: Event #: similarity similarity: Optional[float] = None def asDict(self, targets: set[str]) -> dict: """Represent event match result as dict""" if "similarity" in targets: return dict(event=self.event.asDict(targets - {"similarity"}), similarity=self.similarity) return dict(event=self.event.asDict(targets)) @dataclass(slots=True) class Attribute(_BaseResult): """Simple attribute""" #: create time createTime: Optional[str] = None #: attribute id attributeId: Optional[str] = None #: account id accountId: Optional[str] = None #: basic attributes basicAttributes: Optional[dict] = None #: basic attributes samples basicAttributesSamples: Optional[list[str]] = None #: face descriptor samples faceDescriptorSamples: Optional[list[str]] = None @dataclass(slots=True) class AttributeMatchResult(_BaseResult): """Simple attribute match result""" #: attribute attribute: Attribute #: similarity similarity: Optional[float] = None def asDict(self, targets: set[str]) -> dict: """Represent attribute match result as dict""" if "similarity" in targets: return dict(attribute=self.attribute.asDict(targets - {"similarity"}), similarity=self.similarity) return dict(attribute=self.attribute.asDict(targets)) @dataclass(slots=True) class ErrorMatchResult: """Error match result""" #: error code errorCode: int #: error description desc: str #: error detail detail: str def asDict(self) -> dict: """Represent error match result as dict""" return {"error_code": self.errorCode, "desc": self.desc, "detail": self.detail} class MatchUnitType(Enum): """Candidate and Reference match type enum""" #: face face = "face" #: attribute attribute = "attribute" #: event event = "event" #: descriptor descriptor = "descriptor" @dataclass(slots=True) class Candidate: """Candidate class""" #: candidate batch filters filters: dict #: matching targets targets: set[str] #: matching sort order order: ORDER #: matching limit limit: int #: matching threshold threshold: float @property def type(self) -> MatchUnitType: """Candidate type property""" origin = self.filters["origin"] if origin == "faces": return MatchUnitType.face elif origin == "events": return MatchUnitType.event elif origin == "attributes": return MatchUnitType.attribute def asRequestDict(self) -> dict: """Represent candidate as dict for request.""" return dict( filters=self.filters, targets=list(self.targets), order=self.order, limit=self.limit, threshold=self.threshold, ) def asResponseDict(self) -> dict: """Represent candidate as dict for response.""" return self.filters @dataclass(slots=True) class Descriptor: """Descriptor class""" #: raw descriptor bytes descriptor: bytes #: descriptor version version: int def asDict(self) -> dict: """Represent descriptor as dict""" return dict(descriptor=self.descriptor, version=self.version) @dataclass(slots=True) class Reference: """Reference class.""" #: reference id id: str #: reference type type: MatchUnitType #: reference descriptor | only if there is raw descriptor specified in request descriptor: Optional[Descriptor] = None def asRequestDict(self) -> dict: """Represent reference as dict for request""" if self.descriptor is not None: return dict(id=self.id, type="raw_descriptor", data=self.descriptor.asDict()) return dict(id=self.id, type=self.type.value) def asResponseDict(self) -> dict: """Represent reference as dict for json response""" return dict(id=self.id, type=self.type.value) @dataclass(slots=True) class _ObjectIdReference(Reference): """ Reference with object id. Serialization: associates an external type and its ID with the reference object """ # external reference type _externalType: ClassVar[Literal["face_external_id", "event_external_id", "event_track_id"]] # primary key for found reference _primaryKey: ClassVar[Literal["face_id", "event_id"]] # external object id specified in request externalObjectId: Optional[str] = None def asResponseDict(self) -> dict: """Represent reference as dict for json response""" if not self.externalObjectId: # no reference with given externalObjectId was found, return id as is return dict(type=self._externalType, id=self.id) # we've successfully found appropriate reference by given object ID return {"type": self._externalType, "id": self.externalObjectId, self._primaryKey: self.id} @dataclass(slots=True) class FaceExternalIdReference(_ObjectIdReference): """Reference with face external id.""" _externalType = "face_external_id" _primaryKey = "face_id" @dataclass(slots=True) class EventExternalIdReference(_ObjectIdReference): """Reference with event external id.""" _externalType = "event_external_id" _primaryKey = "event_id" @dataclass(slots=True) class EventTrackIdReference(_ObjectIdReference): """Reference with event track id.""" _externalType = "event_track_id" _primaryKey = "event_id" @dataclass(slots=True) class MatchRequest: """Matching request container class""" #: candidate batch candidate: Candidate #: reference reference: Reference #: match descriptor type - face or body matchDescriptorType: DescriptorType = DescriptorType.face #: account id accountId: Optional[str] = None def asRequestDict(self) -> dict: """Represent match request as request to another http-service""" return dict(candidates=[self.candidate.asRequestDict()], references=[self.reference.asRequestDict()]) @dataclass(slots=True) class MatchResult: """Matching result container class.""" #: match request matchRequest: MatchRequest #: success match results or error results: Union[list[FaceMatchResult], list[EventMatchResult], list[AttributeMatchResult], ErrorMatchResult] def asMatches(self) -> dict: """Represent matches results as dict""" filters = self.matchRequest.candidate.filters targets = self.matchRequest.candidate.targets if isinstance(self.results, ErrorMatchResult): return dict(filters=filters, error=self.results.asDict()) return dict(filters=filters, result=[row.asDict(targets=targets) for row in self.results]) @property def successful(self) -> bool: """Whether match result is successful""" return not isinstance(self.results, ErrorMatchResult) class IMatcher(ABC): """Matcher interface.""" EMPTY_TARGETS = frozenset([]) @property def availableReferenceTypes(self) -> frozenset[MatchUnitType]: """Matcher should implement this.""" raise NotImplementedError @property def availableCandidateTypes(self) -> frozenset[MatchUnitType]: """Matcher should implement this.""" raise NotImplementedError @property def availableDescriptorTypes(self) -> frozenset[DescriptorType]: """Matcher should implement this.""" raise NotImplementedError @property def availableSortOrder(self) -> frozenset[ORDER]: """Matcher should implement this.""" raise NotImplementedError @abstractmethod def getAvailableTargets(self, origin: MatchUnitType) -> frozenset[str]: """ Return targets for given origin. Helps to distinguish targets like "face_id" - (Event.face_id and Face.face_id). Args: origin: candidate match type """ @abstractmethod def getMatchingCost(self, matchRequest: MatchRequest) -> float: """ Get matching cost. Args: matchRequest: matching request class Returns: matching cost """ @abstractmethod async def match(self, matchRequest: MatchRequest) -> MatchResult: """ Match method. Args: matchRequest: matching request class Returns: matching result """