Add in start of worker_manager tests and fakes
This commit is contained in:
parent
9b177af473
commit
66f9f60394
18
tests/etc/config.json
Normal file
18
tests/etc/config.json
Normal file
@ -0,0 +1,18 @@
|
||||
{
|
||||
"zuul_server": {
|
||||
"git_url": "/home/josh/var/lib/zuul/git/",
|
||||
"gearman_host": "localhost",
|
||||
"gearman_port": 14730
|
||||
},
|
||||
"debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
|
||||
"jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
|
||||
"git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
|
||||
"pip_download_cache": "/home/josh/var/cache/pip",
|
||||
"plugins": ["gate_real_db_upgrade"],
|
||||
"publish_logs":
|
||||
{
|
||||
"type": "local",
|
||||
"path": "/home/josh/var/www/results/",
|
||||
"prepend_url": "http://localhost/results/"
|
||||
}
|
||||
}
|
280
tests/fakes.py
Normal file
280
tests/fakes.py
Normal file
@ -0,0 +1,280 @@
|
||||
#!/usr/bin/python2
|
||||
#
|
||||
# Copyright 2013 Rackspace Australia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gear
|
||||
import threading
|
||||
import os
|
||||
|
||||
from turbo_hipster.worker_manager import GearmanManager
|
||||
|
||||
|
||||
class FakeGearmanManager(GearmanManager):
|
||||
def __init__(self, config, tasks, test):
|
||||
self.test = test
|
||||
super(FakeGearmanManager, self).__init__(config, tasks)
|
||||
|
||||
def setup_gearman(self):
|
||||
hostname = os.uname()[1]
|
||||
self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
|
||||
% hostname, self.test)
|
||||
self.gearman_worker.addServer(
|
||||
self.config['zuul_server']['gearman_host'],
|
||||
self.config['zuul_server']['gearman_port']
|
||||
)
|
||||
self.gearman_worker.registerFunction(
|
||||
'stop:turbo-hipster-manager-%s' % hostname)
|
||||
|
||||
|
||||
class FakeWorker(gear.Worker):
|
||||
def __init__(self, worker_id, test):
|
||||
super(FakeWorker, self).__init__(worker_id)
|
||||
self.gearman_jobs = {}
|
||||
self.build_history = []
|
||||
self.running_builds = []
|
||||
self.build_counter = 0
|
||||
self.fail_tests = {}
|
||||
self.test = test
|
||||
|
||||
self.hold_jobs_in_build = False
|
||||
self.lock = threading.Lock()
|
||||
self.__work_thread = threading.Thread(target=self.work)
|
||||
self.__work_thread.daemon = True
|
||||
self.__work_thread.start()
|
||||
|
||||
def handleJob(self, job):
|
||||
parts = job.name.split(":")
|
||||
cmd = parts[0]
|
||||
name = parts[1]
|
||||
if len(parts) > 2:
|
||||
node = parts[2]
|
||||
else:
|
||||
node = None
|
||||
if cmd == 'build':
|
||||
self.handleBuild(job, name, node)
|
||||
elif cmd == 'stop':
|
||||
self.handleStop(job, name)
|
||||
elif cmd == 'set_description':
|
||||
self.handleSetDescription(job, name)
|
||||
|
||||
def handleBuild(self, job, name, node):
|
||||
build = FakeBuild(self, job, self.build_counter, node)
|
||||
job.build = build
|
||||
self.gearman_jobs[job.unique] = job
|
||||
self.build_counter += 1
|
||||
|
||||
self.running_builds.append(build)
|
||||
build.start()
|
||||
|
||||
def handleStop(self, job, name):
|
||||
self.log.debug("handle stop")
|
||||
parameters = json.loads(job.arguments)
|
||||
name = parameters['name']
|
||||
number = parameters['number']
|
||||
for build in self.running_builds:
|
||||
if build.name == name and build.number == number:
|
||||
build.aborted = True
|
||||
build.release()
|
||||
job.sendWorkComplete()
|
||||
return
|
||||
job.sendWorkFail()
|
||||
|
||||
def handleSetDescription(self, job, name):
|
||||
self.log.debug("handle set description")
|
||||
parameters = json.loads(job.arguments)
|
||||
name = parameters['name']
|
||||
number = parameters['number']
|
||||
descr = parameters['html_description']
|
||||
for build in self.running_builds:
|
||||
if build.name == name and build.number == number:
|
||||
build.description = descr
|
||||
job.sendWorkComplete()
|
||||
return
|
||||
for build in self.build_history:
|
||||
if build.name == name and build.number == number:
|
||||
build.description = descr
|
||||
job.sendWorkComplete()
|
||||
return
|
||||
job.sendWorkFail()
|
||||
|
||||
def work(self):
|
||||
while self.running:
|
||||
try:
|
||||
job = self.getJob()
|
||||
except gear.InterruptedError:
|
||||
continue
|
||||
try:
|
||||
self.handleJob(job)
|
||||
except:
|
||||
self.log.exception("Worker exception:")
|
||||
|
||||
def addFailTest(self, name, change):
|
||||
l = self.fail_tests.get(name, [])
|
||||
l.append(change)
|
||||
self.fail_tests[name] = l
|
||||
|
||||
def shouldFailTest(self, name, ref):
|
||||
l = self.fail_tests.get(name, [])
|
||||
for change in l:
|
||||
if self.test.ref_has_change(ref, change):
|
||||
return True
|
||||
return False
|
||||
|
||||
def release(self, regex=None):
|
||||
builds = self.running_builds[:]
|
||||
self.log.debug("releasing build %s (%s)" % (regex,
|
||||
len(self.running_builds)))
|
||||
for build in builds:
|
||||
if not regex or re.match(regex, build.name):
|
||||
self.log.debug("releasing build %s" %
|
||||
(build.parameters['ZUUL_UUID']))
|
||||
build.release()
|
||||
else:
|
||||
self.log.debug("not releasing build %s" %
|
||||
(build.parameters['ZUUL_UUID']))
|
||||
self.log.debug("done releasing builds %s (%s)" %
|
||||
(regex, len(self.running_builds)))
|
||||
|
||||
|
||||
class FakeBuild(threading.Thread):
|
||||
def __init__(self, worker, job, number, node):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.worker = worker
|
||||
self.job = job
|
||||
self.name = job.name.split(':')[1]
|
||||
self.number = number
|
||||
self.node = node
|
||||
self.parameters = json.loads(job.arguments)
|
||||
self.unique = self.parameters['ZUUL_UUID']
|
||||
self.wait_condition = threading.Condition()
|
||||
self.waiting = False
|
||||
self.aborted = False
|
||||
self.created = time.time()
|
||||
self.description = ''
|
||||
|
||||
def release(self):
|
||||
self.wait_condition.acquire()
|
||||
self.wait_condition.notify()
|
||||
self.waiting = False
|
||||
self.log.debug("Build %s released" % self.unique)
|
||||
self.wait_condition.release()
|
||||
|
||||
def isWaiting(self):
|
||||
self.wait_condition.acquire()
|
||||
if self.waiting:
|
||||
ret = True
|
||||
else:
|
||||
ret = False
|
||||
self.wait_condition.release()
|
||||
return ret
|
||||
|
||||
def _wait(self):
|
||||
self.wait_condition.acquire()
|
||||
self.waiting = True
|
||||
self.log.debug("Build %s waiting" % self.unique)
|
||||
self.wait_condition.wait()
|
||||
self.wait_condition.release()
|
||||
|
||||
def run(self):
|
||||
data = {
|
||||
'url': 'https://server/job/%s/%s/' % (self.name, self.number),
|
||||
'name': self.name,
|
||||
'number': self.number,
|
||||
'manager': self.worker.worker_id,
|
||||
}
|
||||
|
||||
self.job.sendWorkData(json.dumps(data))
|
||||
self.job.sendWorkStatus(0, 100)
|
||||
|
||||
if self.worker.hold_jobs_in_build:
|
||||
self._wait()
|
||||
self.log.debug("Build %s continuing" % self.unique)
|
||||
|
||||
self.worker.lock.acquire()
|
||||
|
||||
result = 'SUCCESS'
|
||||
if (('ZUUL_REF' in self.parameters) and
|
||||
self.worker.shouldFailTest(self.name,
|
||||
self.parameters['ZUUL_REF'])):
|
||||
result = 'FAILURE'
|
||||
if self.aborted:
|
||||
result = 'ABORTED'
|
||||
|
||||
data = {'result': result}
|
||||
changes = None
|
||||
if 'ZUUL_CHANGE_IDS' in self.parameters:
|
||||
changes = self.parameters['ZUUL_CHANGE_IDS']
|
||||
|
||||
self.worker.build_history.append(
|
||||
BuildHistory(name=self.name, number=self.number,
|
||||
result=result, changes=changes, node=self.node,
|
||||
uuid=self.unique, description=self.description,
|
||||
pipeline=self.parameters['ZUUL_PIPELINE'])
|
||||
)
|
||||
|
||||
self.job.sendWorkComplete(json.dumps(data))
|
||||
del self.worker.gearman_jobs[self.job.unique]
|
||||
self.worker.running_builds.remove(self)
|
||||
self.worker.lock.release()
|
||||
|
||||
|
||||
class FakeGearmanServer(gear.Server):
|
||||
def __init__(self, port=4730):
|
||||
self.hold_jobs_in_queue = False
|
||||
super(FakeGearmanServer, self).__init__(port)
|
||||
|
||||
def getJobForConnection(self, connection, peek=False):
|
||||
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
|
||||
for job in queue:
|
||||
if not hasattr(job, 'waiting'):
|
||||
if job.name.startswith('build:'):
|
||||
job.waiting = self.hold_jobs_in_queue
|
||||
else:
|
||||
job.waiting = False
|
||||
if job.waiting:
|
||||
continue
|
||||
if job.name in connection.functions:
|
||||
if not peek:
|
||||
queue.remove(job)
|
||||
connection.related_jobs[job.handle] = job
|
||||
job.worker_connection = connection
|
||||
job.running = True
|
||||
return job
|
||||
return None
|
||||
|
||||
def release(self, regex=None):
|
||||
released = False
|
||||
qlen = (len(self.high_queue) + len(self.normal_queue) +
|
||||
len(self.low_queue))
|
||||
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
|
||||
for job in self.getQueue():
|
||||
cmd, name = job.name.split(':')
|
||||
if cmd != 'build':
|
||||
continue
|
||||
if not regex or re.match(regex, name):
|
||||
self.log.debug("releasing queued job %s" %
|
||||
job.unique)
|
||||
job.waiting = False
|
||||
released = True
|
||||
else:
|
||||
self.log.debug("not releasing queued job %s" %
|
||||
job.unique)
|
||||
if released:
|
||||
self.wakeConnections()
|
||||
qlen = (len(self.high_queue) + len(self.normal_queue) +
|
||||
len(self.low_queue))
|
||||
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
|
||||
|
@ -1,29 +1,46 @@
|
||||
import unittest
|
||||
#!/usr/bin/python2
|
||||
#
|
||||
# Copyright 2013 Rackspace Australia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class TestGearmanManager(unittest.TestCase):
|
||||
def test___init__(self):
|
||||
# gearman_manager = GearmanManager(config, tasks)
|
||||
assert False # TODO: implement your test here
|
||||
|
||||
def test_run(self):
|
||||
# gearman_manager = GearmanManager(config, tasks)
|
||||
# self.assertEqual(expected, gearman_manager.run())
|
||||
assert False # TODO: implement your test here
|
||||
import json
|
||||
import os
|
||||
import testtools
|
||||
from fakes import FakeGearmanManager, FakeGearmanServer
|
||||
|
||||
def test_setup_gearman(self):
|
||||
# gearman_manager = GearmanManager(config, tasks)
|
||||
# self.assertEqual(expected, gearman_manager.setup_gearman())
|
||||
assert False # TODO: implement your test here
|
||||
CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
|
||||
with open(os.path.join(CONFIG_DIR, 'config.json'), 'r') as config_stream:
|
||||
CONFIG = json.load(config_stream)
|
||||
|
||||
def test_stop(self):
|
||||
# gearman_manager = GearmanManager(config, tasks)
|
||||
# self.assertEqual(expected, gearman_manager.stop())
|
||||
assert False # TODO: implement your test here
|
||||
|
||||
def test_stopped(self):
|
||||
# gearman_manager = GearmanManager(config, tasks)
|
||||
# self.assertEqual(expected, gearman_manager.stopped())
|
||||
assert False # TODO: implement your test here
|
||||
class TestGearmanManager(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestGearmanManager, self).setUp()
|
||||
self.config = CONFIG
|
||||
self.tasks = []
|
||||
self.gearman_server = FakeGearmanServer(
|
||||
self.config['zuul_server']['gearman_port'])
|
||||
|
||||
self.gearman_manager = FakeGearmanManager(self.config,
|
||||
self.tasks,
|
||||
self)
|
||||
|
||||
def test_manager_function_registered(self):
|
||||
""" Check the manager is set up correctly and registered with the
|
||||
gearman server with an appropriate function """
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user