Source code for luna_tasks.db.tasks_db_tools.models.tasks_models

"""
Tasks database models.
"""
from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import Column, ForeignKey, Integer, Sequence, SmallInteger, String, TIMESTAMP
from sqlalchemy.dialects.oracle import CLOB
from sqlalchemy.dialects.postgresql import TEXT
from sqlalchemy.ext.declarative import declarative_base

from .config import DBConfig

# raw sql function to get utc timestamp for creating/updating table columns
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None

Base = declarative_base()

taskIdSeq = Sequence("task_task_id_seq", metadata=Base.metadata)
subtaskIdSeq = Sequence("subtask_subtask_id_seq", metadata=Base.metadata)
taskErrIdSeq = Sequence("task_error_id_seq", metadata=Base.metadata)
scheduleIdSeq = Sequence("schedule_schedule_id_seq", metadata=Base.metadata)


[docs] class Task(Base): """ Task. """ __tablename__ = "task" __table_args__ = {"comment": "Table of tasks."} task_id = Column(Integer, taskIdSeq, primary_key=True, comment="int: task id") create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: task create time", ) end_time = Column(TIMESTAMP, index=True, comment="date and time: task end time") last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: task last update time", ) task_type = Column( Integer, index=True, nullable=False, comment="int: task type - (0 - unknown type," "1 - linker, 2 - clusterizaiton," "3 - clusterization report, 4 - gc," "5 - additional_extract)", ) task_status = Column( Integer, index=True, nullable=False, comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, " "3 - failed, 4 - collect results, 5 - done)", ) result_id = Column( String(36), comment="uuid: id of object which contain task results" ) account_id = Column( String(36), index=True, comment="uuid: id of user account which create task" ) count_task_parts_done = Column( Integer, comment="int: count of current completed task parts" ) count_task_parts_all = Column(Integer, comment="int: count all task parts") content = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=False, comment="Union[CLOB, TEXT]: task content", ) notification_policy = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=True, comment="Union[CLOB, TEXT]: notification policy", ) result_storage_policy = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=True, comment="Union[CLOB, TEXT]: result storage policy", ) description = Column(String(128), comment="str(128): task description") schedule_id = Column(Integer, nullable=True, comment="int: tasks schedule") def __repr__(self): return "<task_task_id %r>" % self.task_id
[docs] class Schedule(Base): """ Schedule. """ __tablename__ = "schedule" __table_args__ = {"comment": "Table of schedules."} schedule_id = Column( Integer, scheduleIdSeq, primary_key=True, comment="int: schedule id" ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: schedule create time", ) last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: schedule last update time", ) last_check_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: schedule last check time", ) status = Column( SmallInteger, nullable=False, comment="int: schedule status (1 - running, 2 - stopped)", ) cron = Column(String(128), nullable=False, comment="str: schedule cron") cron_timezone = Column(String(16), nullable=True, comment="str: cron timezone") account_id = Column( String(36), index=True, comment="uuid: id of user account which create schedule" ) task_type = Column( Integer, index=True, nullable=False, comment="int: task type", ) content = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=False, comment="Union[CLOB, TEXT]: task content", ) task_notification_policy = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=True, comment="Union[CLOB, TEXT]: task notification policy", ) result_storage_policy = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=True, comment="Union[CLOB, TEXT]: result storage policy", ) next_run_at = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=True, comment="date and time: schedule create time", )
[docs] class SubTask(Base): """ SubTask. """ __tablename__ = "subtask" __table_args__ = {"comment": "Table of subtasks."} subtask_id = Column( Integer, subtaskIdSeq, primary_key=True, comment="str: subtask id" ) task_id = Column( Integer, ForeignKey("task.task_id", ondelete="CASCADE"), index=True, unique=False, comment="int: task id", ) result_id = Column( String(36), comment="uuid: id of object which contain task results" ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: subtask create time", ) end_time = Column(TIMESTAMP, index=True, comment="date and time: subtask end time") subtask_status = Column( Integer, index=True, nullable=False, comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, 3 - failed, " "4 - collect results, 5 - done)", ) content = Column( CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=False, comment="Union[CLOB, TEXT]: subtask content", ) def __repr__(self): return "<subtask_subtask_id %r>" % self.subtask_id
[docs] class TaskError(Base): """ Task errors. """ __tablename__ = "task_error" __table_args__ = {"comment": "Table of tasks errors."} error_id = Column( Integer, taskErrIdSeq, primary_key=True, comment="int: task's error id" ) task_id = Column( Integer, ForeignKey("task.task_id", ondelete="CASCADE"), index=True, unique=False, comment="int: task id", ) subtask_id = Column( Integer, ForeignKey("subtask.subtask_id", ondelete="CASCADE"), index=True, unique=False, comment="int: subtask id", ) error_code = Column( Integer, index=True, nullable=False, comment="int: error status code" ) description = Column( String(64), nullable=False, comment="string: error description" ) detail = Column(String(1024), comment="string: error detail") additional_info = Column(String(128), comment="string: additional error info") error_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: error time", ) def __repr__(self): return "<task_error_id %r>" % self.error_id