Source code for luna_video_manager.db.video_manager_db_tools.models.video_manager_models

from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import (
    CLOB,
    TIMESTAMP,
    BigInteger,
    Column,
    ForeignKey,
    Index,
    Integer,
    LargeBinary,
    Sequence,
    SmallInteger,
    Float,
    String,
    table,
    text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import TableClause

from .config import DBConfig
from .models_utils import GEOGRAPHY_COLUMN_TYPE

Base = declarative_base()

# Raw sql function to get utc timestamp for creating/updating table columns. USED BY ALEMBIC ONLY.
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None

streamIdSeq = Sequence("stream_id_seq", metadata=Base.metadata)
logIdSeq = Sequence("log_id_seq", metadata=Base.metadata)
groupIdSeq = Sequence("group_id_seq", metadata=Base.metadata)
groupStreamIdSeq = Sequence("group_stream_id_seq", metadata=Base.metadata)
agentAnalyticIdSeq = Sequence("agent_analytic_id_seq", metadata=Base.metadata)
streamAnalyticIdSeq = Sequence("stream_analytic_id_seq", metadata=Base.metadata)
agentStreamIdSeq = Sequence("agent_stream_id_seq", metadata=Base.metadata)


[docs] class Stream(Base): """ Database table model for streams. """ __tablename__ = "stream" id = Column( BigInteger, streamIdSeq, index=True, ) stream_id = Column( String(36), primary_key=True, ) account_id = Column( String(36), nullable=False, index=True, ) name = Column( String(128), index=True, ) description = Column( String(512), ) type = Column( String(36), nullable=False, ) reference = Column( String(512), index=True, nullable=False, ) rotation = Column( SmallInteger, nullable=False, ) start_time = Column( Float, nullable=False, server_default=text("0.0"), ) status = Column( SmallInteger, index=True, nullable=False, ) version = Column( SmallInteger, nullable=False, ) downloadable = Column( SmallInteger, nullable=False, server_default=text("0"), ) timeout = Column( Integer, nullable=True, ) timestamp_source = Column( SmallInteger, nullable=False, server_default=text("0"), ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) status_last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) last_feedback_time = Column( TIMESTAMP, index=True, nullable=True, ) notification_policy = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, ) ix_stream_account_id_stream_id = Index( "ix_stream_account_id_stream_id", account_id, stream_id, unique=True )
[docs] class StreamMeta(Base): """ Database table model for stream meta info. """ __tablename__ = "stream_meta" stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), primary_key=True, ) account_id = Column( String(36), nullable=False, index=True, ) city = Column( String(36), unique=False, ) area = Column( String(36), unique=False, ) district = Column( String(36), unique=False, ) street = Column( String(36), unique=False, ) house_number = Column( String(36), unique=False, ) geo_position = Column(GEOGRAPHY_COLUMN_TYPE, nullable=True) source = Column(String(128), unique=False, nullable=True)
[docs] class Restart(Base): """ Database table model for stream restart info. """ __tablename__ = "restart" stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), primary_key=True, ) restart = Column( SmallInteger, nullable=False, index=True, ) attempt_count = Column( SmallInteger, nullable=False, index=True, ) delay = Column( Integer, nullable=False, ) current_attempt = Column( SmallInteger, nullable=False, server_default=text("0"), ) last_attempt_time = Column( TIMESTAMP, nullable=True, ) status = Column( SmallInteger, nullable=False, server_default=text("0"), )
[docs] class Log(Base): """ Database table model for stream logging. """ __tablename__ = "log" log_id = Column( BigInteger, logIdSeq, primary_key=True, ) stream_id = Column( String(36), index=True, ) time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) error = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, ) status = Column( SmallInteger, index=True, nullable=False, ) status_last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, nullable=False, ) last_processed_frame_time = Column( Float, nullable=True, ) meta = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, )
[docs] class Group(Base): """ Database table model for group. """ __tablename__ = "group" id = Column( BigInteger, groupIdSeq, index=True, ) group_id = Column( String(36), nullable=False, primary_key=True, ) group_name = Column( String(128), nullable=False, index=True, ) account_id = Column( String(36), nullable=False, index=True, ) description = Column( String(256), ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, nullable=False, ) ix_group_group_name_account_id = Index( "ix_group_group_name_account_id", group_name, account_id, unique=True )
[docs] class GroupStream(Base): """ Database table model for links between stream and group. """ __tablename__ = "group_stream" id = Column( BigInteger, groupStreamIdSeq, primary_key=True, ) group_id = Column( String(36), ForeignKey("group.group_id", ondelete="CASCADE"), index=True, ) group_name = Column(String(128), index=True) stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), index=True, )
[docs] class Agent(Base): """ Database table model for agent. """ __tablename__ = "agent" agent_id = Column(String(36), primary_key=True) agent_name = Column(String(128), nullable=False, index=True) description = Column(String(512)) status = Column(SmallInteger, index=True, nullable=False) max_stream_count = Column(SmallInteger, nullable=False) active_stream_count = Column(SmallInteger, nullable=False) api_version = Column(SmallInteger, nullable=False) host = Column(String(512), nullable=False) port = Column(Integer, nullable=False) account_id = Column( String(36), nullable=False, index=True, ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) last_feedback_time = Column( TIMESTAMP, index=True, nullable=False, server_default=DB_CURRENT_TIMESTAMP, )
[docs] class AgentStream(Base): """ Database table model for links between stream and agent. """ __tablename__ = "agent_stream" id = Column(BigInteger, agentStreamIdSeq, autoincrement=True, primary_key=True) agent_id = Column(String(36), index=True) stream_id = Column(String(36), index=True) status = Column(SmallInteger, index=True, nullable=False)
[docs] class VideoAnalytic(Base): """ Database table model for analytic. """ __tablename__ = "video_analytic" analytic_id = Column(String(36), primary_key=True) analytic_name = Column(String(36), nullable=False, index=True, unique=True) description = Column( String(512), ) documentation = Column(LargeBinary, nullable=True) version = Column( SmallInteger, nullable=False, ) default_parameters = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, ) validation_schema = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, ) account_id = Column( String(36), nullable=False, index=True, ) create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, ) ix_analytic_name_id = Index( "ix_analytic_name_id", analytic_name, analytic_id, unique=True )
[docs] class AgentAnalytic(Base): """ Database table model for links between agent and analytic. """ __tablename__ = "agent_analytic" id = Column( BigInteger, agentAnalyticIdSeq, primary_key=True, server_default=text(f"nextval('{agentAnalyticIdSeq.name}')"), ) agent_id = Column( String(36), ForeignKey("agent.agent_id", ondelete="CASCADE"), index=True, ) analytic_id = Column( String(36), ForeignKey("video_analytic.analytic_id", ondelete="CASCADE"), index=True, ) analytic_name = Column(String(36), nullable=False)
[docs] class StreamAnalytic(Base): """ Database table model for links between stream and analytic. """ __tablename__ = "stream_analytic" id = Column( BigInteger, streamAnalyticIdSeq, server_default=text(f"nextval('{streamAnalyticIdSeq.name}')"), autoincrement=True, primary_key=True, ) analytic_id = Column( String(36), ForeignKey("video_analytic.analytic_id", ondelete="CASCADE"), index=True, ) stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), index=True, ) analytic_name = Column(String(36), nullable=False) analytic_parameters = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, )
[docs] def view(name: str, selectable) -> TableClause: """ Create a simple view object. Use it like a table: `select([SomeView.c.roi])` or `SomeView.insert().values(name='foo')`. Args: name: view name selectable: any selectable that has columns Returns: sa.table object (NOT Model) """ t = table(name) for c in selectable.c: c._make_proxy(t) return t
StreamQueueView = view("streams_queue", Stream.__table__)
[docs] class SingleProcessLock(Base): """ Table for storing one-at-time process lock. Lock a needed process by selecting row with `FOR UPDATE` for a guaranteed single-instance execution. """ __tablename__ = "single_process_lock" name = Column(String(100), primary_key=True, comment="str: process name") checkpoint = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, nullable=False, comment="date: last timestamp of a completed process", ) expiration = Column( TIMESTAMP, comment="date: lock expiration time", ) session_id = Column(String(36), comment="uuid: current lock session id")