Merge "Partition static resources defined in pipeline.yaml"

This commit is contained in:
Jenkins 2014-09-29 10:11:01 +00:00 committed by Gerrit Code Review
commit 1c5ce73023
4 changed files with 77 additions and 7 deletions

View File

@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline as publish_pipeline
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -53,7 +54,14 @@ class Resources(object):
source_discovery = (self.agent_manager.discover(self._discovery,
discovery_cache)
if self._discovery else [])
return self._resources + source_discovery
static_resources = []
if self._resources:
static_resources_group = self.agent_manager.construct_group_id(
utils.hash_of_set(self._resources))
p_coord = self.agent_manager.partition_coordinator
static_resources = p_coord.extract_my_subset(
static_resources_group, self._resources)
return static_resources + source_discovery
@staticmethod
def key(source, pollster):
@ -145,8 +153,15 @@ class AgentManager(os_service.Service):
)
def join_partitioning_groups(self):
groups = set([self._construct_group_id(d.obj.group_id)
groups = set([self.construct_group_id(d.obj.group_id)
for d in self.discovery_manager])
# let each set of statically-defined resources have its own group
static_resource_groups = set([
self.construct_group_id(utils.hash_of_set(p.resources))
for p in self.pipeline_manager.pipelines
if p.resources
])
groups.update(static_resource_groups)
for group in groups:
self.partition_coordinator.join_group(group)
@ -168,7 +183,7 @@ class AgentManager(os_service.Service):
return polling_tasks
def _construct_group_id(self, discovery_group_id):
def construct_group_id(self, discovery_group_id):
return ('%s-%s' % (self.group_prefix,
discovery_group_id)
if discovery_group_id else None)
@ -217,7 +232,7 @@ class AgentManager(os_service.Service):
try:
discovered = discoverer.discover(self, param)
partitioned = self.partition_coordinator.extract_my_subset(
self._construct_group_id(discoverer.group_id),
self.construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:

View File

@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import transformer
from ceilometer import utils
class TestSample(sample.Sample):
@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
expected = [mock.call(self.mgr._construct_group_id(g))
for g in ['another_group', 'global']]
static_group_ids = [utils.hash_of_set(p['resources'])
for p in self.pipeline_cfg
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id),
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
if hasattr(d.obj, 'resources')]
@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
def test_static_resources_partitioning(self):
p_coord = self.mgr.partition_coordinator
self.mgr.default_discovery = []
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.pipeline_cfg[0]['resources'] = static_resources
self.pipeline_cfg.append({
'name': "test_pipeline2",
'interval': 60,
'counters': ['test', 'test2'],
'resources': static_resources2,
'transformers': [],
'publishers': ["test"],
})
# have one pipeline without static resources defined
self.pipeline_cfg.append({
'name': "test_pipeline3",
'interval': 60,
'counters': ['test', 'test2'],
'resources': [],
'transformers': [],
'publishers': ["test"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
# Only two groups need to be created, one for each pipeline,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
utils.hash_of_set(resources)),
resources)
for resources in [static_resources,
static_resources2]]
self.assertEqual(len(expected),
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)

View File

@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase):
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
def test_hash_of_set(self):
x = ['a', 'b']
y = ['a', 'b', 'a']
z = ['a', 'c']
self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000

View File

@ -206,6 +206,10 @@ def uniq(dupes, attrs):
return deduped
def hash_of_set(s):
return str(hash(frozenset(s)))
class HashRing(object):
def __init__(self, nodes, replicas=100):