Source code for luna_lambda_tools.public.tasks

import abc

from ..private.request import ClientsKwargsMixin
from ..private.tasks import _BaseLambdaTask
from .clients import TasksClients
from .clients.events import Events
from .clients.faces import Faces
from .clients.handlers import Handlers
from .clients.lis import LIS
from .clients.lpm import LPM
from .clients.lrs import LRS
from .clients.sender import Sender


[docs] class BaseLambdaTask(_BaseLambdaTask, ClientsKwargsMixin): """Lambda task"""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
@property def clients(self): return TasksClients( faces=Faces(**self._getFacesClientKwargs()), events=Events(**self._getEventsClientKwargs()), matcher=LPM(**self._getLPMClientKwargs()), faceSamplesStore=LIS(**self._getFacesSamplesClientKwargs()), bodySamplesStore=LIS(**self._getBodySamplesClientKwargs()), imageOriginStore=LIS(**self._getImageOriginClientKwargs()), sdk=LRS(**self._getLRSClientKwargs()), sender=Sender(**self._getSenderClientKwargs()), handlers=Handlers(**self._getHandlersClientKwargs()), ) @abc.abstractmethod async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to sub task contents""" ... @abc.abstractmethod async def executeSubtask(self, subTaskContent: dict) -> dict | list: """Execute current sub task processing""" ...