turbo-hipster/tests/test_worker_manager.py
Michael Still e7399f6b2e gear tests are unreliable.
Disable the gear tests until they can be made reliable. These fail
consistently on my laptop, as well as intermittently in the gate.
Note that I can't use self.skip() here because the failure occurs
in setUp().

Change-Id: I7edb0f53228e9a0d24c8bd47626319d896d40cb2
2014-01-15 11:27:42 +11:00

69 lines
2.5 KiB
Python

#!/usr/bin/python2
#
# Copyright 2013 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import testtools
import time
from fakes import FakeZuulManager, FakeGearmanServer,\
FakeRealDbUpgradeRunner
CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
with open(os.path.join(CONFIG_DIR, 'config.json'), 'r') as config_stream:
CONFIG = json.load(config_stream)
class TestGearmanManager(testtools.TestCase):
def setUp(self):
super(TestGearmanManager, self).setUp()
self.config = CONFIG
self.gearman_server = FakeGearmanServer(
self.config['zuul_server']['gearman_port'])
self.config['zuul_server']['gearman_port'] = self.gearman_server.port
self.task = FakeRealDbUpgradeRunner(self.config,
self.config['plugins'][0],
'test-worker-1', self)
self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task)
self.gearman_manager = FakeZuulManager(self.config, self.tasks, self)
def xtest_manager_function_registered(self):
""" Check the manager is set up correctly and registered with the
gearman server with an appropriate function """
# Give the gearman server up to 5 seconds to register the function
for x in range(500):
time.sleep(0.01)
if len(self.gearman_server.functions) > 0:
break
hostname = os.uname()[1]
function_name = 'stop:turbo-hipster-manager-%s' % hostname
self.assertIn(function_name, self.gearman_server.functions)
def xtest_task_registered_with_manager(self):
""" Check the FakeRealDbUpgradeRunner_worker task is registered """
self.assertIn('FakeRealDbUpgradeRunner_worker',
self.gearman_manager.tasks.keys())
def xtest_stop_task(self):
""" Check that the manager successfully stops a task when requested
"""
pass