import inspect
from luna_lambda_tools.private.metrics.base_metrics import (
UserCounter as _UserCounter,
UserGauge as _UserGauge,
UserHistogram as _UserHistogram,
UserSummary as _UserSummary,
)
from luna_lambda_tools.private.metrics.manager import ActivityMonitor
[docs]
def monitorLastActivity(func):
"""Wrap user function with last activity update"""
if inspect.iscoroutinefunction(func):
return ActivityMonitor.wrapAsyncLastActivity(func)
else:
return ActivityMonitor.wrapSyncLastActivity(func)
[docs]
class Counter(_UserCounter):
"""Counter metrics. Used for monotonically increasing values"""
def inc(self, amount: float = 1, labels: dict[str, str] = None):
"""Increase metric by some amount"""
self._inc(amount=amount, labels=labels)
[docs]
class Gauge(_UserGauge):
"""Gauge metrics. Used for values that go up and down"""
def set(self, value: float, labels: dict[str, str] = None):
"""Set metric to some value"""
self._set(value=value, labels=labels)
def inc(self, amount: float = 1, labels: dict[str, str] = None):
"""Increase metric by some amount"""
self._inc(amount=amount, labels=labels)
def dec(self, amount: float = 1, labels: dict[str, str] = None):
"""Decrease metric by some amount"""
self._dec(amount=amount, labels=labels)
[docs]
class Histogram(_UserHistogram):
"""Histogram metric. Used to measure distributions"""
def observe(self, amount: float, labels: dict[str, str] = None):
"""Observe new metric value"""
self._observe(amount=amount, labels=labels)
[docs]
class Summary(_UserSummary):
"""Summary metric. Used to track event durations or sizes"""
def observe(self, amount: float, labels: dict[str, str] = None):
"""Observe new metric value"""
self._observe(amount=amount, labels=labels)