Source code for track.aggregators.aggregator

from track.containers.ring import RingBuffer
from track.containers.types import float32
from track.utils.stat import StatStream


[docs]class Aggregator:
[docs] def append(self, other): raise NotImplementedError()
def __iadd__(self, other): return self.append(other) @property def val(self): """ Return the last observed value """ raise NotImplementedError()
[docs] def to_json(self, short=False): raise NotImplementedError()
[docs] @staticmethod def lazy(aggregator_t, **kwargs): """Lazily instantiate the underlying aggregator """ return lambda: aggregator_t(**kwargs)
[docs]class RingAggregator(Aggregator): """ Saves the `n` last elements. Start overriding the elements once `n` elements is reached """ def __init__(self, n, dtype=float32): self.ring = RingBuffer(n, dtype)
[docs] def append(self, other): self.ring.append(other)
@property def val(self): return self.ring.to_list()
[docs] @staticmethod def lazy(n, dtype): return lambda: RingAggregator(n, dtype)
[docs] def to_json(self, short=False): return self.ring.to_list()
def __repr__(self): return f'r<{repr(self.ring.to_list())}>' def __str__(self): return f'r<{str(self.ring.to_list())}>'
[docs]class StatAggregator(Aggregator): """ Compute mean, sd, min, max; does not keep the entire history. This is useful if you are worried about memory usage and the values should not vary much. i.e keeping the entire history is not useful. """ def __init__(self, skip_obs=10): self.stat = StatStream(drop_first_obs=skip_obs)
[docs] def append(self, other): self.stat.update(other)
@property def val(self): return self.stat.val
[docs] @staticmethod def lazy(skip): return lambda: StatAggregator(skip)
[docs] @staticmethod def from_json(data): stat = StatStream.from_dict(data) agg = StatAggregator(0) agg.stat = stat return agg
[docs] def to_json(self, short=False): if short: return { 'avg': self.avg, 'min': self.min, 'max': self.max, 'sd': self.sd, 'count': self.total } return { 'dtype': 'statstream', 'sum': self.stat.sum, 'sum_sqr': self.stat.sum_sqr, 'first_obs': self.stat.first_obs, 'min': self.stat.min, 'max': self.stat.max, 'current_count': self.stat.current_count, 'current_obs': self.stat.current_obs, 'drop_obs': self.stat.drop_obs, }
@property def avg(self): return self.stat.avg @property def max(self): return self.stat.max @property def min(self): return self.stat.min @property def sd(self): return self.stat.sd @property def sum(self): return self.stat.total @property def total(self): return self.stat.total def __repr__(self): return f's<{repr(self.stat.to_dict())}>' def __str__(self): return f's<{str(self.stat.to_dict())}>'
[docs]class TimeSeriesAggregator(Aggregator): """ Keeps the entire history of the metric """ def __init__(self): self.time_series = []
[docs] def append(self, other): self.time_series.append(other)
@property def val(self): return self.time_series
[docs] @staticmethod def lazy(): return lambda: TimeSeriesAggregator()
[docs] def to_json(self, short=False): if short: return self.time_series[-20:] return self.time_series
def __repr__(self): return f'ts<{repr(self.time_series)}>' def __str__(self): return f'ts<{str(self.time_series)}>'
[docs]class ValueAggregator(Aggregator): """ Does not Aggregate only keeps the latest value """ def __init__(self, val=None): self.value = val
[docs] def append(self, other): self.value = other
@property def val(self): return self.value
[docs] @staticmethod def lazy(): return lambda: ValueAggregator()
[docs] def to_json(self, short=False): return self.value
def __repr__(self): return f'v<{repr(self.value)}>' def __str__(self): return f'v<{str(self.value)}>'