From e883b5bcda1a2e783f3789d108ead17a8b4df666 Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Fri, 18 Oct 2019 16:55:40 +0200 Subject: [PATCH] Move data_points to repository class data_points_class can be now declared as a List or a Dictionary. The latter is used when writing measurements per tenant. This refactoring avoids creating the dictionary and then chaining again its values in case of ElasticSearch and Cassandra databases or if db_per_tenant option is disabled. The change also fixes the events pipeline. Change-Id: I25ea80eea714acb167e70f188ed229cc90531596 Story: 2006331 Task: 37211 --- .../repositories/abstract_repository.py | 2 +- .../cassandra/abstract_repository.py | 3 + .../alarm_state_history_repository.py | 5 +- .../cassandra/metrics_repository.py | 5 +- monasca_persister/repositories/data_points.py | 55 +++++++++++++++++++ .../elasticsearch/events_repository.py | 2 + .../influxdb/abstract_repository.py | 19 ++++--- monasca_persister/repositories/persister.py | 37 +------------ monasca_persister/repositories/utils.py | 2 +- ...assandra_alarm_state_history_repository.py | 5 +- monasca_persister/tests/test_events.py | 3 +- .../tests/test_persister_repo.py | 12 ++-- monasca_persister/tests/test_utils.py | 3 +- 13 files changed, 90 insertions(+), 63 deletions(-) create mode 100644 monasca_persister/repositories/data_points.py diff --git a/monasca_persister/repositories/abstract_repository.py b/monasca_persister/repositories/abstract_repository.py index 9f909552..6e9301cd 100644 --- a/monasca_persister/repositories/abstract_repository.py +++ b/monasca_persister/repositories/abstract_repository.py @@ -27,5 +27,5 @@ class AbstractRepository(object): pass @abc.abstractmethod - def write_batch(self, data_points_by_tenant): + def write_batch(self, data_points): pass diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py index 188f1e83..4f1bd347 100644 --- a/monasca_persister/repositories/cassandra/abstract_repository.py +++ b/monasca_persister/repositories/cassandra/abstract_repository.py @@ -20,9 +20,11 @@ import six from monasca_persister.repositories import abstract_repository from monasca_persister.repositories.cassandra import connection_util +from monasca_persister.repositories import data_points conf = cfg.CONF + @six.add_metaclass(abc.ABCMeta) class AbstractCassandraRepository(abstract_repository.AbstractRepository): def __init__(self): @@ -33,3 +35,4 @@ class AbstractCassandraRepository(abstract_repository.AbstractRepository): self._retention = conf.cassandra.retention_policy * 24 * 3600 self._cache_size = conf.cassandra.max_definition_cache_size self._max_batches = conf.cassandra.max_batches + self.data_points_class = data_points.DataPointsAsList diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py index 064be743..0ee35a17 100644 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -55,10 +55,7 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep return alarm_state_hist, tenant_id - def write_batch(self, alarm_state_hists_by_tenant): - # TODO(brtknr): At the moment, Cassandra does not have database per - # tenant implemented, so use chained list of values. - alarm_state_hists = alarm_state_hists_by_tenant.chained() + def write_batch(self, alarm_state_hists): while alarm_state_hists: num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size) batch = alarm_state_hists[:num_rows] diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py index 5c1b4a60..55d28e05 100644 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -220,10 +220,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) return metric, tenant_id - def write_batch(self, metrics_by_tenant): - # TODO(brtknr): At the moment, Cassandra does not have database per - # tenant implemented, so join the list of values. - metrics = metrics_by_tenant.chained() + def write_batch(self, metrics): with self._lock: batch_list = self._metric_batch.get_all_batches() results = execute_concurrent(self._session, batch_list, raise_on_first_error=True) diff --git a/monasca_persister/repositories/data_points.py b/monasca_persister/repositories/data_points.py new file mode 100644 index 00000000..b367c474 --- /dev/null +++ b/monasca_persister/repositories/data_points.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class DataPointsAsList(list): + + def append(self, key, value): + super(DataPointsAsList, self).append(value) + + def get_count(self): + return len(self) + + def clear(self): + del self[:] + + +class DataPointsAsDict(dict): + + def __init__(self): + self.counter = 0 + + def __setitem__(self, key, value): + raise NotImplementedError('Use append(key, value) instead.') + + def __delitem__(self, key): + raise NotImplementedError('Use clear() instead.') + + def pop(self): + raise NotImplementedError('Use clear() instead.') + + def popitem(self): + raise NotImplementedError('Use clear() instead.') + + def update(self): + raise NotImplementedError('Use clear() instead.') + + def append(self, key, value): + super(DataPointsAsDict, self).setdefault(key, []).append(value) + self.counter += 1 + + def clear(self): + super(DataPointsAsDict, self).clear() + self.counter = 0 + + def get_count(self): + return self.counter diff --git a/monasca_persister/repositories/elasticsearch/events_repository.py b/monasca_persister/repositories/elasticsearch/events_repository.py index ca0362ee..b4dffdfa 100644 --- a/monasca_persister/repositories/elasticsearch/events_repository.py +++ b/monasca_persister/repositories/elasticsearch/events_repository.py @@ -21,6 +21,7 @@ from oslo_config import cfg from oslo_log import log from monasca_persister.repositories import abstract_repository +from monasca_persister.repositories import data_points from monasca_persister.repositories import utils LOG = log.getLogger(__name__) @@ -37,6 +38,7 @@ class ElasticSearchEventsRepository(abstract_repository.AbstractRepository): sniffer_timeout=self.conf.sniffer_timeout, max_retries=self.conf.max_retries ) + self.data_points_class = data_points.DataPointsAsList def process_message(self, message): return utils.parse_events_message(message) diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 5f63a8da..fad54070 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -18,6 +18,7 @@ from oslo_config import cfg import six from monasca_persister.repositories import abstract_repository +from monasca_persister.repositories import data_points DATABASE_NOT_FOUND_MSG = "database not found" @@ -33,16 +34,18 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.port, self.conf.influxdb.user, self.conf.influxdb.password) - - def write_batch(self, data_points_by_tenant): if self.conf.influxdb.db_per_tenant: - for tenant_id, data_points in data_points_by_tenant.items(): - database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id) - self._write_batch(data_points, database) + self.data_points_class = data_points.DataPointsAsDict + else: + self.data_points_class = data_points.DataPointsAsList + + def write_batch(self, data_points): + if self.conf.influxdb.db_per_tenant: + for tenant_id, tenant_data_points in data_points.items(): + database = '%s_%s' % (self.conf.influxdb.database_name, + tenant_id) + self._write_batch(tenant_data_points, database) else: - # NOTE (brtknr): Chain list of values to avoid multiple calls to - # database API endpoint (when db_per_tenant is False). - data_points = data_points_by_tenant.chained() self._write_batch(data_points, self.conf.influxdb.database_name) def _write_batch(self, data_points, database): diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index 10b9e7b4..9762b695 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -24,47 +24,16 @@ from monasca_persister.repositories import singleton LOG = log.getLogger(__name__) -class DataPoints(dict): - - def __init__(self): - self.counter = 0 - - def __setitem__(self, key, value): - raise NotImplementedError('Use append(key, value) instead.') - - def __delitem__(self, key): - raise NotImplementedError('Use clear() instead.') - - def pop(self): - raise NotImplementedError('Use clear() instead.') - - def popitem(self): - raise NotImplementedError('Use clear() instead.') - - def update(self): - raise NotImplementedError('Use clear() instead.') - - def chained(self): - return [vi for vo in super(DataPoints, self).values() for vi in vo] - - def append(self, key, value): - super(DataPoints, self).setdefault(key, []).append(value) - self.counter += 1 - - def clear(self): - super(DataPoints, self).clear() - self.counter = 0 - @six.add_metaclass(singleton.Singleton) class Persister(six.with_metaclass(ABCMeta, object)): def __init__(self, kafka_conf, repository): - self._data_points = DataPoints() self._kafka_topic = kafka_conf.topic self._batch_size = kafka_conf.batch_size self.repository = repository() + self._data_points = self.repository.data_points_class() def _flush(self): if not self._data_points: @@ -74,7 +43,7 @@ class Persister(six.with_metaclass(ABCMeta, object)): self.repository.write_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - self._data_points.counter, self._kafka_topic)) + self._data_points.get_count(), self._kafka_topic)) self._data_points.clear() self._consumer.commit() @@ -105,7 +74,7 @@ class Persister(six.with_metaclass(ABCMeta, object)): LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if self._data_points.counter >= self._batch_size: + if self._data_points.get_count() >= self._batch_size: self._flush() except Exception: LOG.exception( diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index 154ad94f..9ab68e8d 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -104,4 +104,4 @@ def parse_events_message(message): project_id = decoded_message['meta']['project_id'] dimensions = decoded_message['event']['dimensions'] - return project_id, timestamp, event_type, payload, dimensions + return (project_id, timestamp, event_type, payload, dimensions), project_id diff --git a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py index ddbea998..523bef0b 100644 --- a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py @@ -21,8 +21,7 @@ from oslo_config import cfg from monasca_persister.repositories.cassandra import alarm_state_history_repository from monasca_persister.repositories.cassandra import connection_util - -from monasca_persister.repositories.persister import DataPoints +from monasca_persister.repositories import data_points class TestAlarmStateHistoryRepo(base.BaseTestCase): @@ -106,6 +105,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1)) self._session, self._upsert_stmt = Mock(), Mock() - alarm_state_hists_by_tenant = DataPoints() + alarm_state_hists_by_tenant = data_points.DataPointsAsList() alarm_state_hists_by_tenant.append('fake_tenant', 'elem') self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant) diff --git a/monasca_persister/tests/test_events.py b/monasca_persister/tests/test_events.py index fd9f5ee0..4714e6b2 100644 --- a/monasca_persister/tests/test_events.py +++ b/monasca_persister/tests/test_events.py @@ -37,7 +37,8 @@ class TestEvents(base.BaseTestCase): def test_parse_event(self): event = self._load_event('event_1') - project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(event) + (project_id, timestamp, event_type, payload, + dimensions), _ = utils.parse_events_message(event) self.assertEqual('de98fbff448f4f278a56e9929db70b03', project_id) self.assertEqual('2017-06-01 09:15:11.494606', timestamp) self.assertEqual('compute.instance.create.start', event_type) diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py index 7050bb24..1e5f4a2a 100644 --- a/monasca_persister/tests/test_persister_repo.py +++ b/monasca_persister/tests/test_persister_repo.py @@ -22,8 +22,8 @@ from oslo_config import cfg from monasca_common.kafka import consumer from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister +from monasca_persister.repositories import data_points from monasca_persister.repositories.persister import LOG -from monasca_persister.repositories.persister import DataPoints class FakeException(Exception): @@ -85,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_if_consumer_is_faulty(self): with patch.object(os, '_exit', return_value=None) as mock_exit: - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._consumer = Mock(side_effect=FakeException) self.persister.run() mock_exit.assert_called_once_with(1) @@ -93,7 +93,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_logs_exception_from_consumer(self): with patch.object(self.persister.repository, 'process_message', side_effect=FakeException): - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._consumer = ['aa'] self.persister.run() self.mock_log_exception.assert_called() @@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase): with patch.object(self.persister.repository, 'process_message', return_value=('message', 'tenant_id')): with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.persister._consumer.__iter__.return_value = ('aa', 'bb') self.persister._batch_size = 1 @@ -117,7 +117,7 @@ class TestPersisterRepo(base.BaseTestCase): return_value=True)): for elem in exception_msgs: with patch.object(LOG, 'info', side_effect=FakeException(elem)): - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.persister._flush() self.mock_log_warning.assert_called() @@ -127,7 +127,7 @@ class TestPersisterRepo(base.BaseTestCase): with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error', return_value=False)): mock_log_info.side_effect.message = 'some msg' - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.assertRaises(FakeException, self.persister._flush) self.mock_log_exception.assert_called() diff --git a/monasca_persister/tests/test_utils.py b/monasca_persister/tests/test_utils.py index f2991ee1..2aefcd57 100644 --- a/monasca_persister/tests/test_utils.py +++ b/monasca_persister/tests/test_utils.py @@ -104,7 +104,8 @@ class TestUtils(base.BaseTestCase): } }""" - project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(message) + (project_id, timestamp, event_type, payload, + dimensions), _ = utils.parse_events_message(message) self.assertEqual(project_id, "dummy_project_id") self.assertEqual(timestamp, "dummy_timestamp")