# -*- encoding: utf-8 -*- # # Copyright © 2013 Intel Corp. # # Author: Lianhao Lu # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Tests for ceilometer/central/manager.py """ import datetime from stevedore import extension from ceilometer.central import manager from ceilometer import counter from ceilometer import publish from ceilometer.tests import base from ceilometer.openstack.common import cfg from keystoneclient.v2_0 import client as ksclient def test_load_plugins(): mgr = manager.AgentManager() assert list(mgr.ext_manager), 'Failed to load any plugins' return class TestRunTasks(base.TestCase): class Pollster: counters = [] test_data = counter.Counter( name='test', type=counter.TYPE_CUMULATIVE, unit='', volume=1, user_id='test', project_id='test', resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'Pollster'}, ) def get_counters(self, manager): self.counters.append((manager, self.test_data)) return [self.test_data] def faux_notify(self, context, msg, topic, secret, source): self.notifications.append((msg, topic, secret, source)) def setUp(self): super(TestRunTasks, self).setUp() self.notifications = [] self.stubs.Set(publish, 'publish_counter', self.faux_notify) self.stubs.Set(ksclient, 'Client', lambda *args, **kwargs: None) self.mgr = manager.AgentManager() self.mgr.ext_manager = extension.ExtensionManager('fake', invoke_on_load=False, ) self.mgr.ext_manager.extensions = [extension.Extension('test', None, None, self.Pollster(), ), ] # Invoke the periodic tasks to call the pollsters. self.mgr.periodic_tasks(None) def tearDown(self): self.Pollster.counters = [] super(TestRunTasks, self).tearDown() def test_message(self): self.assertEqual(len(self.Pollster.counters), 1) self.assertTrue(self.Pollster.counters[0][1] is self.Pollster.test_data) def test_notifications(self): actual = self.notifications self.assertEqual(list(actual[0]), [self.Pollster.test_data, cfg.CONF.metering_topic, cfg.CONF.metering_secret, cfg.CONF.counter_source])