Update config.yaml to define jobs rather than plugins

Pay off some technical debt and make config.yaml define jobs rather than
plugins.

Change-Id: Ib4aa17649c02ab246d31515c8073230e8259cc4e
This commit is contained in:
Joshua Hesketh 2014-12-02 13:22:10 +11:00
parent d5d7a21ed0
commit 62721fbb68
10 changed files with 219 additions and 100 deletions

View File

@ -56,7 +56,16 @@ for your environment::
**pip_download_cache** **pip_download_cache**
Some of turbo-hipsters task plugins download requirements Some of turbo-hipsters task plugins download requirements
for projects. This is the cache directory used by pip. for projects. This is the cache directory used by pip.
**plugins** **jobs**
A list of registered jobs.
**name**
The name of the job to register. This is the function name
for zuul's job. eg build:some_job.
**plugin** (optional)
The plugin to use. Defaults to shell_task.
Any other variables the plugin may require for the job.
**plugins** (depreciated)
This is depreciated in favour of jobs (above).
A list of enabled plugins and their settings in a dictionary. A list of enabled plugins and their settings in a dictionary.
The only required parameters are *name*, which should be the The only required parameters are *name*, which should be the
same as the folder containing the plugin module, and same as the folder containing the plugin module, and

View File

@ -8,10 +8,19 @@ debug_log: /home/josh/var/log/turbo-hipster/debug.log
jobs_working_dir: /home/josh/var/lib/turbo-hipster/jobs jobs_working_dir: /home/josh/var/lib/turbo-hipster/jobs
git_working_dir: /home/josh/var/lib/turbo-hipster/git git_working_dir: /home/josh/var/lib/turbo-hipster/git
pip_download_cache: /home/josh/var/cache/pip pip_download_cache: /home/josh/var/cache/pip
jobs:
- name: build:real-db-upgrade_nova_mysql
datasets_dir: /home/josh/var/lib/turbo-hipster/datasets
plugin: real_db_upgrade
- name: build:some_shell_job
shell_script: /dev/null
# Legacy job definition as plugins
plugins: plugins:
- name: real_db_upgrade - name: real_db_upgrade
datasets_dir": /home/josh/var/lib/turbo-hipster/datasets datasets_dir: /var/lib/turbo-hipster/datasets_user_001
job: real-db-upgrade_nova_mysql function: build:real-db-upgrade_nova_mysql_user_001
publish_logs: publish_logs:
type: local type: local

View File

@ -9,6 +9,11 @@ jobs_working_dir: /var/lib/turbo-hipster/jobs
git_working_dir: /var/lib/turbo-hipster/git git_working_dir: /var/lib/turbo-hipster/git
pip_download_cache: /var/cache/pip pip_download_cache: /var/cache/pip
jobs:
- name: build:do_something_shelly
shell_script: 'ls -lah && echo'
# Legacy job definition as plugins
plugins: plugins:
- name: real_db_upgrade - name: real_db_upgrade
datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007 datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007
@ -18,10 +23,6 @@ plugins:
datasets_dir: /var/lib/turbo-hipster/datasets_user_001 datasets_dir: /var/lib/turbo-hipster/datasets_user_001
function: build:real-db-upgrade_nova_mysql_user_001 function: build:real-db-upgrade_nova_mysql_user_001
- name: shell_script
function: build:do_something_shelly
shell_script: 'ls -lah && echo'
publish_logs: publish_logs:
type: local type: local
path: /tmp/turbo-hipster/var/www/results/ path: /tmp/turbo-hipster/var/www/results/

View File

@ -25,9 +25,9 @@ from turbo_hipster.lib import models
class TestTaskStep(testtools.TestCase): class TestTaskStep(testtools.TestCase):
def test_task_step_decorator(self): def test_task_step_decorator(self):
class FakeTask(models.Task): class FakeTask(models.Task):
def __init__(self, global_config, plugin_config, job_name): def __init__(self, global_config, job_name, job_config):
super(FakeTask, self).__init__(global_config, plugin_config, super(FakeTask, self).__init__(global_config, job_name,
job_name) job_config)
# Define the number of steps we will do to determine our # Define the number of steps we will do to determine our
# progress. # progress.
self.total_steps = 2 self.total_steps = 2
@ -43,7 +43,7 @@ class TestTaskStep(testtools.TestCase):
def do_something_more(self): def do_something_more(self):
pass pass
task = FakeTask({}, {}, 'job_name') task = FakeTask({}, 'build:function', {})
task.job = fakes.FakeJob() task.job = fakes.FakeJob()
self.assertEqual(0, task.current_step) self.assertEqual(0, task.current_step)

View File

@ -93,7 +93,7 @@ class TestTaskRunner(base.TestWithGearman):
# Modify the job to fail. The git_path, job_working_dir and unqiue_id # Modify the job to fail. The git_path, job_working_dir and unqiue_id
# are all passed to the shell script. If we 'ls unique_id' it'll fail # are all passed to the shell script. If we 'ls unique_id' it'll fail
# since it doesn't exist. # since it doesn't exist.
self.config['plugins'][2]['shell_script'] = 'ls -lah' self.config['jobs'][0]['shell_script'] = 'ls -lah'
zuul.submit_job('build:do_something_shelly', data_req) zuul.submit_job('build:do_something_shelly', data_req)
zuul.wait_for_completion() zuul.wait_for_completion()

View File

@ -20,50 +20,125 @@ import fakes
class TestWorkerServer(base.TestWithGearman): class TestWorkerServer(base.TestWithGearman):
def test_plugins_load(self): def test_jobs_load_from_legacy_plugins(self):
"Test the configured plugins are loaded" "Test the configured plugins are loaded from legacy config.yaml layout"
self.start_server() self.start_server()
self.assertFalse(self.worker_server.stopped()) self.assertFalse(self.worker_server.stopped())
self.assertEqual(3, len(self.worker_server.plugins)) self.assertEqual(3, len(self.worker_server.jobs))
plugin0_config = { expected_jobs = {
"name": "real_db_upgrade", 'build:real-db-upgrade_nova_mysql_devstack_131007': {
"datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007", "name": "build:real-db-upgrade_nova_mysql_devstack_131007",
"function": "build:real-db-upgrade_nova_mysql_devstack_131007" "plugin": "real_db_upgrade",
} "runner_module_name": "turbo_hipster.task_plugins."
plugin1_config = { "real_db_upgrade.task",
"name": "real_db_upgrade", "plugin_config": {
"datasets_dir": "/var/lib/turbo-hipster/datasets_user_001", "name": "real_db_upgrade",
"function": "build:real-db-upgrade_nova_mysql_user_001" "datasets_dir": "/var/lib/turbo-hipster/"
} "datasets_devstack_131007",
plugin2_config = { "function": "build:real-db-upgrade_nova_mysql_devstack_"
"name": "shell_script", "131007"
"function": "build:do_something_shelly", },
"shell_script": "ls -lah && echo", },
'build:real-db-upgrade_nova_mysql_user_001': {
"name": "build:real-db-upgrade_nova_mysql_user_001",
"plugin": "real_db_upgrade",
"runner_module_name": "turbo_hipster.task_plugins."
"real_db_upgrade.task",
"plugin_config": {
"name": "real_db_upgrade",
"datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
"function": "build:real-db-upgrade_nova_mysql_user_001"
},
},
'build:do_something_shelly': {
"name": "build:do_something_shelly",
"plugin": "shell_script",
"runner_module_name": "turbo_hipster.task_plugins."
"shell_script.task",
"job_config": {
"name": "build:do_something_shelly",
"shell_script": "ls -lah && echo",
},
},
} }
self.assertEqual(plugin0_config, for job_name, job in self.worker_server.jobs.items():
self.worker_server.plugins[0]['plugin_config']) self.assertEqual(expected_jobs[job_name]['name'],
self.assertEqual( job['name'])
'turbo_hipster.task_plugins.real_db_upgrade.task', self.assertEqual(expected_jobs[job_name]['plugin'],
self.worker_server.plugins[0]['module'].__name__ job['plugin'])
) if 'plugin_config' in job:
self.assertEqual(expected_jobs[job_name]['plugin_config'],
job['plugin_config'])
if 'job_config' in job:
self.assertEqual(expected_jobs[job_name]['job_config'],
job['job_config'])
self.assertEqual(
expected_jobs[job_name]['runner_module_name'],
job['runner'].__module__
)
self.assertEqual(plugin1_config, def test_job_configuration(self):
self.worker_server.plugins[1]['plugin_config']) "Test config.yaml job layout"
self.assertEqual( self._load_config_fixture('config.yaml')
'turbo_hipster.task_plugins.real_db_upgrade.task', self.start_server()
self.worker_server.plugins[1]['module'].__name__
)
self.assertEqual(plugin2_config, self.assertFalse(self.worker_server.stopped())
self.worker_server.plugins[2]['plugin_config']) self.assertEqual(3, len(self.worker_server.jobs))
self.assertEqual(
'turbo_hipster.task_plugins.shell_script.task', expected_jobs = {
self.worker_server.plugins[2]['module'].__name__ 'build:real-db-upgrade_nova_mysql': {
) "name": "build:real-db-upgrade_nova_mysql",
"plugin": "real_db_upgrade",
"runner_module_name": "turbo_hipster.task_plugins."
"real_db_upgrade.task",
"job_config": {
"name": "build:real-db-upgrade_nova_mysql",
"plugin": "real_db_upgrade",
"datasets_dir": "/home/josh/var/lib/turbo-hipster/datasets"
},
},
'build:real-db-upgrade_nova_mysql_user_001': {
"name": "build:real-db-upgrade_nova_mysql_user_001",
"plugin": "real_db_upgrade",
"runner_module_name": "turbo_hipster.task_plugins."
"real_db_upgrade.task",
"plugin_config": {
"name": "real_db_upgrade",
"datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
"function": "build:real-db-upgrade_nova_mysql_user_001",
},
},
'build:some_shell_job': {
"name": "build:some_shell_job",
"plugin": "shell_script",
"runner_module_name": "turbo_hipster.task_plugins."
"shell_script.task",
"job_config": {
"name": "build:some_shell_job",
"shell_script": "/dev/null",
},
},
}
for job_name, job in self.worker_server.jobs.items():
self.assertEqual(expected_jobs[job_name]['name'],
job['name'])
self.assertEqual(expected_jobs[job_name]['plugin'],
job['plugin'])
if 'plugin_config' in job:
self.assertEqual(expected_jobs[job_name]['plugin_config'],
job['plugin_config'])
if 'job_config' in job:
self.assertEqual(expected_jobs[job_name]['job_config'],
job['job_config'])
self.assertEqual(
expected_jobs[job_name]['runner_module_name'],
job['runner'].__module__
)
def test_zuul_client_started(self): def test_zuul_client_started(self):
"Test the zuul client has been started" "Test the zuul client has been started"

View File

@ -19,6 +19,7 @@ import logging
import os import os
import pkg_resources import pkg_resources
import socket import socket
import uuid
from turbo_hipster.lib import common from turbo_hipster.lib import common
from turbo_hipster.lib import utils from turbo_hipster.lib import utils
@ -28,9 +29,13 @@ class Task(object):
""" A base object for running a job (aka Task) """ """ A base object for running a job (aka Task) """
log = logging.getLogger("task") log = logging.getLogger("task")
def __init__(self, worker_server, plugin_config, job_name): def __init__(self, worker_server, job_name, job_config):
# TODO(jhesketh): remove the need for worker_server here
self.worker_server = worker_server self.worker_server = worker_server
self.plugin_config = plugin_config # NOTE(jhesketh): job_config may be in the old format where name
# refers to the plugin and function is the job name. Thus these should
# never be used in a job, instead use the provided job_name.
self.job_config = job_config
self.job_name = job_name self.job_name = job_name
self._reset() self._reset()
@ -52,16 +57,16 @@ class Task(object):
self.messages = [] self.messages = []
self.current_step = 0 self.current_step = 0
self.log_handler = None self.log_handler = None
self.th_uuid = str(uuid.uuid4())[-12:]
def _prep_working_dir(self): def _prep_working_dir(self):
self.job_identifier = utils.determine_job_identifier( # Use the th_uuid so that if the same job is somehow taken twice from
self.job_arguments, # zuul we won't re-use zuul's uuid. This shouldn't happen but if it
self.plugin_config['function'], # does it prevents overwriting previous results
self.job.unique
)
self.job_working_dir = os.path.join( self.job_working_dir = os.path.join(
self.worker_server.config['jobs_working_dir'], self.worker_server.config['jobs_working_dir'],
self.job_identifier self.th_uuid,
self.job_arguments['LOG_PATH']
) )
self.job_results_dir = os.path.join( self.job_results_dir = os.path.join(
self.job_working_dir, self.job_working_dir,
@ -221,7 +226,7 @@ class Task(object):
if 'publish_logs' in self.worker_server.config: if 'publish_logs' in self.worker_server.config:
index_url = utils.push_file( index_url = utils.push_file(
self.job_identifier, self.job_results_dir, self.job_arguments['LOG_PATH'], self.job_results_dir,
self.worker_server.config['publish_logs']) self.worker_server.config['publish_logs'])
self.log.debug("Index URL found at %s" % index_url) self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url self.work_data['url'] = index_url
@ -229,14 +234,14 @@ class Task(object):
if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments: if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments:
# Upload to zuul's url as instructed # Upload to zuul's url as instructed
utils.zuul_swift_upload(self.job_working_dir, self.job_arguments) utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
self.work_data['url'] = self.job_identifier self.work_data['url'] = self.job_arguments['LOG_PATH']
class ShellTask(Task): class ShellTask(Task):
log = logging.getLogger("task.shell_task") log = logging.getLogger("task.shell_task")
def __init__(self, worker_server, plugin_config, job_name): def __init__(self, worker_server, job_name, job_config):
super(ShellTask, self).__init__(worker_server, plugin_config, job_name) super(ShellTask, self).__init__(worker_server, job_name, job_config)
# Define the number of steps we will do to determine our progress. # Define the number of steps we will do to determine our progress.
self.total_steps = 5 self.total_steps = 5
@ -285,7 +290,7 @@ class ShellTask(Task):
self.log.debug("Grab the patchset we want to test against") self.log.debug("Grab the patchset we want to test against")
local_path = os.path.join(self.worker_server.config['git_working_dir'], local_path = os.path.join(self.worker_server.config['git_working_dir'],
self.job_name, job_args['ZUUL_PROJECT']) self.th_uuid, job_args['ZUUL_PROJECT'])
if not os.path.exists(local_path): if not os.path.exists(local_path):
os.makedirs(local_path) os.makedirs(local_path)
@ -305,7 +310,7 @@ class ShellTask(Task):
@common.task_step @common.task_step
def _execute_script(self): def _execute_script(self):
# Run script # Run script
cmd = self.plugin_config['shell_script'] cmd = self.job_config['shell_script']
cmd += ( cmd += (
(' %(git_path)s %(job_working_dir)s %(unique_id)s') (' %(git_path)s %(job_working_dir)s %(unique_id)s')
% { % {
@ -339,8 +344,8 @@ class ShellTask(Task):
def _handle_cleanup(self): def _handle_cleanup(self):
"""Handle and cleanup functions. Shutdown if requested to so that no """Handle and cleanup functions. Shutdown if requested to so that no
further jobs are ran if the environment is dirty.""" further jobs are ran if the environment is dirty."""
if ('shutdown-th' in self.plugin_config and if ('shutdown-th' in self.job_config and
self.plugin_config['shutdown-th']): self.job_config['shutdown-th']):
self.worker_server.shutdown_gracefully() self.worker_server.shutdown_gracefully()
@common.task_step @common.task_step

View File

@ -263,12 +263,6 @@ def scp_push_file(results_set_name, file_path, local_config):
pass pass
def determine_job_identifier(zuul_arguments, job, unique):
# use new determined path from zuul
path = zuul_arguments['LOG_PATH']
return path
def zuul_swift_upload(file_path, job_arguments): def zuul_swift_upload(file_path, job_arguments):
"""Upload working_dir to swift as per zuul's instructions""" """Upload working_dir to swift as per zuul's instructions"""
# NOTE(jhesketh): Zuul specifies an object prefix in the destination so # NOTE(jhesketh): Zuul specifies an object prefix in the destination so

View File

@ -40,8 +40,8 @@ class Runner(models.ShellTask):
log = logging.getLogger("task.real_db_upgrade") log = logging.getLogger("task.real_db_upgrade")
def __init__(self, worker_server, plugin_config, job_name): def __init__(self, worker_server, job_name, job_config):
super(Runner, self).__init__(worker_server, plugin_config, job_name) super(Runner, self).__init__(worker_server, job_name, job_config)
# Set up the runner worker # Set up the runner worker
self.datasets = [] self.datasets = []
@ -69,10 +69,7 @@ class Runner(models.ShellTask):
if (self.job_arguments['ZUUL_PROJECT'] == if (self.job_arguments['ZUUL_PROJECT'] ==
dataset['config']['project'] and dataset['config']['project'] and
self._get_project_command(dataset['config']['type'])): self._get_project_command(dataset['config']['type'])):
dataset['determined_path'] = utils.determine_job_identifier( dataset['determined_path'] = self.job_arguments['LOG_PATH']
self.job_arguments, self.plugin_config['function'],
self.job.unique
)
dataset['job_log_file_path'] = os.path.join( dataset['job_log_file_path'] = os.path.join(
self.worker_server.config['jobs_working_dir'], self.worker_server.config['jobs_working_dir'],
dataset['determined_path'], dataset['determined_path'],
@ -129,7 +126,7 @@ class Runner(models.ShellTask):
if len(self.datasets) > 0: if len(self.datasets) > 0:
return self.datasets return self.datasets
datasets_path = self.plugin_config['datasets_dir'] datasets_path = self.job_config['datasets_dir']
for ent in os.listdir(datasets_path): for ent in os.listdir(datasets_path):
dataset_dir = os.path.join(datasets_path, ent) dataset_dir = os.path.join(datasets_path, ent)
if (os.path.isdir(dataset_dir) and os.path.isfile( if (os.path.isdir(dataset_dir) and os.path.isfile(

View File

@ -47,15 +47,14 @@ class Server(threading.Thread):
# Config init # Config init
self.zuul_manager = None self.zuul_manager = None
self.zuul_client = None self.zuul_client = None
self.plugins = []
self.services_started = False self.services_started = False
# TODO: Make me unique (random?) and we should be able to run multiple # TODO: Make me unique (random?) and we should be able to run multiple
# instances of turbo-hipster on the one host # instances of turbo-hipster on the one host
self.worker_name = os.uname()[1] self.worker_name = os.uname()[1]
self.tasks = {} self.jobs = {}
self.load_plugins() self.load_jobs()
def load_extra_configuration(self): def load_extra_configuration(self):
if isdir(self.config["conf_d"]): if isdir(self.config["conf_d"]):
@ -84,41 +83,71 @@ class Server(threading.Thread):
filename=log_file, filename=log_file,
level=logging.DEBUG) level=logging.DEBUG)
def load_jobs(self):
# Legacy, load the plugins first
self.load_plugins()
self.log.debug("Loading jobs")
if 'jobs' in self.config:
for job in self.config['jobs']:
try:
plugin = 'shell_script'
if 'plugin' in job:
plugin = job['plugin']
module = __import__('turbo_hipster.task_plugins.' +
plugin + '.task',
fromlist='turbo_hipster.task_plugins' +
plugin)
self.jobs[job['name']] = {
'name': job['name'],
'plugin': plugin,
'job_config': job,
'runner': module.Runner(self, job['name'], job),
}
self.log.debug('Job %s loaded' % job['name'])
except Exception as e:
self.log.exception("Failure loading job")
self.log.exception(e)
def load_plugins(self): def load_plugins(self):
""" Load the available plugins from task_plugins """ """ Load the available plugins from task_plugins """
self.log.debug('Loading plugins') self.log.debug('Loading plugins')
# Load plugins # Load plugins
for plugin in self.config['plugins']: if 'plugins' in self.config:
self.plugins.append({ for plugin in self.config['plugins']:
'module': __import__('turbo_hipster.task_plugins.' + try:
plugin['name'] + '.task', module = __import__('turbo_hipster.task_plugins.' +
fromlist='turbo_hipster.task_plugins' + plugin['name'] + '.task',
plugin['name']), fromlist='turbo_hipster.task_plugins' +
'plugin_config': plugin plugin['name'])
})
self.log.debug('Plugin %s loaded' % plugin['name']) self.jobs[plugin['function']] = {
'name': plugin['function'],
'plugin': plugin['name'],
'plugin_config': plugin,
'runner': module.Runner(
self, plugin['function'], plugin
),
}
self.log.debug('Job %s loaded' % plugin['function'])
except Exception as e:
self.log.exception("Failure loading plugin")
self.log.exception(e)
def start_zuul_client(self): def start_zuul_client(self):
""" Run the tasks """ """ Run the tasks """
self.log.debug('Starting zuul client') self.log.debug('Starting zuul client')
self.zuul_client = worker_manager.ZuulClient(self) self.zuul_client = worker_manager.ZuulClient(self)
for task_number, plugin in enumerate(self.plugins): for job in self.jobs.values():
module = plugin['module'] self.zuul_client.add_function(job['name'], job['runner'])
job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
self.worker_name, task_number)
self.tasks[job_name] = module.Runner(
self,
plugin['plugin_config'],
job_name
)
self.zuul_client.add_function(plugin['plugin_config']['function'],
self.tasks[job_name])
self.zuul_client.start() self.zuul_client.start()
def start_zuul_manager(self): def start_zuul_manager(self):
self.zuul_manager = worker_manager.ZuulManager(self, self.tasks) self.zuul_manager = worker_manager.ZuulManager(self, self.jobs)
self.zuul_manager.start() self.zuul_manager.start()
def shutdown_gracefully(self): def shutdown_gracefully(self):