from track.containers.ring import RingBuffer
from track.containers.types import float32
from track.utils.stat import StatStream
[docs]class Aggregator:
[docs] def append(self, other):
raise NotImplementedError()
def __iadd__(self, other):
return self.append(other)
@property
def val(self):
""" Return the last observed value """
raise NotImplementedError()
[docs] def to_json(self, short=False):
raise NotImplementedError()
[docs] @staticmethod
def lazy(aggregator_t, **kwargs):
"""Lazily instantiate the underlying aggregator """
return lambda: aggregator_t(**kwargs)
[docs]class RingAggregator(Aggregator):
""" Saves the `n` last elements. Start overriding the elements once `n` elements is reached """
def __init__(self, n, dtype=float32):
self.ring = RingBuffer(n, dtype)
[docs] def append(self, other):
self.ring.append(other)
@property
def val(self):
return self.ring.to_list()
[docs] @staticmethod
def lazy(n, dtype):
return lambda: RingAggregator(n, dtype)
[docs] def to_json(self, short=False):
return self.ring.to_list()
def __repr__(self):
return f'r<{repr(self.ring.to_list())}>'
def __str__(self):
return f'r<{str(self.ring.to_list())}>'
[docs]class StatAggregator(Aggregator):
""" Compute mean, sd, min, max; does not keep the entire history.
This is useful if you are worried about memory usage and the values should not vary much.
i.e keeping the entire history is not useful.
"""
def __init__(self, skip_obs=10):
self.stat = StatStream(drop_first_obs=skip_obs)
[docs] def append(self, other):
self.stat.update(other)
@property
def val(self):
return self.stat.val
[docs] @staticmethod
def lazy(skip):
return lambda: StatAggregator(skip)
[docs] @staticmethod
def from_json(data):
stat = StatStream.from_dict(data)
agg = StatAggregator(0)
agg.stat = stat
return agg
[docs] def to_json(self, short=False):
if short:
return {
'avg': self.avg,
'min': self.min,
'max': self.max,
'sd': self.sd,
'count': self.total
}
return {
'dtype': 'statstream',
'sum': self.stat.sum,
'sum_sqr': self.stat.sum_sqr,
'first_obs': self.stat.first_obs,
'min': self.stat.min,
'max': self.stat.max,
'current_count': self.stat.current_count,
'current_obs': self.stat.current_obs,
'drop_obs': self.stat.drop_obs,
}
@property
def avg(self):
return self.stat.avg
@property
def max(self):
return self.stat.max
@property
def min(self):
return self.stat.min
@property
def sd(self):
return self.stat.sd
@property
def sum(self):
return self.stat.total
@property
def total(self):
return self.stat.total
def __repr__(self):
return f's<{repr(self.stat.to_dict())}>'
def __str__(self):
return f's<{str(self.stat.to_dict())}>'
[docs]class TimeSeriesAggregator(Aggregator):
""" Keeps the entire history of the metric """
def __init__(self):
self.time_series = []
[docs] def append(self, other):
self.time_series.append(other)
@property
def val(self):
return self.time_series
[docs] @staticmethod
def lazy():
return lambda: TimeSeriesAggregator()
[docs] def to_json(self, short=False):
if short:
return self.time_series[-20:]
return self.time_series
def __repr__(self):
return f'ts<{repr(self.time_series)}>'
def __str__(self):
return f'ts<{str(self.time_series)}>'
[docs]class ValueAggregator(Aggregator):
""" Does not Aggregate only keeps the latest value """
def __init__(self, val=None):
self.value = val
[docs] def append(self, other):
self.value = other
@property
def val(self):
return self.value
[docs] @staticmethod
def lazy():
return lambda: ValueAggregator()
[docs] def to_json(self, short=False):
return self.value
def __repr__(self):
return f'v<{repr(self.value)}>'
def __str__(self):
return f'v<{str(self.value)}>'