diff --git a/ceilometer/agent.py b/ceilometer/agent.py index cc0067a70..dad18f2e7 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline as publish_pipeline +from ceilometer import utils LOG = log.getLogger(__name__) @@ -53,7 +54,14 @@ class Resources(object): source_discovery = (self.agent_manager.discover(self._discovery, discovery_cache) if self._discovery else []) - return self._resources + source_discovery + static_resources = [] + if self._resources: + static_resources_group = self.agent_manager.construct_group_id( + utils.hash_of_set(self._resources)) + p_coord = self.agent_manager.partition_coordinator + static_resources = p_coord.extract_my_subset( + static_resources_group, self._resources) + return static_resources + source_discovery @staticmethod def key(source, pollster): @@ -145,8 +153,15 @@ class AgentManager(os_service.Service): ) def join_partitioning_groups(self): - groups = set([self._construct_group_id(d.obj.group_id) + groups = set([self.construct_group_id(d.obj.group_id) for d in self.discovery_manager]) + # let each set of statically-defined resources have its own group + static_resource_groups = set([ + self.construct_group_id(utils.hash_of_set(p.resources)) + for p in self.pipeline_manager.pipelines + if p.resources + ]) + groups.update(static_resource_groups) for group in groups: self.partition_coordinator.join_group(group) @@ -168,7 +183,7 @@ class AgentManager(os_service.Service): return polling_tasks - def _construct_group_id(self, discovery_group_id): + def construct_group_id(self, discovery_group_id): return ('%s-%s' % (self.group_prefix, discovery_group_id) if discovery_group_id else None) @@ -217,7 +232,7 @@ class AgentManager(os_service.Service): try: discovered = discoverer.discover(self, param) partitioned = self.partition_coordinator.extract_my_subset( - self._construct_group_id(discoverer.group_id), + self.construct_group_id(discoverer.group_id), discovered) resources.extend(partitioned) if discovery_cache is not None: diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 39e8d7543..b658bea52 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import transformer +from ceilometer import utils class TestSample(sample.Sample): @@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.mgr.discovery_manager = self.create_discovery_manager() self.mgr.join_partitioning_groups() p_coord = self.mgr.partition_coordinator - expected = [mock.call(self.mgr._construct_group_id(g)) - for g in ['another_group', 'global']] + static_group_ids = [utils.hash_of_set(p['resources']) + for p in self.pipeline_cfg + if p['resources']] + expected = [mock.call(self.mgr.construct_group_id(g)) + for g in ['another_group', 'global'] + static_group_ids] self.assertEqual(len(expected), len(p_coord.join_group.call_args_list)) for c in expected: self.assertIn(c, p_coord.join_group.call_args_list) @@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] + self.pipeline_cfg[0]['resources'] = [] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id), + expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id), d.obj.resources) for d in self.mgr.discovery_manager if hasattr(d.obj, 'resources')] @@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase): len(p_coord.extract_my_subset.call_args_list)) for c in expected: self.assertIn(c, p_coord.extract_my_subset.call_args_list) + + def test_static_resources_partitioning(self): + p_coord = self.mgr.partition_coordinator + self.mgr.default_discovery = [] + static_resources = ['static_1', 'static_2'] + static_resources2 = ['static_3', 'static_4'] + self.pipeline_cfg[0]['resources'] = static_resources + self.pipeline_cfg.append({ + 'name': "test_pipeline2", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': static_resources2, + 'transformers': [], + 'publishers': ["test"], + }) + # have one pipeline without static resources defined + self.pipeline_cfg.append({ + 'name': "test_pipeline3", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': [], + 'transformers': [], + 'publishers': ["test"], + }) + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + # Only two groups need to be created, one for each pipeline, + # even though counter test is used twice + expected = [mock.call(self.mgr.construct_group_id( + utils.hash_of_set(resources)), + resources) + for resources in [static_resources, + static_resources2]] + self.assertEqual(len(expected), + len(p_coord.extract_my_subset.call_args_list)) + for c in expected: + self.assertIn(c, p_coord.extract_my_subset.call_args_list) diff --git a/ceilometer/tests/test_utils.py b/ceilometer/tests/test_utils.py index f9a648102..bda3de191 100644 --- a/ceilometer/tests/test_utils.py +++ b/ceilometer/tests/test_utils.py @@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase): ('nested2[1].c', 'B')], sorted(pairs, key=lambda x: x[0])) + def test_hash_of_set(self): + x = ['a', 'b'] + y = ['a', 'b', 'a'] + z = ['a', 'c'] + self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y)) + self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z)) + self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z)) + def test_hash_ring(self): num_nodes = 10 num_keys = 1000 diff --git a/ceilometer/utils.py b/ceilometer/utils.py index c703521bc..016fb8a32 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -206,6 +206,10 @@ def uniq(dupes, attrs): return deduped +def hash_of_set(s): + return str(hash(frozenset(s))) + + class HashRing(object): def __init__(self, nodes, replicas=100):