Database context¶
- class luna_streams.db.context.DBContext(logger, dbSettings=None, storageTime=None)[source]¶
DB context
- async autorestartStreams()[source]¶
Autorestart streams. Make stream.status failure->`pending` for all failed streams that allow autorestart taking restart delay into account Make stream.restart.current_attempt to 0 (means autorestart has been completed) for all streams that have status different from failure after delay seconds and current_attempt >= 1
- Parameters:
lock – active db connection
- Return type:
int
- Returns:
amount of the restarted streams
- async blockStreams(connection, streamIds, accountId=None)[source]¶
Block streams for update.
- Parameters:
connection – current connection
streamIds (
list
[str
]) – stream idsaccountId (
Optional
[str
]) – account id
- Return type:
list
[str
]- Returns:
ids of streams blocked
- async checkStreamAllowedForUpdate(connection, streamId, newStatus=None)[source]¶
Check stream is allowed for update with new status :type connection: :param connection: connection to database :type streamId:
str
:param streamId: stream id :type newStatus:Optional
[str
] :param newStatus: new status for stream- Raises:
VLException(Error.UnableToStopProcessing.format(streamId), 400, False) if unable to stop stream –
VLException(Error.UnableToCancelProcessing.format(streamId), 400, False) if unable to cancel stream –
- async countGroups(groupNames=None, groupIds=None, accountId=None)[source]¶
Get count of groups.
- Parameters:
groupNames (
Optional
[list
[str
]]) – group namesgroupIds (
Optional
[list
[str
]]) – group idsaccountId (
Optional
[str
]) – account id
- Return type:
int
- Returns:
count of groups
- async countStreams(filters)[source]¶
Count streams according to filters.
- Parameters:
filters (
StreamSearchFilters
) – stream search filters- Return type:
int
- Returns:
streams count
- async createGroup(group)[source]¶
Create group.
- Parameters:
group (
NewGroupModel
) – group to create- Return type:
str
- Returns:
unique group id
- async createStream(stream, streamId=None, version=1, dbConnection=None)[source]¶
Save stream.
- Parameters:
stream (
NewStreamModel
) – stream to savestreamId (
Optional
[str
]) – optional id to put stream byversion (
int
) – optional stream versiondbConnection (
Optional
[AbstractDBConnection
]) – stream database connection, if need to do something within the transaction
- Return type:
str
- Returns:
stream id in uuid4 format
- async createStreamV2(stream, streamId=None, version=1, dbConnection=None)[source]¶
Save stream.
- Parameters:
stream (
NewStreamModelV2
) – stream to save in V2 formatstreamId (
Optional
[str
]) – optional id to put stream byversion (
int
) – optional stream versiondbConnection (
Optional
[AbstractDBConnection
]) – stream database connection, if need to do something within the transaction
- Return type:
str
- Returns:
stream id in uuid4 format
- async deleteStreamsByDeleteFilters(filters, accountId=None)[source]¶
Delete streams by filters (OR).
- Parameters:
filters (
DeleteStreamsFilters
) – streams delete filtersaccountId (
Optional
[str
]) – account id
- async deleteStreamsLogs(logTimeLt=None)[source]¶
Delete streams logs except the last one :param : logTimeLt: upper excluding bound of stream log creation time
- Return type:
int
- Returns:
deleted logs count
- async static denyStreamAutoRestart(connection, streamId)[source]¶
Deny stream auto restart :type connection:
AbstractDBConnection
:param connection: connection :type streamId:str
:param streamId: stream id
- async downgradeStreamStatus(streamInactiveTime)[source]¶
Downgrade stream status. Make stream.status in_progress->`pending` for all long-time-ago updated streams.
- Parameters:
streamInactiveTime – period without feedback
- Return type:
int
- Returns:
amount of the downgraded streams
- getGeoPosition(geoPosition)[source]¶
Get geo position prepared for insertion to database :type geoPosition:
dict
:param geoPosition: raw dict with latitude and longitude- Return type:
Union
[Geometry
,TypeVar
(T_SDO_GEOMETRY
)]- Returns:
Geometry object for postgres database or SDO_GEOMETRY object for oracle database
- async getGroups(groupNames=None, groupIds=None, accountId=None, page=1, pageSize=None)[source]¶
Get groups.
- Parameters:
groupNames (
Optional
[list
[str
]]) – group namesgroupIds (
Optional
[list
[str
]]) – group idsaccountId (
Optional
[str
]) – account idpage (
int
) – page numberpageSize (
Optional
[int
]) – page size
- Return type:
list
[dict
[str
,Any
]]- Returns:
list of groups
- async getPreview(streamId, previewType, accountId=None)[source]¶
Get preview url :type streamId:
str
:param streamId: stream id :type previewType:Literal
['last_frame'
,'live'
] :param previewType: preview type - frame or live :type accountId:Optional
[str
] :param accountId: account id- Return type:
str
- Returns:
preview url
- Raises:
VLException(Error.PreviewNotFound.format(previewType, streamId), 404, False) if preview not found –
VLException(Error.StreamNotFound.format(streamId), 404, False) if stream not found –
- getRuntimeChecks(includeLunaServices=False)[source]¶
Returns configured system checks, pairs of (name, coroutine).
- Parameters:
includeLunaServices (
bool
) – A bool, whether to return checks for luna services.- Return type:
List
[Tuple
[str
,Awaitable
]]
- async getStreams(filters, page=None, pageSize=None, targetFormat=1)[source]¶
Get streams.
- Parameters:
filters (
StreamSearchFilters
) – stream search filterspage (
Optional
[int
]) – page numberpageSize (
Optional
[int
]) – page sizetargetFormat (
int
) – stream target format
- Return type:
list
[dict
[str
,Any
]]- Returns:
list of streams
- async getStreamsLogs(page, pageSize, targets=None, accountId=None, streamIds=None, statuses=None, logTimeLt=None, logTimeGte=None)[source]¶
Get streams logs by filters :type page:
int
:param page: page :type pageSize:int
:param pageSize: page size :type targets:Optional
[list
[str
]] :param targets: targets :type accountId:Optional
[str
] :param accountId: account id :type streamIds:Optional
[list
[int
]] :param streamIds: stream ids :type statuses:Optional
[list
[str
]] :param statuses: stream statuses :type logTimeLt:Optional
[datetime
] :param logTimeLt: upper excluding bound of stream log creation time :type logTimeGte:Optional
[datetime
] :param logTimeGte: lower including bound of stream log creation time- Return type:
list
[dict
[str
,Any
]]
- async getStreamsQueue(limit, streamIds=None, streamNames=None, groupIds=None, groupNames=None, targetFormat=1)[source]¶
Get streams from the queue.
- Parameters:
limit (
int
) – the maximum amount to returnstreamIds (
Optional
[list
[str
]]) – list of stream idsstreamNames (
Optional
[list
[str
]]) – list of stream namesgroupIds (
Optional
[list
[str
]]) – list of group idsgroupNames (
Optional
[list
[str
]]) – list of group namestargetFormat (
int
) – stream target format
- Return type:
list
[dict
[str
,Any
]]- Returns:
streams as dicts
- async classmethod initSDO()[source]¶
Initialize SDO_GEOMETRY to improve insertion speed | Required only for oracle database See cx_oracle samples/insert_geometry.py for details
- Return type:
None
- async linkStreamsToGroup(streamIds, groupId, accountId=None)[source]¶
Attach streamIds to group.
- Parameters:
streamIds (
list
[str
]) – stream idsgroupId (
str
) – group idaccountId (
Optional
[str
]) – account id
- Raises:
ContextException(Error.StreamNotFound.format(streamId)) if stream doesnt exist –
- static prepareSearchQueryFilters(filters)[source]¶
Prepare search query filters on “Stream” model.
- Parameters:
filters (
StreamSearchFilters
) – query filters- Return type:
BooleanClauseList
- Returns:
filters for select query
- async probe()[source]¶
Ensure provided config is valid. Create new connection. Can be used without initialization.
Support mixin.Initializable protocol
- Return type:
bool
- async pullStreamsQueue(limit, streamIds=None, streamNames=None, groupIds=None, groupNames=None, targetFormat=1)[source]¶
Pull streams from the queue.
- Parameters:
limit (
int
) – the maximum amount to returnstreamIds (
Optional
[list
[str
]]) – list of stream idstreamNames (
Optional
[list
[str
]]) – list of stream namesgroupNames (
Optional
[list
[str
]]) – list of group namesgroupIds (
Optional
[list
[str
]]) – list of group idstargetFormat (
int
) – stream target format - 1 or 2
- Return type:
list
[dict
[str
,Any
]]- Returns:
streams as dicts
- async putStream(streamId, stream, streamFormat=1)[source]¶
Put stream. There are 3 cases: 1) Replacing existing stream 2) Creating new stream 3) Recreating new stream
- Parameters:
streamId (
str
) – id to put stream bystream (
NewStreamModel
|NewStreamModelV2
) – stream to putstreamFormat (
int
) – stream data format version
- Raises:
VLException(Error.StreamNotFound.format(streamId), 404, False) if the stream was deleted –
- Return type:
int
- Returns:
stream current version
- async removeGroup(groupId, accountId=None)[source]¶
Remove group.
- Parameters:
groupId (
str
) – group idaccountId (
Optional
[str
]) – account id
- Return type:
int
- Returns:
deleted group count
- async removeStreams(streamIds, accountId=None)[source]¶
Remove streams by streamIds.
- Parameters:
streamIds (
list
[str
]) – stream idsaccountId (
Optional
[str
]) – account id
- Return type:
int
- Returns:
removed streams count
- async saveFeedback(streams)[source]¶
Save feedback, return stream info.
- Parameters:
streams (
list
[OneStreamFeedback
]) – list of stream feedbacks- Returns:
{“modified”: [{<stream info>}], “unmodified”: [{<stream info>}]}
- Return type:
two lists of db stream info objects
- splitList(values, divider=1)[source]¶
Split list into several lists taking into account max length of sql queries :type values:
list
:param values: list of values :type divider:int
:param divider: additional divider for list split (applicable for many args usage per request)- Return type:
list
[list
]- Returns:
lists
- async unlinkStreamsFromGroup(streamIds, groupId, accountId=None)[source]¶
Unlink streamIds from group.
- Parameters:
streamIds (
list
[str
]) – stream idsgroupId (
str
) – group idaccountId (
Optional
[str
]) – account id
- class luna_streams.db.context.SingleProcessLock(logger, name)[source]¶
Single-process lock realisation.
- logger¶
logger
- name¶
lock name (SingleProcessLock.name)
- context¶
database context
- currentDBTimestamp¶
sql function for current database calculating
- async heartbeat(name, ttl, sessionId)[source]¶
Master heartbeat.
- Parameters:
sessionId (
str
) – current lock sessionname (
str
) – lock namettl (
int
) – lock ttl
- Return type:
bool
- Returns:
true if lock has same session id otherwise false
- async lockProcess(name, ttl, sessionId)[source]¶
Try to get a lock.
- Lock can be to get if:
if expiration is null
if expiration in the past
- Parameters:
name (
str
) – lock namettl (
int
) – lock ttlsessionId (
str
) – new lock session id
- Return type:
bool
- Returns:
True if lock is free and we get it (set up expiration time) otherwise False