diff --git a/monasca_persister/repositories/abstract_repository.py b/monasca_persister/repositories/abstract_repository.py index 9f909552..6e9301cd 100644 --- a/monasca_persister/repositories/abstract_repository.py +++ b/monasca_persister/repositories/abstract_repository.py @@ -27,5 +27,5 @@ class AbstractRepository(object): pass @abc.abstractmethod - def write_batch(self, data_points_by_tenant): + def write_batch(self, data_points): pass diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py index 188f1e83..4f1bd347 100644 --- a/monasca_persister/repositories/cassandra/abstract_repository.py +++ b/monasca_persister/repositories/cassandra/abstract_repository.py @@ -20,9 +20,11 @@ import six from monasca_persister.repositories import abstract_repository from monasca_persister.repositories.cassandra import connection_util +from monasca_persister.repositories import data_points conf = cfg.CONF + @six.add_metaclass(abc.ABCMeta) class AbstractCassandraRepository(abstract_repository.AbstractRepository): def __init__(self): @@ -33,3 +35,4 @@ class AbstractCassandraRepository(abstract_repository.AbstractRepository): self._retention = conf.cassandra.retention_policy * 24 * 3600 self._cache_size = conf.cassandra.max_definition_cache_size self._max_batches = conf.cassandra.max_batches + self.data_points_class = data_points.DataPointsAsList diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py index 064be743..0ee35a17 100644 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -55,10 +55,7 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep return alarm_state_hist, tenant_id - def write_batch(self, alarm_state_hists_by_tenant): - # TODO(brtknr): At the moment, Cassandra does not have database per - # tenant implemented, so use chained list of values. - alarm_state_hists = alarm_state_hists_by_tenant.chained() + def write_batch(self, alarm_state_hists): while alarm_state_hists: num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size) batch = alarm_state_hists[:num_rows] diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py index 5c1b4a60..55d28e05 100644 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -220,10 +220,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) return metric, tenant_id - def write_batch(self, metrics_by_tenant): - # TODO(brtknr): At the moment, Cassandra does not have database per - # tenant implemented, so join the list of values. - metrics = metrics_by_tenant.chained() + def write_batch(self, metrics): with self._lock: batch_list = self._metric_batch.get_all_batches() results = execute_concurrent(self._session, batch_list, raise_on_first_error=True) diff --git a/monasca_persister/repositories/data_points.py b/monasca_persister/repositories/data_points.py new file mode 100644 index 00000000..b367c474 --- /dev/null +++ b/monasca_persister/repositories/data_points.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class DataPointsAsList(list): + + def append(self, key, value): + super(DataPointsAsList, self).append(value) + + def get_count(self): + return len(self) + + def clear(self): + del self[:] + + +class DataPointsAsDict(dict): + + def __init__(self): + self.counter = 0 + + def __setitem__(self, key, value): + raise NotImplementedError('Use append(key, value) instead.') + + def __delitem__(self, key): + raise NotImplementedError('Use clear() instead.') + + def pop(self): + raise NotImplementedError('Use clear() instead.') + + def popitem(self): + raise NotImplementedError('Use clear() instead.') + + def update(self): + raise NotImplementedError('Use clear() instead.') + + def append(self, key, value): + super(DataPointsAsDict, self).setdefault(key, []).append(value) + self.counter += 1 + + def clear(self): + super(DataPointsAsDict, self).clear() + self.counter = 0 + + def get_count(self): + return self.counter diff --git a/monasca_persister/repositories/elasticsearch/events_repository.py b/monasca_persister/repositories/elasticsearch/events_repository.py index ca0362ee..b4dffdfa 100644 --- a/monasca_persister/repositories/elasticsearch/events_repository.py +++ b/monasca_persister/repositories/elasticsearch/events_repository.py @@ -21,6 +21,7 @@ from oslo_config import cfg from oslo_log import log from monasca_persister.repositories import abstract_repository +from monasca_persister.repositories import data_points from monasca_persister.repositories import utils LOG = log.getLogger(__name__) @@ -37,6 +38,7 @@ class ElasticSearchEventsRepository(abstract_repository.AbstractRepository): sniffer_timeout=self.conf.sniffer_timeout, max_retries=self.conf.max_retries ) + self.data_points_class = data_points.DataPointsAsList def process_message(self, message): return utils.parse_events_message(message) diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 5f63a8da..fad54070 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -18,6 +18,7 @@ from oslo_config import cfg import six from monasca_persister.repositories import abstract_repository +from monasca_persister.repositories import data_points DATABASE_NOT_FOUND_MSG = "database not found" @@ -33,16 +34,18 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.port, self.conf.influxdb.user, self.conf.influxdb.password) - - def write_batch(self, data_points_by_tenant): if self.conf.influxdb.db_per_tenant: - for tenant_id, data_points in data_points_by_tenant.items(): - database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id) - self._write_batch(data_points, database) + self.data_points_class = data_points.DataPointsAsDict + else: + self.data_points_class = data_points.DataPointsAsList + + def write_batch(self, data_points): + if self.conf.influxdb.db_per_tenant: + for tenant_id, tenant_data_points in data_points.items(): + database = '%s_%s' % (self.conf.influxdb.database_name, + tenant_id) + self._write_batch(tenant_data_points, database) else: - # NOTE (brtknr): Chain list of values to avoid multiple calls to - # database API endpoint (when db_per_tenant is False). - data_points = data_points_by_tenant.chained() self._write_batch(data_points, self.conf.influxdb.database_name) def _write_batch(self, data_points, database): diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index 10b9e7b4..9762b695 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -24,47 +24,16 @@ from monasca_persister.repositories import singleton LOG = log.getLogger(__name__) -class DataPoints(dict): - - def __init__(self): - self.counter = 0 - - def __setitem__(self, key, value): - raise NotImplementedError('Use append(key, value) instead.') - - def __delitem__(self, key): - raise NotImplementedError('Use clear() instead.') - - def pop(self): - raise NotImplementedError('Use clear() instead.') - - def popitem(self): - raise NotImplementedError('Use clear() instead.') - - def update(self): - raise NotImplementedError('Use clear() instead.') - - def chained(self): - return [vi for vo in super(DataPoints, self).values() for vi in vo] - - def append(self, key, value): - super(DataPoints, self).setdefault(key, []).append(value) - self.counter += 1 - - def clear(self): - super(DataPoints, self).clear() - self.counter = 0 - @six.add_metaclass(singleton.Singleton) class Persister(six.with_metaclass(ABCMeta, object)): def __init__(self, kafka_conf, repository): - self._data_points = DataPoints() self._kafka_topic = kafka_conf.topic self._batch_size = kafka_conf.batch_size self.repository = repository() + self._data_points = self.repository.data_points_class() def _flush(self): if not self._data_points: @@ -74,7 +43,7 @@ class Persister(six.with_metaclass(ABCMeta, object)): self.repository.write_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - self._data_points.counter, self._kafka_topic)) + self._data_points.get_count(), self._kafka_topic)) self._data_points.clear() self._consumer.commit() @@ -105,7 +74,7 @@ class Persister(six.with_metaclass(ABCMeta, object)): LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if self._data_points.counter >= self._batch_size: + if self._data_points.get_count() >= self._batch_size: self._flush() except Exception: LOG.exception( diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index 154ad94f..9ab68e8d 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -104,4 +104,4 @@ def parse_events_message(message): project_id = decoded_message['meta']['project_id'] dimensions = decoded_message['event']['dimensions'] - return project_id, timestamp, event_type, payload, dimensions + return (project_id, timestamp, event_type, payload, dimensions), project_id diff --git a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py index ddbea998..523bef0b 100644 --- a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py @@ -21,8 +21,7 @@ from oslo_config import cfg from monasca_persister.repositories.cassandra import alarm_state_history_repository from monasca_persister.repositories.cassandra import connection_util - -from monasca_persister.repositories.persister import DataPoints +from monasca_persister.repositories import data_points class TestAlarmStateHistoryRepo(base.BaseTestCase): @@ -106,6 +105,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1)) self._session, self._upsert_stmt = Mock(), Mock() - alarm_state_hists_by_tenant = DataPoints() + alarm_state_hists_by_tenant = data_points.DataPointsAsList() alarm_state_hists_by_tenant.append('fake_tenant', 'elem') self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant) diff --git a/monasca_persister/tests/test_events.py b/monasca_persister/tests/test_events.py index fd9f5ee0..4714e6b2 100644 --- a/monasca_persister/tests/test_events.py +++ b/monasca_persister/tests/test_events.py @@ -37,7 +37,8 @@ class TestEvents(base.BaseTestCase): def test_parse_event(self): event = self._load_event('event_1') - project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(event) + (project_id, timestamp, event_type, payload, + dimensions), _ = utils.parse_events_message(event) self.assertEqual('de98fbff448f4f278a56e9929db70b03', project_id) self.assertEqual('2017-06-01 09:15:11.494606', timestamp) self.assertEqual('compute.instance.create.start', event_type) diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py index 7050bb24..1e5f4a2a 100644 --- a/monasca_persister/tests/test_persister_repo.py +++ b/monasca_persister/tests/test_persister_repo.py @@ -22,8 +22,8 @@ from oslo_config import cfg from monasca_common.kafka import consumer from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister +from monasca_persister.repositories import data_points from monasca_persister.repositories.persister import LOG -from monasca_persister.repositories.persister import DataPoints class FakeException(Exception): @@ -85,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_if_consumer_is_faulty(self): with patch.object(os, '_exit', return_value=None) as mock_exit: - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._consumer = Mock(side_effect=FakeException) self.persister.run() mock_exit.assert_called_once_with(1) @@ -93,7 +93,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_logs_exception_from_consumer(self): with patch.object(self.persister.repository, 'process_message', side_effect=FakeException): - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._consumer = ['aa'] self.persister.run() self.mock_log_exception.assert_called() @@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase): with patch.object(self.persister.repository, 'process_message', return_value=('message', 'tenant_id')): with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.persister._consumer.__iter__.return_value = ('aa', 'bb') self.persister._batch_size = 1 @@ -117,7 +117,7 @@ class TestPersisterRepo(base.BaseTestCase): return_value=True)): for elem in exception_msgs: with patch.object(LOG, 'info', side_effect=FakeException(elem)): - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.persister._flush() self.mock_log_warning.assert_called() @@ -127,7 +127,7 @@ class TestPersisterRepo(base.BaseTestCase): with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error', return_value=False)): mock_log_info.side_effect.message = 'some msg' - self.persister._data_points = DataPoints() + self.persister._data_points = data_points.DataPointsAsDict() self.persister._data_points.append('fake_tenant_id', 'some') self.assertRaises(FakeException, self.persister._flush) self.mock_log_exception.assert_called() diff --git a/monasca_persister/tests/test_utils.py b/monasca_persister/tests/test_utils.py index f2991ee1..2aefcd57 100644 --- a/monasca_persister/tests/test_utils.py +++ b/monasca_persister/tests/test_utils.py @@ -104,7 +104,8 @@ class TestUtils(base.BaseTestCase): } }""" - project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(message) + (project_id, timestamp, event_type, payload, + dimensions), _ = utils.parse_events_message(message) self.assertEqual(project_id, "dummy_project_id") self.assertEqual(timestamp, "dummy_timestamp")