Database context

class luna_streams.db.context.DBContext(logger, dbSettings=None, storageTime=None)[source]

DB context

async autorestartStreams()[source]

Autorestart streams. Make stream.status failure->`pending` for all failed streams that allow autorestart taking restart delay into account Make stream.restart.current_attempt to 0 (means autorestart has been completed) for all streams that have status different from failure after delay seconds and current_attempt >= 1

Parameters:

lock – active db connection

Return type:

int

Returns:

amount of the restarted streams

async blockStreams(connection, streamIds, accountId=None)[source]

Block streams for update.

Parameters:
  • connection – current connection

  • streamIds (list[str]) – stream ids

  • accountId (Optional[str]) – account id

Return type:

list[str]

Returns:

ids of streams blocked

async checkStreamAllowedForUpdate(connection, streamId, newStatus=None)[source]

Check stream is allowed for update with new status :type connection: :param connection: connection to database :type streamId: str :param streamId: stream id :type newStatus: Optional[str] :param newStatus: new status for stream

Raises:
  • VLException(Error.UnableToStopProcessing.format(streamId), 400, False) if unable to stop stream

  • VLException(Error.UnableToCancelProcessing.format(streamId), 400, False) if unable to cancel stream

async close()[source]

Close context

async countGroups(groupNames=None, groupIds=None, accountId=None)[source]

Get count of groups.

Parameters:
  • groupNames (Optional[list[str]]) – group names

  • groupIds (Optional[list[str]]) – group ids

  • accountId (Optional[str]) – account id

Return type:

int

Returns:

count of groups

async countStreams(filters)[source]

Count streams according to filters.

Parameters:

filters (StreamSearchFilters) – stream search filters

Return type:

int

Returns:

streams count

async createGroup(group)[source]

Create group.

Parameters:

group (NewGroupModel) – group to create

Return type:

str

Returns:

unique group id

async createStream(stream, streamId=None, version=1, dbConnection=None)[source]

Save stream.

Parameters:
  • stream (NewStreamModel) – stream to save

  • streamId (Optional[str]) – optional id to put stream by

  • version (int) – optional stream version

  • dbConnection (Optional[AbstractDBConnection]) – stream database connection, if need to do something within the transaction

Return type:

str

Returns:

stream id in uuid4 format

async createStreamV2(stream, streamId=None, version=1, dbConnection=None)[source]

Save stream.

Parameters:
  • stream (NewStreamModelV2) – stream to save in V2 format

  • streamId (Optional[str]) – optional id to put stream by

  • version (int) – optional stream version

  • dbConnection (Optional[AbstractDBConnection]) – stream database connection, if need to do something within the transaction

Return type:

str

Returns:

stream id in uuid4 format

async deleteStreamsByDeleteFilters(filters, accountId=None)[source]

Delete streams by filters (OR).

Parameters:
  • filters (DeleteStreamsFilters) – streams delete filters

  • accountId (Optional[str]) – account id

async deleteStreamsLogs(logTimeLt=None)[source]

Delete streams logs except the last one :param : logTimeLt: upper excluding bound of stream log creation time

Return type:

int

Returns:

deleted logs count

async static denyStreamAutoRestart(connection, streamId)[source]

Deny stream auto restart :type connection: AbstractDBConnection :param connection: connection :type streamId: str :param streamId: stream id

async downgradeStreamStatus(streamInactiveTime)[source]

Downgrade stream status. Make stream.status in_progress->`pending` for all long-time-ago updated streams.

Parameters:

streamInactiveTime – period without feedback

Return type:

int

Returns:

amount of the downgraded streams

getGeoPosition(geoPosition)[source]

Get geo position prepared for insertion to database :type geoPosition: dict :param geoPosition: raw dict with latitude and longitude

Return type:

Union[Geometry, TypeVar(T_SDO_GEOMETRY)]

Returns:

Geometry object for postgres database or SDO_GEOMETRY object for oracle database

async getGroups(groupNames=None, groupIds=None, accountId=None, page=1, pageSize=None)[source]

Get groups.

Parameters:
  • groupNames (Optional[list[str]]) – group names

  • groupIds (Optional[list[str]]) – group ids

  • accountId (Optional[str]) – account id

  • page (int) – page number

  • pageSize (Optional[int]) – page size

Return type:

list[dict[str, Any]]

Returns:

list of groups

async getPreview(streamId, previewType, accountId=None)[source]

Get preview url :type streamId: str :param streamId: stream id :type previewType: Literal['last_frame', 'live'] :param previewType: preview type - frame or live :type accountId: Optional[str] :param accountId: account id

Return type:

str

Returns:

preview url

Raises:
  • VLException(Error.PreviewNotFound.format(previewType, streamId), 404, False) if preview not found

  • VLException(Error.StreamNotFound.format(streamId), 404, False) if stream not found

getRuntimeChecks(includeLunaServices=False)[source]

Returns configured system checks, pairs of (name, coroutine).

Parameters:

includeLunaServices (bool) – A bool, whether to return checks for luna services.

Return type:

List[Tuple[str, Awaitable]]

async getStreams(filters, page=None, pageSize=None, targetFormat=1)[source]

Get streams.

Parameters:
  • filters (StreamSearchFilters) – stream search filters

  • page (Optional[int]) – page number

  • pageSize (Optional[int]) – page size

  • targetFormat (int) – stream target format

Return type:

list[dict[str, Any]]

Returns:

list of streams

async getStreamsLogs(page, pageSize, targets=None, accountId=None, streamIds=None, statuses=None, logTimeLt=None, logTimeGte=None)[source]

Get streams logs by filters :type page: int :param page: page :type pageSize: int :param pageSize: page size :type targets: Optional[list[str]] :param targets: targets :type accountId: Optional[str] :param accountId: account id :type streamIds: Optional[list[int]] :param streamIds: stream ids :type statuses: Optional[list[str]] :param statuses: stream statuses :type logTimeLt: Optional[datetime] :param logTimeLt: upper excluding bound of stream log creation time :type logTimeGte: Optional[datetime] :param logTimeGte: lower including bound of stream log creation time

Return type:

list[dict[str, Any]]

async getStreamsQueue(limit, streamIds=None, streamNames=None, groupIds=None, groupNames=None, targetFormat=1)[source]

Get streams from the queue.

Parameters:
  • limit (int) – the maximum amount to return

  • streamIds (Optional[list[str]]) – list of stream ids

  • streamNames (Optional[list[str]]) – list of stream names

  • groupIds (Optional[list[str]]) – list of group ids

  • groupNames (Optional[list[str]]) – list of group names

  • targetFormat (int) – stream target format

Return type:

list[dict[str, Any]]

Returns:

streams as dicts

async classmethod initSDO()[source]

Initialize SDO_GEOMETRY to improve insertion speed | Required only for oracle database See cx_oracle samples/insert_geometry.py for details

Return type:

None

async initialize()[source]

Initialize context

async linkStreamsToGroup(streamIds, groupId, accountId=None)[source]

Attach streamIds to group.

Parameters:
  • streamIds (list[str]) – stream ids

  • groupId (str) – group id

  • accountId (Optional[str]) – account id

Raises:

ContextException(Error.StreamNotFound.format(streamId)) if stream doesnt exist

static prepareSearchQueryFilters(filters)[source]

Prepare search query filters on “Stream” model.

Parameters:

filters (StreamSearchFilters) – query filters

Return type:

BooleanClauseList

Returns:

filters for select query

async probe()[source]

Ensure provided config is valid. Create new connection. Can be used without initialization.

Support mixin.Initializable protocol

Return type:

bool

async pullStreamsQueue(limit, streamIds=None, streamNames=None, groupIds=None, groupNames=None, targetFormat=1)[source]

Pull streams from the queue.

Parameters:
  • limit (int) – the maximum amount to return

  • streamIds (Optional[list[str]]) – list of stream id

  • streamNames (Optional[list[str]]) – list of stream names

  • groupNames (Optional[list[str]]) – list of group names

  • groupIds (Optional[list[str]]) – list of group ids

  • targetFormat (int) – stream target format - 1 or 2

Return type:

list[dict[str, Any]]

Returns:

streams as dicts

async putStream(streamId, stream, streamFormat=1)[source]

Put stream. There are 3 cases: 1) Replacing existing stream 2) Creating new stream 3) Recreating new stream

Parameters:
  • streamId (str) – id to put stream by

  • stream (NewStreamModel | NewStreamModelV2) – stream to put

  • streamFormat (int) – stream data format version

Raises:

VLException(Error.StreamNotFound.format(streamId), 404, False) if the stream was deleted

Return type:

int

Returns:

stream current version

async removeGroup(groupId, accountId=None)[source]

Remove group.

Parameters:
  • groupId (str) – group id

  • accountId (Optional[str]) – account id

Return type:

int

Returns:

deleted group count

async removeStreams(streamIds, accountId=None)[source]

Remove streams by streamIds.

Parameters:
  • streamIds (list[str]) – stream ids

  • accountId (Optional[str]) – account id

Return type:

int

Returns:

removed streams count

async saveFeedback(streams)[source]

Save feedback, return stream info.

Parameters:

streams (list[OneStreamFeedback]) – list of stream feedbacks

Returns:

{“modified”: [{<stream info>}], “unmodified”: [{<stream info>}]}

Return type:

two lists of db stream info objects

splitList(values, divider=1)[source]

Split list into several lists taking into account max length of sql queries :type values: list :param values: list of values :type divider: int :param divider: additional divider for list split (applicable for many args usage per request)

Return type:

list[list]

Returns:

lists

async unlinkStreamsFromGroup(streamIds, groupId, accountId=None)[source]

Unlink streamIds from group.

Parameters:
  • streamIds (list[str]) – stream ids

  • groupId (str) – group id

  • accountId (Optional[str]) – account id

async updateGroup(groupId, group, accountId=None)[source]

Update group.

Parameters:
  • groupId (str) – group id

  • group (PatchGroupModel) – group to update

  • accountId (Optional[str]) – account id

Return type:

int

Returns:

updated group count

async updateStream(streamId, streamData)[source]

Update stream.

Parameters:
  • streamId (str) – stream id

  • streamData (StreamPatchingDataModel) – stream data to update

Return type:

bool

Returns:

True if stream was patched

class luna_streams.db.context.SingleProcessLock(logger, name)[source]

Single-process lock realisation.

logger

logger

name

lock name (SingleProcessLock.name)

context

database context

currentDBTimestamp

sql function for current database calculating

async heartbeat(name, ttl, sessionId)[source]

Master heartbeat.

Parameters:
  • sessionId (str) – current lock session

  • name (str) – lock name

  • ttl (int) – lock ttl

Return type:

bool

Returns:

true if lock has same session id otherwise false

async lockProcess(name, ttl, sessionId)[source]

Try to get a lock.

Lock can be to get if:
  1. if expiration is null

  2. if expiration in the past

Parameters:
  • name (str) – lock name

  • ttl (int) – lock ttl

  • sessionId (str) – new lock session id

Return type:

bool

Returns:

True if lock is free and we get it (set up expiration time) otherwise False