Fix subscription limit per queue
Query for all subscriptions on a given queue by taking into account the returned marker, if any. Change-Id: I6b03dd93cabc76d1c91041a896fbb30ac08032f1 Closes-Bug: #1590173
This commit is contained in:
parent
5d1964d96b
commit
61100fec33
@ -40,15 +40,22 @@ class NotifierDriver(object):
|
||||
if self.subscription_controller:
|
||||
if not isinstance(self.subscription_controller,
|
||||
pooling.SubscriptionController):
|
||||
subscribers = self.subscription_controller.list(queue_name,
|
||||
project)
|
||||
for sub in next(subscribers):
|
||||
s_type = urllib_parse.urlparse(sub['subscriber']).scheme
|
||||
data_driver = self.subscription_controller.driver
|
||||
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||
s_type,
|
||||
invoke_on_load=True)
|
||||
self.executor.submit(mgr.driver.execute, sub, messages,
|
||||
conf=data_driver.conf)
|
||||
marker = None
|
||||
while True:
|
||||
subscribers = self.subscription_controller.list(
|
||||
queue_name, project, marker=marker)
|
||||
for sub in next(subscribers):
|
||||
LOG.debug("Notifying subscriber %r" % (sub,))
|
||||
s_type = urllib_parse.urlparse(
|
||||
sub['subscriber']).scheme
|
||||
data_driver = self.subscription_controller.driver
|
||||
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||
s_type,
|
||||
invoke_on_load=True)
|
||||
self.executor.submit(mgr.driver.execute, sub, messages,
|
||||
conf=data_driver.conf)
|
||||
marker = next(subscribers)
|
||||
if not marker:
|
||||
break
|
||||
else:
|
||||
LOG.error(_LE('Failed to get subscription controller.'))
|
||||
|
@ -60,7 +60,7 @@ class NotifierTest(testing.TestBase):
|
||||
{'subscriber': 'http://ping_me',
|
||||
'source': 'fake_queue'}]
|
||||
ctlr = mock.MagicMock()
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription]))
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
|
||||
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
with mock.patch('requests.post') as mock_post:
|
||||
@ -98,6 +98,45 @@ class NotifierTest(testing.TestBase):
|
||||
], any_order=True)
|
||||
self.assertEqual(6, len(mock_post.mock_calls))
|
||||
|
||||
def test_marker(self):
|
||||
subscription1 = [{'subscriber': 'http://trigger_me1',
|
||||
'source': 'fake_queue'}]
|
||||
subscription2 = [{'subscriber': 'http://trigger_me2',
|
||||
'source': 'fake_queue'}]
|
||||
ctlr = mock.MagicMock()
|
||||
|
||||
def mock_list(queue, project, marker):
|
||||
if not marker:
|
||||
return iter([subscription1, 'marker_id'])
|
||||
else:
|
||||
return iter([subscription2, {}])
|
||||
|
||||
ctlr.list = mock_list
|
||||
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
with mock.patch('requests.post') as mock_post:
|
||||
driver.post('fake_queue', self.messages, self.client_id,
|
||||
self.project)
|
||||
driver.executor.shutdown()
|
||||
# Let's deserialize "data" from JSON string to dict in each mock
|
||||
# call, so we can do dict comparisons. JSON string comparisons
|
||||
# often fail, because dict keys can be serialized in different
|
||||
# order inside the string.
|
||||
for call in mock_post.call_args_list:
|
||||
call[1]['data'] = json.loads(call[1]['data'])
|
||||
# These are not real calls. In real calls each "data" argument is
|
||||
# serialized by json.dumps. But we made a substitution before,
|
||||
# so it will work.
|
||||
mock_post.assert_has_calls([
|
||||
mock.call(subscription1[0]['subscriber'],
|
||||
data=self.notifications[0],
|
||||
headers=headers),
|
||||
mock.call(subscription2[0]['subscriber'],
|
||||
data=self.notifications[0],
|
||||
headers=headers),
|
||||
], any_order=True)
|
||||
self.assertEqual(4, len(mock_post.mock_calls))
|
||||
|
||||
@mock.patch('subprocess.Popen')
|
||||
def test_mailto(self, mock_popen):
|
||||
subscription = [{'subscriber': 'mailto:aaa@example.com',
|
||||
@ -109,7 +148,7 @@ class NotifierTest(testing.TestBase):
|
||||
'options': {'subject': 'Hello',
|
||||
'from': 'zaqar@example.com'}}]
|
||||
ctlr = mock.MagicMock()
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription]))
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
|
||||
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||
called = set()
|
||||
msg = ('Content-Type: text/plain; charset="us-ascii"\n'
|
||||
@ -159,7 +198,7 @@ class NotifierTest(testing.TestBase):
|
||||
|
||||
def test_post_no_subscriber(self):
|
||||
ctlr = mock.MagicMock()
|
||||
ctlr.list = mock.Mock(return_value=iter([[]]))
|
||||
ctlr.list = mock.Mock(return_value=iter([[], {}]))
|
||||
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||
with mock.patch('requests.post') as mock_post:
|
||||
driver.post('fake_queue', self.messages, self.client_id,
|
||||
@ -171,7 +210,7 @@ class NotifierTest(testing.TestBase):
|
||||
subscription = [{'subscriber': 'http://trigger_me',
|
||||
'source': 'fake_queue'}]
|
||||
ctlr = mock.MagicMock()
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription]))
|
||||
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
|
||||
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||
with mock.patch('requests.post') as mock_post:
|
||||
driver.post('fake_queue', self.messages, self.client_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user