diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py index 2ae685c..7863db2 100644 --- a/nova/api/ec2/__init__.py +++ b/nova/api/ec2/__init__.py @@ -42,6 +42,7 @@ from nova.openstack.common import timeutils from nova import utils from nova import wsgi +import tomograph LOG = logging.getLogger(__name__) @@ -95,6 +96,7 @@ class FaultWrapper(wsgi.Middleware): """Calls the middleware stack, captures any exceptions into faults.""" @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('FaultWrapper', 'middleware') def __call__(self, req): try: return req.get_response(self.application) @@ -107,6 +109,7 @@ class RequestLogging(wsgi.Middleware): """Access-Log akin logging for all EC2 API requests.""" @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('RequestLogging', 'middleware') def __call__(self, req): start = timeutils.utcnow() rv = req.get_response(self.application) @@ -169,6 +172,7 @@ class Lockout(wsgi.Middleware): super(Lockout, self).__init__(application) @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('Lockout', 'middleware') def __call__(self, req): access_key = str(req.params['AWSAccessKeyId']) failures_key = "authfailures-%s" % access_key @@ -197,6 +201,7 @@ class EC2KeystoneAuth(wsgi.Middleware): """Authenticate an EC2 request with keystone and convert to context.""" @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('EC2KeystoneAuth', 'middleware') def __call__(self, req): request_id = context.generate_request_id() signature = req.params.get('Signature') @@ -225,8 +230,11 @@ class EC2KeystoneAuth(wsgi.Middleware): creds = {'ec2Credentials': cred_dict} else: creds = {'auth': {'OS-KSEC2:ec2Credentials': cred_dict}} + creds_json = jsonutils.dumps(creds) + headers = {'Content-Type': 'application/json'} + tomograph.add_trace_info_header(headers) o = urlparse.urlparse(FLAGS.keystone_ec2_url) if o.scheme == "http": @@ -282,6 +290,7 @@ class NoAuth(wsgi.Middleware): """Add user:project as 'nova.context' to WSGI environ.""" @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('NoAuth', 'middleware') def __call__(self, req): if 'AWSAccessKeyId' not in req.params: raise webob.exc.HTTPBadRequest() @@ -306,6 +315,7 @@ class Requestify(wsgi.Middleware): self.controller = importutils.import_object(controller) @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('Requestify', 'middleware') def __call__(self, req): non_args = ['Action', 'Signature', 'AWSAccessKeyId', 'SignatureMethod', 'SignatureVersion', 'Version', 'Timestamp'] @@ -394,6 +404,7 @@ class Authorizer(wsgi.Middleware): } @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('Authorizer', 'middleware') def __call__(self, req): context = req.environ['nova.context'] controller = req.environ['ec2.request'].controller.__class__.__name__ @@ -448,6 +459,7 @@ class Validator(wsgi.Middleware): super(Validator, self).__init__(application) @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('Validator', 'middleware') def __call__(self, req): if validator.validate(req.environ['ec2.request'].args, validator.DEFAULT_VALIDATOR): @@ -466,6 +478,7 @@ class Executor(wsgi.Application): """ @webob.dec.wsgify(RequestClass=wsgi.Request) + @tomograph.traced('Executor', 'application') def __call__(self, req): context = req.environ['nova.context'] request_id = context.request_id diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index 6afb05a..e2fc7f8 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -43,6 +43,7 @@ from nova import quota from nova import utils from nova import volume +import tomograph FLAGS = flags.FLAGS diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py index ac7021f..f391eb8 100644 --- a/nova/api/openstack/__init__.py +++ b/nova/api/openstack/__init__.py @@ -23,6 +23,7 @@ WSGI middleware for OpenStack API controllers. import routes import webob.dec import webob.exc +import tomograph from nova.api.openstack import wsgi from nova.openstack.common import log as logging diff --git a/nova/api/openstack/wsgi.py b/nova/api/openstack/wsgi.py index e440889..f4554fa 100644 --- a/nova/api/openstack/wsgi.py +++ b/nova/api/openstack/wsgi.py @@ -30,6 +30,8 @@ from nova.openstack.common import log as logging from nova import utils from nova import wsgi +import tomograph + XMLNS_V10 = 'http://docs.rackspacecloud.com/servers/api/v1.0' XMLNS_V11 = 'http://docs.openstack.org/compute/api/v1.1' diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index 6869672..e1052be 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -102,7 +102,8 @@ def destroy(name): LOG.exception(_('Instance type %s not found for deletion') % name) raise exception.InstanceTypeNotFoundByName(instance_type_name=name) - +import tomograph +@tomograph.traced('get_all_types', 'foo') def get_all_types(ctxt=None, inactive=False, filters=None): """Get all non-deleted instance_types. diff --git a/nova/compute/manager.py b/nova/compute/manager.py index bbb71dd..9c92cce 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -2650,6 +2650,13 @@ class ComputeManager(manager.SchedulerDependentManager): usage['bw_in'], usage['bw_out'], last_refreshed=refreshed) + @manager.periodic_task(fast_task=True) + def _driver_metrics(self, context): + capabilities = self.driver.get_host_stats(refresh=True) + capabilities['host_ip'] = FLAGS.my_ip + capabilities['num_instances'] = self.driver.get_num_instances() + LOG.audit("driver_metrics", extra=capabilities) + @manager.periodic_task def _report_driver_status(self, context): curr_time = time.time() diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index afec290..744a9c1 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -24,7 +24,7 @@ from nova.openstack.common import jsonutils from nova.openstack.common import rpc import nova.openstack.common.rpc.proxy - +import tomograph FLAGS = flags.FLAGS @@ -501,8 +501,9 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): def terminate_instance(self, ctxt, instance): instance_p = jsonutils.to_primitive(instance) - self.cast(ctxt, self.make_msg('terminate_instance', - instance=instance_p), + msg = self.make_msg('terminate_instance', instance=instance_p) + msg['trace_info'] = tomograph.get_trace_info() + self.cast(ctxt, msg, topic=_compute_topic(self.topic, ctxt, None, instance)) def unpause_instance(self, ctxt, instance): diff --git a/nova/db/sqlalchemy/session.py b/nova/db/sqlalchemy/session.py index 6e754be..92ea58b 100644 --- a/nova/db/sqlalchemy/session.py +++ b/nova/db/sqlalchemy/session.py @@ -25,6 +25,7 @@ from sqlalchemy.exc import DisconnectionError, OperationalError import sqlalchemy.interfaces import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool +import tomograph import nova.exception import nova.flags as flags @@ -122,6 +123,8 @@ def get_engine(): _ENGINE = sqlalchemy.create_engine(FLAGS.sql_connection, **engine_args) + sqlalchemy.event.listen(_ENGINE, 'before_execute', tomograph.before_execute('nova')) + sqlalchemy.event.listen(_ENGINE, 'after_execute', tomograph.after_execute('nova')) if 'mysql' in connection_dict.drivername: sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener) elif "sqlite" in connection_dict.drivername: @@ -158,6 +161,7 @@ def get_engine(): if (remaining != 'infinite' and remaining == 0) or \ not is_db_connection_error(e.args[0]): raise + return _ENGINE @@ -207,3 +211,5 @@ def debug_mysql_do_query(): # return the new _do_query method return _do_query + + diff --git a/nova/image/s3.py b/nova/image/s3.py index 80f9448..1597864 100644 --- a/nova/image/s3.py +++ b/nova/image/s3.py @@ -38,6 +38,7 @@ from nova.openstack.common import cfg from nova.openstack.common import log as logging from nova import utils +#import tomograph LOG = logging.getLogger(__name__) @@ -72,6 +73,7 @@ class S3ImageService(object): self.service = service or glance.get_default_image_service() self.service.__init__(*args, **kwargs) + #@tomograph.traced('uuidxlate1', 'foo') def _translate_uuids_to_ids(self, context, images): return [self._translate_uuid_to_id(context, img) for img in images] @@ -136,6 +138,7 @@ class S3ImageService(object): image = self.service.update(context, image_uuid, metadata, data) return self._translate_uuid_to_id(context, image) + #@tomograph.traced('s3details', 'foo') def detail(self, context, **kwargs): #NOTE(bcwaldon): sort asc to make sure we assign lower ids # to older images diff --git a/nova/manager.py b/nova/manager.py index 275d98b..9a5efb1 100644 --- a/nova/manager.py +++ b/nova/manager.py @@ -84,6 +84,7 @@ def periodic_task(*args, **kwargs): def decorator(f): f._periodic_task = True f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0) + f._fast_task = kwargs.pop('fast_task', False) return f # NOTE(sirp): The `if` is necessary to allow the decorator to be used with @@ -115,8 +116,10 @@ class ManagerMeta(type): # parent's toes. try: cls._periodic_tasks = cls._periodic_tasks[:] + cls._fast_tasks = cls._fast_tasks[:] except AttributeError: cls._periodic_tasks = [] + cls._fast_tasks = [] try: cls._ticks_to_skip = cls._ticks_to_skip.copy() @@ -127,8 +130,11 @@ class ManagerMeta(type): if getattr(value, '_periodic_task', False): task = value name = task.__name__ - cls._periodic_tasks.append((name, task)) - cls._ticks_to_skip[name] = task._ticks_between_runs + if getattr(value, '_fast_task', False): + cls._fast_tasks.append((name, task)) + else: + cls._periodic_tasks.append((name, task)) + cls._ticks_to_skip[name] = task._ticks_between_runs class Manager(base.Base): @@ -156,6 +162,16 @@ class Manager(base.Base): ''' return rpc_dispatcher.RpcDispatcher([self]) + def fast_tasks(self, context): + for task_name, task in self._fast_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + #LOG.debug(_("Running fast task %(full_task_name)s"), locals()) + try: + task(self, context) + except Exception as e: + LOG.exception(_("Error during %(full_task_name)s: %(e)s"), + locals()) + def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" for task_name, task in self._periodic_tasks: diff --git a/nova/network/manager.py b/nova/network/manager.py index 6a51f05..d59a883 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -1,5 +1,5 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 - +# $Id$ # Copyright (c) 2011 X.commerce, a business unit of eBay Inc. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index 0f82c47..d08c154 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -27,7 +27,7 @@ For some wrappers that add message versioning to rpc, see: from nova.openstack.common import cfg from nova.openstack.common import importutils - +import tomograph rpc_opts = [ cfg.StrOpt('rpc_backend', @@ -105,6 +105,7 @@ def call(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().call(cfg.CONF, context, topic, msg, timeout) @@ -123,6 +124,10 @@ def cast(context, topic, msg): :returns: None """ + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass return _get_impl().cast(cfg.CONF, context, topic, msg) @@ -144,6 +149,10 @@ def fanout_cast(context, topic, msg): :returns: None """ + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) @@ -174,6 +183,7 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) @@ -215,6 +225,10 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, msg) @@ -231,6 +245,10 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, topic, msg) diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index a884084..9fcff67 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -40,6 +40,8 @@ from nova.openstack.common.gettextutils import _ from nova.openstack.common import local from nova.openstack.common.rpc import common as rpc_common +import socket +import tomograph LOG = logging.getLogger(__name__) @@ -255,14 +257,15 @@ class ProxyCallback(object): method = message_data.get('method') args = message_data.get('args', {}) version = message_data.get('version', None) + trace_info = message_data.get('trace_info') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, trace_info, method, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, trace_info, method, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -271,6 +274,10 @@ class ProxyCallback(object): the old behavior of magically calling the specified method on the proxy we have here. """ + moo = method + if isinstance(moo, unicode): + moo = moo.encode('ascii', 'ignore') + tomograph.start("rpc" + moo, 'proxy', socket.gethostname(), 1000, trace_info) ctxt.update_store() try: rval = self.proxy.dispatch(ctxt, version, method, **args) @@ -286,6 +293,7 @@ class ProxyCallback(object): LOG.exception('Exception during message handling') ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + tomograph.stop('proxy') class MulticallWaiter(object): @@ -301,6 +309,7 @@ class MulticallWaiter(object): def done(self): if self._done: return + #tomograph.stop('rpc') self._done = True self._iterator.close() self._iterator = None diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index a077552..b0f48e1 100644 --- a/nova/openstack/common/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -21,6 +21,8 @@ For more information about rpc API version numbers, see: rpc/dispatcher.py """ +import socket +import tomograph from nova.openstack.common import rpc @@ -77,6 +79,7 @@ class RpcProxy(object): :returns: The return value from the remote method. """ self._set_version(msg, version) + msg['trace_info'] = tomograph.get_trace_info() return rpc.call(context, self._get_topic(topic), msg, timeout) def multicall(self, context, msg, topic=None, version=None, timeout=None): @@ -95,6 +98,7 @@ class RpcProxy(object): from the remote method as they arrive. """ self._set_version(msg, version) + msg['trace_info'] = tomograph.get_trace_info() return rpc.multicall(context, self._get_topic(topic), msg, timeout) def cast(self, context, msg, topic=None, version=None): @@ -110,6 +114,10 @@ class RpcProxy(object): remote method. """ self._set_version(msg, version) + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass rpc.cast(context, self._get_topic(topic), msg) def fanout_cast(self, context, msg, topic=None, version=None): @@ -125,6 +133,10 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, @@ -143,6 +155,10 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) def fanout_cast_to_server(self, context, server_params, msg, topic=None, @@ -161,5 +177,9 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) + try: + msg['trace_info'] = tomograph.get_trace_info() + except: + pass rpc.fanout_cast_to_server(context, server_params, self._get_topic(topic), msg) diff --git a/nova/service.py b/nova/service.py index 6f350c6..9ff1815 100644 --- a/nova/service.py +++ b/nova/service.py @@ -24,11 +24,13 @@ import inspect import os import random import signal +import socket import sys import time import eventlet import greenlet +import tomograph from nova.common import eventlet_backdoor from nova import context @@ -443,6 +445,10 @@ class Service(object): initial_delay=initial_delay) self.timers.append(periodic) + fast_periodic = utils.LoopingCall(self.fast_tasks) + fast_periodic.start(interval=5) + self.timers.append(fast_periodic) + def _create_service_ref(self, context): zone = FLAGS.node_availability_zone service_ref = db.service_create(context, @@ -527,6 +533,10 @@ class Service(object): ctxt = context.get_admin_context() self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + def fast_tasks(self): + ctxt = context.get_admin_context() + self.manager.fast_tasks(ctxt) + def report_state(self): """Update the state of this service in the datastore.""" ctxt = context.get_admin_context() @@ -572,11 +582,12 @@ class WSGIService(object): """ self.name = name + LOG.error('wsgi server named {0} coming online...'.format(name)) self.manager = self._get_manager() self.loader = loader or wsgi.Loader() - self.app = self.loader.load_app(name) self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") self.port = getattr(FLAGS, '%s_listen_port' % name, 0) + self.app = tomograph.tracewrap(self.loader.load_app(name), self.name, "WSGI", self.host, self.port) self.workers = getattr(FLAGS, '%s_workers' % name, None) self.server = wsgi.Server(name, self.app, diff --git a/nova/wsgi.py b/nova/wsgi.py index afb5303..07a3b37 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -34,6 +34,7 @@ from nova import exception from nova import flags from nova.openstack.common import log as logging +import tomograph FLAGS = flags.FLAGS LOG = logging.getLogger(__name__) @@ -238,6 +239,7 @@ class Middleware(Application): """Do whatever you'd like to the response.""" return response + @tomograph.traced(None, 'middleware') @webob.dec.wsgify(RequestClass=Request) def __call__(self, req): response = self.process_request(req) @@ -373,3 +375,4 @@ class Loader(object): except LookupError as err: LOG.error(err) raise exception.PasteAppNotFound(name=name, path=self.config_path) +