Source code for luna_streams.db.streams_db_tools.models.streams_db_models

from luna_db_tools.functions import currentDBTimestamp
from sqlalchemy import (
    CLOB,
    TIMESTAMP,
    BigInteger,
    Boolean,
    Column,
    ForeignKey,
    Index,
    Integer,
    Sequence,
    SmallInteger,
    String,
    table,
    text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import TableClause

from .config import DBConfig
from .models_utils import GEOGRAPHY_COLUMN_TYPE

Base = declarative_base()

# Raw sql function to get utc timestamp for creating/updating table columns. USED BY ALEMBIC ONLY.
DB_CURRENT_TIMESTAMP = currentDBTimestamp(DBConfig.dbType) if DBConfig.dbType else None

streamIdSeq = Sequence("stream_id_seq", metadata=Base.metadata)
restartIdSeq = Sequence("restart_id_seq", metadata=Base.metadata)
handlerIdSeq = Sequence("handler_id_seq", metadata=Base.metadata)
locationIdSeq = Sequence("location_id_seq", metadata=Base.metadata)
logIdSeq = Sequence("log_id_seq", metadata=Base.metadata)
groupIdSeq = Sequence("group_id_seq", metadata=Base.metadata, start=1)


[docs] class Stream(Base): """ Database table model for streams. """ __tablename__ = "stream" id = Column(Integer, streamIdSeq, index=True, unique=True, comment="int: id") stream_id = Column(String(36), primary_key=True, comment="uuid: stream id") account_id = Column(String(36), nullable=False, comment="uuid: stream account id") name = Column(String(128), index=True, comment="str: stream name") description = Column(String(512), comment="str: stream description") type = Column( String(36), nullable=False, comment="str: stream type (0 - udp, 1 - tcp, 2 - videofile, 3 - images)", ) mask = Column(String(128), comment="str: stream file mask") endless = Column(Boolean(), comment="boolean: stream endless") reference = Column( String(512), index=True, nullable=False, comment="str: stream reference" ) roi = Column(String(128), nullable=False, comment="str: stream rect of interest") droi = Column( String(128), nullable=False, comment="str: stream rect of interest for detection" ) rotation = Column( SmallInteger, nullable=False, comment="smallint: stream rotation (0, 90, 180, 270)", ) preferred_program_stream_frame_width = Column( SmallInteger, nullable=False, comment="smallint: frame width of the preferred program stream", ) status = Column( SmallInteger, index=True, nullable=False, comment="smallint: stream status (" "0 - pending, " "1 - in progress, " "2 - done, " "3 - pause, " "4 - restart, " "5 - cancel, " "6 - failure, " "7 - handler lost, " "8 - not found" ")", ) version = Column(SmallInteger, nullable=False, comment="smallint: stream version") create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: stream create time", ) status_last_update_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: stream status last update time", ) last_feedback_time = Column( TIMESTAMP, index=True, nullable=True, comment="date and time: last feedback time", ) ix_stream_account_id_stream_id = Index( "ix_stream_account_id_stream_id", account_id, stream_id, unique=True ) ix_stream_licensing = Index("ix_stream_licensing", type, status) api_version = Column(SmallInteger, nullable=False, comment="smallint: stream api version")
[docs] class Restart(Base): """ Database table model for stream restart info. """ __tablename__ = "restart" id = Column(Integer, restartIdSeq, primary_key=True, comment="int: id") stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), index=True, comment="uuid: stream id", unique=True, ) restart = Column( SmallInteger, nullable=False, comment="smallint: stream restart status (0, 1)", index=True, ) attempt_count = Column( SmallInteger, nullable=False, comment="smallint: stream restart attempt count", index=True, ) delay = Column( Integer, nullable=False, comment="int: stream restart delay, in seconds" ) current_attempt = Column( SmallInteger, nullable=False, comment="smallint: stream restart attempt number", server_default=text("0"), ) last_attempt_time = Column( TIMESTAMP, nullable=True, comment="date and time: last attempt time" ) status = Column( SmallInteger, nullable=False, server_default=text("0"), comment="autorestart status", )
[docs] class Handler(Base): """ Database table model for stream handler info. """ __tablename__ = "handler" id = Column(Integer, handlerIdSeq, primary_key=True, comment="int: id") stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), index=True, comment="uuid: stream id", unique=True, ) event_handler = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=False, comment="string: stream event handler dump", ) policies = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=False, comment="string: stream processing policies dump", )
[docs] class Location(Base): """ Database table model for stream location info. """ __tablename__ = "location" id = Column(Integer, locationIdSeq, primary_key=True, comment="int: id") stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), index=True, comment="uuid: stream id", unique=True, ) city = Column(String(36), unique=False, comment="str: city") area = Column(String(36), unique=False, comment="str: area") district = Column(String(36), unique=False, comment="str: district") street = Column(String(36), unique=False, comment="str: street") house_number = Column(String(36), unique=False, comment="str: house number") # event gps coordinates # warning: spatial index will be explicitly created after the column is created geo_position = Column(GEOGRAPHY_COLUMN_TYPE, nullable=True)
[docs] class Log(Base): """ Database table model for stream logging. """ __tablename__ = "log" id = Column(BigInteger, logIdSeq, primary_key=True, comment="int: id") stream_id = Column(String(36), comment="uuid: stream id") time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, index=True, nullable=False, comment="date and time: log record time", ) error = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, comment="string: stream error description", ) status = Column( SmallInteger, index=True, nullable=False, comment="smallint: stream status (" "0 - pending, " "1 - in progress, " "2 - done, " "3 - pause, " "4 - restart, " "5 - cancel, " "6 - failure, " "7 - handler lost, " "8 - not found " "9 - deleted" ")", ) video_info = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, comment="string: stream source video info", ) stream_version = Column( SmallInteger, nullable=False, comment="smallint: stream version" ) preview = Column( CLOB if DBConfig.dbType == "oracle" else String, nullable=True, comment="string: stream preview data", ) stream_log_idx = Index("ix_stream_id_id", stream_id, id)
[docs] class Group(Base): """ Database table model for group. """ __tablename__ = "group" id = Column(Integer, groupIdSeq, primary_key=True, comment="int: id") group_id = Column( String(36), nullable=False, index=True, unique=True, comment="uuid: group id" ) group_name = Column(String(128), nullable=False, comment="str: group name") account_id = Column(String(36), nullable=False, comment="uuid: group account id") description = Column(String(256), comment="str: group description") create_time = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, nullable=False, comment="date and time: group create time", ) ix_group_group_name_account_id = Index( "ix_group_group_name_account_id", group_name, account_id, unique=True )
[docs] class GroupStream(Base): """ Database table model for links between stream and group. """ __tablename__ = "group_stream" id = Column( Integer, ForeignKey("group.id", ondelete="CASCADE"), comment="int: group id" ) group_name = Column(String(128), primary_key=True, comment="str: group name") stream_id = Column( String(36), ForeignKey("stream.stream_id", ondelete="CASCADE"), primary_key=True, index=True, comment="uuid: stream id", )
[docs] def view(name: str, selectable) -> TableClause: """ Create a simple view object. Use it like a table: `select([SomeView.c.roi])` or `SomeView.insert().values(name='foo')`. Args: name: view name selectable: any selectable that has columns Returns: sa.table object (NOT Model) """ t = table(name) for c in selectable.c: c._make_proxy(t) return t
# real signature in base_scripts/alembic/versions/f8cdd0805c10_stream_queue_view.py StreamQueueView = view("streams_queue", Stream.__table__)
[docs] class SingleProcessLock(Base): """ Table for storing one-at-time process lock. Lock a needed process by selecting row with `FOR UPDATE` for a guaranteed single-instance execution. """ __tablename__ = "single_process_lock" name = Column(String(100), primary_key=True, comment="str: process name") checkpoint = Column( TIMESTAMP, server_default=DB_CURRENT_TIMESTAMP, nullable=False, comment="date: last timestamp of a completed process", ) expiration = Column( TIMESTAMP, comment="date: lock expiration time", ) session_id = Column(String(36), comment="uuid: current lock session id")