"""
Tasks database models.
"""
from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import Column, ForeignKey, Integer, Sequence, SmallInteger, String, TIMESTAMP
from sqlalchemy.dialects.oracle import CLOB
from sqlalchemy.dialects.postgresql import TEXT
from sqlalchemy.ext.declarative import declarative_base
from .config import DBConfig
# raw sql function to get utc timestamp for creating/updating table columns
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None
Base = declarative_base()
taskIdSeq = Sequence("task_task_id_seq", metadata=Base.metadata)
subtaskIdSeq = Sequence("subtask_subtask_id_seq", metadata=Base.metadata)
taskErrIdSeq = Sequence("task_error_id_seq", metadata=Base.metadata)
scheduleIdSeq = Sequence("schedule_schedule_id_seq", metadata=Base.metadata)
[docs]
class Task(Base):
"""
Task.
"""
__tablename__ = "task"
__table_args__ = {"comment": "Table of tasks."}
task_id = Column(Integer, taskIdSeq, primary_key=True, comment="int: task id")
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: task create time",
)
end_time = Column(TIMESTAMP, index=True, comment="date and time: task end time")
last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: task last update time",
)
task_type = Column(
Integer,
index=True,
nullable=False,
comment="int: task type - (0 - unknown type,"
"1 - linker, 2 - clusterizaiton,"
"3 - clusterization report, 4 - gc,"
"5 - additional_extract)",
)
task_status = Column(
Integer,
index=True,
nullable=False,
comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, "
"3 - failed, 4 - collect results, 5 - done)",
)
result_id = Column(
String(36), comment="uuid: id of object which contain task results"
)
account_id = Column(
String(36), index=True, comment="uuid: id of user account which create task"
)
count_task_parts_done = Column(
Integer, comment="int: count of current completed task parts"
)
count_task_parts_all = Column(Integer, comment="int: count all task parts")
content = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=False,
comment="Union[CLOB, TEXT]: task content",
)
notification_policy = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=True,
comment="Union[CLOB, TEXT]: notification policy",
)
result_storage_policy = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=True,
comment="Union[CLOB, TEXT]: result storage policy",
)
description = Column(String(128), comment="str(128): task description")
schedule_id = Column(Integer, nullable=True, comment="int: tasks schedule")
def __repr__(self):
return "<task_task_id %r>" % self.task_id
[docs]
class Schedule(Base):
"""
Schedule.
"""
__tablename__ = "schedule"
__table_args__ = {"comment": "Table of schedules."}
schedule_id = Column(
Integer, scheduleIdSeq, primary_key=True, comment="int: schedule id"
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: schedule create time",
)
last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: schedule last update time",
)
last_check_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: schedule last check time",
)
status = Column(
SmallInteger,
nullable=False,
comment="int: schedule status (1 - running, 2 - stopped)",
)
cron = Column(String(128), nullable=False, comment="str: schedule cron")
cron_timezone = Column(String(16), nullable=True, comment="str: cron timezone")
account_id = Column(
String(36), index=True, comment="uuid: id of user account which create schedule"
)
task_type = Column(
Integer,
index=True,
nullable=False,
comment="int: task type",
)
content = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=False,
comment="Union[CLOB, TEXT]: task content",
)
task_notification_policy = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=True,
comment="Union[CLOB, TEXT]: task notification policy",
)
result_storage_policy = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=True,
comment="Union[CLOB, TEXT]: result storage policy",
)
next_run_at = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=True,
comment="date and time: schedule create time",
)
[docs]
class SubTask(Base):
"""
SubTask.
"""
__tablename__ = "subtask"
__table_args__ = {"comment": "Table of subtasks."}
subtask_id = Column(
Integer, subtaskIdSeq, primary_key=True, comment="str: subtask id"
)
task_id = Column(
Integer,
ForeignKey("task.task_id", ondelete="CASCADE"),
index=True,
unique=False,
comment="int: task id",
)
result_id = Column(
String(36), comment="uuid: id of object which contain task results"
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: subtask create time",
)
end_time = Column(TIMESTAMP, index=True, comment="date and time: subtask end time")
subtask_status = Column(
Integer,
index=True,
nullable=False,
comment="int: task status - (0 - pending, 1 - in progress, 2 - cancelled, 3 - failed, "
"4 - collect results, 5 - done)",
)
content = Column(
CLOB if DBConfig.dbType == "oracle" else TEXT,
nullable=False,
comment="Union[CLOB, TEXT]: subtask content",
)
def __repr__(self):
return "<subtask_subtask_id %r>" % self.subtask_id
[docs]
class TaskError(Base):
"""
Task errors.
"""
__tablename__ = "task_error"
__table_args__ = {"comment": "Table of tasks errors."}
error_id = Column(
Integer, taskErrIdSeq, primary_key=True, comment="int: task's error id"
)
task_id = Column(
Integer,
ForeignKey("task.task_id", ondelete="CASCADE"),
index=True,
unique=False,
comment="int: task id",
)
subtask_id = Column(
Integer,
ForeignKey("subtask.subtask_id", ondelete="CASCADE"),
index=True,
unique=False,
comment="int: subtask id",
)
error_code = Column(
Integer, index=True, nullable=False, comment="int: error status code"
)
description = Column(
String(64), nullable=False, comment="string: error description"
)
detail = Column(String(1024), comment="string: error detail")
additional_info = Column(String(128), comment="string: additional error info")
error_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: error time",
)
def __repr__(self):
return "<task_error_id %r>" % self.error_id