Use mocked object to test log message

Change-Id: I491d49310acdefe83615232e1520df5b93280e3b
This commit is contained in:
LiuSheng 2015-08-02 21:59:07 +08:00
parent c22c89b104
commit 92c4960f06

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
from oslo_config import fixture as fixture_config
import tooz.coordination
@ -107,32 +105,12 @@ class MockAsyncError(tooz.coordination.CoordAsyncResult):
return True
class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs."""
def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())
def reset(self):
self.messages = {'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': []}
class TestPartitioning(base.BaseTestCase):
def setUp(self):
super(TestPartitioning, self).setUp()
conf = service.prepare_service([])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.str_handler = MockLoggingHandler()
coordination.LOG.logger.addHandler(self.str_handler)
self.shared_storage = {}
def _get_new_started_coordinator(self, shared_storage, agent_id=None,
@ -199,37 +177,37 @@ class TestPartitioning(base.BaseTestCase):
expected_resources=expected_resources[i]))
self._usage_simulation(*agents_kwargs)
def test_coordination_backend_offline(self):
@mock.patch.object(coordination.LOG, 'exception')
def test_coordination_backend_offline(self, mocked_exception):
agents = [dict(agent_id='agent1',
group_id='group',
all_resources=['res1', 'res2'],
expected_resources=[],
coordinator_cls=MockToozCoordExceptionRaiser)]
self._usage_simulation(*agents)
expected_errors = ['Error getting group membership info from '
'coordination backend.',
'Error connecting to coordination backend.']
for e in expected_errors:
self.assertIn(e, self.str_handler.messages['error'])
called = [mock.call(u'Error connecting to coordination backend.'),
mock.call(u'Error getting group membership info from '
u'coordination backend.')]
self.assertEqual(called, mocked_exception.call_args_list)
def test_reconnect(self):
@mock.patch.object(coordination.LOG, 'exception')
@mock.patch.object(coordination.LOG, 'info')
def test_reconnect(self, mock_info, mocked_exception):
coord = self._get_new_started_coordinator({}, 'a',
MockToozCoordExceptionRaiser)
with mock.patch('tooz.coordination.get_coordinator',
return_value=MockToozCoordExceptionRaiser('a', {})):
coord.heartbeat()
expected_errors = ['Error connecting to coordination backend.',
'Error sending a heartbeat to coordination '
'backend.']
for e in expected_errors:
self.assertIn(e, self.str_handler.messages['error'])
self.str_handler.messages['error'] = []
called = [mock.call(u'Error connecting to coordination backend.'),
mock.call(u'Error connecting to coordination backend.'),
mock.call(u'Error sending a heartbeat to coordination '
u'backend.')]
self.assertEqual(called, mocked_exception.call_args_list)
with mock.patch('tooz.coordination.get_coordinator',
return_value=MockToozCoordinator('a', {})):
coord.heartbeat()
for e in expected_errors:
self.assertNotIn(e, self.str_handler.messages['error'])
mock_info.assert_called_with(u'Coordination backend started '
u'successfully.')
def test_group_id_none(self):
coord = self._get_new_started_coordinator({}, 'a')