Selectable aggregation functions for statistics

Addresses: BP selectable-aggregates

Previously, the statistics API always computed and returned a
standard set of aggregate functions.

Now, individual aggregation functions can be selected and even
parameterized via syntax of form:

   /v2/meters/<meter_name>/statistics?aggregate.func=<name>&aggregate.param=<value>

For example in order to calculate the average CPU util only:

  GET /v2/meters/cpu_util/statistics?aggregate.func=avg

  HTTP/1.0 200 OK
  [{"aggregate": {"avg": 0.6858829535841072},
    "avg": 0.6858829535841072,
    "duration_start": "2014-01-30T11:13:23",
    "duration_end": "2014-01-31T16:07:13",
    "duration": 104030.0,
    "period": 0,
    "period_start": "2014-01-30T11:13:23",
    "period_end": "2014-01-31T16:07:13",
    "groupby": null,
    "unit": "%"}]

In the current patch, selectable aggregates are provided by the
sqlalchemy driver only, with support in the mongodb driver to
follow in a subsequent patch.

Change-Id: I6cc095ba5ae16dea3f6b404e72a070ab9ac49c9a
This commit is contained in:
Eoghan Glynn 2014-02-25 15:54:11 +00:00
parent 4ca9550cfb
commit cdc49d6c75
10 changed files with 258 additions and 50 deletions

View File

@ -637,6 +637,9 @@ class Statistics(_Base):
count = int
"The number of samples seen"
aggregate = {wtypes.text: float}
"The selectable aggregate value(s)"
duration = float
"The difference, in seconds, between the oldest and newest timestamp"
@ -707,6 +710,27 @@ class Statistics(_Base):
)
class Aggregate(_Base):
func = wsme.wsattr(wtypes.text, mandatory=True)
"The aggregation function name"
param = wsme.wsattr(wtypes.text, default=None)
"The paramter to the aggregation function"
def __init__(self, **kwargs):
super(Aggregate, self).__init__(**kwargs)
@staticmethod
def validate(aggregate):
return aggregate
@classmethod
def sample(cls):
return cls(func='cardinality',
param='resource_id')
class MeterController(rest.RestController):
"""Manages operations on a single meter.
"""
@ -793,14 +817,15 @@ class MeterController(rest.RestController):
return samples
@wsme_pecan.wsexpose([Statistics], [Query], [unicode], int)
def statistics(self, q=[], groupby=[], period=None):
@wsme_pecan.wsexpose([Statistics], [Query], [unicode], int, [Aggregate])
def statistics(self, q=[], groupby=[], period=None, aggregate=[]):
"""Computes the statistics of the samples in the time range given.
:param q: Filter rules for the data to be returned.
:param groupby: Fields for group by aggregation
:param period: Returned result will be an array of statistics for a
period long of that number of seconds.
:param aggregate: The selectable aggregation functions to be applied.
"""
if period and period < 0:
raise ClientSideError(_("Period must be positive."))
@ -811,7 +836,8 @@ class MeterController(rest.RestController):
g = _validate_groupby_fields(groupby)
computed = pecan.request.storage_conn.get_meter_statistics(f,
period,
g)
g,
aggregate)
LOG.debug(_('computed value coming from %r'),
pecan.request.storage_conn)
# Find the original timestamp in the query to use for clamping

View File

@ -247,7 +247,8 @@ class Connection(object):
raise NotImplementedError(_('Samples not implemented'))
@staticmethod
def get_meter_statistics(sample_filter, period=None, groupby=None):
def get_meter_statistics(sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of model.Statistics instances.
The filter must have a meter value set.

View File

@ -330,7 +330,8 @@ class Connection(pymongo_base.Connection):
user_id=latest_meter['user_id'],
metadata=latest_meter['resource_metadata'])
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of models.Statistics instance containing meter
statistics described by the query parameters.
@ -341,6 +342,10 @@ class Connection(pymongo_base.Connection):
'resource_id', 'source'])):
raise NotImplementedError("Unable to group by these fields")
if aggregate:
msg = _('Selectable aggregates not implemented')
raise NotImplementedError(msg)
q = pymongo_base.make_query_from_filter(sample_filter)
if period:
@ -376,8 +381,12 @@ class Connection(pymongo_base.Connection):
'seconds': (periods * period) % self.SECONDS_IN_A_DAY}
for key, grouped_meters in itertools.groupby(meters, key=_group_key):
stat = models.Statistics(None, sys.maxint, -sys.maxint, 0, 0, 0,
0, 0, 0, 0, 0, 0, None)
stat = models.Statistics(unit=None,
min=sys.maxint, max=-sys.maxint,
avg=0, sum=0, count=0,
period=0, period_start=0, period_end=0,
duration=0, duration_start=0,
duration_end=0, groupby=None)
for meter in grouped_meters:
stat.unit = meter.get('counter_unit', '')

View File

@ -551,7 +551,8 @@ class Connection(base.Connection):
timeutils.delta_seconds(stat.duration_start,
stat.duration_end)
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of models.Statistics instances containing meter
statistics described by the query parameters.
@ -567,6 +568,10 @@ class Connection(base.Connection):
if groupby:
raise NotImplementedError("Group by not implemented.")
if aggregate:
msg = _('Selectable aggregates not implemented')
raise NotImplementedError(msg)
meter_table = self.conn.table(self.METER_TABLE)
q, start, stop = make_query_from_filter(sample_filter)

View File

@ -137,7 +137,8 @@ class Connection(base.Connection):
"""
return []
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return a dictionary containing meter statistics.
described by the query parameters.

View File

@ -591,7 +591,8 @@ class Connection(pymongo_base.Connection):
finally:
self.db[out].drop()
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of models.Statistics instance containing meter
statistics described by the query parameters.
@ -603,6 +604,10 @@ class Connection(pymongo_base.Connection):
'resource_id', 'source'])):
raise NotImplementedError("Unable to group by these fields")
if aggregate:
msg = _('Selectable aggregates not implemented')
raise NotImplementedError(msg)
q = pymongo_base.make_query_from_filter(sample_filter)
if period:

View File

@ -108,6 +108,14 @@ META_TYPE_MAP = {bool: models.MetaBool,
long: models.MetaBigInt,
float: models.MetaFloat}
STANDARD_AGGREGATES = dict(
avg=func.avg(models.Sample.volume).label('avg'),
sum=func.sum(models.Sample.volume).label('sum'),
min=func.min(models.Sample.volume).label('min'),
max=func.max(models.Sample.volume).label('max'),
count=func.count(models.Sample.volume).label('count')
)
def apply_metaquery_filter(session, query, metaquery):
"""Apply provided metaquery filter to existing query.
@ -634,18 +642,32 @@ class Connection(base.Connection):
limit,
models.MeterSample)
def _make_stats_query(self, sample_filter, groupby):
@staticmethod
def _get_aggregate_functions(aggregate):
if not aggregate:
return [f for f in STANDARD_AGGREGATES.values()]
functions = []
for a in aggregate:
if a.func in STANDARD_AGGREGATES:
functions.append(STANDARD_AGGREGATES[a.func])
else:
raise NotImplementedError(_('Selectable aggregate function %s'
' is not supported') % a.func)
return functions
def _make_stats_query(self, sample_filter, groupby, aggregate):
select = [
models.Meter.unit,
func.min(models.Sample.timestamp).label('tsmin'),
func.max(models.Sample.timestamp).label('tsmax'),
func.avg(models.Sample.volume).label('avg'),
func.sum(models.Sample.volume).label('sum'),
func.min(models.Sample.volume).label('min'),
func.max(models.Sample.volume).label('max'),
func.count(models.Sample.volume).label('count')
]
select.extend(self._get_aggregate_functions(aggregate))
session = self._get_db_session()
if groupby:
@ -659,30 +681,41 @@ class Connection(base.Connection):
return make_query_from_filter(session, query, sample_filter)
@staticmethod
def _stats_result_aggregates(result, aggregate):
stats_args = {}
if isinstance(result.count, (int, long)):
stats_args['count'] = result.count
for attr in ['min', 'max', 'sum', 'avg']:
if hasattr(result, attr):
stats_args[attr] = getattr(result, attr)
if aggregate:
stats_args['aggregate'] = dict(
('%s%s' % (a.func, '/%s' % a.param if a.param else ''),
getattr(result, a.func)) for a in aggregate
)
return stats_args
@staticmethod
def _stats_result_to_model(result, period, period_start,
period_end, groupby):
period_end, groupby, aggregate):
stats_args = Connection._stats_result_aggregates(result, aggregate)
stats_args['unit'] = result.unit
duration = (timeutils.delta_seconds(result.tsmin, result.tsmax)
if result.tsmin is not None and result.tsmax is not None
else None)
return api_models.Statistics(
unit=result.unit,
count=int(result.count),
min=result.min,
max=result.max,
avg=result.avg,
sum=result.sum,
duration_start=result.tsmin,
duration_end=result.tsmax,
duration=duration,
period=period,
period_start=period_start,
period_end=period_end,
groupby=(dict((g, getattr(result, g)) for g in groupby)
if groupby else None)
)
stats_args['duration'] = duration
stats_args['duration_start'] = result.tsmin
stats_args['duration_end'] = result.tsmax
stats_args['period'] = period
stats_args['period_start'] = period_start
stats_args['period_end'] = period_end
stats_args['groupby'] = (dict(
(g, getattr(result, g)) for g in groupby) if groupby else None)
return api_models.Statistics(**stats_args)
def get_meter_statistics(self, sample_filter, period=None, groupby=None):
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of api_models.Statistics instances containing
meter statistics described by the query parameters.
@ -696,17 +729,22 @@ class Connection(base.Connection):
_("Unable to group by these fields"))
if not period:
for res in self._make_stats_query(sample_filter, groupby):
for res in self._make_stats_query(sample_filter,
groupby,
aggregate):
if res.count:
yield self._stats_result_to_model(res, 0,
res.tsmin, res.tsmax,
groupby)
groupby,
aggregate)
return
if not sample_filter.start or not sample_filter.end:
res = self._make_stats_query(sample_filter, None).first()
res = self._make_stats_query(sample_filter,
None,
aggregate).first()
query = self._make_stats_query(sample_filter, groupby)
query = self._make_stats_query(sample_filter, groupby, aggregate)
# HACK(jd) This is an awful method to compute stats by period, but
# since we're trying to be SQL agnostic we have to write portable
# code, so here it is, admire! We're going to do one request to get
@ -726,7 +764,8 @@ class Connection(base.Connection):
period_end)),
period_start=period_start,
period_end=period_end,
groupby=groupby
groupby=groupby,
aggregate=aggregate
)
@staticmethod
@ -1299,7 +1338,14 @@ class QueryTransformer(object):
'statistics': {'groupby': True,
'query': {'simple': True,
'metadata': True},
'aggregation': {'standard': True}},
'aggregation': {'standard': True,
'selectable': {
'max': True,
'min': True,
'sum': True,
'avg': True,
'count': True}}
},
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,

View File

@ -244,18 +244,12 @@ class Statistics(Model):
"""Computed statistics based on a set of sample data.
"""
def __init__(self, unit,
min, max, avg, sum, count,
period, period_start, period_end,
duration, duration_start, duration_end,
groupby):
groupby, **data):
"""Create a new statistics object.
:param unit: The unit type of the data set
:param min: The smallest volume found
:param max: The largest volume found
:param avg: The average of all volumes found
:param sum: The total of all volumes found
:param count: The number of samples found
:param period: The length of the time range covered by these stats
:param period_start: The timestamp for the start of the period
:param period_end: The timestamp for the end of the period
@ -263,14 +257,21 @@ class Statistics(Model):
:param duration_start: The earliest time for the matching samples
:param duration_end: The latest time for the matching samples
:param groupby: The fields used to group the samples.
:param data: some or all of the following aggregates
min: The smallest volume found
max: The largest volume found
avg: The average of all volumes found
sum: The total of all volumes found
count: The number of samples found
aggregate: name-value pairs for selectable aggregates
"""
Model.__init__(self, unit=unit,
min=min, max=max, avg=avg, sum=sum, count=count,
period=period, period_start=period_start,
period_end=period_end, duration=duration,
duration_start=duration_start,
duration_end=duration_end,
groupby=groupby)
groupby=groupby,
**data)
class Alarm(Model):

View File

@ -57,7 +57,7 @@ class TestComputeDurationByResource(FunctionalTest,
self.late2 = datetime.datetime(2012, 8, 29, 19, 0)
def _patch_get_interval(self, start, end):
def get_interval(event_filter, period, groupby):
def get_interval(event_filter, period, groupby, aggregate):
assert event_filter.start
assert event_filter.end
if (event_filter.start > end or event_filter.end < start):

View File

@ -1329,3 +1329,117 @@ class TestGroupBySource(FunctionalTest,
self.assertEqual(r['max'], 4)
self.assertEqual(r['sum'], 4)
self.assertEqual(r['avg'], 4)
class TestSelectableAggregates(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
PATH = '/meters/instance/statistics'
def setUp(self):
super(TestSelectableAggregates, self).setUp()
test_sample_data = (
{'volume': 2, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-1', 'timestamp': (2013, 8, 1, 16, 10),
'metadata_flavor': 'm1.tiny', 'metadata_event': 'event-1',
'source': 'source'},
{'volume': 2, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-3', 'timestamp': (2013, 8, 1, 15, 37),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-1',
'source': 'source'},
{'volume': 1, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-5', 'timestamp': (2013, 8, 1, 10, 11),
'metadata_flavor': 'm1.medium', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 2, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-2', 'timestamp': (2013, 8, 1, 10, 40),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 2, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-4', 'timestamp': (2013, 8, 1, 14, 59),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 5, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-2', 'timestamp': (2013, 8, 1, 17, 28),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 4, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-3', 'timestamp': (2013, 8, 1, 11, 22),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
)
for test_sample in test_sample_data:
c = sample.Sample(
'instance',
sample.TYPE_GAUGE,
unit='instance',
volume=test_sample['volume'],
user_id=test_sample['user'],
project_id=test_sample['project'],
resource_id=test_sample['resource'],
timestamp=datetime.datetime(*test_sample['timestamp']),
resource_metadata={'flavor': test_sample['metadata_flavor'],
'event': test_sample['metadata_event'], },
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
c,
self.CONF.publisher.metering_secret,
)
self.conn.record_metering_data(msg)
def _do_test_per_tenant_selectable_standard_aggregate(self,
aggregate,
expected_values):
agg_args = {'aggregate.func': aggregate}
data = self.get_json(self.PATH, groupby=['project_id'], **agg_args)
groupby_keys_set = set(x for sub_dict in data
for x in sub_dict['groupby'].keys())
groupby_vals_set = set(x for sub_dict in data
for x in sub_dict['groupby'].values())
self.assertEqual(groupby_keys_set, set(['project_id']))
self.assertEqual(groupby_vals_set, set(['project-1', 'project-2']))
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
for r in data:
grp = r['groupby']
if grp == {'project_id': 'project-1'}:
expected = expected_values[0]
self.assertEqual(r['unit'], 'instance')
self.assertAlmostEqual(r[aggregate], expected)
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertAlmostEqual(r['aggregate'][aggregate], expected)
for a in standard_aggregates - set([aggregate]):
self.assertNotIn(a, r)
elif grp == {'project_id': 'project-2'}:
expected = expected_values[1]
self.assertEqual(r['unit'], 'instance')
self.assertAlmostEqual(r[aggregate], expected)
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertAlmostEqual(r['aggregate'][aggregate], expected)
for a in standard_aggregates - set([aggregate]):
self.assertNotIn(a, r)
def test_per_tenant_selectable_max(self):
self._do_test_per_tenant_selectable_standard_aggregate('max',
[5, 4])
def test_per_tenant_selectable_min(self):
self._do_test_per_tenant_selectable_standard_aggregate('min',
[2, 1])
def test_per_tenant_selectable_sum(self):
self._do_test_per_tenant_selectable_standard_aggregate('sum',
[9, 9])
def test_per_tenant_selectable_avg(self):
self._do_test_per_tenant_selectable_standard_aggregate('avg',
[3, 2.25])
def test_per_tenant_selectable_count(self):
self._do_test_per_tenant_selectable_standard_aggregate('count',
[3, 4])