Source code for luna_tasks.db.models

Tasks database models.
from sqlalchemy import Column, String, Integer, ForeignKey, TIMESTAMP, Sequence
from sqlalchemy import MetaData
from import CLOB
from sqlalchemy.dialects.postgresql import TEXT
from sqlalchemy.ext.declarative import declarative_base

from crutches_on_wheels.utils.db_functions import currentDBTimestamp
from db.models_config import DBConfig

Base = declarative_base()
metadata = MetaData()

# raw sql function to get utc timestamp for creating/updating table columns
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None

taskIdSeq = Sequence('task_task_id_seq', metadata=metadata)
subtaskIdSeq = Sequence('subtask_subtask_id_seq', metadata=metadata)
taskErrIdSeq = Sequence('task_error_id_seq', metadata=metadata)

[docs]class Task(Base): """ Task. """ __tablename__ = 'task' __table_args__ = {'comment': 'Table of tasks.'} Base.metadata = metadata # Task id task_id = Column(Integer, taskIdSeq, primary_key=True, comment="int: task id") # Create time create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: task create time", ) # End time end_time = Column(TIMESTAMP, index=True, comment="date and time: task end time") # Last update time last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: task last update time", ) # Task type task_type = Column(Integer, index=True, nullable=False, comment="int: task type - (0 - unknown type," "1 - linker, 2 - clusterizaiton," "3 - clusterization report, 4 - gc," "5 - additional_extract)") # Task status task_status = Column(Integer, index=True, nullable=False, comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, " "3 - failed, 4 - collect results, 5 - done)") # Result's object id result_id = Column(String(36), comment="uuid: id of object which contain task results") # Account id account_id = Column(String(36), index=True, comment="uuid: id of user account which create task") # Overall progress count_task_parts_done = Column(Integer, comment="int: count of current completed task parts") # Coount all task parts count_task_parts_all = Column(Integer, comment="int: count all task parts") # Task content content = Column(CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=False, comment="Union[CLOB, TEXT]: task content") # Description description = Column(String(128), comment="str(128): task description") def __repr__(self): return '<task_task_id %r>' % self.task_id
[docs]class SubTask(Base): """ SubTask. """ __tablename__ = 'subtask' __table_args__ = {'comment': 'Table of subtasks.'} Base.metadata = metadata # SubTask id subtask_id = Column(Integer, subtaskIdSeq, primary_key=True, comment="str: subtask id") # Task id task_id = Column(Integer, ForeignKey("task.task_id", ondelete='CASCADE'), index=True, unique=False, comment="int: task id") # Result's object id result_id = Column(String(36), comment="uuid: id of object which contain task results") # Create time create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: subtask create time", ) # End time end_time = Column(TIMESTAMP, index=True, comment="date and time: subtask end time") # SubTask status subtask_status = Column(Integer, index=True, nullable=False, comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, 3 - failed, " "4 - collect results, 5 - done)") # SubTask content content = Column(CLOB if DBConfig.dbType == "oracle" else TEXT, nullable=False, comment="Union[CLOB, TEXT]: subtask content") def __repr__(self): return '<subtask_subtask_id %r>' % self.subtask_id
[docs]class TaskError(Base): """ Task errors. """ __tablename__ = 'task_error' __table_args__ = {'comment': 'Table of tasks errors.'} Base.metadata = metadata # Task's error id error_id = Column(Integer, taskErrIdSeq, primary_key=True, comment="int: task's error id") # Task id task_id = Column(Integer, ForeignKey("task.task_id", ondelete='CASCADE'), index=True, unique=False, comment="int: task id") # SubTask id subtask_id = Column(Integer, ForeignKey("subtask.subtask_id", ondelete='CASCADE'), index=True, unique=False, comment="int: subtask id") # Error code error_code = Column(Integer, index=True, nullable=False, comment="int: error status code") # Error description description = Column(String(64), nullable=False, comment="string: error description") # Error detail detail = Column(String(1024), comment="string: error detail") # Additional error info additional_info = Column(String(128), comment="string: additional error info") # Error time error_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: error time", ) def __repr__(self): return '<task_error_id %r>' % self.error_id