from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import (
CLOB,
TIMESTAMP,
BigInteger,
Boolean,
Column,
ForeignKey,
Index,
Integer,
Sequence,
SmallInteger,
String,
table,
text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import TableClause
from .config import DBConfig
from .models_utils import GEOGRAPHY_COLUMN_TYPE
Base = declarative_base()
# Raw sql function to get utc timestamp for creating/updating table columns. USED BY ALEMBIC ONLY.
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None
streamIdSeq = Sequence("stream_id_seq", metadata=Base.metadata)
restartIdSeq = Sequence("restart_id_seq", metadata=Base.metadata)
handlerIdSeq = Sequence("handler_id_seq", metadata=Base.metadata)
locationIdSeq = Sequence("location_id_seq", metadata=Base.metadata)
logIdSeq = Sequence("log_id_seq", metadata=Base.metadata)
groupIdSeq = Sequence("group_id_seq", metadata=Base.metadata, start=1)
[docs]
class Stream(Base):
"""
Database table model for streams.
"""
__tablename__ = "stream"
id = Column(Integer, streamIdSeq, index=True, unique=True, comment="int: id")
stream_id = Column(String(36), primary_key=True, comment="uuid: stream id")
account_id = Column(String(36), nullable=False, comment="uuid: stream account id")
name = Column(String(128), index=True, comment="str: stream name")
description = Column(String(512), comment="str: stream description")
type = Column(
String(36),
nullable=False,
comment="str: stream type (0 - udp, 1 - tcp, 2 - videofile, 3 - images)",
)
mask = Column(String(128), comment="str: stream file mask")
endless = Column(Boolean(), comment="boolean: stream endless")
reference = Column(
String(512), index=True, nullable=False, comment="str: stream reference"
)
roi = Column(String(128), nullable=False, comment="str: stream rect of interest")
droi = Column(
String(128), nullable=False, comment="str: stream rect of interest for detection"
)
rotation = Column(
SmallInteger,
nullable=False,
comment="smallint: stream rotation (0, 90, 180, 270)",
)
preferred_program_stream_frame_width = Column(
SmallInteger,
nullable=False,
comment="smallint: frame width of the preferred program stream",
)
status = Column(
SmallInteger,
index=True,
nullable=False,
comment="smallint: stream status ("
"0 - pending, "
"1 - in progress, "
"2 - done, "
"3 - pause, "
"4 - restart, "
"5 - cancel, "
"6 - failure, "
"7 - handler lost, "
"8 - not found"
")",
)
version = Column(SmallInteger, nullable=False, comment="smallint: stream version")
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: stream create time",
)
status_last_update_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: stream status last update time",
)
last_feedback_time = Column(
TIMESTAMP,
index=True,
nullable=True,
comment="date and time: last feedback time",
)
ix_stream_account_id_stream_id = Index(
"ix_stream_account_id_stream_id", account_id, stream_id, unique=True
)
ix_stream_licensing = Index("ix_stream_licensing", type, status)
api_version = Column(SmallInteger, nullable=False, comment="smallint: stream api version")
[docs]
class Restart(Base):
"""
Database table model for stream restart info.
"""
__tablename__ = "restart"
id = Column(Integer, restartIdSeq, primary_key=True, comment="int: id")
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
index=True,
comment="uuid: stream id",
unique=True,
)
restart = Column(
SmallInteger,
nullable=False,
comment="smallint: stream restart status (0, 1)",
index=True,
)
attempt_count = Column(
SmallInteger,
nullable=False,
comment="smallint: stream restart attempt count",
index=True,
)
delay = Column(
Integer, nullable=False, comment="int: stream restart delay, in seconds"
)
current_attempt = Column(
SmallInteger,
nullable=False,
comment="smallint: stream restart attempt number",
server_default=text("0"),
)
last_attempt_time = Column(
TIMESTAMP, nullable=True, comment="date and time: last attempt time"
)
status = Column(
SmallInteger,
nullable=False,
server_default=text("0"),
comment="autorestart status",
)
[docs]
class Handler(Base):
"""
Database table model for stream handler info.
"""
__tablename__ = "handler"
id = Column(Integer, handlerIdSeq, primary_key=True, comment="int: id")
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
index=True,
comment="uuid: stream id",
unique=True,
)
event_handler = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=False,
comment="string: stream event handler dump",
)
policies = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=False,
comment="string: stream processing policies dump",
)
[docs]
class Location(Base):
"""
Database table model for stream location info.
"""
__tablename__ = "location"
id = Column(Integer, locationIdSeq, primary_key=True, comment="int: id")
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
index=True,
comment="uuid: stream id",
unique=True,
)
city = Column(String(36), unique=False, comment="str: city")
area = Column(String(36), unique=False, comment="str: area")
district = Column(String(36), unique=False, comment="str: district")
street = Column(String(36), unique=False, comment="str: street")
house_number = Column(String(36), unique=False, comment="str: house number")
# event gps coordinates
# warning: spatial index will be explicitly created after the column is created
geo_position = Column(GEOGRAPHY_COLUMN_TYPE, nullable=True)
[docs]
class Log(Base):
"""
Database table model for stream logging.
"""
__tablename__ = "log"
id = Column(BigInteger, logIdSeq, primary_key=True, comment="int: id")
stream_id = Column(String(36), comment="uuid: stream id")
time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
index=True,
nullable=False,
comment="date and time: log record time",
)
error = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
comment="string: stream error description",
)
status = Column(
SmallInteger,
index=True,
nullable=False,
comment="smallint: stream status ("
"0 - pending, "
"1 - in progress, "
"2 - done, "
"3 - pause, "
"4 - restart, "
"5 - cancel, "
"6 - failure, "
"7 - handler lost, "
"8 - not found "
"9 - deleted"
")",
)
video_info = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
comment="string: stream source video info",
)
stream_version = Column(
SmallInteger, nullable=False, comment="smallint: stream version"
)
preview = Column(
CLOB if DBConfig.dbType == "oracle" else String,
nullable=True,
comment="string: stream preview data",
)
stream_log_idx = Index("ix_stream_id_id", stream_id, id)
[docs]
class Group(Base):
"""
Database table model for group.
"""
__tablename__ = "group"
id = Column(Integer, groupIdSeq, primary_key=True, comment="int: id")
group_id = Column(
String(36), nullable=False, index=True, unique=True, comment="uuid: group id"
)
group_name = Column(String(128), nullable=False, comment="str: group name")
account_id = Column(String(36), nullable=False, comment="uuid: group account id")
description = Column(String(256), comment="str: group description")
create_time = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
nullable=False,
comment="date and time: group create time",
)
ix_group_group_name_account_id = Index(
"ix_group_group_name_account_id", group_name, account_id, unique=True
)
[docs]
class GroupStream(Base):
"""
Database table model for links between stream and group.
"""
__tablename__ = "group_stream"
id = Column(
Integer, ForeignKey("group.id", ondelete="CASCADE"), comment="int: group id"
)
group_name = Column(String(128), primary_key=True, comment="str: group name")
stream_id = Column(
String(36),
ForeignKey("stream.stream_id", ondelete="CASCADE"),
primary_key=True,
index=True,
comment="uuid: stream id",
)
[docs]
def view(name: str, selectable) -> TableClause:
"""
Create a simple view object.
Use it like a table: `select([SomeView.c.roi])` or `SomeView.insert().values(name='foo')`.
Args:
name: view name
selectable: any selectable that has columns
Returns:
sa.table object (NOT Model)
"""
t = table(name)
for c in selectable.c:
c._make_proxy(t)
return t
# real signature in base_scripts/alembic/versions/f8cdd0805c10_stream_queue_view.py
StreamQueueView = view("streams_queue", Stream.__table__)
[docs]
class SingleProcessLock(Base):
"""
Table for storing one-at-time process lock.
Lock a needed process by selecting row with `FOR UPDATE` for a guaranteed single-instance execution.
"""
__tablename__ = "single_process_lock"
name = Column(String(100), primary_key=True, comment="str: process name")
checkpoint = Column(
TIMESTAMP,
server_default=DB_CURRENT_TIMESTAMP,
nullable=False,
comment="date: last timestamp of a completed process",
)
expiration = Column(
TIMESTAMP,
comment="date: lock expiration time",
)
session_id = Column(String(36), comment="uuid: current lock session id")