From cdc49d6c757c86176a49b62f46174101c4ea709c Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Tue, 25 Feb 2014 15:54:11 +0000 Subject: [PATCH] Selectable aggregation functions for statistics Addresses: BP selectable-aggregates Previously, the statistics API always computed and returned a standard set of aggregate functions. Now, individual aggregation functions can be selected and even parameterized via syntax of form: /v2/meters//statistics?aggregate.func=&aggregate.param= For example in order to calculate the average CPU util only: GET /v2/meters/cpu_util/statistics?aggregate.func=avg HTTP/1.0 200 OK [{"aggregate": {"avg": 0.6858829535841072}, "avg": 0.6858829535841072, "duration_start": "2014-01-30T11:13:23", "duration_end": "2014-01-31T16:07:13", "duration": 104030.0, "period": 0, "period_start": "2014-01-30T11:13:23", "period_end": "2014-01-31T16:07:13", "groupby": null, "unit": "%"}] In the current patch, selectable aggregates are provided by the sqlalchemy driver only, with support in the mongodb driver to follow in a subsequent patch. Change-Id: I6cc095ba5ae16dea3f6b404e72a070ab9ac49c9a --- ceilometer/api/controllers/v2.py | 32 ++++- ceilometer/storage/base.py | 3 +- ceilometer/storage/impl_db2.py | 15 ++- ceilometer/storage/impl_hbase.py | 7 +- ceilometer/storage/impl_log.py | 3 +- ceilometer/storage/impl_mongodb.py | 7 +- ceilometer/storage/impl_sqlalchemy.py | 106 +++++++++++----- ceilometer/storage/models.py | 19 +-- ..._compute_duration_by_resource_scenarios.py | 2 +- .../tests/api/v2/test_statistics_scenarios.py | 114 ++++++++++++++++++ 10 files changed, 258 insertions(+), 50 deletions(-) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 8b8de6b1a..53a83f668 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -637,6 +637,9 @@ class Statistics(_Base): count = int "The number of samples seen" + aggregate = {wtypes.text: float} + "The selectable aggregate value(s)" + duration = float "The difference, in seconds, between the oldest and newest timestamp" @@ -707,6 +710,27 @@ class Statistics(_Base): ) +class Aggregate(_Base): + + func = wsme.wsattr(wtypes.text, mandatory=True) + "The aggregation function name" + + param = wsme.wsattr(wtypes.text, default=None) + "The paramter to the aggregation function" + + def __init__(self, **kwargs): + super(Aggregate, self).__init__(**kwargs) + + @staticmethod + def validate(aggregate): + return aggregate + + @classmethod + def sample(cls): + return cls(func='cardinality', + param='resource_id') + + class MeterController(rest.RestController): """Manages operations on a single meter. """ @@ -793,14 +817,15 @@ class MeterController(rest.RestController): return samples - @wsme_pecan.wsexpose([Statistics], [Query], [unicode], int) - def statistics(self, q=[], groupby=[], period=None): + @wsme_pecan.wsexpose([Statistics], [Query], [unicode], int, [Aggregate]) + def statistics(self, q=[], groupby=[], period=None, aggregate=[]): """Computes the statistics of the samples in the time range given. :param q: Filter rules for the data to be returned. :param groupby: Fields for group by aggregation :param period: Returned result will be an array of statistics for a period long of that number of seconds. + :param aggregate: The selectable aggregation functions to be applied. """ if period and period < 0: raise ClientSideError(_("Period must be positive.")) @@ -811,7 +836,8 @@ class MeterController(rest.RestController): g = _validate_groupby_fields(groupby) computed = pecan.request.storage_conn.get_meter_statistics(f, period, - g) + g, + aggregate) LOG.debug(_('computed value coming from %r'), pecan.request.storage_conn) # Find the original timestamp in the query to use for clamping diff --git a/ceilometer/storage/base.py b/ceilometer/storage/base.py index b3c7ac633..d497abc8a 100644 --- a/ceilometer/storage/base.py +++ b/ceilometer/storage/base.py @@ -247,7 +247,8 @@ class Connection(object): raise NotImplementedError(_('Samples not implemented')) @staticmethod - def get_meter_statistics(sample_filter, period=None, groupby=None): + def get_meter_statistics(sample_filter, period=None, groupby=None, + aggregate=None): """Return an iterable of model.Statistics instances. The filter must have a meter value set. diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py index 3a1642fb2..abef6f2fe 100644 --- a/ceilometer/storage/impl_db2.py +++ b/ceilometer/storage/impl_db2.py @@ -330,7 +330,8 @@ class Connection(pymongo_base.Connection): user_id=latest_meter['user_id'], metadata=latest_meter['resource_metadata']) - def get_meter_statistics(self, sample_filter, period=None, groupby=None): + def get_meter_statistics(self, sample_filter, period=None, groupby=None, + aggregate=None): """Return an iterable of models.Statistics instance containing meter statistics described by the query parameters. @@ -341,6 +342,10 @@ class Connection(pymongo_base.Connection): 'resource_id', 'source'])): raise NotImplementedError("Unable to group by these fields") + if aggregate: + msg = _('Selectable aggregates not implemented') + raise NotImplementedError(msg) + q = pymongo_base.make_query_from_filter(sample_filter) if period: @@ -376,8 +381,12 @@ class Connection(pymongo_base.Connection): 'seconds': (periods * period) % self.SECONDS_IN_A_DAY} for key, grouped_meters in itertools.groupby(meters, key=_group_key): - stat = models.Statistics(None, sys.maxint, -sys.maxint, 0, 0, 0, - 0, 0, 0, 0, 0, 0, None) + stat = models.Statistics(unit=None, + min=sys.maxint, max=-sys.maxint, + avg=0, sum=0, count=0, + period=0, period_start=0, period_end=0, + duration=0, duration_start=0, + duration_end=0, groupby=None) for meter in grouped_meters: stat.unit = meter.get('counter_unit', '') diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index dec594b17..3005c3db5 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -551,7 +551,8 @@ class Connection(base.Connection): timeutils.delta_seconds(stat.duration_start, stat.duration_end) - def get_meter_statistics(self, sample_filter, period=None, groupby=None): + def get_meter_statistics(self, sample_filter, period=None, groupby=None, + aggregate=None): """Return an iterable of models.Statistics instances containing meter statistics described by the query parameters. @@ -567,6 +568,10 @@ class Connection(base.Connection): if groupby: raise NotImplementedError("Group by not implemented.") + if aggregate: + msg = _('Selectable aggregates not implemented') + raise NotImplementedError(msg) + meter_table = self.conn.table(self.METER_TABLE) q, start, stop = make_query_from_filter(sample_filter) diff --git a/ceilometer/storage/impl_log.py b/ceilometer/storage/impl_log.py index 7a36b9165..fe83f0f4c 100644 --- a/ceilometer/storage/impl_log.py +++ b/ceilometer/storage/impl_log.py @@ -137,7 +137,8 @@ class Connection(base.Connection): """ return [] - def get_meter_statistics(self, sample_filter, period=None, groupby=None): + def get_meter_statistics(self, sample_filter, period=None, groupby=None, + aggregate=None): """Return a dictionary containing meter statistics. described by the query parameters. diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 895a14006..7131b503b 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -591,7 +591,8 @@ class Connection(pymongo_base.Connection): finally: self.db[out].drop() - def get_meter_statistics(self, sample_filter, period=None, groupby=None): + def get_meter_statistics(self, sample_filter, period=None, groupby=None, + aggregate=None): """Return an iterable of models.Statistics instance containing meter statistics described by the query parameters. @@ -603,6 +604,10 @@ class Connection(pymongo_base.Connection): 'resource_id', 'source'])): raise NotImplementedError("Unable to group by these fields") + if aggregate: + msg = _('Selectable aggregates not implemented') + raise NotImplementedError(msg) + q = pymongo_base.make_query_from_filter(sample_filter) if period: diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index e0893da51..6c716f4ff 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -108,6 +108,14 @@ META_TYPE_MAP = {bool: models.MetaBool, long: models.MetaBigInt, float: models.MetaFloat} +STANDARD_AGGREGATES = dict( + avg=func.avg(models.Sample.volume).label('avg'), + sum=func.sum(models.Sample.volume).label('sum'), + min=func.min(models.Sample.volume).label('min'), + max=func.max(models.Sample.volume).label('max'), + count=func.count(models.Sample.volume).label('count') +) + def apply_metaquery_filter(session, query, metaquery): """Apply provided metaquery filter to existing query. @@ -634,18 +642,32 @@ class Connection(base.Connection): limit, models.MeterSample) - def _make_stats_query(self, sample_filter, groupby): + @staticmethod + def _get_aggregate_functions(aggregate): + if not aggregate: + return [f for f in STANDARD_AGGREGATES.values()] + + functions = [] + + for a in aggregate: + if a.func in STANDARD_AGGREGATES: + functions.append(STANDARD_AGGREGATES[a.func]) + else: + raise NotImplementedError(_('Selectable aggregate function %s' + ' is not supported') % a.func) + + return functions + + def _make_stats_query(self, sample_filter, groupby, aggregate): + select = [ models.Meter.unit, func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), - func.avg(models.Sample.volume).label('avg'), - func.sum(models.Sample.volume).label('sum'), - func.min(models.Sample.volume).label('min'), - func.max(models.Sample.volume).label('max'), - func.count(models.Sample.volume).label('count') ] + select.extend(self._get_aggregate_functions(aggregate)) + session = self._get_db_session() if groupby: @@ -659,30 +681,41 @@ class Connection(base.Connection): return make_query_from_filter(session, query, sample_filter) + @staticmethod + def _stats_result_aggregates(result, aggregate): + stats_args = {} + if isinstance(result.count, (int, long)): + stats_args['count'] = result.count + for attr in ['min', 'max', 'sum', 'avg']: + if hasattr(result, attr): + stats_args[attr] = getattr(result, attr) + if aggregate: + stats_args['aggregate'] = dict( + ('%s%s' % (a.func, '/%s' % a.param if a.param else ''), + getattr(result, a.func)) for a in aggregate + ) + return stats_args + @staticmethod def _stats_result_to_model(result, period, period_start, - period_end, groupby): + period_end, groupby, aggregate): + stats_args = Connection._stats_result_aggregates(result, aggregate) + stats_args['unit'] = result.unit duration = (timeutils.delta_seconds(result.tsmin, result.tsmax) if result.tsmin is not None and result.tsmax is not None else None) - return api_models.Statistics( - unit=result.unit, - count=int(result.count), - min=result.min, - max=result.max, - avg=result.avg, - sum=result.sum, - duration_start=result.tsmin, - duration_end=result.tsmax, - duration=duration, - period=period, - period_start=period_start, - period_end=period_end, - groupby=(dict((g, getattr(result, g)) for g in groupby) - if groupby else None) - ) + stats_args['duration'] = duration + stats_args['duration_start'] = result.tsmin + stats_args['duration_end'] = result.tsmax + stats_args['period'] = period + stats_args['period_start'] = period_start + stats_args['period_end'] = period_end + stats_args['groupby'] = (dict( + (g, getattr(result, g)) for g in groupby) if groupby else None) + return api_models.Statistics(**stats_args) - def get_meter_statistics(self, sample_filter, period=None, groupby=None): + def get_meter_statistics(self, sample_filter, period=None, groupby=None, + aggregate=None): """Return an iterable of api_models.Statistics instances containing meter statistics described by the query parameters. @@ -696,17 +729,22 @@ class Connection(base.Connection): _("Unable to group by these fields")) if not period: - for res in self._make_stats_query(sample_filter, groupby): + for res in self._make_stats_query(sample_filter, + groupby, + aggregate): if res.count: yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax, - groupby) + groupby, + aggregate) return if not sample_filter.start or not sample_filter.end: - res = self._make_stats_query(sample_filter, None).first() + res = self._make_stats_query(sample_filter, + None, + aggregate).first() - query = self._make_stats_query(sample_filter, groupby) + query = self._make_stats_query(sample_filter, groupby, aggregate) # HACK(jd) This is an awful method to compute stats by period, but # since we're trying to be SQL agnostic we have to write portable # code, so here it is, admire! We're going to do one request to get @@ -726,7 +764,8 @@ class Connection(base.Connection): period_end)), period_start=period_start, period_end=period_end, - groupby=groupby + groupby=groupby, + aggregate=aggregate ) @staticmethod @@ -1299,7 +1338,14 @@ class QueryTransformer(object): 'statistics': {'groupby': True, 'query': {'simple': True, 'metadata': True}, - 'aggregation': {'standard': True}}, + 'aggregation': {'standard': True, + 'selectable': { + 'max': True, + 'min': True, + 'sum': True, + 'avg': True, + 'count': True}} + }, 'alarms': {'query': {'simple': True, 'complex': True}, 'history': {'query': {'simple': True, diff --git a/ceilometer/storage/models.py b/ceilometer/storage/models.py index dbcee3367..f00fa1067 100644 --- a/ceilometer/storage/models.py +++ b/ceilometer/storage/models.py @@ -244,18 +244,12 @@ class Statistics(Model): """Computed statistics based on a set of sample data. """ def __init__(self, unit, - min, max, avg, sum, count, period, period_start, period_end, duration, duration_start, duration_end, - groupby): + groupby, **data): """Create a new statistics object. :param unit: The unit type of the data set - :param min: The smallest volume found - :param max: The largest volume found - :param avg: The average of all volumes found - :param sum: The total of all volumes found - :param count: The number of samples found :param period: The length of the time range covered by these stats :param period_start: The timestamp for the start of the period :param period_end: The timestamp for the end of the period @@ -263,14 +257,21 @@ class Statistics(Model): :param duration_start: The earliest time for the matching samples :param duration_end: The latest time for the matching samples :param groupby: The fields used to group the samples. + :param data: some or all of the following aggregates + min: The smallest volume found + max: The largest volume found + avg: The average of all volumes found + sum: The total of all volumes found + count: The number of samples found + aggregate: name-value pairs for selectable aggregates """ Model.__init__(self, unit=unit, - min=min, max=max, avg=avg, sum=sum, count=count, period=period, period_start=period_start, period_end=period_end, duration=duration, duration_start=duration_start, duration_end=duration_end, - groupby=groupby) + groupby=groupby, + **data) class Alarm(Model): diff --git a/ceilometer/tests/api/v2/test_compute_duration_by_resource_scenarios.py b/ceilometer/tests/api/v2/test_compute_duration_by_resource_scenarios.py index d4b67f610..d4ecca845 100644 --- a/ceilometer/tests/api/v2/test_compute_duration_by_resource_scenarios.py +++ b/ceilometer/tests/api/v2/test_compute_duration_by_resource_scenarios.py @@ -57,7 +57,7 @@ class TestComputeDurationByResource(FunctionalTest, self.late2 = datetime.datetime(2012, 8, 29, 19, 0) def _patch_get_interval(self, start, end): - def get_interval(event_filter, period, groupby): + def get_interval(event_filter, period, groupby, aggregate): assert event_filter.start assert event_filter.end if (event_filter.start > end or event_filter.end < start): diff --git a/ceilometer/tests/api/v2/test_statistics_scenarios.py b/ceilometer/tests/api/v2/test_statistics_scenarios.py index 817e50ea0..e7e756599 100644 --- a/ceilometer/tests/api/v2/test_statistics_scenarios.py +++ b/ceilometer/tests/api/v2/test_statistics_scenarios.py @@ -1329,3 +1329,117 @@ class TestGroupBySource(FunctionalTest, self.assertEqual(r['max'], 4) self.assertEqual(r['sum'], 4) self.assertEqual(r['avg'], 4) + + +class TestSelectableAggregates(FunctionalTest, + tests_db.MixinTestsWithBackendScenarios): + + PATH = '/meters/instance/statistics' + + def setUp(self): + super(TestSelectableAggregates, self).setUp() + + test_sample_data = ( + {'volume': 2, 'user': 'user-1', 'project': 'project-1', + 'resource': 'resource-1', 'timestamp': (2013, 8, 1, 16, 10), + 'metadata_flavor': 'm1.tiny', 'metadata_event': 'event-1', + 'source': 'source'}, + {'volume': 2, 'user': 'user-2', 'project': 'project-2', + 'resource': 'resource-3', 'timestamp': (2013, 8, 1, 15, 37), + 'metadata_flavor': 'm1.large', 'metadata_event': 'event-1', + 'source': 'source'}, + {'volume': 1, 'user': 'user-2', 'project': 'project-2', + 'resource': 'resource-5', 'timestamp': (2013, 8, 1, 10, 11), + 'metadata_flavor': 'm1.medium', 'metadata_event': 'event-2', + 'source': 'source'}, + {'volume': 2, 'user': 'user-1', 'project': 'project-1', + 'resource': 'resource-2', 'timestamp': (2013, 8, 1, 10, 40), + 'metadata_flavor': 'm1.large', 'metadata_event': 'event-2', + 'source': 'source'}, + {'volume': 2, 'user': 'user-2', 'project': 'project-2', + 'resource': 'resource-4', 'timestamp': (2013, 8, 1, 14, 59), + 'metadata_flavor': 'm1.large', 'metadata_event': 'event-2', + 'source': 'source'}, + {'volume': 5, 'user': 'user-1', 'project': 'project-1', + 'resource': 'resource-2', 'timestamp': (2013, 8, 1, 17, 28), + 'metadata_flavor': 'm1.large', 'metadata_event': 'event-2', + 'source': 'source'}, + {'volume': 4, 'user': 'user-2', 'project': 'project-2', + 'resource': 'resource-3', 'timestamp': (2013, 8, 1, 11, 22), + 'metadata_flavor': 'm1.large', 'metadata_event': 'event-2', + 'source': 'source'}, + ) + + for test_sample in test_sample_data: + c = sample.Sample( + 'instance', + sample.TYPE_GAUGE, + unit='instance', + volume=test_sample['volume'], + user_id=test_sample['user'], + project_id=test_sample['project'], + resource_id=test_sample['resource'], + timestamp=datetime.datetime(*test_sample['timestamp']), + resource_metadata={'flavor': test_sample['metadata_flavor'], + 'event': test_sample['metadata_event'], }, + source=test_sample['source'], + ) + msg = utils.meter_message_from_counter( + c, + self.CONF.publisher.metering_secret, + ) + self.conn.record_metering_data(msg) + + def _do_test_per_tenant_selectable_standard_aggregate(self, + aggregate, + expected_values): + agg_args = {'aggregate.func': aggregate} + data = self.get_json(self.PATH, groupby=['project_id'], **agg_args) + groupby_keys_set = set(x for sub_dict in data + for x in sub_dict['groupby'].keys()) + groupby_vals_set = set(x for sub_dict in data + for x in sub_dict['groupby'].values()) + self.assertEqual(groupby_keys_set, set(['project_id'])) + self.assertEqual(groupby_vals_set, set(['project-1', 'project-2'])) + + standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) + for r in data: + grp = r['groupby'] + if grp == {'project_id': 'project-1'}: + expected = expected_values[0] + self.assertEqual(r['unit'], 'instance') + self.assertAlmostEqual(r[aggregate], expected) + self.assertIn('aggregate', r) + self.assertIn(aggregate, r['aggregate']) + self.assertAlmostEqual(r['aggregate'][aggregate], expected) + for a in standard_aggregates - set([aggregate]): + self.assertNotIn(a, r) + elif grp == {'project_id': 'project-2'}: + expected = expected_values[1] + self.assertEqual(r['unit'], 'instance') + self.assertAlmostEqual(r[aggregate], expected) + self.assertIn('aggregate', r) + self.assertIn(aggregate, r['aggregate']) + self.assertAlmostEqual(r['aggregate'][aggregate], expected) + for a in standard_aggregates - set([aggregate]): + self.assertNotIn(a, r) + + def test_per_tenant_selectable_max(self): + self._do_test_per_tenant_selectable_standard_aggregate('max', + [5, 4]) + + def test_per_tenant_selectable_min(self): + self._do_test_per_tenant_selectable_standard_aggregate('min', + [2, 1]) + + def test_per_tenant_selectable_sum(self): + self._do_test_per_tenant_selectable_standard_aggregate('sum', + [9, 9]) + + def test_per_tenant_selectable_avg(self): + self._do_test_per_tenant_selectable_standard_aggregate('avg', + [3, 2.25]) + + def test_per_tenant_selectable_count(self): + self._do_test_per_tenant_selectable_standard_aggregate('count', + [3, 4])