diff --git a/conductor/bin/app.py b/conductor/bin/app.py
index 3c2619b..8c74b5c 100644
--- a/conductor/bin/app.py
+++ b/conductor/bin/app.py
@@ -1,3 +1,37 @@
#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
-from conductor import app
\ No newline at end of file
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+
+
+from conductor import config
+from conductor.openstack.common import log
+from conductor.openstack.common import service
+from conductor.app import ConductorWorkflowService
+
+if __name__ == '__main__':
+ try:
+ config.parse_args()
+ log.setup('conductor')
+ launcher = service.ServiceLauncher()
+ launcher.launch_service(ConductorWorkflowService())
+ launcher.wait()
+ except RuntimeError, e:
+ sys.stderr.write("ERROR: %s\n" % e)
+ sys.exit(1)
diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py
index f4cf9a7..52d8b2a 100644
--- a/conductor/conductor/app.py
+++ b/conductor/conductor/app.py
@@ -1,64 +1,69 @@
import datetime
import glob
import json
-import time
import sys
-import tornado.ioloop
-import rabbitmq
+from conductor.openstack.common import service
from workflow import Workflow
import cloud_formation
import windows_agent
from commands.dispatcher import CommandDispatcher
from config import Config
import reporting
+import rabbitmq
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
-rmqclient = rabbitmq.RabbitMqClient(
- virtual_host=config.get_setting('rabbitmq', 'vhost', '/'),
- login=config.get_setting('rabbitmq', 'login', 'guest'),
- password=config.get_setting('rabbitmq', 'password', 'guest'),
- host=config.get_setting('rabbitmq', 'host', 'localhost'))
-
-
-def schedule(callback, *args, **kwargs):
- tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 0.1,
- lambda args=args, kwargs=kwargs: callback(*args, **kwargs))
-
-
def task_received(task, message_id):
- print 'Starting at', datetime.datetime.now()
- reporter = reporting.Reporter(rmqclient, message_id, task['id'])
+ with rabbitmq.RmqClient() as rmqclient:
+ print 'Starting at', datetime.datetime.now()
+ reporter = reporting.Reporter(rmqclient, message_id, task['id'])
- command_dispatcher = CommandDispatcher(task['name'], rmqclient)
- workflows = []
- for path in glob.glob("data/workflows/*.xml"):
- print "loading", path
- workflow = Workflow(path, task, command_dispatcher, config, reporter)
- workflows.append(workflow)
+ command_dispatcher = CommandDispatcher(task['name'], rmqclient)
+ workflows = []
+ for path in glob.glob("data/workflows/*.xml"):
+ print "loading", path
+ workflow = Workflow(path, task, command_dispatcher, config, reporter)
+ workflows.append(workflow)
- def loop(callback):
- for workflow in workflows:
- workflow.execute()
- if not command_dispatcher.execute_pending(lambda: schedule(loop, callback)):
- callback()
+ while True:
+ for workflow in workflows:
+ workflow.execute()
+ if not command_dispatcher.execute_pending():
+ break
- def shutdown():
command_dispatcher.close()
- rmqclient.send('task-results', json.dumps(task), message_id=message_id)
+ result_msg = rabbitmq.Message()
+ result_msg.body = task
+ result_msg.id = message_id
+
+ rmqclient.send(message=result_msg, key='task-results')
print 'Finished at', datetime.datetime.now()
- loop(shutdown)
-def message_received(body, message_id, **kwargs):
- task_received(json.loads(body), message_id)
+class ConductorWorkflowService(service.Service):
+ def __init__(self):
+ super(ConductorWorkflowService, self).__init__()
+ def start(self):
+ super(ConductorWorkflowService, self).start()
+ self.tg.add_thread(self._start_rabbitmq)
-def start():
- rmqclient.subscribe("tasks", message_received)
+ def stop(self):
+ super(ConductorWorkflowService, self).stop()
-rmqclient.start(start)
-tornado.ioloop.IOLoop.instance().start()
+ def _start_rabbitmq(self):
+ while True:
+ try:
+ with rabbitmq.RmqClient() as rmq:
+ rmq.declare('tasks', 'tasks')
+ rmq.declare('task-results', 'tasks')
+ with rmq.open('tasks') as subscription:
+ while True:
+ msg = subscription.get_message()
+ self.tg.add_thread(
+ task_received, msg.body, msg.id)
+ except Exception as ex:
+ print ex
diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py
index 3ef3b41..1642714 100644
--- a/conductor/conductor/cloud_formation.py
+++ b/conductor/conductor/cloud_formation.py
@@ -22,9 +22,13 @@ def prepare_user_data(context, template='Default', **kwargs):
with open('data/templates/agent-config/%s.template'
% template) as template_file:
init_script = init_script_file.read()
- template_data = template_file.read().replace(
+ template_data = template_file.read()
+ template_data = template_data.replace(
'%RABBITMQ_HOST%',
config.get_setting('rabbitmq', 'host') or 'localhost')
+ template_data = template_data.replace(
+ '%RESULT_QUEUE%',
+ '-execution-results-%s' % str(context['/dataSource']['name']))
return init_script.replace(
'%WINDOWS_AGENT_CONFIG_BASE64%',
diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py
index 0d12083..d0225ed 100644
--- a/conductor/conductor/commands/cloud_formation.py
+++ b/conductor/conductor/commands/cloud_formation.py
@@ -1,4 +1,4 @@
-import json
+import anyjson
import os
import uuid
@@ -17,7 +17,7 @@ class HeatExecutor(CommandBase):
template_data = template_file.read()
template_data = conductor.helpers.transform_json(
- json.loads(template_data), mappings)
+ anyjson.loads(template_data), mappings)
self._pending_list.append({
'template': template_data,
@@ -28,7 +28,7 @@ class HeatExecutor(CommandBase):
def has_pending_commands(self):
return len(self._pending_list) > 0
- def execute_pending(self, callback):
+ def execute_pending(self):
if not self.has_pending_commands():
return False
@@ -40,7 +40,7 @@ class HeatExecutor(CommandBase):
arguments = conductor.helpers.merge_dicts(
arguments, t['arguments'], max_levels=1)
- print 'Executing heat template', json.dumps(template), \
+ print 'Executing heat template', anyjson.dumps(template), \
'with arguments', arguments, 'on stack', self._stack
if not os.path.exists("tmp"):
@@ -48,28 +48,23 @@ class HeatExecutor(CommandBase):
file_name = "tmp/" + str(uuid.uuid4())
print "Saving template to", file_name
with open(file_name, "w") as f:
- f.write(json.dumps(template))
+ f.write(anyjson.dumps(template))
arguments_str = ';'.join(['%s=%s' % (key, value)
for (key, value) in arguments.items()])
- call([
- "./heat_run", "stack-create",
- "-f" + file_name,
- "-P" + arguments_str,
- self._stack
- ])
-
-
- callbacks = []
- for t in self._pending_list:
- if t['callback']:
- callbacks.append(t['callback'])
+ # call([
+ # "./heat_run", "stack-create",
+ # "-f" + file_name,
+ # "-P" + arguments_str,
+ # self._stack
+ # ])
+ pending_list = self._pending_list
self._pending_list = []
- for cb in callbacks:
- cb(True)
+ for item in pending_list:
+ item['callback'](True)
+
- callback()
return True
diff --git a/conductor/conductor/commands/command.py b/conductor/conductor/commands/command.py
index ca9d144..ad2d469 100644
--- a/conductor/conductor/commands/command.py
+++ b/conductor/conductor/commands/command.py
@@ -2,7 +2,7 @@ class CommandBase(object):
def execute(self, **kwargs):
pass
- def execute_pending(self, callback):
+ def execute_pending(self):
return False
def has_pending_commands(self):
diff --git a/conductor/conductor/commands/dispatcher.py b/conductor/conductor/commands/dispatcher.py
index b815ddb..606266e 100644
--- a/conductor/conductor/commands/dispatcher.py
+++ b/conductor/conductor/commands/dispatcher.py
@@ -4,33 +4,22 @@ import windows_agent
class CommandDispatcher(command.CommandBase):
- def __init__(self, environment_name, rmqclient):
+ def __init__(self, environment_id, rmqclient):
self._command_map = {
- 'cf': cloud_formation.HeatExecutor(environment_name),
+ 'cf': cloud_formation.HeatExecutor(environment_id),
'agent': windows_agent.WindowsAgentExecutor(
- environment_name, rmqclient)
+ environment_id, rmqclient, environment_id)
}
def execute(self, name, **kwargs):
self._command_map[name].execute(**kwargs)
- def execute_pending(self, callback):
- result = 0
- count = [0]
-
- def on_result():
- count[0] -= 1
- if not count[0]:
- callback()
-
+ def execute_pending(self):
+ result = False
for command in self._command_map.values():
- count[0] += 1
- result += 1
- if not command.execute_pending(on_result):
- count[0] -= 1
- result -= 1
+ result |= command.execute_pending()
- return result > 0
+ return result
def has_pending_commands(self):
@@ -40,6 +29,7 @@ class CommandDispatcher(command.CommandBase):
return result
+
def close(self):
for t in self._command_map.values():
t.close()
diff --git a/conductor/conductor/commands/windows_agent.py b/conductor/conductor/commands/windows_agent.py
index c4747b6..978ddd2 100644
--- a/conductor/conductor/commands/windows_agent.py
+++ b/conductor/conductor/commands/windows_agent.py
@@ -1,66 +1,60 @@
import json
import uuid
+from conductor.rabbitmq import Message
import conductor.helpers
from command import CommandBase
class WindowsAgentExecutor(CommandBase):
- def __init__(self, stack, rmqclient):
+ def __init__(self, stack, rmqclient, environment):
self._stack = stack
self._rmqclient = rmqclient
- self._callback = None
self._pending_list = []
- self._current_pending_list = []
- rmqclient.subscribe('-execution-results', self._on_message)
+ self._results_queue = '-execution-results-%s' % str(environment)
+ rmqclient.declare(self._results_queue)
def execute(self, template, mappings, host, callback):
with open('data/templates/agent/%s.template' %
template) as template_file:
template_data = template_file.read()
- template_data = json.dumps(conductor.helpers.transform_json(
- json.loads(template_data), mappings))
+ template_data = conductor.helpers.transform_json(
+ json.loads(template_data), mappings)
+ id = str(uuid.uuid4()).lower()
+ host = ('%s-%s' % (self._stack, host)).lower().replace(' ', '-')
self._pending_list.append({
- 'id': str(uuid.uuid4()).lower(),
- 'template': template_data,
- 'host': ('%s-%s' % (self._stack, host)).lower().replace(' ', '-'),
+ 'id': id,
'callback': callback
})
- def _on_message(self, body, message_id, **kwargs):
- msg_id = message_id.lower()
- item, index = conductor.helpers.find(lambda t: t['id'] == msg_id,
- self._current_pending_list)
- if item:
- self._current_pending_list.pop(index)
- item['callback'](json.loads(body))
- if self._callback and not self._current_pending_list:
- cb = self._callback
- self._callback = None
- cb()
+ msg = Message()
+ msg.body = template_data
+ msg.id = id
+ self._rmqclient.declare(host)
+ self._rmqclient.send(message=msg, key=host)
+ print 'Sending RMQ message %s to %s' % (
+ template_data, host)
def has_pending_commands(self):
return len(self._pending_list) > 0
- def execute_pending(self, callback):
+ def execute_pending(self):
if not self.has_pending_commands():
return False
- self._current_pending_list = self._pending_list
- self._pending_list = []
+ with self._rmqclient.open(self._results_queue) as subscription:
+ while self.has_pending_commands():
+ msg = subscription.get_message()
+ msg_id = msg.id.lower()
+ item, index = conductor.helpers.find(
+ lambda t: t['id'] == msg_id, self._pending_list)
+ if item:
+ self._pending_list.pop(index)
+ item['callback'](msg.body)
- self._callback = callback
-
- for rec in self._current_pending_list:
- self._rmqclient.send(
- queue=rec['host'], data=rec['template'], message_id=rec['id'])
- print 'Sending RMQ message %s to %s' % (
- rec['template'], rec['host'])
return True
- def close(self):
- self._rmqclient.unsubscribe('-execution-results')
diff --git a/conductor/conductor/config.py b/conductor/conductor/config.py
index 881d4ad..1e42cad 100644
--- a/conductor/conductor/config.py
+++ b/conductor/conductor/config.py
@@ -1,5 +1,193 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Routines for configuring Glance
+"""
+
+import logging
+import logging.config
+import logging.handlers
+import os
+import sys
+
+from oslo.config import cfg
+from paste import deploy
+
+from conductor.version import version_info as version
from ConfigParser import SafeConfigParser
+paste_deploy_opts = [
+ cfg.StrOpt('flavor'),
+ cfg.StrOpt('config_file'),
+]
+
+rabbit_opts = [
+ cfg.StrOpt('host', default='localhost'),
+ cfg.IntOpt('port', default=5672),
+ cfg.StrOpt('login', default='guest'),
+ cfg.StrOpt('password', default='guest'),
+ cfg.StrOpt('virtual_host', default='/'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(paste_deploy_opts, group='paste_deploy')
+CONF.register_opts(rabbit_opts, group='rabbitmq')
+
+
+CONF.import_opt('verbose', 'conductor.openstack.common.log')
+CONF.import_opt('debug', 'conductor.openstack.common.log')
+CONF.import_opt('log_dir', 'conductor.openstack.common.log')
+CONF.import_opt('log_file', 'conductor.openstack.common.log')
+CONF.import_opt('log_config', 'conductor.openstack.common.log')
+CONF.import_opt('log_format', 'conductor.openstack.common.log')
+CONF.import_opt('log_date_format', 'conductor.openstack.common.log')
+CONF.import_opt('use_syslog', 'conductor.openstack.common.log')
+CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log')
+
+
+def parse_args(args=None, usage=None, default_config_files=None):
+ CONF(args=args,
+ project='conductor',
+ version=version.cached_version_string(),
+ usage=usage,
+ default_config_files=default_config_files)
+
+def setup_logging():
+ """
+ Sets up the logging options for a log with supplied name
+ """
+
+ if CONF.log_config:
+ # Use a logging configuration file for all settings...
+ if os.path.exists(CONF.log_config):
+ logging.config.fileConfig(CONF.log_config)
+ return
+ else:
+ raise RuntimeError("Unable to locate specified logging "
+ "config file: %s" % CONF.log_config)
+
+ root_logger = logging.root
+ if CONF.debug:
+ root_logger.setLevel(logging.DEBUG)
+ elif CONF.verbose:
+ root_logger.setLevel(logging.INFO)
+ else:
+ root_logger.setLevel(logging.WARNING)
+
+ formatter = logging.Formatter(CONF.log_format, CONF.log_date_format)
+
+ if CONF.use_syslog:
+ try:
+ facility = getattr(logging.handlers.SysLogHandler,
+ CONF.syslog_log_facility)
+ except AttributeError:
+ raise ValueError(_("Invalid syslog facility"))
+
+ handler = logging.handlers.SysLogHandler(address='/dev/log',
+ facility=facility)
+ elif CONF.log_file:
+ logfile = CONF.log_file
+ if CONF.log_dir:
+ logfile = os.path.join(CONF.log_dir, logfile)
+ handler = logging.handlers.WatchedFileHandler(logfile)
+ else:
+ handler = logging.StreamHandler(sys.stdout)
+
+ handler.setFormatter(formatter)
+ root_logger.addHandler(handler)
+
+
+def _get_deployment_flavor():
+ """
+ Retrieve the paste_deploy.flavor config item, formatted appropriately
+ for appending to the application name.
+ """
+ flavor = CONF.paste_deploy.flavor
+ return '' if not flavor else ('-' + flavor)
+
+
+def _get_paste_config_path():
+ paste_suffix = '-paste.ini'
+ conf_suffix = '.conf'
+ if CONF.config_file:
+ # Assume paste config is in a paste.ini file corresponding
+ # to the last config file
+ path = CONF.config_file[-1].replace(conf_suffix, paste_suffix)
+ else:
+ path = CONF.prog + '-paste.ini'
+ return CONF.find_file(os.path.basename(path))
+
+
+def _get_deployment_config_file():
+ """
+ Retrieve the deployment_config_file config item, formatted as an
+ absolute pathname.
+ """
+ path = CONF.paste_deploy.config_file
+ if not path:
+ path = _get_paste_config_path()
+ if not path:
+ msg = "Unable to locate paste config file for %s." % CONF.prog
+ raise RuntimeError(msg)
+ return os.path.abspath(path)
+
+
+def load_paste_app(app_name=None):
+ """
+ Builds and returns a WSGI app from a paste config file.
+
+ We assume the last config file specified in the supplied ConfigOpts
+ object is the paste config file.
+
+ :param app_name: name of the application to load
+
+ :raises RuntimeError when config file cannot be located or application
+ cannot be loaded from config file
+ """
+ if app_name is None:
+ app_name = CONF.prog
+
+ # append the deployment flavor to the application name,
+ # in order to identify the appropriate paste pipeline
+ app_name += _get_deployment_flavor()
+
+ conf_file = _get_deployment_config_file()
+
+ try:
+ logger = logging.getLogger(__name__)
+ logger.debug(_("Loading %(app_name)s from %(conf_file)s"),
+ {'conf_file': conf_file, 'app_name': app_name})
+
+ app = deploy.loadapp("config:%s" % conf_file, name=app_name)
+
+ # Log the options used when starting if we're in debug mode...
+ if CONF.debug:
+ CONF.log_opt_values(logger, logging.DEBUG)
+
+ return app
+ except (LookupError, ImportError), e:
+ msg = _("Unable to load %(app_name)s from "
+ "configuration file %(conf_file)s."
+ "\nGot: %(e)r") % locals()
+ logger.error(msg)
+ raise RuntimeError(msg)
+
class Config(object):
CONFIG_PATH = './etc/app.config'
diff --git a/conductor/conductor/openstack/__init__.py b/conductor/conductor/openstack/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/conductor/conductor/openstack/common/__init__.py b/conductor/conductor/openstack/common/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/conductor/conductor/openstack/common/eventlet_backdoor.py b/conductor/conductor/openstack/common/eventlet_backdoor.py
new file mode 100644
index 0000000..7605e26
--- /dev/null
+++ b/conductor/conductor/openstack/common/eventlet_backdoor.py
@@ -0,0 +1,87 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack Foundation.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gc
+import pprint
+import sys
+import traceback
+
+import eventlet
+import eventlet.backdoor
+import greenlet
+from oslo.config import cfg
+
+eventlet_backdoor_opts = [
+ cfg.IntOpt('backdoor_port',
+ default=None,
+ help='port for eventlet backdoor to listen')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(eventlet_backdoor_opts)
+
+
+def _dont_use_this():
+ print "Don't use this, just disconnect instead"
+
+
+def _find_objects(t):
+ return filter(lambda o: isinstance(o, t), gc.get_objects())
+
+
+def _print_greenthreads():
+ for i, gt in enumerate(_find_objects(greenlet.greenlet)):
+ print i, gt
+ traceback.print_stack(gt.gr_frame)
+ print
+
+
+def _print_nativethreads():
+ for threadId, stack in sys._current_frames().items():
+ print threadId
+ traceback.print_stack(stack)
+ print
+
+
+def initialize_if_enabled():
+ backdoor_locals = {
+ 'exit': _dont_use_this, # So we don't exit the entire process
+ 'quit': _dont_use_this, # So we don't exit the entire process
+ 'fo': _find_objects,
+ 'pgt': _print_greenthreads,
+ 'pnt': _print_nativethreads,
+ }
+
+ if CONF.backdoor_port is None:
+ return None
+
+ # NOTE(johannes): The standard sys.displayhook will print the value of
+ # the last expression and set it to __builtin__._, which overwrites
+ # the __builtin__._ that gettext sets. Let's switch to using pprint
+ # since it won't interact poorly with gettext, and it's easier to
+ # read the output too.
+ def displayhook(val):
+ if val is not None:
+ pprint.pprint(val)
+ sys.displayhook = displayhook
+
+ sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ port = sock.getsockname()[1]
+ eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
+ locals=backdoor_locals)
+ return port
diff --git a/conductor/conductor/openstack/common/exception.py b/conductor/conductor/openstack/common/exception.py
new file mode 100644
index 0000000..beabfcd
--- /dev/null
+++ b/conductor/conductor/openstack/common/exception.py
@@ -0,0 +1,142 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Exceptions common to OpenStack projects
+"""
+
+import logging
+
+from conductor.openstack.common.gettextutils import _
+
+_FATAL_EXCEPTION_FORMAT_ERRORS = False
+
+
+class Error(Exception):
+ def __init__(self, message=None):
+ super(Error, self).__init__(message)
+
+
+class ApiError(Error):
+ def __init__(self, message='Unknown', code='Unknown'):
+ self.message = message
+ self.code = code
+ super(ApiError, self).__init__('%s: %s' % (code, message))
+
+
+class NotFound(Error):
+ pass
+
+
+class UnknownScheme(Error):
+
+ msg = "Unknown scheme '%s' found in URI"
+
+ def __init__(self, scheme):
+ msg = self.__class__.msg % scheme
+ super(UnknownScheme, self).__init__(msg)
+
+
+class BadStoreUri(Error):
+
+ msg = "The Store URI %s was malformed. Reason: %s"
+
+ def __init__(self, uri, reason):
+ msg = self.__class__.msg % (uri, reason)
+ super(BadStoreUri, self).__init__(msg)
+
+
+class Duplicate(Error):
+ pass
+
+
+class NotAuthorized(Error):
+ pass
+
+
+class NotEmpty(Error):
+ pass
+
+
+class Invalid(Error):
+ pass
+
+
+class BadInputError(Exception):
+ """Error resulting from a client sending bad input to a server"""
+ pass
+
+
+class MissingArgumentError(Error):
+ pass
+
+
+class DatabaseMigrationError(Error):
+ pass
+
+
+class ClientConnectionError(Exception):
+ """Error resulting from a client connecting to a server"""
+ pass
+
+
+def wrap_exception(f):
+ def _wrap(*args, **kw):
+ try:
+ return f(*args, **kw)
+ except Exception, e:
+ if not isinstance(e, Error):
+ #exc_type, exc_value, exc_traceback = sys.exc_info()
+ logging.exception(_('Uncaught exception'))
+ #logging.error(traceback.extract_stack(exc_traceback))
+ raise Error(str(e))
+ raise
+ _wrap.func_name = f.func_name
+ return _wrap
+
+
+class OpenstackException(Exception):
+ """
+ Base Exception
+
+ To correctly use this class, inherit from it and define
+ a 'message' property. That message will get printf'd
+ with the keyword arguments provided to the constructor.
+ """
+ message = "An unknown exception occurred"
+
+ def __init__(self, **kwargs):
+ try:
+ self._error_string = self.message % kwargs
+
+ except Exception as e:
+ if _FATAL_EXCEPTION_FORMAT_ERRORS:
+ raise e
+ else:
+ # at least get the core message out if something happened
+ self._error_string = self.message
+
+ def __str__(self):
+ return self._error_string
+
+
+class MalformedRequestBody(OpenstackException):
+ message = "Malformed message body: %(reason)s"
+
+
+class InvalidContentType(OpenstackException):
+ message = "Invalid content type %(content_type)s"
diff --git a/conductor/conductor/openstack/common/gettextutils.py b/conductor/conductor/openstack/common/gettextutils.py
new file mode 100644
index 0000000..000d23b
--- /dev/null
+++ b/conductor/conductor/openstack/common/gettextutils.py
@@ -0,0 +1,33 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+gettext for openstack-common modules.
+
+Usual usage in an openstack.common module:
+
+ from conductor.openstack.common.gettextutils import _
+"""
+
+import gettext
+
+
+t = gettext.translation('openstack-common', 'locale', fallback=True)
+
+
+def _(msg):
+ return t.ugettext(msg)
diff --git a/conductor/conductor/openstack/common/importutils.py b/conductor/conductor/openstack/common/importutils.py
new file mode 100644
index 0000000..6862ca8
--- /dev/null
+++ b/conductor/conductor/openstack/common/importutils.py
@@ -0,0 +1,67 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Import related utilities and helper functions.
+"""
+
+import sys
+import traceback
+
+
+def import_class(import_str):
+ """Returns a class from a string including module and class"""
+ mod_str, _sep, class_str = import_str.rpartition('.')
+ try:
+ __import__(mod_str)
+ return getattr(sys.modules[mod_str], class_str)
+ except (ValueError, AttributeError):
+ raise ImportError('Class %s cannot be found (%s)' %
+ (class_str,
+ traceback.format_exception(*sys.exc_info())))
+
+
+def import_object(import_str, *args, **kwargs):
+ """Import a class and return an instance of it."""
+ return import_class(import_str)(*args, **kwargs)
+
+
+def import_object_ns(name_space, import_str, *args, **kwargs):
+ """
+ Import a class and return an instance of it, first by trying
+ to find the class in a default namespace, then failing back to
+ a full path if not found in the default namespace.
+ """
+ import_value = "%s.%s" % (name_space, import_str)
+ try:
+ return import_class(import_value)(*args, **kwargs)
+ except ImportError:
+ return import_class(import_str)(*args, **kwargs)
+
+
+def import_module(import_str):
+ """Import a module."""
+ __import__(import_str)
+ return sys.modules[import_str]
+
+
+def try_import(import_str, default=None):
+ """Try to import a module and if it fails return default."""
+ try:
+ return import_module(import_str)
+ except ImportError:
+ return default
diff --git a/conductor/conductor/openstack/common/jsonutils.py b/conductor/conductor/openstack/common/jsonutils.py
new file mode 100644
index 0000000..c281b6a
--- /dev/null
+++ b/conductor/conductor/openstack/common/jsonutils.py
@@ -0,0 +1,141 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''
+JSON related utilities.
+
+This module provides a few things:
+
+ 1) A handy function for getting an object down to something that can be
+ JSON serialized. See to_primitive().
+
+ 2) Wrappers around loads() and dumps(). The dumps() wrapper will
+ automatically use to_primitive() for you if needed.
+
+ 3) This sets up anyjson to use the loads() and dumps() wrappers if anyjson
+ is available.
+'''
+
+
+import datetime
+import functools
+import inspect
+import itertools
+import json
+import xmlrpclib
+
+from conductor.openstack.common import timeutils
+
+
+def to_primitive(value, convert_instances=False, convert_datetime=True,
+ level=0, max_depth=3):
+ """Convert a complex object into primitives.
+
+ Handy for JSON serialization. We can optionally handle instances,
+ but since this is a recursive function, we could have cyclical
+ data structures.
+
+ To handle cyclical data structures we could track the actual objects
+ visited in a set, but not all objects are hashable. Instead we just
+ track the depth of the object inspections and don't go too deep.
+
+ Therefore, convert_instances=True is lossy ... be aware.
+
+ """
+ nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+ inspect.isfunction, inspect.isgeneratorfunction,
+ inspect.isgenerator, inspect.istraceback, inspect.isframe,
+ inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+ inspect.isabstract]
+ for test in nasty:
+ if test(value):
+ return unicode(value)
+
+ # value of itertools.count doesn't get caught by inspects
+ # above and results in infinite loop when list(value) is called.
+ if type(value) == itertools.count:
+ return unicode(value)
+
+ # FIXME(vish): Workaround for LP bug 852095. Without this workaround,
+ # tests that raise an exception in a mocked method that
+ # has a @wrap_exception with a notifier will fail. If
+ # we up the dependency to 0.5.4 (when it is released) we
+ # can remove this workaround.
+ if getattr(value, '__module__', None) == 'mox':
+ return 'mock'
+
+ if level > max_depth:
+ return '?'
+
+ # The try block may not be necessary after the class check above,
+ # but just in case ...
+ try:
+ recursive = functools.partial(to_primitive,
+ convert_instances=convert_instances,
+ convert_datetime=convert_datetime,
+ level=level,
+ max_depth=max_depth)
+ # It's not clear why xmlrpclib created their own DateTime type, but
+ # for our purposes, make it a datetime type which is explicitly
+ # handled
+ if isinstance(value, xmlrpclib.DateTime):
+ value = datetime.datetime(*tuple(value.timetuple())[:6])
+
+ if isinstance(value, (list, tuple)):
+ return [recursive(v) for v in value]
+ elif isinstance(value, dict):
+ return dict((k, recursive(v)) for k, v in value.iteritems())
+ elif convert_datetime and isinstance(value, datetime.datetime):
+ return timeutils.strtime(value)
+ elif hasattr(value, 'iteritems'):
+ return recursive(dict(value.iteritems()), level=level + 1)
+ elif hasattr(value, '__iter__'):
+ return recursive(list(value))
+ elif convert_instances and hasattr(value, '__dict__'):
+ # Likely an instance of something. Watch for cycles.
+ # Ignore class member vars.
+ return recursive(value.__dict__, level=level + 1)
+ else:
+ return value
+ except TypeError:
+ # Class objects are tricky since they may define something like
+ # __iter__ defined but it isn't callable as list().
+ return unicode(value)
+
+
+def dumps(value, default=to_primitive, **kwargs):
+ return json.dumps(value, default=default, **kwargs)
+
+
+def loads(s):
+ return json.loads(s)
+
+
+def load(s):
+ return json.load(s)
+
+
+try:
+ import anyjson
+except ImportError:
+ pass
+else:
+ anyjson._modules.append((__name__, 'dumps', TypeError,
+ 'loads', ValueError, 'load'))
+ anyjson.force_implementation(__name__)
diff --git a/conductor/conductor/openstack/common/local.py b/conductor/conductor/openstack/common/local.py
new file mode 100644
index 0000000..9b1d22a
--- /dev/null
+++ b/conductor/conductor/openstack/common/local.py
@@ -0,0 +1,48 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Greenthread local storage of variables using weak references"""
+
+import weakref
+
+from eventlet import corolocal
+
+
+class WeakLocal(corolocal.local):
+ def __getattribute__(self, attr):
+ rval = corolocal.local.__getattribute__(self, attr)
+ if rval:
+ # NOTE(mikal): this bit is confusing. What is stored is a weak
+ # reference, not the value itself. We therefore need to lookup
+ # the weak reference and return the inner value here.
+ rval = rval()
+ return rval
+
+ def __setattr__(self, attr, value):
+ value = weakref.ref(value)
+ return corolocal.local.__setattr__(self, attr, value)
+
+
+# NOTE(mikal): the name "store" should be deprecated in the future
+store = WeakLocal()
+
+# A "weak" store uses weak references and allows an object to fall out of scope
+# when it falls out of scope in the code that uses the thread local storage. A
+# "strong" store will hold a reference to the object so that it never falls out
+# of scope.
+weak_store = WeakLocal()
+strong_store = corolocal.local
diff --git a/conductor/conductor/openstack/common/log.py b/conductor/conductor/openstack/common/log.py
new file mode 100644
index 0000000..31ed37f
--- /dev/null
+++ b/conductor/conductor/openstack/common/log.py
@@ -0,0 +1,522 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Openstack logging handler.
+
+This module adds to logging functionality by adding the option to specify
+a context object when calling the various log methods. If the context object
+is not specified, default formatting is used. Additionally, an instance uuid
+may be passed as part of the log message, which is intended to make it easier
+for admins to find messages related to a specific instance.
+
+It also allows setting of formatting information through conf.
+
+"""
+
+import cStringIO
+import inspect
+import itertools
+import logging
+import logging.config
+import logging.handlers
+import os
+import stat
+import sys
+import traceback
+
+from oslo.config import cfg
+
+from conductor.openstack.common.gettextutils import _
+from conductor.openstack.common import jsonutils
+from conductor.openstack.common import local
+from conductor.openstack.common import notifier
+
+
+_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+common_cli_opts = [
+ cfg.BoolOpt('debug',
+ short='d',
+ default=False,
+ help='Print debugging output (set logging level to '
+ 'DEBUG instead of default WARNING level).'),
+ cfg.BoolOpt('verbose',
+ short='v',
+ default=False,
+ help='Print more verbose output (set logging level to '
+ 'INFO instead of default WARNING level).'),
+]
+
+logging_cli_opts = [
+ cfg.StrOpt('log-config',
+ metavar='PATH',
+ help='If this option is specified, the logging configuration '
+ 'file specified is used and overrides any other logging '
+ 'options specified. Please see the Python logging module '
+ 'documentation for details on logging configuration '
+ 'files.'),
+ cfg.StrOpt('log-format',
+ default=_DEFAULT_LOG_FORMAT,
+ metavar='FORMAT',
+ help='A logging.Formatter log message format string which may '
+ 'use any of the available logging.LogRecord attributes. '
+ 'Default: %(default)s'),
+ cfg.StrOpt('log-date-format',
+ default=_DEFAULT_LOG_DATE_FORMAT,
+ metavar='DATE_FORMAT',
+ help='Format string for %%(asctime)s in log records. '
+ 'Default: %(default)s'),
+ cfg.StrOpt('log-file',
+ metavar='PATH',
+ deprecated_name='logfile',
+ help='(Optional) Name of log file to output to. '
+ 'If not set, logging will go to stdout.'),
+ cfg.StrOpt('log-dir',
+ deprecated_name='logdir',
+ help='(Optional) The directory to keep log files in '
+ '(will be prepended to --log-file)'),
+ cfg.BoolOpt('use-syslog',
+ default=False,
+ help='Use syslog for logging.'),
+ cfg.StrOpt('syslog-log-facility',
+ default='LOG_USER',
+ help='syslog facility to receive log lines')
+]
+
+generic_log_opts = [
+ cfg.BoolOpt('use_stderr',
+ default=True,
+ help='Log output to standard error'),
+ cfg.StrOpt('logfile_mode',
+ default='0644',
+ help='Default file mode used when creating log files'),
+]
+
+log_opts = [
+ cfg.StrOpt('logging_context_format_string',
+ default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
+ '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
+ '%(message)s',
+ help='format string to use for log messages with context'),
+ cfg.StrOpt('logging_default_format_string',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [-] %(instance)s%(message)s',
+ help='format string to use for log messages without context'),
+ cfg.StrOpt('logging_debug_format_suffix',
+ default='%(funcName)s %(pathname)s:%(lineno)d',
+ help='data to append to log format when level is DEBUG'),
+ cfg.StrOpt('logging_exception_prefix',
+ default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
+ '%(instance)s',
+ help='prefix each line of exception output with this format'),
+ cfg.ListOpt('default_log_levels',
+ default=[
+ 'amqplib=WARN',
+ 'sqlalchemy=WARN',
+ 'boto=WARN',
+ 'suds=INFO',
+ 'keystone=INFO',
+ 'eventlet.wsgi.server=WARN'
+ ],
+ help='list of logger=LEVEL pairs'),
+ cfg.BoolOpt('publish_errors',
+ default=False,
+ help='publish error events'),
+ cfg.BoolOpt('fatal_deprecations',
+ default=False,
+ help='make deprecations fatal'),
+
+ # NOTE(mikal): there are two options here because sometimes we are handed
+ # a full instance (and could include more information), and other times we
+ # are just handed a UUID for the instance.
+ cfg.StrOpt('instance_format',
+ default='[instance: %(uuid)s] ',
+ help='If an instance is passed with the log message, format '
+ 'it like this'),
+ cfg.StrOpt('instance_uuid_format',
+ default='[instance: %(uuid)s] ',
+ help='If an instance UUID is passed with the log message, '
+ 'format it like this'),
+]
+
+CONF = cfg.CONF
+CONF.register_cli_opts(common_cli_opts)
+CONF.register_cli_opts(logging_cli_opts)
+CONF.register_opts(generic_log_opts)
+CONF.register_opts(log_opts)
+
+# our new audit level
+# NOTE(jkoelker) Since we synthesized an audit level, make the logging
+# module aware of it so it acts like other levels.
+logging.AUDIT = logging.INFO + 1
+logging.addLevelName(logging.AUDIT, 'AUDIT')
+
+
+try:
+ NullHandler = logging.NullHandler
+except AttributeError: # NOTE(jkoelker) NullHandler added in Python 2.7
+ class NullHandler(logging.Handler):
+ def handle(self, record):
+ pass
+
+ def emit(self, record):
+ pass
+
+ def createLock(self):
+ self.lock = None
+
+
+def _dictify_context(context):
+ if context is None:
+ return None
+ if not isinstance(context, dict) and getattr(context, 'to_dict', None):
+ context = context.to_dict()
+ return context
+
+
+def _get_binary_name():
+ return os.path.basename(inspect.stack()[-1][1])
+
+
+def _get_log_file_path(binary=None):
+ logfile = CONF.log_file
+ logdir = CONF.log_dir
+
+ if logfile and not logdir:
+ return logfile
+
+ if logfile and logdir:
+ return os.path.join(logdir, logfile)
+
+ if logdir:
+ binary = binary or _get_binary_name()
+ return '%s.log' % (os.path.join(logdir, binary),)
+
+
+class ContextAdapter(logging.LoggerAdapter):
+ warn = logging.LoggerAdapter.warning
+
+ def __init__(self, logger, project_name, version_string):
+ self.logger = logger
+ self.project = project_name
+ self.version = version_string
+
+ def audit(self, msg, *args, **kwargs):
+ self.log(logging.AUDIT, msg, *args, **kwargs)
+
+ def deprecated(self, msg, *args, **kwargs):
+ stdmsg = _("Deprecated: %s") % msg
+ if CONF.fatal_deprecations:
+ self.critical(stdmsg, *args, **kwargs)
+ raise DeprecatedConfig(msg=stdmsg)
+ else:
+ self.warn(stdmsg, *args, **kwargs)
+
+ def process(self, msg, kwargs):
+ if 'extra' not in kwargs:
+ kwargs['extra'] = {}
+ extra = kwargs['extra']
+
+ context = kwargs.pop('context', None)
+ if not context:
+ context = getattr(local.store, 'context', None)
+ if context:
+ extra.update(_dictify_context(context))
+
+ instance = kwargs.pop('instance', None)
+ instance_extra = ''
+ if instance:
+ instance_extra = CONF.instance_format % instance
+ else:
+ instance_uuid = kwargs.pop('instance_uuid', None)
+ if instance_uuid:
+ instance_extra = (CONF.instance_uuid_format
+ % {'uuid': instance_uuid})
+ extra.update({'instance': instance_extra})
+
+ extra.update({"project": self.project})
+ extra.update({"version": self.version})
+ extra['extra'] = extra.copy()
+ return msg, kwargs
+
+
+class JSONFormatter(logging.Formatter):
+ def __init__(self, fmt=None, datefmt=None):
+ # NOTE(jkoelker) we ignore the fmt argument, but its still there
+ # since logging.config.fileConfig passes it.
+ self.datefmt = datefmt
+
+ def formatException(self, ei, strip_newlines=True):
+ lines = traceback.format_exception(*ei)
+ if strip_newlines:
+ lines = [itertools.ifilter(
+ lambda x: x,
+ line.rstrip().splitlines()) for line in lines]
+ lines = list(itertools.chain(*lines))
+ return lines
+
+ def format(self, record):
+ message = {'message': record.getMessage(),
+ 'asctime': self.formatTime(record, self.datefmt),
+ 'name': record.name,
+ 'msg': record.msg,
+ 'args': record.args,
+ 'levelname': record.levelname,
+ 'levelno': record.levelno,
+ 'pathname': record.pathname,
+ 'filename': record.filename,
+ 'module': record.module,
+ 'lineno': record.lineno,
+ 'funcname': record.funcName,
+ 'created': record.created,
+ 'msecs': record.msecs,
+ 'relative_created': record.relativeCreated,
+ 'thread': record.thread,
+ 'thread_name': record.threadName,
+ 'process_name': record.processName,
+ 'process': record.process,
+ 'traceback': None}
+
+ if hasattr(record, 'extra'):
+ message['extra'] = record.extra
+
+ if record.exc_info:
+ message['traceback'] = self.formatException(record.exc_info)
+
+ return jsonutils.dumps(message)
+
+
+class PublishErrorsHandler(logging.Handler):
+ def emit(self, record):
+ if ('conductor.openstack.common.notifier.log_notifier' in
+ CONF.notification_driver):
+ return
+ notifier.api.notify(None, 'error.publisher',
+ 'error_notification',
+ notifier.api.ERROR,
+ dict(error=record.msg))
+
+
+def _create_logging_excepthook(product_name):
+ def logging_excepthook(type, value, tb):
+ extra = {}
+ if CONF.verbose:
+ extra['exc_info'] = (type, value, tb)
+ getLogger(product_name).critical(str(value), **extra)
+ return logging_excepthook
+
+
+def setup(product_name):
+ """Setup logging."""
+ if CONF.log_config:
+ logging.config.fileConfig(CONF.log_config)
+ else:
+ _setup_logging_from_conf()
+ sys.excepthook = _create_logging_excepthook(product_name)
+
+
+def set_defaults(logging_context_format_string):
+ cfg.set_defaults(log_opts,
+ logging_context_format_string=
+ logging_context_format_string)
+
+
+def _find_facility_from_conf():
+ facility_names = logging.handlers.SysLogHandler.facility_names
+ facility = getattr(logging.handlers.SysLogHandler,
+ CONF.syslog_log_facility,
+ None)
+
+ if facility is None and CONF.syslog_log_facility in facility_names:
+ facility = facility_names.get(CONF.syslog_log_facility)
+
+ if facility is None:
+ valid_facilities = facility_names.keys()
+ consts = ['LOG_AUTH', 'LOG_AUTHPRIV', 'LOG_CRON', 'LOG_DAEMON',
+ 'LOG_FTP', 'LOG_KERN', 'LOG_LPR', 'LOG_MAIL', 'LOG_NEWS',
+ 'LOG_AUTH', 'LOG_SYSLOG', 'LOG_USER', 'LOG_UUCP',
+ 'LOG_LOCAL0', 'LOG_LOCAL1', 'LOG_LOCAL2', 'LOG_LOCAL3',
+ 'LOG_LOCAL4', 'LOG_LOCAL5', 'LOG_LOCAL6', 'LOG_LOCAL7']
+ valid_facilities.extend(consts)
+ raise TypeError(_('syslog facility must be one of: %s') %
+ ', '.join("'%s'" % fac
+ for fac in valid_facilities))
+
+ return facility
+
+
+def _setup_logging_from_conf():
+ log_root = getLogger(None).logger
+ for handler in log_root.handlers:
+ log_root.removeHandler(handler)
+
+ if CONF.use_syslog:
+ facility = _find_facility_from_conf()
+ syslog = logging.handlers.SysLogHandler(address='/dev/log',
+ facility=facility)
+ log_root.addHandler(syslog)
+
+ logpath = _get_log_file_path()
+ if logpath:
+ filelog = logging.handlers.WatchedFileHandler(logpath)
+ log_root.addHandler(filelog)
+
+ mode = int(CONF.logfile_mode, 8)
+ st = os.stat(logpath)
+ if st.st_mode != (stat.S_IFREG | mode):
+ os.chmod(logpath, mode)
+
+ if CONF.use_stderr:
+ streamlog = ColorHandler()
+ log_root.addHandler(streamlog)
+
+ elif not CONF.log_file:
+ # pass sys.stdout as a positional argument
+ # python2.6 calls the argument strm, in 2.7 it's stream
+ streamlog = logging.StreamHandler(sys.stdout)
+ log_root.addHandler(streamlog)
+
+ if CONF.publish_errors:
+ log_root.addHandler(PublishErrorsHandler(logging.ERROR))
+
+ for handler in log_root.handlers:
+ datefmt = CONF.log_date_format
+ if CONF.log_format:
+ handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
+ datefmt=datefmt))
+ else:
+ handler.setFormatter(LegacyFormatter(datefmt=datefmt))
+
+ if CONF.debug:
+ log_root.setLevel(logging.DEBUG)
+ elif CONF.verbose:
+ log_root.setLevel(logging.INFO)
+ else:
+ log_root.setLevel(logging.WARNING)
+
+ level = logging.NOTSET
+ for pair in CONF.default_log_levels:
+ mod, _sep, level_name = pair.partition('=')
+ level = logging.getLevelName(level_name)
+ logger = logging.getLogger(mod)
+ logger.setLevel(level)
+ for handler in log_root.handlers:
+ logger.addHandler(handler)
+
+_loggers = {}
+
+
+def getLogger(name='unknown', version='unknown'):
+ if name not in _loggers:
+ _loggers[name] = ContextAdapter(logging.getLogger(name),
+ name,
+ version)
+ return _loggers[name]
+
+
+class WritableLogger(object):
+ """A thin wrapper that responds to `write` and logs."""
+
+ def __init__(self, logger, level=logging.INFO):
+ self.logger = logger
+ self.level = level
+
+ def write(self, msg):
+ self.logger.log(self.level, msg)
+
+
+class LegacyFormatter(logging.Formatter):
+ """A context.RequestContext aware formatter configured through flags.
+
+ The flags used to set format strings are: logging_context_format_string
+ and logging_default_format_string. You can also specify
+ logging_debug_format_suffix to append extra formatting if the log level is
+ debug.
+
+ For information about what variables are available for the formatter see:
+ http://docs.python.org/library/logging.html#formatter
+
+ """
+
+ def format(self, record):
+ """Uses contextstring if request_id is set, otherwise default."""
+ # NOTE(sdague): default the fancier formating params
+ # to an empty string so we don't throw an exception if
+ # they get used
+ for key in ('instance', 'color'):
+ if key not in record.__dict__:
+ record.__dict__[key] = ''
+
+ if record.__dict__.get('request_id', None):
+ self._fmt = CONF.logging_context_format_string
+ else:
+ self._fmt = CONF.logging_default_format_string
+
+ if (record.levelno == logging.DEBUG and
+ CONF.logging_debug_format_suffix):
+ self._fmt += " " + CONF.logging_debug_format_suffix
+
+ # Cache this on the record, Logger will respect our formated copy
+ if record.exc_info:
+ record.exc_text = self.formatException(record.exc_info, record)
+ return logging.Formatter.format(self, record)
+
+ def formatException(self, exc_info, record=None):
+ """Format exception output with CONF.logging_exception_prefix."""
+ if not record:
+ return logging.Formatter.formatException(self, exc_info)
+
+ stringbuffer = cStringIO.StringIO()
+ traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
+ None, stringbuffer)
+ lines = stringbuffer.getvalue().split('\n')
+ stringbuffer.close()
+
+ if CONF.logging_exception_prefix.find('%(asctime)') != -1:
+ record.asctime = self.formatTime(record, self.datefmt)
+
+ formatted_lines = []
+ for line in lines:
+ pl = CONF.logging_exception_prefix % record.__dict__
+ fl = '%s%s' % (pl, line)
+ formatted_lines.append(fl)
+ return '\n'.join(formatted_lines)
+
+
+class ColorHandler(logging.StreamHandler):
+ LEVEL_COLORS = {
+ logging.DEBUG: '\033[00;32m', # GREEN
+ logging.INFO: '\033[00;36m', # CYAN
+ logging.AUDIT: '\033[01;36m', # BOLD CYAN
+ logging.WARN: '\033[01;33m', # BOLD YELLOW
+ logging.ERROR: '\033[01;31m', # BOLD RED
+ logging.CRITICAL: '\033[01;31m', # BOLD RED
+ }
+
+ def format(self, record):
+ record.color = self.LEVEL_COLORS[record.levelno]
+ return logging.StreamHandler.format(self, record)
+
+
+class DeprecatedConfig(Exception):
+ message = _("Fatal call to deprecated config: %(msg)s")
+
+ def __init__(self, msg):
+ super(Exception, self).__init__(self.message % dict(msg=msg))
diff --git a/conductor/conductor/openstack/common/loopingcall.py b/conductor/conductor/openstack/common/loopingcall.py
new file mode 100644
index 0000000..238fab9
--- /dev/null
+++ b/conductor/conductor/openstack/common/loopingcall.py
@@ -0,0 +1,95 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+
+from conductor.openstack.common.gettextutils import _
+from conductor.openstack.common import log as logging
+from conductor.openstack.common import timeutils
+
+LOG = logging.getLogger(__name__)
+
+
+class LoopingCallDone(Exception):
+ """Exception to break out and stop a LoopingCall.
+
+ The poll-function passed to LoopingCall can raise this exception to
+ break out of the loop normally. This is somewhat analogous to
+ StopIteration.
+
+ An optional return-value can be included as the argument to the exception;
+ this return-value will be returned by LoopingCall.wait()
+
+ """
+
+ def __init__(self, retvalue=True):
+ """:param retvalue: Value that LoopingCall.wait() should return."""
+ self.retvalue = retvalue
+
+
+class LoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+
+ def start(self, interval, initial_delay=None):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if initial_delay:
+ greenthread.sleep(initial_delay)
+
+ try:
+ while self._running:
+ start = timeutils.utcnow()
+ self.f(*self.args, **self.kw)
+ end = timeutils.utcnow()
+ if not self._running:
+ break
+ delay = interval - timeutils.delta_seconds(start, end)
+ if delay <= 0:
+ LOG.warn(_('task run outlasted interval by %s sec') %
+ -delay)
+ greenthread.sleep(delay if delay > 0 else 0)
+ except LoopingCallDone, e:
+ self.stop()
+ done.send(e.retvalue)
+ except Exception:
+ LOG.exception(_('in looping call'))
+ done.send_exception(*sys.exc_info())
+ return
+ else:
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn_n(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
diff --git a/conductor/conductor/openstack/common/service.py b/conductor/conductor/openstack/common/service.py
new file mode 100644
index 0000000..df48769
--- /dev/null
+++ b/conductor/conductor/openstack/common/service.py
@@ -0,0 +1,332 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import logging as std_logging
+from oslo.config import cfg
+
+from conductor.openstack.common import eventlet_backdoor
+from conductor.openstack.common.gettextutils import _
+from conductor.openstack.common import importutils
+from conductor.openstack.common import log as logging
+from conductor.openstack.common import threadgroup
+
+
+rpc = importutils.try_import('conductor.openstack.common.rpc')
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+ """Launch one or more services and wait for them to complete."""
+
+ def __init__(self):
+ """Initialize the service launcher.
+
+ :returns: None
+
+ """
+ self._services = threadgroup.ThreadGroup()
+ eventlet_backdoor.initialize_if_enabled()
+
+ @staticmethod
+ def run_service(service):
+ """Start and wait for a service to finish.
+
+ :param service: service to run and wait for.
+ :returns: None
+
+ """
+ service.start()
+ service.wait()
+
+ def launch_service(self, service):
+ """Load and start the given service.
+
+ :param service: The service you would like to start.
+ :returns: None
+
+ """
+ self._services.add_thread(self.run_service, service)
+
+ def stop(self):
+ """Stop all services which are currently running.
+
+ :returns: None
+
+ """
+ self._services.stop()
+
+ def wait(self):
+ """Waits until all services have been stopped, and then returns.
+
+ :returns: None
+
+ """
+ self._services.wait()
+
+
+class SignalExit(SystemExit):
+ def __init__(self, signo, exccode=1):
+ super(SignalExit, self).__init__(exccode)
+ self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+ def _handle_signal(self, signo, frame):
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ raise SignalExit(signo)
+
+ def wait(self):
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ status = None
+ try:
+ super(ServiceLauncher, self).wait()
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ finally:
+ if rpc:
+ rpc.cleanup()
+ self.stop()
+ return status
+
+
+class ServiceWrapper(object):
+ def __init__(self, service, workers):
+ self.service = service
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, service):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn_n(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_service(service)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.service)
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ wrap.service.stop()
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_service(self, service, workers=1):
+ wrap = ServiceWrapper(service, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ # Don't block if no child processes have exited
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ return None
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+ dict(pid=pid, sig=sig))
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)s exited with status %(code)d'),
+ dict(pid=pid, code=code))
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ # Yield to other threads if no children have exited
+ # Sleep for a short time to avoid excessive CPU usage
+ # (see bug #1095346)
+ eventlet.greenthread.sleep(.01)
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
+class Service(object):
+ """Service object for binaries running on hosts."""
+
+ def __init__(self, threads=1000):
+ self.tg = threadgroup.ThreadGroup(threads)
+
+ def start(self):
+ pass
+
+ def stop(self):
+ self.tg.stop()
+
+ def wait(self):
+ self.tg.wait()
+
+
+def launch(service, workers=None):
+ if workers:
+ launcher = ProcessLauncher()
+ launcher.launch_service(service, workers=workers)
+ else:
+ launcher = ServiceLauncher()
+ launcher.launch_service(service)
+ return launcher
diff --git a/conductor/conductor/openstack/common/setup.py b/conductor/conductor/openstack/common/setup.py
new file mode 100644
index 0000000..0e9ff88
--- /dev/null
+++ b/conductor/conductor/openstack/common/setup.py
@@ -0,0 +1,359 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Utilities with minimum-depends for use in setup.py
+"""
+
+import email
+import os
+import re
+import subprocess
+import sys
+
+from setuptools.command import sdist
+
+
+def parse_mailmap(mailmap='.mailmap'):
+ mapping = {}
+ if os.path.exists(mailmap):
+ with open(mailmap, 'r') as fp:
+ for l in fp:
+ try:
+ canonical_email, alias = re.match(
+ r'[^#]*?(<.+>).*(<.+>).*', l).groups()
+ except AttributeError:
+ continue
+ mapping[alias] = canonical_email
+ return mapping
+
+
+def _parse_git_mailmap(git_dir, mailmap='.mailmap'):
+ mailmap = os.path.join(os.path.dirname(git_dir), mailmap)
+ return parse_mailmap(mailmap)
+
+
+def canonicalize_emails(changelog, mapping):
+ """Takes in a string and an email alias mapping and replaces all
+ instances of the aliases in the string with their real email.
+ """
+ for alias, email_address in mapping.iteritems():
+ changelog = changelog.replace(alias, email_address)
+ return changelog
+
+
+# Get requirements from the first file that exists
+def get_reqs_from_files(requirements_files):
+ for requirements_file in requirements_files:
+ if os.path.exists(requirements_file):
+ with open(requirements_file, 'r') as fil:
+ return fil.read().split('\n')
+ return []
+
+
+def parse_requirements(requirements_files=['requirements.txt',
+ 'tools/pip-requires']):
+ requirements = []
+ for line in get_reqs_from_files(requirements_files):
+ # For the requirements list, we need to inject only the portion
+ # after egg= so that distutils knows the package it's looking for
+ # such as:
+ # -e git://github.com/openstack/nova/master#egg=nova
+ if re.match(r'\s*-e\s+', line):
+ requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
+ line))
+ # such as:
+ # http://github.com/openstack/nova/zipball/master#egg=nova
+ elif re.match(r'\s*https?:', line):
+ requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
+ line))
+ # -f lines are for index locations, and don't get used here
+ elif re.match(r'\s*-f\s+', line):
+ pass
+ # argparse is part of the standard library starting with 2.7
+ # adding it to the requirements list screws distro installs
+ elif line == 'argparse' and sys.version_info >= (2, 7):
+ pass
+ else:
+ requirements.append(line)
+
+ return requirements
+
+
+def parse_dependency_links(requirements_files=['requirements.txt',
+ 'tools/pip-requires']):
+ dependency_links = []
+ # dependency_links inject alternate locations to find packages listed
+ # in requirements
+ for line in get_reqs_from_files(requirements_files):
+ # skip comments and blank lines
+ if re.match(r'(\s*#)|(\s*$)', line):
+ continue
+ # lines with -e or -f need the whole line, minus the flag
+ if re.match(r'\s*-[ef]\s+', line):
+ dependency_links.append(re.sub(r'\s*-[ef]\s+', '', line))
+ # lines that are only urls can go in unmolested
+ elif re.match(r'\s*https?:', line):
+ dependency_links.append(line)
+ return dependency_links
+
+
+def _run_shell_command(cmd, throw_on_error=False):
+ if os.name == 'nt':
+ output = subprocess.Popen(["cmd.exe", "/C", cmd],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ else:
+ output = subprocess.Popen(["/bin/sh", "-c", cmd],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out = output.communicate()
+ if output.returncode and throw_on_error:
+ raise Exception("%s returned %d" % cmd, output.returncode)
+ if len(out) == 0:
+ return None
+ if len(out[0].strip()) == 0:
+ return None
+ return out[0].strip()
+
+
+def _get_git_directory():
+ parent_dir = os.path.dirname(__file__)
+ while True:
+ git_dir = os.path.join(parent_dir, '.git')
+ if os.path.exists(git_dir):
+ return git_dir
+ parent_dir, child = os.path.split(parent_dir)
+ if not child: # reached to root dir
+ return None
+
+
+def write_git_changelog():
+ """Write a changelog based on the git changelog."""
+ new_changelog = 'ChangeLog'
+ git_dir = _get_git_directory()
+ if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
+ if git_dir:
+ git_log_cmd = 'git --git-dir=%s log' % git_dir
+ changelog = _run_shell_command(git_log_cmd)
+ mailmap = _parse_git_mailmap(git_dir)
+ with open(new_changelog, "w") as changelog_file:
+ changelog_file.write(canonicalize_emails(changelog, mailmap))
+ else:
+ open(new_changelog, 'w').close()
+
+
+def generate_authors():
+ """Create AUTHORS file using git commits."""
+ jenkins_email = 'jenkins@review.(openstack|stackforge).org'
+ old_authors = 'AUTHORS.in'
+ new_authors = 'AUTHORS'
+ git_dir = _get_git_directory()
+ if not os.getenv('SKIP_GENERATE_AUTHORS'):
+ if git_dir:
+ # don't include jenkins email address in AUTHORS file
+ git_log_cmd = ("git --git-dir=" + git_dir +
+ " log --format='%aN <%aE>' | sort -u | "
+ "egrep -v '" + jenkins_email + "'")
+ changelog = _run_shell_command(git_log_cmd)
+ mailmap = _parse_git_mailmap(git_dir)
+ with open(new_authors, 'w') as new_authors_fh:
+ new_authors_fh.write(canonicalize_emails(changelog, mailmap))
+ if os.path.exists(old_authors):
+ with open(old_authors, "r") as old_authors_fh:
+ new_authors_fh.write('\n' + old_authors_fh.read())
+ else:
+ open(new_authors, 'w').close()
+
+
+_rst_template = """%(heading)s
+%(underline)s
+
+.. automodule:: %(module)s
+ :members:
+ :undoc-members:
+ :show-inheritance:
+"""
+
+
+def get_cmdclass():
+ """Return dict of commands to run from setup.py."""
+
+ cmdclass = dict()
+
+ def _find_modules(arg, dirname, files):
+ for filename in files:
+ if filename.endswith('.py') and filename != '__init__.py':
+ arg["%s.%s" % (dirname.replace('/', '.'),
+ filename[:-3])] = True
+
+ class LocalSDist(sdist.sdist):
+ """Builds the ChangeLog and Authors files from VC first."""
+
+ def run(self):
+ write_git_changelog()
+ generate_authors()
+ # sdist.sdist is an old style class, can't use super()
+ sdist.sdist.run(self)
+
+ cmdclass['sdist'] = LocalSDist
+
+ # If Sphinx is installed on the box running setup.py,
+ # enable setup.py to build the documentation, otherwise,
+ # just ignore it
+ try:
+ from sphinx.setup_command import BuildDoc
+
+ class LocalBuildDoc(BuildDoc):
+
+ builders = ['html', 'man']
+
+ def generate_autoindex(self):
+ print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
+ modules = {}
+ option_dict = self.distribution.get_option_dict('build_sphinx')
+ source_dir = os.path.join(option_dict['source_dir'][1], 'api')
+ if not os.path.exists(source_dir):
+ os.makedirs(source_dir)
+ for pkg in self.distribution.packages:
+ if '.' not in pkg:
+ os.path.walk(pkg, _find_modules, modules)
+ module_list = modules.keys()
+ module_list.sort()
+ autoindex_filename = os.path.join(source_dir, 'autoindex.rst')
+ with open(autoindex_filename, 'w') as autoindex:
+ autoindex.write(""".. toctree::
+ :maxdepth: 1
+
+""")
+ for module in module_list:
+ output_filename = os.path.join(source_dir,
+ "%s.rst" % module)
+ heading = "The :mod:`%s` Module" % module
+ underline = "=" * len(heading)
+ values = dict(module=module, heading=heading,
+ underline=underline)
+
+ print "Generating %s" % output_filename
+ with open(output_filename, 'w') as output_file:
+ output_file.write(_rst_template % values)
+ autoindex.write(" %s.rst\n" % module)
+
+ def run(self):
+ if not os.getenv('SPHINX_DEBUG'):
+ self.generate_autoindex()
+
+ for builder in self.builders:
+ self.builder = builder
+ self.finalize_options()
+ self.project = self.distribution.get_name()
+ self.version = self.distribution.get_version()
+ self.release = self.distribution.get_version()
+ BuildDoc.run(self)
+
+ class LocalBuildLatex(LocalBuildDoc):
+ builders = ['latex']
+
+ cmdclass['build_sphinx'] = LocalBuildDoc
+ cmdclass['build_sphinx_latex'] = LocalBuildLatex
+ except ImportError:
+ pass
+
+ return cmdclass
+
+
+def _get_revno(git_dir):
+ """Return the number of commits since the most recent tag.
+
+ We use git-describe to find this out, but if there are no
+ tags then we fall back to counting commits since the beginning
+ of time.
+ """
+ describe = _run_shell_command(
+ "git --git-dir=%s describe --always" % git_dir)
+ if "-" in describe:
+ return describe.rsplit("-", 2)[-2]
+
+ # no tags found
+ revlist = _run_shell_command(
+ "git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir)
+ return len(revlist.splitlines())
+
+
+def _get_version_from_git(pre_version):
+ """Return a version which is equal to the tag that's on the current
+ revision if there is one, or tag plus number of additional revisions
+ if the current revision has no tag."""
+
+ git_dir = _get_git_directory()
+ if git_dir:
+ if pre_version:
+ try:
+ return _run_shell_command(
+ "git --git-dir=" + git_dir + " describe --exact-match",
+ throw_on_error=True).replace('-', '.')
+ except Exception:
+ sha = _run_shell_command(
+ "git --git-dir=" + git_dir + " log -n1 --pretty=format:%h")
+ return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha)
+ else:
+ return _run_shell_command(
+ "git --git-dir=" + git_dir + " describe --always").replace(
+ '-', '.')
+ return None
+
+
+def _get_version_from_pkg_info(package_name):
+ """Get the version from PKG-INFO file if we can."""
+ try:
+ pkg_info_file = open('PKG-INFO', 'r')
+ except (IOError, OSError):
+ return None
+ try:
+ pkg_info = email.message_from_file(pkg_info_file)
+ except email.MessageError:
+ return None
+ # Check to make sure we're in our own dir
+ if pkg_info.get('Name', None) != package_name:
+ return None
+ return pkg_info.get('Version', None)
+
+
+def get_version(package_name, pre_version=None):
+ """Get the version of the project. First, try getting it from PKG-INFO, if
+ it exists. If it does, that means we're in a distribution tarball or that
+ install has happened. Otherwise, if there is no PKG-INFO file, pull the
+ version from git.
+
+ We do not support setup.py version sanity in git archive tarballs, nor do
+ we support packagers directly sucking our git repo into theirs. We expect
+ that a source tarball be made from our git repo - or that if someone wants
+ to make a source tarball from a fork of our repo with additional tags in it
+ that they understand and desire the results of doing that.
+ """
+ version = os.environ.get("OSLO_PACKAGE_VERSION", None)
+ if version:
+ return version
+ version = _get_version_from_pkg_info(package_name)
+ if version:
+ return version
+ version = _get_version_from_git(pre_version)
+ if version:
+ return version
+ raise Exception("Versioning for this project requires either an sdist"
+ " tarball, or access to an upstream git repository.")
diff --git a/conductor/conductor/openstack/common/sslutils.py b/conductor/conductor/openstack/common/sslutils.py
new file mode 100644
index 0000000..4168e87
--- /dev/null
+++ b/conductor/conductor/openstack/common/sslutils.py
@@ -0,0 +1,80 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import ssl
+
+from oslo.config import cfg
+
+from conductor.openstack.common.gettextutils import _
+
+
+ssl_opts = [
+ cfg.StrOpt('ca_file',
+ default=None,
+ help="CA certificate file to use to verify "
+ "connecting clients"),
+ cfg.StrOpt('cert_file',
+ default=None,
+ help="Certificate file to use when starting "
+ "the server securely"),
+ cfg.StrOpt('key_file',
+ default=None,
+ help="Private key file to use when starting "
+ "the server securely"),
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(ssl_opts, "ssl")
+
+
+def is_enabled():
+ cert_file = CONF.ssl.cert_file
+ key_file = CONF.ssl.key_file
+ ca_file = CONF.ssl.ca_file
+ use_ssl = cert_file or key_file
+
+ if cert_file and not os.path.exists(cert_file):
+ raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
+
+ if ca_file and not os.path.exists(ca_file):
+ raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
+
+ if key_file and not os.path.exists(key_file):
+ raise RuntimeError(_("Unable to find key_file : %s") % key_file)
+
+ if use_ssl and (not cert_file or not key_file):
+ raise RuntimeError(_("When running server in SSL mode, you must "
+ "specify both a cert_file and key_file "
+ "option value in your configuration file"))
+
+ return use_ssl
+
+
+def wrap(sock):
+ ssl_kwargs = {
+ 'server_side': True,
+ 'certfile': CONF.ssl.cert_file,
+ 'keyfile': CONF.ssl.key_file,
+ 'cert_reqs': ssl.CERT_NONE,
+ }
+
+ if CONF.ssl.ca_file:
+ ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
+ ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+ return ssl.wrap_socket(sock, **ssl_kwargs)
diff --git a/conductor/conductor/openstack/common/threadgroup.py b/conductor/conductor/openstack/common/threadgroup.py
new file mode 100644
index 0000000..c8d0d9e
--- /dev/null
+++ b/conductor/conductor/openstack/common/threadgroup.py
@@ -0,0 +1,114 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from conductor.openstack.common import log as logging
+from conductor.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+ """ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the :class:`ThreadGroup` to notify if.
+
+ """
+ kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+ """ Wrapper around a greenthread, that holds a reference to the
+ :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+ it has done so it can be removed from the threads list.
+ """
+ def __init__(self, thread, group):
+ self.thread = thread
+ self.thread.link(_thread_done, group=group, thread=self)
+
+ def stop(self):
+ self.thread.kill()
+
+ def wait(self):
+ return self.thread.wait()
+
+
+class ThreadGroup(object):
+ """ The point of the ThreadGroup classis to:
+
+ * keep track of timers and greenthreads (making it easier to stop them
+ when need be).
+ * provide an easy API to add timers.
+ """
+ def __init__(self, thread_pool_size=10):
+ self.pool = greenpool.GreenPool(thread_pool_size)
+ self.threads = []
+ self.timers = []
+
+ def add_timer(self, interval, callback, initial_delay=None,
+ *args, **kwargs):
+ pulse = loopingcall.LoopingCall(callback, *args, **kwargs)
+ pulse.start(interval=interval,
+ initial_delay=initial_delay)
+ self.timers.append(pulse)
+
+ def add_thread(self, callback, *args, **kwargs):
+ gt = self.pool.spawn(callback, *args, **kwargs)
+ th = Thread(gt, self)
+ self.threads.append(th)
+
+ def thread_done(self, thread):
+ self.threads.remove(thread)
+
+ def stop(self):
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ # don't kill the current thread.
+ continue
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+ self.timers = []
+
+ def wait(self):
+ for x in self.timers:
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ continue
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
diff --git a/conductor/conductor/openstack/common/timeutils.py b/conductor/conductor/openstack/common/timeutils.py
new file mode 100644
index 0000000..35cdf8c
--- /dev/null
+++ b/conductor/conductor/openstack/common/timeutils.py
@@ -0,0 +1,186 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Time related utilities and helper functions.
+"""
+
+import calendar
+import datetime
+
+import iso8601
+
+
+# ISO 8601 extended time format with microseconds
+_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
+_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
+PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
+
+
+def isotime(at=None, subsecond=False):
+ """Stringify time in ISO 8601 format"""
+ if not at:
+ at = utcnow()
+ st = at.strftime(_ISO8601_TIME_FORMAT
+ if not subsecond
+ else _ISO8601_TIME_FORMAT_SUBSECOND)
+ tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
+ st += ('Z' if tz == 'UTC' else tz)
+ return st
+
+
+def parse_isotime(timestr):
+ """Parse time from ISO 8601 format"""
+ try:
+ return iso8601.parse_date(timestr)
+ except iso8601.ParseError as e:
+ raise ValueError(e.message)
+ except TypeError as e:
+ raise ValueError(e.message)
+
+
+def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
+ """Returns formatted utcnow."""
+ if not at:
+ at = utcnow()
+ return at.strftime(fmt)
+
+
+def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
+ """Turn a formatted time back into a datetime."""
+ return datetime.datetime.strptime(timestr, fmt)
+
+
+def normalize_time(timestamp):
+ """Normalize time in arbitrary timezone to UTC naive object"""
+ offset = timestamp.utcoffset()
+ if offset is None:
+ return timestamp
+ return timestamp.replace(tzinfo=None) - offset
+
+
+def is_older_than(before, seconds):
+ """Return True if before is older than seconds."""
+ if isinstance(before, basestring):
+ before = parse_strtime(before).replace(tzinfo=None)
+ return utcnow() - before > datetime.timedelta(seconds=seconds)
+
+
+def is_newer_than(after, seconds):
+ """Return True if after is newer than seconds."""
+ if isinstance(after, basestring):
+ after = parse_strtime(after).replace(tzinfo=None)
+ return after - utcnow() > datetime.timedelta(seconds=seconds)
+
+
+def utcnow_ts():
+ """Timestamp version of our utcnow function."""
+ return calendar.timegm(utcnow().timetuple())
+
+
+def utcnow():
+ """Overridable version of utils.utcnow."""
+ if utcnow.override_time:
+ try:
+ return utcnow.override_time.pop(0)
+ except AttributeError:
+ return utcnow.override_time
+ return datetime.datetime.utcnow()
+
+
+def iso8601_from_timestamp(timestamp):
+ """Returns a iso8601 formated date from timestamp"""
+ return isotime(datetime.datetime.utcfromtimestamp(timestamp))
+
+
+utcnow.override_time = None
+
+
+def set_time_override(override_time=datetime.datetime.utcnow()):
+ """
+ Override utils.utcnow to return a constant time or a list thereof,
+ one at a time.
+ """
+ utcnow.override_time = override_time
+
+
+def advance_time_delta(timedelta):
+ """Advance overridden time using a datetime.timedelta."""
+ assert(not utcnow.override_time is None)
+ try:
+ for dt in utcnow.override_time:
+ dt += timedelta
+ except TypeError:
+ utcnow.override_time += timedelta
+
+
+def advance_time_seconds(seconds):
+ """Advance overridden time by seconds."""
+ advance_time_delta(datetime.timedelta(0, seconds))
+
+
+def clear_time_override():
+ """Remove the overridden time."""
+ utcnow.override_time = None
+
+
+def marshall_now(now=None):
+ """Make an rpc-safe datetime with microseconds.
+
+ Note: tzinfo is stripped, but not required for relative times."""
+ if not now:
+ now = utcnow()
+ return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
+ minute=now.minute, second=now.second,
+ microsecond=now.microsecond)
+
+
+def unmarshall_time(tyme):
+ """Unmarshall a datetime dict."""
+ return datetime.datetime(day=tyme['day'],
+ month=tyme['month'],
+ year=tyme['year'],
+ hour=tyme['hour'],
+ minute=tyme['minute'],
+ second=tyme['second'],
+ microsecond=tyme['microsecond'])
+
+
+def delta_seconds(before, after):
+ """
+ Compute the difference in seconds between two date, time, or
+ datetime objects (as a float, to microsecond resolution).
+ """
+ delta = after - before
+ try:
+ return delta.total_seconds()
+ except AttributeError:
+ return ((delta.days * 24 * 3600) + delta.seconds +
+ float(delta.microseconds) / (10 ** 6))
+
+
+def is_soon(dt, window):
+ """
+ Determines if time is going to happen in the next window seconds.
+
+ :params dt: the time
+ :params window: minimum seconds to remain to consider the time not soon
+
+ :return: True if expiration is within the given duration
+ """
+ soon = (utcnow() + datetime.timedelta(seconds=window))
+ return normalize_time(dt) <= soon
diff --git a/conductor/conductor/openstack/common/uuidutils.py b/conductor/conductor/openstack/common/uuidutils.py
new file mode 100644
index 0000000..fff6309
--- /dev/null
+++ b/conductor/conductor/openstack/common/uuidutils.py
@@ -0,0 +1,39 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+UUID related utilities and helper functions.
+"""
+
+import uuid
+
+
+def generate_uuid():
+ return str(uuid.uuid4())
+
+
+def is_uuid_like(val):
+ """Returns validation of a value as a UUID.
+
+ For our purposes, a UUID is a canonical form string:
+ aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
+
+ """
+ try:
+ return str(uuid.UUID(val)) == val
+ except (TypeError, ValueError, AttributeError):
+ return False
diff --git a/conductor/conductor/openstack/common/version.py b/conductor/conductor/openstack/common/version.py
new file mode 100644
index 0000000..0b80a60
--- /dev/null
+++ b/conductor/conductor/openstack/common/version.py
@@ -0,0 +1,94 @@
+
+# Copyright 2012 OpenStack Foundation
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Utilities for consuming the version from pkg_resources.
+"""
+
+import pkg_resources
+
+
+class VersionInfo(object):
+
+ def __init__(self, package):
+ """Object that understands versioning for a package
+ :param package: name of the python package, such as glance, or
+ python-glanceclient
+ """
+ self.package = package
+ self.release = None
+ self.version = None
+ self._cached_version = None
+
+ def __str__(self):
+ """Make the VersionInfo object behave like a string."""
+ return self.version_string()
+
+ def __repr__(self):
+ """Include the name."""
+ return "VersionInfo(%s:%s)" % (self.package, self.version_string())
+
+ def _get_version_from_pkg_resources(self):
+ """Get the version of the package from the pkg_resources record
+ associated with the package."""
+ try:
+ requirement = pkg_resources.Requirement.parse(self.package)
+ provider = pkg_resources.get_provider(requirement)
+ return provider.version
+ except pkg_resources.DistributionNotFound:
+ # The most likely cause for this is running tests in a tree
+ # produced from a tarball where the package itself has not been
+ # installed into anything. Revert to setup-time logic.
+ from conductor.openstack.common import setup
+ return setup.get_version(self.package)
+
+ def release_string(self):
+ """Return the full version of the package including suffixes indicating
+ VCS status.
+ """
+ if self.release is None:
+ self.release = self._get_version_from_pkg_resources()
+
+ return self.release
+
+ def version_string(self):
+ """Return the short version minus any alpha/beta tags."""
+ if self.version is None:
+ parts = []
+ for part in self.release_string().split('.'):
+ if part[0].isdigit():
+ parts.append(part)
+ else:
+ break
+ self.version = ".".join(parts)
+
+ return self.version
+
+ # Compatibility functions
+ canonical_version_string = version_string
+ version_string_with_vcs = release_string
+
+ def cached_version_string(self, prefix=""):
+ """Generate an object which will expand in a string context to
+ the results of version_string(). We do this so that don't
+ call into pkg_resources every time we start up a program when
+ passing version information into the CONF constructor, but
+ rather only do the calculation when and if a version is requested
+ """
+ if not self._cached_version:
+ self._cached_version = "%s%s" % (prefix,
+ self.version_string())
+ return self._cached_version
diff --git a/conductor/conductor/openstack/common/wsgi.py b/conductor/conductor/openstack/common/wsgi.py
new file mode 100644
index 0000000..c367224
--- /dev/null
+++ b/conductor/conductor/openstack/common/wsgi.py
@@ -0,0 +1,797 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Utility methods for working with WSGI servers."""
+
+import eventlet
+eventlet.patcher.monkey_patch(all=False, socket=True)
+
+import datetime
+import errno
+import socket
+import sys
+import time
+
+import eventlet.wsgi
+from oslo.config import cfg
+import routes
+import routes.middleware
+import webob.dec
+import webob.exc
+from xml.dom import minidom
+from xml.parsers import expat
+
+from conductor.openstack.common import exception
+from conductor.openstack.common.gettextutils import _
+from conductor.openstack.common import jsonutils
+from conductor.openstack.common import log as logging
+from conductor.openstack.common import service
+from conductor.openstack.common import sslutils
+from conductor.openstack.common import xmlutils
+
+socket_opts = [
+ cfg.IntOpt('backlog',
+ default=4096,
+ help="Number of backlog requests to configure the socket with"),
+ cfg.IntOpt('tcp_keepidle',
+ default=600,
+ help="Sets the value of TCP_KEEPIDLE in seconds for each "
+ "server socket. Not supported on OS X."),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(socket_opts)
+
+LOG = logging.getLogger(__name__)
+
+
+def run_server(application, port):
+ """Run a WSGI server with the given application."""
+ sock = eventlet.listen(('0.0.0.0', port))
+ eventlet.wsgi.server(sock, application)
+
+
+class Service(service.Service):
+ """
+ Provides a Service API for wsgi servers.
+
+ This gives us the ability to launch wsgi servers with the
+ Launcher classes in service.py.
+ """
+
+ def __init__(self, application, port,
+ host='0.0.0.0', backlog=4096, threads=1000):
+ self.application = application
+ self._port = port
+ self._host = host
+ self._backlog = backlog if backlog else CONF.backlog
+ super(Service, self).__init__(threads)
+
+ def _get_socket(self, host, port, backlog):
+ # TODO(dims): eventlet's green dns/socket module does not actually
+ # support IPv6 in getaddrinfo(). We need to get around this in the
+ # future or monitor upstream for a fix
+ info = socket.getaddrinfo(host,
+ port,
+ socket.AF_UNSPEC,
+ socket.SOCK_STREAM)[0]
+ family = info[0]
+ bind_addr = info[-1]
+
+ sock = None
+ retry_until = time.time() + 30
+ while not sock and time.time() < retry_until:
+ try:
+ sock = eventlet.listen(bind_addr,
+ backlog=backlog,
+ family=family)
+ if sslutils.is_enabled():
+ sock = sslutils.wrap(sock)
+
+ except socket.error, err:
+ if err.args[0] != errno.EADDRINUSE:
+ raise
+ eventlet.sleep(0.1)
+ if not sock:
+ raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
+ "after trying for 30 seconds") %
+ {'host': host, 'port': port})
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # sockets can hang around forever without keepalive
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+
+ # This option isn't available in the OS X version of eventlet
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ sock.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPIDLE,
+ CONF.tcp_keepidle)
+
+ return sock
+
+ def start(self):
+ """Start serving this service using the provided server instance.
+
+ :returns: None
+
+ """
+ super(Service, self).start()
+ self._socket = self._get_socket(self._host, self._port, self._backlog)
+ self.tg.add_thread(self._run, self.application, self._socket)
+
+ @property
+ def backlog(self):
+ return self._backlog
+
+ @property
+ def host(self):
+ return self._socket.getsockname()[0] if self._socket else self._host
+
+ @property
+ def port(self):
+ return self._socket.getsockname()[1] if self._socket else self._port
+
+ def stop(self):
+ """Stop serving this API.
+
+ :returns: None
+
+ """
+ super(Service, self).stop()
+
+ def _run(self, application, socket):
+ """Start a WSGI server in a new green thread."""
+ logger = logging.getLogger('eventlet.wsgi')
+ eventlet.wsgi.server(socket,
+ application,
+ custom_pool=self.tg.pool,
+ log=logging.WritableLogger(logger))
+
+
+class Middleware(object):
+ """
+ Base WSGI middleware wrapper. These classes require an application to be
+ initialized that will be called next. By default the middleware will
+ simply call its wrapped app, or you can override __call__ to customize its
+ behavior.
+ """
+
+ def __init__(self, application):
+ self.application = application
+
+ def process_request(self, req):
+ """
+ Called on each request.
+
+ If this returns None, the next application down the stack will be
+ executed. If it returns a response then that response will be returned
+ and execution will stop here.
+ """
+ return None
+
+ def process_response(self, response):
+ """Do whatever you'd like to the response."""
+ return response
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ response = self.process_request(req)
+ if response:
+ return response
+ response = req.get_response(self.application)
+ return self.process_response(response)
+
+
+class Debug(Middleware):
+ """
+ Helper class that can be inserted into any WSGI application chain
+ to get information about the request and response.
+ """
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ print ("*" * 40) + " REQUEST ENVIRON"
+ for key, value in req.environ.items():
+ print key, "=", value
+ print
+ resp = req.get_response(self.application)
+
+ print ("*" * 40) + " RESPONSE HEADERS"
+ for (key, value) in resp.headers.iteritems():
+ print key, "=", value
+ print
+
+ resp.app_iter = self.print_generator(resp.app_iter)
+
+ return resp
+
+ @staticmethod
+ def print_generator(app_iter):
+ """
+ Iterator that prints the contents of a wrapper string iterator
+ when iterated.
+ """
+ print ("*" * 40) + " BODY"
+ for part in app_iter:
+ sys.stdout.write(part)
+ sys.stdout.flush()
+ yield part
+ print
+
+
+class Router(object):
+
+ """
+ WSGI middleware that maps incoming requests to WSGI apps.
+ """
+
+ def __init__(self, mapper):
+ """
+ Create a router for the given routes.Mapper.
+
+ Each route in `mapper` must specify a 'controller', which is a
+ WSGI app to call. You'll probably want to specify an 'action' as
+ well and have your controller be a wsgi.Controller, who will route
+ the request to the action method.
+
+ Examples:
+ mapper = routes.Mapper()
+ sc = ServerController()
+
+ # Explicit mapping of one route to a controller+action
+ mapper.connect(None, "/svrlist", controller=sc, action="list")
+
+ # Actions are all implicitly defined
+ mapper.resource("server", "servers", controller=sc)
+
+ # Pointing to an arbitrary WSGI app. You can specify the
+ # {path_info:.*} parameter so the target app can be handed just that
+ # section of the URL.
+ mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp())
+ """
+ self.map = mapper
+ self._router = routes.middleware.RoutesMiddleware(self._dispatch,
+ self.map)
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ """
+ Route the incoming request to a controller based on self.map.
+ If no match, return a 404.
+ """
+ return self._router
+
+ @staticmethod
+ @webob.dec.wsgify
+ def _dispatch(req):
+ """
+ Called by self._router after matching the incoming request to a route
+ and putting the information into req.environ. Either returns 404
+ or the routed WSGI app's response.
+ """
+ match = req.environ['wsgiorg.routing_args'][1]
+ if not match:
+ return webob.exc.HTTPNotFound()
+ app = match['controller']
+ return app
+
+
+class Request(webob.Request):
+ """Add some Openstack API-specific logic to the base webob.Request."""
+
+ default_request_content_types = ('application/json', 'application/xml')
+ default_accept_types = ('application/json', 'application/xml')
+ default_accept_type = 'application/json'
+
+ def best_match_content_type(self, supported_content_types=None):
+ """Determine the requested response content-type.
+
+ Based on the query extension then the Accept header.
+ Defaults to default_accept_type if we don't find a preference
+
+ """
+ supported_content_types = (supported_content_types or
+ self.default_accept_types)
+
+ parts = self.path.rsplit('.', 1)
+ if len(parts) > 1:
+ ctype = 'application/{0}'.format(parts[1])
+ if ctype in supported_content_types:
+ return ctype
+
+ bm = self.accept.best_match(supported_content_types)
+ return bm or self.default_accept_type
+
+ def get_content_type(self, allowed_content_types=None):
+ """Determine content type of the request body.
+
+ Does not do any body introspection, only checks header
+
+ """
+ if "Content-Type" not in self.headers:
+ return None
+
+ content_type = self.content_type
+ allowed_content_types = (allowed_content_types or
+ self.default_request_content_types)
+
+ if content_type not in allowed_content_types:
+ raise exception.InvalidContentType(content_type=content_type)
+ return content_type
+
+
+class Resource(object):
+ """
+ WSGI app that handles (de)serialization and controller dispatch.
+
+ Reads routing information supplied by RoutesMiddleware and calls
+ the requested action method upon its deserializer, controller,
+ and serializer. Those three objects may implement any of the basic
+ controller action methods (create, update, show, index, delete)
+ along with any that may be specified in the api router. A 'default'
+ method may also be implemented to be used in place of any
+ non-implemented actions. Deserializer methods must accept a request
+ argument and return a dictionary. Controller methods must accept a
+ request argument. Additionally, they must also accept keyword
+ arguments that represent the keys returned by the Deserializer. They
+ may raise a webob.exc exception or return a dict, which will be
+ serialized by requested content type.
+ """
+ def __init__(self, controller, deserializer=None, serializer=None):
+ """
+ :param controller: object that implement methods created by routes lib
+ :param deserializer: object that supports webob request deserialization
+ through controller-like actions
+ :param serializer: object that supports webob response serialization
+ through controller-like actions
+ """
+ self.controller = controller
+ self.serializer = serializer or ResponseSerializer()
+ self.deserializer = deserializer or RequestDeserializer()
+
+ @webob.dec.wsgify(RequestClass=Request)
+ def __call__(self, request):
+ """WSGI method that controls (de)serialization and method dispatch."""
+
+ try:
+ action, action_args, accept = self.deserialize_request(request)
+ except exception.InvalidContentType:
+ msg = _("Unsupported Content-Type")
+ return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
+ except exception.MalformedRequestBody:
+ msg = _("Malformed request body")
+ return webob.exc.HTTPBadRequest(explanation=msg)
+
+ action_result = self.execute_action(action, request, **action_args)
+ try:
+ return self.serialize_response(action, action_result, accept)
+ # return unserializable result (typically a webob exc)
+ except Exception:
+ return action_result
+
+ def deserialize_request(self, request):
+ return self.deserializer.deserialize(request)
+
+ def serialize_response(self, action, action_result, accept):
+ return self.serializer.serialize(action_result, accept, action)
+
+ def execute_action(self, action, request, **action_args):
+ return self.dispatch(self.controller, action, request, **action_args)
+
+ def dispatch(self, obj, action, *args, **kwargs):
+ """Find action-specific method on self and call it."""
+ try:
+ method = getattr(obj, action)
+ except AttributeError:
+ method = getattr(obj, 'default')
+
+ return method(*args, **kwargs)
+
+ def get_action_args(self, request_environment):
+ """Parse dictionary created by routes library."""
+ try:
+ args = request_environment['wsgiorg.routing_args'][1].copy()
+ except Exception:
+ return {}
+
+ try:
+ del args['controller']
+ except KeyError:
+ pass
+
+ try:
+ del args['format']
+ except KeyError:
+ pass
+
+ return args
+
+
+class ActionDispatcher(object):
+ """Maps method name to local methods through action name."""
+
+ def dispatch(self, *args, **kwargs):
+ """Find and call local method."""
+ action = kwargs.pop('action', 'default')
+ action_method = getattr(self, str(action), self.default)
+ return action_method(*args, **kwargs)
+
+ def default(self, data):
+ raise NotImplementedError()
+
+
+class DictSerializer(ActionDispatcher):
+ """Default request body serialization"""
+
+ def serialize(self, data, action='default'):
+ return self.dispatch(data, action=action)
+
+ def default(self, data):
+ return ""
+
+
+class JSONDictSerializer(DictSerializer):
+ """Default JSON request body serialization"""
+
+ def default(self, data):
+ def sanitizer(obj):
+ if isinstance(obj, datetime.datetime):
+ _dtime = obj - datetime.timedelta(microseconds=obj.microsecond)
+ return _dtime.isoformat()
+ return unicode(obj)
+ return jsonutils.dumps(data, default=sanitizer)
+
+
+class XMLDictSerializer(DictSerializer):
+
+ def __init__(self, metadata=None, xmlns=None):
+ """
+ :param metadata: information needed to deserialize xml into
+ a dictionary.
+ :param xmlns: XML namespace to include with serialized xml
+ """
+ super(XMLDictSerializer, self).__init__()
+ self.metadata = metadata or {}
+ self.xmlns = xmlns
+
+ def default(self, data):
+ # We expect data to contain a single key which is the XML root.
+ root_key = data.keys()[0]
+ doc = minidom.Document()
+ node = self._to_xml_node(doc, self.metadata, root_key, data[root_key])
+
+ return self.to_xml_string(node)
+
+ def to_xml_string(self, node, has_atom=False):
+ self._add_xmlns(node, has_atom)
+ return node.toprettyxml(indent=' ', encoding='UTF-8')
+
+ #NOTE (ameade): the has_atom should be removed after all of the
+ # xml serializers and view builders have been updated to the current
+ # spec that required all responses include the xmlns:atom, the has_atom
+ # flag is to prevent current tests from breaking
+ def _add_xmlns(self, node, has_atom=False):
+ if self.xmlns is not None:
+ node.setAttribute('xmlns', self.xmlns)
+ if has_atom:
+ node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom")
+
+ def _to_xml_node(self, doc, metadata, nodename, data):
+ """Recursive method to convert data members to XML nodes."""
+ result = doc.createElement(nodename)
+
+ # Set the xml namespace if one is specified
+ # TODO(justinsb): We could also use prefixes on the keys
+ xmlns = metadata.get('xmlns', None)
+ if xmlns:
+ result.setAttribute('xmlns', xmlns)
+
+ #TODO(bcwaldon): accomplish this without a type-check
+ if type(data) is list:
+ collections = metadata.get('list_collections', {})
+ if nodename in collections:
+ metadata = collections[nodename]
+ for item in data:
+ node = doc.createElement(metadata['item_name'])
+ node.setAttribute(metadata['item_key'], str(item))
+ result.appendChild(node)
+ return result
+ singular = metadata.get('plurals', {}).get(nodename, None)
+ if singular is None:
+ if nodename.endswith('s'):
+ singular = nodename[:-1]
+ else:
+ singular = 'item'
+ for item in data:
+ node = self._to_xml_node(doc, metadata, singular, item)
+ result.appendChild(node)
+ #TODO(bcwaldon): accomplish this without a type-check
+ elif type(data) is dict:
+ collections = metadata.get('dict_collections', {})
+ if nodename in collections:
+ metadata = collections[nodename]
+ for k, v in data.items():
+ node = doc.createElement(metadata['item_name'])
+ node.setAttribute(metadata['item_key'], str(k))
+ text = doc.createTextNode(str(v))
+ node.appendChild(text)
+ result.appendChild(node)
+ return result
+ attrs = metadata.get('attributes', {}).get(nodename, {})
+ for k, v in data.items():
+ if k in attrs:
+ result.setAttribute(k, str(v))
+ else:
+ node = self._to_xml_node(doc, metadata, k, v)
+ result.appendChild(node)
+ else:
+ # Type is atom
+ node = doc.createTextNode(str(data))
+ result.appendChild(node)
+ return result
+
+ def _create_link_nodes(self, xml_doc, links):
+ link_nodes = []
+ for link in links:
+ link_node = xml_doc.createElement('atom:link')
+ link_node.setAttribute('rel', link['rel'])
+ link_node.setAttribute('href', link['href'])
+ if 'type' in link:
+ link_node.setAttribute('type', link['type'])
+ link_nodes.append(link_node)
+ return link_nodes
+
+
+class ResponseHeadersSerializer(ActionDispatcher):
+ """Default response headers serialization"""
+
+ def serialize(self, response, data, action):
+ self.dispatch(response, data, action=action)
+
+ def default(self, response, data):
+ response.status_int = 200
+
+
+class ResponseSerializer(object):
+ """Encode the necessary pieces into a response object"""
+
+ def __init__(self, body_serializers=None, headers_serializer=None):
+ self.body_serializers = {
+ 'application/xml': XMLDictSerializer(),
+ 'application/json': JSONDictSerializer(),
+ }
+ self.body_serializers.update(body_serializers or {})
+
+ self.headers_serializer = (headers_serializer or
+ ResponseHeadersSerializer())
+
+ def serialize(self, response_data, content_type, action='default'):
+ """Serialize a dict into a string and wrap in a wsgi.Request object.
+
+ :param response_data: dict produced by the Controller
+ :param content_type: expected mimetype of serialized response body
+
+ """
+ response = webob.Response()
+ self.serialize_headers(response, response_data, action)
+ self.serialize_body(response, response_data, content_type, action)
+ return response
+
+ def serialize_headers(self, response, data, action):
+ self.headers_serializer.serialize(response, data, action)
+
+ def serialize_body(self, response, data, content_type, action):
+ response.headers['Content-Type'] = content_type
+ if data is not None:
+ serializer = self.get_body_serializer(content_type)
+ response.body = serializer.serialize(data, action)
+
+ def get_body_serializer(self, content_type):
+ try:
+ return self.body_serializers[content_type]
+ except (KeyError, TypeError):
+ raise exception.InvalidContentType(content_type=content_type)
+
+
+class RequestHeadersDeserializer(ActionDispatcher):
+ """Default request headers deserializer"""
+
+ def deserialize(self, request, action):
+ return self.dispatch(request, action=action)
+
+ def default(self, request):
+ return {}
+
+
+class RequestDeserializer(object):
+ """Break up a Request object into more useful pieces."""
+
+ def __init__(self, body_deserializers=None, headers_deserializer=None,
+ supported_content_types=None):
+
+ self.supported_content_types = supported_content_types
+
+ self.body_deserializers = {
+ 'application/xml': XMLDeserializer(),
+ 'application/json': JSONDeserializer(),
+ }
+ self.body_deserializers.update(body_deserializers or {})
+
+ self.headers_deserializer = (headers_deserializer or
+ RequestHeadersDeserializer())
+
+ def deserialize(self, request):
+ """Extract necessary pieces of the request.
+
+ :param request: Request object
+ :returns: tuple of (expected controller action name, dictionary of
+ keyword arguments to pass to the controller, the expected
+ content type of the response)
+
+ """
+ action_args = self.get_action_args(request.environ)
+ action = action_args.pop('action', None)
+
+ action_args.update(self.deserialize_headers(request, action))
+ action_args.update(self.deserialize_body(request, action))
+
+ accept = self.get_expected_content_type(request)
+
+ return (action, action_args, accept)
+
+ def deserialize_headers(self, request, action):
+ return self.headers_deserializer.deserialize(request, action)
+
+ def deserialize_body(self, request, action):
+ if not len(request.body) > 0:
+ LOG.debug(_("Empty body provided in request"))
+ return {}
+
+ try:
+ content_type = request.get_content_type()
+ except exception.InvalidContentType:
+ LOG.debug(_("Unrecognized Content-Type provided in request"))
+ raise
+
+ if content_type is None:
+ LOG.debug(_("No Content-Type provided in request"))
+ return {}
+
+ try:
+ deserializer = self.get_body_deserializer(content_type)
+ except exception.InvalidContentType:
+ LOG.debug(_("Unable to deserialize body as provided Content-Type"))
+ raise
+
+ return deserializer.deserialize(request.body, action)
+
+ def get_body_deserializer(self, content_type):
+ try:
+ return self.body_deserializers[content_type]
+ except (KeyError, TypeError):
+ raise exception.InvalidContentType(content_type=content_type)
+
+ def get_expected_content_type(self, request):
+ return request.best_match_content_type(self.supported_content_types)
+
+ def get_action_args(self, request_environment):
+ """Parse dictionary created by routes library."""
+ try:
+ args = request_environment['wsgiorg.routing_args'][1].copy()
+ except Exception:
+ return {}
+
+ try:
+ del args['controller']
+ except KeyError:
+ pass
+
+ try:
+ del args['format']
+ except KeyError:
+ pass
+
+ return args
+
+
+class TextDeserializer(ActionDispatcher):
+ """Default request body deserialization"""
+
+ def deserialize(self, datastring, action='default'):
+ return self.dispatch(datastring, action=action)
+
+ def default(self, datastring):
+ return {}
+
+
+class JSONDeserializer(TextDeserializer):
+
+ def _from_json(self, datastring):
+ try:
+ return jsonutils.loads(datastring)
+ except ValueError:
+ msg = _("cannot understand JSON")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def default(self, datastring):
+ return {'body': self._from_json(datastring)}
+
+
+class XMLDeserializer(TextDeserializer):
+
+ def __init__(self, metadata=None):
+ """
+ :param metadata: information needed to deserialize xml into
+ a dictionary.
+ """
+ super(XMLDeserializer, self).__init__()
+ self.metadata = metadata or {}
+
+ def _from_xml(self, datastring):
+ plurals = set(self.metadata.get('plurals', {}))
+
+ try:
+ node = xmlutils.safe_minidom_parse_string(datastring).childNodes[0]
+ return {node.nodeName: self._from_xml_node(node, plurals)}
+ except expat.ExpatError:
+ msg = _("cannot understand XML")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def _from_xml_node(self, node, listnames):
+ """Convert a minidom node to a simple Python type.
+
+ :param listnames: list of XML node names whose subnodes should
+ be considered list items.
+
+ """
+
+ if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
+ return node.childNodes[0].nodeValue
+ elif node.nodeName in listnames:
+ return [self._from_xml_node(n, listnames) for n in node.childNodes]
+ else:
+ result = dict()
+ for attr in node.attributes.keys():
+ result[attr] = node.attributes[attr].nodeValue
+ for child in node.childNodes:
+ if child.nodeType != node.TEXT_NODE:
+ result[child.nodeName] = self._from_xml_node(child,
+ listnames)
+ return result
+
+ def find_first_child_named(self, parent, name):
+ """Search a nodes children for the first child with a given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ return node
+ return None
+
+ def find_children_named(self, parent, name):
+ """Return all of a nodes children who have the given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ yield node
+
+ def extract_text(self, node):
+ """Get the text field contained by the given node"""
+ if len(node.childNodes) == 1:
+ child = node.childNodes[0]
+ if child.nodeType == child.TEXT_NODE:
+ return child.nodeValue
+ return ""
+
+ def default(self, datastring):
+ return {'body': self._from_xml(datastring)}
diff --git a/conductor/conductor/openstack/common/xmlutils.py b/conductor/conductor/openstack/common/xmlutils.py
new file mode 100644
index 0000000..ae7c077
--- /dev/null
+++ b/conductor/conductor/openstack/common/xmlutils.py
@@ -0,0 +1,74 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from xml.dom import minidom
+from xml.parsers import expat
+from xml import sax
+from xml.sax import expatreader
+
+
+class ProtectedExpatParser(expatreader.ExpatParser):
+ """An expat parser which disables DTD's and entities by default."""
+
+ def __init__(self, forbid_dtd=True, forbid_entities=True,
+ *args, **kwargs):
+ # Python 2.x old style class
+ expatreader.ExpatParser.__init__(self, *args, **kwargs)
+ self.forbid_dtd = forbid_dtd
+ self.forbid_entities = forbid_entities
+
+ def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
+ raise ValueError("Inline DTD forbidden")
+
+ def entity_decl(self, entityName, is_parameter_entity, value, base,
+ systemId, publicId, notationName):
+ raise ValueError(" entity declaration forbidden")
+
+ def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
+ # expat 1.2
+ raise ValueError(" unparsed entity forbidden")
+
+ def external_entity_ref(self, context, base, systemId, publicId):
+ raise ValueError(" external entity forbidden")
+
+ def notation_decl(self, name, base, sysid, pubid):
+ raise ValueError(" notation forbidden")
+
+ def reset(self):
+ expatreader.ExpatParser.reset(self)
+ if self.forbid_dtd:
+ self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
+ self._parser.EndDoctypeDeclHandler = None
+ if self.forbid_entities:
+ self._parser.EntityDeclHandler = self.entity_decl
+ self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
+ self._parser.ExternalEntityRefHandler = self.external_entity_ref
+ self._parser.NotationDeclHandler = self.notation_decl
+ try:
+ self._parser.SkippedEntityHandler = None
+ except AttributeError:
+ # some pyexpat versions do not support SkippedEntity
+ pass
+
+
+def safe_minidom_parse_string(xml_string):
+ """Parse an XML string using minidom safely.
+
+ """
+ try:
+ return minidom.parseString(xml_string, parser=ProtectedExpatParser())
+ except sax.SAXParseException:
+ raise expat.ExpatError()
diff --git a/conductor/conductor/rabbitmq.py b/conductor/conductor/rabbitmq.py
index d7c3351..6a6de5e 100644
--- a/conductor/conductor/rabbitmq.py
+++ b/conductor/conductor/rabbitmq.py
@@ -1,72 +1,125 @@
-import uuid
-import pika
-from pika.adapters import TornadoConnection
-import time
-
-try:
- import tornado.ioloop
-
- IOLoop = tornado.ioloop.IOLoop
-except ImportError:
- IOLoop = None
-
-
-class RabbitMqClient(object):
- def __init__(self, host='localhost', login='guest',
- password='guest', virtual_host='/'):
- credentials = pika.PlainCredentials(login, password)
- self._connection_parameters = pika.ConnectionParameters(
- credentials=credentials, host=host, virtual_host=virtual_host)
- self._subscriptions = {}
-
- def _create_connection(self):
- self.connection = TornadoConnection(
- parameters=self._connection_parameters,
- on_open_callback=self._on_connected)
-
- def _on_connected(self, connection):
- self._channel = connection.channel(self._on_channel_open)
-
- def _on_channel_open(self, channel):
- self._channel = channel
- if self._started_callback:
- self._started_callback()
-
- def _on_queue_declared(self, frame, queue, callback, ctag):
- def invoke_callback(ch, method_frame, header_frame, body):
- callback(body=body,
- message_id=header_frame.message_id or "")
-
- self._channel.basic_consume(invoke_callback, queue=queue,
- no_ack=True, consumer_tag=ctag)
-
- def subscribe(self, queue, callback):
- ctag = str(uuid.uuid4())
- self._subscriptions[queue] = ctag
-
- self._channel.queue_declare(
- queue=queue, durable=True,
- callback=lambda frame, ctag=ctag: self._on_queue_declared(
- frame, queue, callback, ctag))
-
- def unsubscribe(self, queue):
- self._channel.basic_cancel(consumer_tag=self._subscriptions[queue])
- del self._subscriptions[queue]
-
- def start(self, callback=None):
- if IOLoop is None: raise ImportError("Tornado not installed")
- self._started_callback = callback
- ioloop = IOLoop.instance()
- self.timeout_id = ioloop.add_timeout(time.time() + 0.1,
- self._create_connection)
-
- def send(self, queue, data, exchange="", message_id=""):
- properties = pika.BasicProperties(message_id=message_id)
- self._channel.queue_declare(
- queue=queue, durable=True,
- callback=lambda frame: self._channel.basic_publish(
- exchange=exchange, routing_key=queue,
- body=data, properties=properties))
-
-
-
+from eventlet import patcher
+puka = patcher.import_patched('puka')
+#import puka
+import anyjson
+import config
+
+class RmqClient(object):
+ def __init__(self):
+ settings = config.CONF.rabbitmq
+ self._client = puka.Client('amqp://{0}:{1}@{2}:{3}/{4}'.format(
+ settings.login,
+ settings.password,
+ settings.host,
+ settings.port,
+ settings.virtual_host
+ ))
+ self._connected = False
+
+ def __enter__(self):
+ self.connect()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+ def connect(self):
+ if not self._connected:
+ promise = self._client.connect()
+ self._client.wait(promise, timeout=10000)
+ self._connected = True
+
+ def close(self):
+ if self._connected:
+ self._client.close()
+ self._connected = False
+
+ def declare(self, queue, exchange=None):
+ promise = self._client.queue_declare(str(queue), durable=True)
+ self._client.wait(promise)
+
+ if exchange:
+ promise = self._client.exchange_declare(str(exchange), durable=True)
+ self._client.wait(promise)
+ promise = self._client.queue_bind(
+ str(queue), str(exchange), routing_key=str(queue))
+ self._client.wait(promise)
+
+ def send(self, message, key, exchange='', timeout=None):
+ if not self._connected:
+ raise RuntimeError('Not connected to RabbitMQ')
+
+ headers = { 'message_id': message.id }
+
+ promise = self._client.basic_publish(
+ exchange=str(exchange),
+ routing_key=str(key),
+ body=anyjson.dumps(message.body),
+ headers=headers)
+ self._client.wait(promise, timeout=timeout)
+
+ def open(self, queue):
+ if not self._connected:
+ raise RuntimeError('Not connected to RabbitMQ')
+
+ return Subscription(self._client, queue)
+
+class Subscription(object):
+ def __init__(self, client, queue):
+ self._client = client
+ self._queue = queue
+ self._promise = None
+ self._lastMessage = None
+
+ def __enter__(self):
+ self._promise = self._client.basic_consume(
+ queue=self._queue,
+ prefetch_count=1)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._ack_last()
+ promise = self._client.basic_cancel(self._promise)
+ self._client.wait(promise)
+ return False
+
+ def _ack_last(self):
+ if self._lastMessage:
+ self._client.basic_ack(self._lastMessage)
+ self._lastMessage = None
+
+ def get_message(self, timeout=None):
+ if not self._promise:
+ raise RuntimeError(
+ "Subscription object must be used within 'with' block")
+ self._ack_last()
+ self._lastMessage = self._client.wait(self._promise, timeout=timeout)
+ #print self._lastMessage
+ msg = Message()
+ msg.body = anyjson.loads(self._lastMessage['body'])
+ msg.id = self._lastMessage['headers'].get('message_id')
+ return msg
+
+
+class Message(object):
+ def __init__(self):
+ self._body = {}
+ self._id = ''
+
+ @property
+ def body(self):
+ return self._body
+
+ @body.setter
+ def body(self, value):
+ self._body = value
+
+ @property
+ def id(self):
+ return self._id
+
+ @id.setter
+ def id(self, value):
+ self._id = value or ''
+
diff --git a/conductor/conductor/reporting.py b/conductor/conductor/reporting.py
index 4dbef12..1f37b8c 100644
--- a/conductor/conductor/reporting.py
+++ b/conductor/conductor/reporting.py
@@ -1,22 +1,29 @@
import xml_code_engine
import json
-
+import rabbitmq
class Reporter(object):
def __init__(self, rmqclient, task_id, environment_id):
self._rmqclient = rmqclient
self._task_id = task_id
self._environment_id = environment_id
+ rmqclient.declare('task-reports')
def _report_func(self, id, entity, text, **kwargs):
- msg = json.dumps({
+ body = {
'id': id,
'entity': entity,
'text': text,
'environment_id': self._environment_id
- })
+ }
+
+ msg = rabbitmq.Message()
+ msg.body = body
+ msg.id = self._task_id
+
self._rmqclient.send(
- queue='task-reports', data=msg, message_id=self._task_id)
+ message=msg,
+ key='task-reports')
def _report_func(context, id, entity, text, **kwargs):
reporter = context['/reporter']
diff --git a/conductor/conductor/version.py b/conductor/conductor/version.py
new file mode 100644
index 0000000..736f240
--- /dev/null
+++ b/conductor/conductor/version.py
@@ -0,0 +1,20 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from conductor.openstack.common import version as common_version
+
+version_info = common_version.VersionInfo('conductor')
diff --git a/conductor/conductor/windows_agent.py b/conductor/conductor/windows_agent.py
index 287abb0..2e7761c 100644
--- a/conductor/conductor/windows_agent.py
+++ b/conductor/conductor/windows_agent.py
@@ -7,7 +7,8 @@ def send_command(engine, context, body, template, host, mappings=None,
command_dispatcher = context['/commandDispatcher']
def callback(result_value):
- print "Received result for %s: %s. Body is %s" % (template, result_value, body)
+ print "Received result for %s: %s. Body is %s" % \
+ (template, result_value, body)
if result is not None:
context[result] = result_value['Result']
@@ -16,10 +17,8 @@ def send_command(engine, context, body, template, host, mappings=None,
engine.evaluate_content(success_handler, context)
command_dispatcher.execute(name='agent',
- template=template,
- mappings=mappings,
- host=host,
- callback=callback)
+ template=template, mappings=mappings,
+ host=host, callback=callback)
xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command")
\ No newline at end of file
diff --git a/conductor/data/templates/agent-config/Default.template b/conductor/data/templates/agent-config/Default.template
index 54d9cb9..179bfb8 100644
--- a/conductor/data/templates/agent-config/Default.template
+++ b/conductor/data/templates/agent-config/Default.template
@@ -23,7 +23,7 @@
-
+
diff --git a/conductor/etc/app.config b/conductor/etc/app.config
deleted file mode 100644
index f69fe45..0000000
--- a/conductor/etc/app.config
+++ /dev/null
@@ -1,5 +0,0 @@
-[rabbitmq]
-host = localhost
-vhost = keero
-login = keero
-password = keero
\ No newline at end of file
diff --git a/conductor/etc/conductor-paste.ini b/conductor/etc/conductor-paste.ini
new file mode 100644
index 0000000..e69de29
diff --git a/conductor/etc/conductor.conf b/conductor/etc/conductor.conf
new file mode 100644
index 0000000..189eeed
--- /dev/null
+++ b/conductor/etc/conductor.conf
@@ -0,0 +1,10 @@
+[DEFAULT]
+log_file = logs/conductor.log
+
+
+[rabbitmq]
+host = localhost
+port = 5672
+virtual_host = keero
+login = keero
+password = keero
\ No newline at end of file
diff --git a/conductor/logs/.gitignore b/conductor/logs/.gitignore
new file mode 100644
index 0000000..44c5ea8
--- /dev/null
+++ b/conductor/logs/.gitignore
@@ -0,0 +1,4 @@
+# Ignore everything in this directory
+*
+# Except this file
+!.gitignore
\ No newline at end of file
diff --git a/conductor/openstack-common.conf b/conductor/openstack-common.conf
new file mode 100644
index 0000000..0437737
--- /dev/null
+++ b/conductor/openstack-common.conf
@@ -0,0 +1,7 @@
+[DEFAULT]
+
+# The list of modules to copy from openstack-common
+modules=setup,wsgi,config,exception,gettextutils,importutils,jsonutils,log,xmlutils,sslutils,service,notifier,local,install_venv_common,version,timeutils,eventlet_backdoor,threadgroup,loopingcall,uuidutils
+
+# The base module to hold the copy of openstack.common
+base=conductor
\ No newline at end of file
diff --git a/conductor/tools/pip-requires b/conductor/tools/pip-requires
index a7bcbfe..816ab98 100644
--- a/conductor/tools/pip-requires
+++ b/conductor/tools/pip-requires
@@ -1,3 +1,9 @@
-pika
-tornado
-jsonpath
\ No newline at end of file
+anyjson
+eventlet>=0.9.12
+jsonpath
+puka
+Paste
+PasteDeploy
+iso8601>=0.1.4
+
+http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config