Tasks lambda development ======================== Here is *tasks* lambda development description. More information about lambda types and differences of *tasks* and others is available at `lambda types description <./lambda_types.html#tasks>`_. Tasks lambda requirements ------------------------- Tasks lambda has several requirements in addition to `basic requirements <./lambda_requirements.html#lunaservicesrequirements>`_: - `Luna Faces` available by credentials from `Luna-Configurator` - `Luna Events` available by credentials from `Luna-Configurator`; it can be disabled using *ADDITIONAL_SERVICES_USAGE* setting and it this case *lambda* should provide for work without `Luna-Events` usage - `Luna Python Matcher` available by credentials from `Luna-Configurator` - `Luna Image Store` available by credentials from `Luna-Configurator`; it can be disabled using *ADDITIONAL_SERVICES_USAGE* setting and it this case *lambda* should provide for work without `Luna-Image-Store` usage - `Luna Remote SDK` available by credentials from `Luna-Configurator` - `Luna Handlers` available by credentials from `Luna-Configurator`; it can be disabled using *ADDITIONAL_SERVICES_USAGE* setting and it this case *lambda* should provide for work without `Luna-Handlers` usage - `Luna Tasks` for interaction with *lambda task* (for task execution, tasks view and so on) Usage and purpose of these services are described `here <#tasks-lambda-examples>`_. Tasks lambda configuration -------------------------- The tasks lambda required several settings from *luna-configurator*, which can be separated to several groups: - *LUNA_LAMBDA_UNIT_LOGGER* - lambda logger settings - luna-services addresses and timeouts settings (for example, *LUNA_FACES_ADDRESS* and *LUNA_FACES_TIMEOUTS* will be used by lambda to make requests to *luna-faces* service) - *ADDITIONAL_SERVICES_USAGE* setting will be used to determine which *luna-services* can be used by the lambda (the lambda will not check connection to disabled services and will raise an error if user tries to make request to such service). The *luna_tasks* of *ADDITIONAL_SERVICES_USAGE* must be enabled to use *Luna-Tasks* service and *lambda task* respectively Tasks lambda request usage -------------------------- The lambda tasks can only proceed service requests (such as configs (/config) and docs(/docs/spec)). Task processing pipeline ------------------------ Task processing contains of next stages: - preparation (if one of stages fails, the task will not be created) - user sends request to *task router* - *task router* validates task content from user request and sends it to *task worker* - *task worker* separates task content into one or more parts depending on internal logic and sends it to *task router* for content separation process and recommendations, see `task content separation <./tasks.html#task-content-separation-process>`_ - user receive *task id* in reply from *task router* - *task router* execute task processing - task processing (if one of stages fails, the specific subtask will be marked as failed) - *task router* initialize processing of all subtasks - *task worker* processes each part of task content is treated as a separate subtask and saves result to *Luna-Image-Store* (if *Luna-Image-Store* is enabled) for subtasks processing description see `subtasks processing <./tasks.html#subtask-processing>`_ - when the last subtask is done *task router* will execute task results collection - task results collection - *task worker* collects results of all subtasks, merges them and saves the task result - *task router* marks task as done and provides the user access to the task result Task content separation process ------------------------------- Task processing assumes long running process whose processing time can take minutes or even hours. For ease of presentation of the process and parallelization of task execution, it is recommended to divide each task into subtasks. User can make `get task request `_ to see how many parts of the task are finished and how many parts the task contains (`count_task_parts_done` and `count_task_parts_done` fields of reply respectively). In case more than one *task worker* is available each subtask can be processed by different workers, whose will lead to faster task completion. (At this moment *lambda task* does not provide such an option, but it may be implemented in the future). It is highly not recommended to use requests with pagination to load/match a large number of persons/events, as such queries get worse as the page number gets larger and this kind of sort is not stable (it is possible to receive the same objects on different pages). See the examples described here and below on how to correctly extract a large amount of data (preferably to use filtration using ids (`face_id__gte`/`face_id__lte`/etc) because this kind of sort is stable and received objects will be sorted by their ids). Task content separation can be made in several ways, here are some examples of possible strategies: - separate data by list elements The task content includes list of N (3 for example) list ids .. code-block:: python :caption: task content from user request {"content": {"lambda_id": , "lists": ["list_id_1", "list_id_1", "list_id_1"]}} Lambda task will split list by its elements .. code-block:: python :caption: lambda task content separation class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" return [{"lambda_id": content["content"]["lambda_id"], "list_id": row} for row in content["content"]["lists"]] Each subtask will process its own list (see comment for subtasks content examples) .. code-block:: python :caption: subtasks contents received by `executeSubtask` function class LambdaTask(BaseLambdaTask): async def processList(listId: str): ... async def executeSubtask(self, subtaskContent: dict) -> dict | list: """ Execute current subtask processing Content of `subtaskContent` for subtask #1: {'list_id': 'list_id_1', 'lambda_id': } Content of `subtaskContent` for subtask #2: {'list_id': 'list_id_2', 'lambda_id': } Content of `subtaskContent` for subtask #3: {'list_id': 'list_id_3', 'lambda_id': } """ return await self.processList(subtaskContent["list_id"] - separate data by range The task content includes range of elements. For example, it must process all faces with id from "00000000-0000-4000-8000-000000000000" to "90000000-0000-4000-8000-000000000000". Also, the `create_time__gte` filter can be included in content just like other filters. .. code-block:: python :caption: task content from user request {"content": {"lambda_id": , "face_id__gte": "00000000-0000-4000-8000-000000000000", "face_id__lt": "90000000-0000-4000-8000-000000000000", "create_time__gte": "2020-10-10T10:10:10.123456+03:00"}} Lambda task will split specified range into several .. code-block:: python :caption: lambda task content separation class LambdaTask(BaseLambdaTask): ... def splitUUIDs(count: int, gteId: str | None = None, ltId: str | None = None): if count <= 1: if gteId is None: gteId = "00000000-0000-0000-0000-000000000000" return [{"face_id__lt": ltId, "face_id__gte": gteId}] maxUUIDIntLength = 1 << 128 if ltId is gteId is None: rangeLength = maxUUIDIntLength startValue = 0 elif ltId is not None and gteId is not None: rangeLength = UUID(ltId).int - UUID(gteId).int startValue = UUID(gteId).int elif ltId is not None: rangeLength = UUID(ltId).int startValue = 0 else: # gteId is not None rangeLength = maxUUIDIntLength - UUID(gteId).int startValue = UUID(gteId).int stepValue = rangeLength // count pages = [{"face_id__lt": ltId, "face_id__gte": gteId}] for pageIndex in range(count - 1): currentUuid = str(UUID(int=startValue + stepValue * (pageIndex + 1))) pages[-1]["face_id__lt"] = currentUuid pages.append({"face_id__gte": currentUuid, "face_id__lt": ltId}) return pages async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" lambdaId = content["content"]["lambda_id"] faceIdGte = content["face_id__gte"] faceIdLt = content["face_id__lt"] separated = splitUUIDs(count=5, gteId=faceIdGte, ltId=faceIdLt) return [{"lambda_id": lambdaId, "face_id__gte": row["face_id__gte"], "face_id__lt": row["face_id__lt"], "create_time__gte": "2020-10-10T10:10:10.123456+03:00"} for row in separated] Each subtask will process its own range of face ids (in the presented example range is separated into 5 parts) .. code-block:: python :caption: subtasks contents received by `executeSubtask` function class LambdaTask(BaseLambdaTask): async def processList(listId: str): ... async def executeSubtask(self, subtaskContent: dict) -> dict | list: """ Execute current subtask processing Content of `subtaskContent` for subtask #1: {'face_id__gte': 00000000-0000-4000-8000-000000000000, 'face_id__lt': 1ccccccc-cccd-0ccd-4ccc-cccccccccccc, "create_time__gte": "2020-10-10T10:10:10.123456+03:00", 'lambda_id': } Content of `subtaskContent` for subtask #2: {'face_id__gte': 1ccccccc-cccd-0ccd-4ccc-cccccccccccc, 'face_id__lt': 39999999-9999-d99a-1999-999999999998, "create_time__gte": "2020-10-10T10:10:10.123456+03:00", 'lambda_id': } Content of `subtaskContent` for subtask #3: {'face_id__gte': 39999999-9999-d99a-1999-999999999998, 'face_id__lt': 56666666-6666-a666-e666-666666666664, "create_time__gte": "2020-10-10T10:10:10.123456+03:00", 'lambda_id': } Content of `subtaskContent` for subtask #4: {'face_id__gte': 56666666-6666-a666-e666-666666666664, 'face_id__lt': 73333333-3333-7333-b333-333333333330, "create_time__gte": "2020-10-10T10:10:10.123456+03:00", 'lambda_id': } Content of `subtaskContent` for subtask #5: {'face_id__gte': 73333333-3333-7333-b333-333333333330, 'face_id__lt': 90000000-0000-4000-8000-000000000000, "create_time__gte": "2020-10-10T10:10:10.123456+03:00", 'lambda_id': } """ faces = await self.clients.faces.getFaces( faceIdGte=subtaskContent["face_id__gte"], faceIdLt=subtaskContent["face_id__lt"], createTimeGte=subtaskContent["create_time__gte"], ).json["faces"] ... However, if it is not possible to separate task content into several parts, in those cases the task will contain one subtask. .. code-block:: python :caption: lambda task content separation to one subtask content class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" return [content] Subtask processing ------------------ The subtask processing purpose is to give away result of content processing by realizing `executeSubtask` function. All features described in `general development description <./development.html#luna-lambda-tools>`_ is actual for *lambda tasks* with one exception: the `luna services clients <./development.html#luna-services-clients>`_ are available as `BaseLambdaTask` property. For example, the `standalone lambda` clients usage is as follows: .. literalinclude:: examples/201/luna_services_clients/lambda/lambda_main.py :caption: lambda_main.py :language: python The *lambda task* luna services client's usage as follows: .. code-block:: python class LambdaTask(BaseLambdaTask): async def executeSubtask(self, subtaskContent: dict) -> dict | list: """Execute current subtask processing""" versions = { "faces_address": self.clients.faces.getAddress(), "events_address": self.clients.events.getAddress(), } ... The *result* will be saved to *Luna-Image-Store* as subtask result. Task error processing --------------------- During task preparation may cause many different errors with different consequences. In general, any error that occurs during task content separation will lead to failure of task creation and if any error occurred during subtask processing, it will lead to subtask failure. - error processing basics The are two kinds of errors: expected and unexpected. Unexpected errors compared to expected ones will cause a trace in the logs. .. code-block:: python :caption: unexpected error example class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" data = {"abc": 123} data2 = data["qwe"] To make error expected it must be inherited from `UserTaskLambdaException` .. code-block:: python :caption: expected error example from luna_lambda_tools.public.exceptions import UserTaskLambdaException class NewException(UserTaskLambdaException): errorText = "exception description" class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" data = {"abc": 123} try: data2 = data["qwe"] except KeyError: raise NewException ... - Task separation errors example: wrong content .. code-block:: python :caption: task content from user request {"content": {"lambda_id": , "lists": ["list_id_1", "list_id_1", "list_id_1"]}} .. code-block:: python :caption: lambda task content separation function class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" return [content["faces"]] consequence: user will receive the following type of answer: .. code-block:: python { "error_code": 42004, "desc": "Lambda exception", "detail": "Failed to split task content: : 'faces'", "link": "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-42004" } possible way to improve usability - catch possible errors: .. code-block:: python :caption: lambda task content separation function class UserException(Exception): def __str__(self): return "exception representation" class LambdaTask(BaseLambdaTask): ... async def splitTasksContent(self, content: dict) -> list[dict]: """Split task content to subtask contents""" try: return [content["faces"]] except KeyError: raise UserException consequence: user will receive the following type of answer: .. code-block:: python { "error_code": 42004, "desc": "Lambda exception", "detail": "Failed to split task content: : 'exception representation'", "link": "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-42004" } - subtask processing error example: unexpected error .. code-block:: python :caption: subtask content {"content": {"lambda_id": , "list_id": "list_id_1"}} .. code-block:: python :caption: lambda subtask processing function class LambdaTask(BaseLambdaTask): ... async def executeSubtask(self, subtaskContent: dict) -> dict | list: """Execute current subtask processing""" subtaskContent["faces"] ... consequence: the subtask will be marked as *failed* and user will receive the following type of answer in the subtask/task error in the result: .. code-block:: python { "error_id": 3, "task_id": 7, "subtask_id": 7, "error_code": 42004, "description": "Lambda exception", "detail": ": 'faces'", "additional_info": null, "error_time": "2023-12-07T17:25:15.588297+03:00", "link": "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-42004" } possible way to improve usability - catch possible errors: .. code-block:: python :caption: lambda subtask processing function class UserException(Exception): ... class LambdaTask(BaseLambdaTask): ... async def executeSubtask(self, subtaskContent: dict) -> dict | list: """Execute current subtask processing""" try: subtaskContent["faces"] except KeyError: raise UserException("expected `faces` field in subtask content") ... consequence: the subtask will be marked as *failed* and user will receive the following type of answer in the subtask/task error in the result: .. code-block:: python { "error_id": 3, "task_id": 7, "subtask_id": 7, "error_code": 42004, "description": "Lambda exception", "detail": ": expected `faces` field in subtask content", "additional_info": null, "error_time": "2023-12-07T17:25:15.588297+03:00", "link": "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-42004" } - example: luna3 client exceptions .. code-block:: python :caption: lambda subtask processing function (let's imagine there is no face with id "00000000-0000-4000-a000-000003271373") class LambdaTask(BaseLambdaTask): ... async def executeSubtask(self, subtaskContent: dict) -> dict | list: """Execute current subtask processing""" faceId = await self.clients.faces.getFace(faceId="00000000-0000-4000-a000-000003271373") ... consequence: the subtask will be marked as *failed* and user will receive the following type of answer in the subtask/task error in the result: .. code-block:: python { "error_id": 4, "task_id": 10, "subtask_id": 10, "error_code": 22002, "description": "Object not found", "detail": "Face with id '00000000-0000-4000-a000-000003271373' not found", "additional_info": null, "error_time": "2023-12-07T17:44:29.590114+03:00", "link": "https://docs.visionlabs.ai/info/luna/troubleshooting/errors-description/code-22002" } .. note:: If one or more subtasks fail, the final task status will be marked as *done*, but the task result will contain errors collected from failed subtasks. If some subtasks failed and some finished properly, the final task status will be also marked as *done* and the task result will contain errors from failed subtasks and also results from properly finished subtasks. Tasks lambda examples --------------------- - unlink faces from lists if faces do not similar to the specified face: - check each face is similar to the specified one (with matching threshold > 0.7) - save unlinked face ids as task result - in this case, task content will be separated into several subtasks equal count of received list ids. .. literalinclude:: examples/201/tasks_lambda_unlink/lambda/lambda_main.py :caption: lambda_main.py :language: python .. literalinclude:: examples/201/tasks_lambda_unlink/content.json :caption: task content example :language: json - remove duplicates from list: - check each face has the similar one (similar face is face with threshold > 0.9 for this example) and remove all similar faces if exist - save sample ids of removed faces as task result - in this case, task content will be separated into several subtasks equal count of received list ids. .. literalinclude:: examples/201/tasks_lambda_duplicates/lambda/lambda_main.py :caption: lambda_main.py :language: python .. literalinclude:: examples/201/tasks_lambda_duplicates/content.json :caption: task content example :language: json - recursively find events similar to the specified photo: - get top N (can be modified using *limit*) matches with specified photo - get top N matches for each received event M (can be modified using *recursion_level* parameter) times - in this case, task content will not be separated (only one subtask will be created anyway) .. literalinclude:: examples/201/tasks_lambda_recursive/lambda/lambda_main.py :caption: lambda_main.py :language: python .. literalinclude:: examples/201/tasks_lambda_recursive/content.json :caption: task content example :language: json