import abc
from ..private.request import ClientsKwargsMixin
from ..private.tasks import _BaseLambdaTask
from .clients import TasksClients
from .clients.events import Events
from .clients.faces import Faces
from .clients.handlers import Handlers
from .clients.lis import LIS
from .clients.lpm import LPM
from .clients.lrs import LRS
from .clients.lva import LVA
from .clients.sender import Sender
[docs]
class BaseLambdaTask(_BaseLambdaTask, ClientsKwargsMixin):
"""Lambda task"""
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def clients(self):
return TasksClients(
faces=Faces(**self._getFacesClientKwargs()),
events=Events(**self._getEventsClientKwargs()),
matcher=LPM(**self._getLPMClientKwargs()),
faceSamplesStore=LIS(**self._getFacesSamplesClientKwargs()),
bodySamplesStore=LIS(**self._getBodySamplesClientKwargs()),
imageOriginStore=LIS(**self._getImageOriginClientKwargs()),
sdk=LRS(**self._getLRSClientKwargs()),
videoAgent=LVA(**self._getLVAClientKwargs()),
sender=Sender(**self._getSenderClientKwargs()),
handlers=Handlers(**self._getHandlersClientKwargs()),
)
@abc.abstractmethod
async def splitTasksContent(self, content: dict) -> list[dict]:
"""Split task content to sub task contents"""
...
@abc.abstractmethod
async def executeSubtask(self, subTaskContent: dict) -> dict | list:
"""Execute current sub task processing"""
...