from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import (
CLOB,
TIMESTAMP,
BigInteger,
Column,
ForeignKey,
Index,
Integer,
LargeBinary,
Sequence,
SmallInteger,
Float,
String,
table,
text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import TableClause
from .config import DBConfig
from .models_utils import GEOGRAPHY_COLUMN_TYPE
Base = declarative_base()
# Raw sql function to get utc timestamp for creating/updating table columns. USED BY ALEMBIC ONLY.
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None
streamIdSeq = Sequence("stream_id_seq", metadata=Base.metadata)
logIdSeq = Sequence("log_id_seq", metadata=Base.metadata)
groupIdSeq = Sequence("group_id_seq", metadata=Base.metadata)
groupStreamIdSeq = Sequence("group_stream_id_seq", metadata=Base.metadata)
agentAnalyticIdSeq = Sequence("agent_analytic_id_seq", metadata=Base.metadata)
streamAnalyticIdSeq = Sequence("stream_analytic_id_seq", metadata=Base.metadata)
agentStreamIdSeq = Sequence("agent_stream_id_seq", metadata=Base.metadata)
[docs]
class Stream(Base):
"""
Database table model for streams.
"""
__tablename__ = "stream"
id = Column(
BigInteger,
streamIdSeq,
index=True,
)
stream_id = Column(
String(36),
primary_key=True,
)
account_id = Column(
String(36),
nullable=False,
index=True,
)
name = Column(
String(128),
index=True,
)
description = Column(
String(512),
)
type = Column(
String(36),
nullable=False,
)
reference = Column(
String(512),
index=True,
nullable=False,
)
rotation = Column(
SmallInteger,
nullable=False,
)
start_time = Column(
Float,
nullable=False,
server_default=text("0.0"),
)
status = Column(
SmallInteger,
index=True,
nullable=False,
)
version = Column(
SmallInteger,
nullable=False,
)
downloadable = Column(
SmallInteger,
nullable=False,
server_default=text("0"),
)
timeout = Column(
Integer,
nullable=True,
)
timestamp_source = Column(
SmallInteger,
nullable=False,
server_default=text("0"),
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
status_last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
last_feedback_time = Column(
TIMESTAMP,
index=True,
nullable=True,
)
notification_policy = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
ix_stream_account_id_stream_id = Index(
"ix_stream_account_id_stream_id", account_id, stream_id, unique=True
)
[docs]
class Restart(Base):
"""
Database table model for stream restart info.
"""
__tablename__ = "restart"
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
primary_key=True,
)
restart = Column(
SmallInteger,
nullable=False,
index=True,
)
attempt_count = Column(
SmallInteger,
nullable=False,
index=True,
)
delay = Column(
Integer,
nullable=False,
)
current_attempt = Column(
SmallInteger,
nullable=False,
server_default=text("0"),
)
last_attempt_time = Column(
TIMESTAMP,
nullable=True,
)
status = Column(
SmallInteger,
nullable=False,
server_default=text("0"),
)
[docs]
class Log(Base):
"""
Database table model for stream logging.
"""
__tablename__ = "log"
log_id = Column(
BigInteger,
logIdSeq,
primary_key=True,
)
stream_id = Column(
String(36),
index=True,
)
time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
error = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
status = Column(
SmallInteger,
index=True,
nullable=False,
)
status_last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
nullable=False,
)
last_processed_frame_time = Column(
Float,
nullable=True,
)
meta = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
[docs]
class Group(Base):
"""
Database table model for group.
"""
__tablename__ = "group"
id = Column(
BigInteger,
groupIdSeq,
index=True,
)
group_id = Column(
String(36),
nullable=False,
primary_key=True,
)
group_name = Column(
String(128),
nullable=False,
index=True,
)
account_id = Column(
String(36),
nullable=False,
index=True,
)
description = Column(
String(256),
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
nullable=False,
)
ix_group_group_name_account_id = Index(
"ix_group_group_name_account_id", group_name, account_id, unique=True
)
[docs]
class GroupStream(Base):
"""
Database table model for links between stream and group.
"""
__tablename__ = "group_stream"
id = Column(
BigInteger,
groupStreamIdSeq,
primary_key=True,
)
group_id = Column(
String(36),
ForeignKey("group.group_id", ondelete="CASCADE"),
index=True,
)
group_name = Column(String(128), index=True)
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
index=True,
)
[docs]
class Agent(Base):
"""
Database table model for agent.
"""
__tablename__ = "agent"
agent_id = Column(String(36), primary_key=True)
agent_name = Column(String(128), nullable=False, index=True)
description = Column(String(512))
status = Column(SmallInteger, index=True, nullable=False)
max_stream_count = Column(SmallInteger, nullable=False)
active_stream_count = Column(SmallInteger, nullable=False)
api_version = Column(SmallInteger, nullable=False)
host = Column(String(512), nullable=False)
port = Column(Integer, nullable=False)
account_id = Column(
String(36),
nullable=False,
index=True,
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
last_feedback_time = Column(
TIMESTAMP,
index=True,
nullable=False,
server_default=DB_CURRENT_TIMESTAMP,
)
[docs]
class AgentStream(Base):
"""
Database table model for links between stream and agent.
"""
__tablename__ = "agent_stream"
id = Column(BigInteger, agentStreamIdSeq, autoincrement=True, primary_key=True)
agent_id = Column(String(36), index=True)
stream_id = Column(String(36), index=True)
status = Column(SmallInteger, index=True, nullable=False)
[docs]
class VideoAnalytic(Base):
"""
Database table model for analytic.
"""
__tablename__ = "video_analytic"
analytic_id = Column(String(36), primary_key=True)
analytic_name = Column(String(36), nullable=False, index=True, unique=True)
description = Column(
String(512),
)
documentation = Column(LargeBinary, nullable=True)
version = Column(
SmallInteger,
nullable=False,
)
default_parameters = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
validation_schema = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
account_id = Column(
String(36),
nullable=False,
index=True,
)
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
)
ix_analytic_name_id = Index(
"ix_analytic_name_id", analytic_name, analytic_id, unique=True
)
[docs]
class AgentAnalytic(Base):
"""
Database table model for links between agent and analytic.
"""
__tablename__ = "agent_analytic"
id = Column(
BigInteger,
agentAnalyticIdSeq,
primary_key=True,
server_default=text(f"nextval('{agentAnalyticIdSeq.name}')"),
)
agent_id = Column(
String(36),
ForeignKey("agent.agent_id", ondelete="CASCADE"),
index=True,
)
analytic_id = Column(
String(36),
ForeignKey("video_analytic.analytic_id", ondelete="CASCADE"),
index=True,
)
analytic_name = Column(String(36), nullable=False)
[docs]
class StreamAnalytic(Base):
"""
Database table model for links between stream and analytic.
"""
__tablename__ = "stream_analytic"
id = Column(
BigInteger,
streamAnalyticIdSeq,
server_default=text(f"nextval('{streamAnalyticIdSeq.name}')"),
autoincrement=True,
primary_key=True,
)
analytic_id = Column(
String(36),
ForeignKey("video_analytic.analytic_id", ondelete="CASCADE"),
index=True,
)
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
index=True,
)
analytic_name = Column(String(36), nullable=False)
analytic_parameters = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
)
[docs]
def view(name: str, selectable) -> TableClause:
"""
Create a simple view object.
Use it like a table: `select([SomeView.c.roi])` or `SomeView.insert().values(name='foo')`.
Args:
name: view name
selectable: any selectable that has columns
Returns:
sa.table object (NOT Model)
"""
t = table(name)
for c in selectable.c:
c._make_proxy(t)
return t
StreamQueueView = view("streams_queue", Stream.__table__)
[docs]
class SingleProcessLock(Base):
"""
Table for storing one-at-time process lock.
Lock a needed process by selecting row with `FOR UPDATE` for a guaranteed single-instance execution.
"""
__tablename__ = "single_process_lock"
name = Column(String(100), primary_key=True, comment="str: process name")
checkpoint = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
nullable=False,
comment="date: last timestamp of a completed process",
)
expiration = Column(
TIMESTAMP,
comment="date: lock expiration time",
)
session_id = Column(String(36), comment="uuid: current lock session id")