Update common
This commit is contained in:
parent
958a7c481f
commit
168ecf0337
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -37,9 +37,9 @@ class RequestContext(object):
|
||||
accesses the system, as well as additional request information.
|
||||
"""
|
||||
|
||||
def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
|
||||
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
|
||||
read_only=False, show_deleted=False, request_id=None):
|
||||
self.auth_tok = auth_tok
|
||||
self.auth_token = auth_token
|
||||
self.user = user
|
||||
self.tenant = tenant
|
||||
self.is_admin = is_admin
|
||||
@ -55,7 +55,7 @@ class RequestContext(object):
|
||||
'is_admin': self.is_admin,
|
||||
'read_only': self.read_only,
|
||||
'show_deleted': self.show_deleted,
|
||||
'auth_token': self.auth_tok,
|
||||
'auth_token': self.auth_token,
|
||||
'request_id': self.request_id}
|
||||
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 Openstack, LLC.
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2012, Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -26,7 +26,7 @@ Usual usage in an openstack.common module:
|
||||
import gettext
|
||||
|
||||
|
||||
t = gettext.translation('openstack-common', 'locale', fallback=True)
|
||||
t = gettext.translation('billingstack', 'locale', fallback=True)
|
||||
|
||||
|
||||
def _(msg):
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -54,7 +54,7 @@ class BaseParser(object):
|
||||
|
||||
value = value.strip()
|
||||
if ((value and value[0] == value[-1]) and
|
||||
(value[0] == "\"" or value[0] == "'")):
|
||||
(value[0] == "\"" or value[0] == "'")):
|
||||
value = value[1:-1]
|
||||
return key.strip(), [value]
|
||||
|
||||
|
@ -38,14 +38,10 @@ import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import xmlrpclib
|
||||
|
||||
from billingstack.openstack.common.gettextutils import _
|
||||
from billingstack.openstack.common import timeutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
level=0, max_depth=3):
|
||||
@ -85,8 +81,6 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||
return 'mock'
|
||||
|
||||
if level > max_depth:
|
||||
LOG.error(_('Max serialization depth exceeded on object: %d %s'),
|
||||
level, value)
|
||||
return '?'
|
||||
|
||||
# The try block may not be necessary after the class check above,
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -207,7 +207,6 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
|
||||
local_lock_path = tempfile.mkdtemp()
|
||||
|
||||
if not os.path.exists(local_lock_path):
|
||||
cleanup_dir = True
|
||||
fileutils.ensure_tree(local_lock_path)
|
||||
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -29,6 +29,7 @@ It also allows setting of formatting information through conf.
|
||||
|
||||
"""
|
||||
|
||||
import ConfigParser
|
||||
import cStringIO
|
||||
import inspect
|
||||
import itertools
|
||||
@ -87,11 +88,11 @@ logging_cli_opts = [
|
||||
metavar='PATH',
|
||||
deprecated_name='logfile',
|
||||
help='(Optional) Name of log file to output to. '
|
||||
'If not set, logging will go to stdout.'),
|
||||
'If no default is set, logging will go to stdout.'),
|
||||
cfg.StrOpt('log-dir',
|
||||
deprecated_name='logdir',
|
||||
help='(Optional) The directory to keep log files in '
|
||||
'(will be prepended to --log-file)'),
|
||||
help='(Optional) The base directory used for relative '
|
||||
'--log-file paths'),
|
||||
cfg.BoolOpt('use-syslog',
|
||||
default=False,
|
||||
help='Use syslog for logging.'),
|
||||
@ -111,9 +112,9 @@ generic_log_opts = [
|
||||
|
||||
log_opts = [
|
||||
cfg.StrOpt('logging_context_format_string',
|
||||
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
|
||||
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
|
||||
'%(message)s',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
|
||||
'%(instance)s%(message)s',
|
||||
help='format string to use for log messages with context'),
|
||||
cfg.StrOpt('logging_default_format_string',
|
||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||
@ -323,12 +324,32 @@ def _create_logging_excepthook(product_name):
|
||||
return logging_excepthook
|
||||
|
||||
|
||||
class LogConfigError(Exception):
|
||||
|
||||
message = _('Error loading logging config %(log_config)s: %(err_msg)s')
|
||||
|
||||
def __init__(self, log_config, err_msg):
|
||||
self.log_config = log_config
|
||||
self.err_msg = err_msg
|
||||
|
||||
def __str__(self):
|
||||
return self.message % dict(log_config=self.log_config,
|
||||
err_msg=self.err_msg)
|
||||
|
||||
|
||||
def _load_log_config(log_config):
|
||||
try:
|
||||
logging.config.fileConfig(log_config)
|
||||
except ConfigParser.Error, exc:
|
||||
raise LogConfigError(log_config, str(exc))
|
||||
|
||||
|
||||
def setup(product_name):
|
||||
"""Setup logging."""
|
||||
if CONF.log_config:
|
||||
logging.config.fileConfig(CONF.log_config)
|
||||
_load_log_config(CONF.log_config)
|
||||
else:
|
||||
_setup_logging_from_conf(product_name)
|
||||
_setup_logging_from_conf()
|
||||
sys.excepthook = _create_logging_excepthook(product_name)
|
||||
|
||||
|
||||
@ -362,8 +383,8 @@ def _find_facility_from_conf():
|
||||
return facility
|
||||
|
||||
|
||||
def _setup_logging_from_conf(product_name):
|
||||
log_root = getLogger(product_name).logger
|
||||
def _setup_logging_from_conf():
|
||||
log_root = getLogger(None).logger
|
||||
for handler in log_root.handlers:
|
||||
log_root.removeHandler(handler)
|
||||
|
||||
@ -401,7 +422,8 @@ def _setup_logging_from_conf(product_name):
|
||||
if CONF.log_format:
|
||||
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
|
||||
datefmt=datefmt))
|
||||
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
|
||||
else:
|
||||
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
|
||||
|
||||
if CONF.debug:
|
||||
log_root.setLevel(logging.DEBUG)
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack LLC.
|
||||
# Copyright 2012 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
|
||||
notifier_opts = [
|
||||
cfg.MultiStrOpt('notification_driver',
|
||||
default=[],
|
||||
deprecated_name='list_notifier_drivers',
|
||||
help='Driver or drivers to handle sending notifications'),
|
||||
cfg.StrOpt('default_notification_level',
|
||||
default='INFO',
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -14,16 +14,33 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
from billingstack.openstack.common import cfg
|
||||
from billingstack.openstack.common import context as req_context
|
||||
from billingstack.openstack.common.gettextutils import _
|
||||
from billingstack.openstack.common import log as logging
|
||||
from billingstack.openstack.common.notifier import rpc_notifier
|
||||
from billingstack.openstack.common import rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
notification_topic_opt = cfg.ListOpt(
|
||||
'notification_topics', default=['notifications', ],
|
||||
help='AMQP topic used for openstack notifications')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(notification_topic_opt)
|
||||
|
||||
|
||||
def notify(context, message):
|
||||
"""Deprecated in Grizzly. Please use rpc_notifier instead."""
|
||||
|
||||
LOG.deprecated(_("The rabbit_notifier is now deprecated."
|
||||
" Please use rpc_notifier instead."))
|
||||
rpc_notifier.notify(context, message)
|
||||
"""Sends a notification to the RabbitMQ"""
|
||||
if not context:
|
||||
context = req_context.get_admin_context()
|
||||
priority = message.get('priority',
|
||||
CONF.default_notification_level)
|
||||
priority = priority.lower()
|
||||
for topic in CONF.notification_topics:
|
||||
topic = '%s.%s' % (topic, priority)
|
||||
try:
|
||||
rpc.notify(context, topic, message)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s"), locals())
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -446,9 +446,11 @@ class ProxyCallback(_ThreadPoolWithWait):
|
||||
connection_pool=self.connection_pool,
|
||||
log_failure=False)
|
||||
except Exception:
|
||||
LOG.exception(_('Exception during message handling'))
|
||||
ctxt.reply(None, sys.exc_info(),
|
||||
connection_pool=self.connection_pool)
|
||||
# sys.exc_info() is deleted by LOG.exception().
|
||||
exc_info = sys.exc_info()
|
||||
LOG.error(_('Exception during message handling'),
|
||||
exc_info=exc_info)
|
||||
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
|
||||
|
||||
|
||||
class MulticallProxyWaiter(object):
|
||||
@ -496,7 +498,6 @@ class MulticallProxyWaiter(object):
|
||||
data = self._dataqueue.get(timeout=self._timeout)
|
||||
result = self._process_data(data)
|
||||
except queue.Empty:
|
||||
LOG.exception(_('Timed out waiting for RPC response.'))
|
||||
self.done()
|
||||
raise rpc_common.Timeout()
|
||||
except Exception:
|
||||
@ -663,7 +664,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
|
||||
pack_context(msg, context)
|
||||
with ConnectionContext(conf, connection_pool) as conn:
|
||||
if envelope:
|
||||
msg = rpc_common.serialize_msg(msg, force_envelope=True)
|
||||
msg = rpc_common.serialize_msg(msg)
|
||||
conn.notify_send(topic, msg)
|
||||
|
||||
|
||||
|
@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version'
|
||||
_MESSAGE_KEY = 'oslo.message'
|
||||
|
||||
|
||||
# TODO(russellb) Turn this on after Grizzly.
|
||||
_SEND_RPC_ENVELOPE = False
|
||||
|
||||
|
||||
class RPCException(Exception):
|
||||
message = _("An unknown RPC related exception occurred.")
|
||||
|
||||
@ -122,7 +118,25 @@ class Timeout(RPCException):
|
||||
This exception is raised if the rpc_response_timeout is reached while
|
||||
waiting for a response from the remote side.
|
||||
"""
|
||||
message = _("Timeout while waiting on RPC response.")
|
||||
message = _('Timeout while waiting on RPC response - '
|
||||
'topic: "%(topic)s", RPC method: "%(method)s" '
|
||||
'info: "%(info)s"')
|
||||
|
||||
def __init__(self, info=None, topic=None, method=None):
|
||||
"""
|
||||
:param info: Extra info to convey to the user
|
||||
:param topic: The topic that the rpc call was sent to
|
||||
:param rpc_method_name: The name of the rpc method being
|
||||
called
|
||||
"""
|
||||
self.info = info
|
||||
self.topic = topic
|
||||
self.method = method
|
||||
super(Timeout, self).__init__(
|
||||
None,
|
||||
info=info or _('<unknown>'),
|
||||
topic=topic or _('<unknown>'),
|
||||
method=method or _('<unknown>'))
|
||||
|
||||
|
||||
class DuplicateMessageError(RPCException):
|
||||
@ -237,9 +251,7 @@ class Connection(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
def consume_in_thread_group(self, thread_group):
|
||||
"""
|
||||
Spawn a thread to handle incoming messages in the supplied
|
||||
ThreadGroup.
|
||||
"""Spawn a thread to handle incoming messages in the supplied ThreadGroup.
|
||||
|
||||
Spawn a thread that will be responsible for handling all incoming
|
||||
messages for consumers that were set up on this connection.
|
||||
@ -456,10 +468,7 @@ def version_is_compatible(imp_version, version):
|
||||
return True
|
||||
|
||||
|
||||
def serialize_msg(raw_msg, force_envelope=False):
|
||||
if not _SEND_RPC_ENVELOPE and not force_envelope:
|
||||
return raw_msg
|
||||
|
||||
def serialize_msg(raw_msg):
|
||||
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
|
||||
# information about this format.
|
||||
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
|
||||
|
@ -126,13 +126,11 @@ class RpcDispatcher(object):
|
||||
rpc_api_version = '1.0'
|
||||
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
|
||||
version)
|
||||
|
||||
had_compatible = had_compatible or is_compatible
|
||||
func = getattr(proxyobj, method)
|
||||
if not func:
|
||||
if not hasattr(proxyobj, method):
|
||||
continue
|
||||
if is_compatible:
|
||||
return func(ctxt, **kwargs)
|
||||
return getattr(proxyobj, method)(ctxt, **kwargs)
|
||||
|
||||
if had_compatible:
|
||||
raise AttributeError("No such RPC function '%s'" % method)
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -624,8 +624,8 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, socket.timeout):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
# Copyright 2011 - 2012, Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -40,8 +40,8 @@ qpid_opts = [
|
||||
cfg.StrOpt('qpid_hostname',
|
||||
default='localhost',
|
||||
help='Qpid broker hostname'),
|
||||
cfg.StrOpt('qpid_port',
|
||||
default='5672',
|
||||
cfg.IntOpt('qpid_port',
|
||||
default=5672,
|
||||
help='Qpid broker port'),
|
||||
cfg.ListOpt('qpid_hosts',
|
||||
default=['$qpid_hostname:$qpid_port'],
|
||||
@ -320,7 +320,7 @@ class Connection(object):
|
||||
# Reconnection is done by self.reconnect()
|
||||
self.connection.reconnect = False
|
||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.protocol = self.conf.qpid_protocol
|
||||
self.connection.transport = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
|
||||
def _register_consumer(self, consumer):
|
||||
@ -415,8 +415,8 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, qpid_exceptions.Empty):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import os
|
||||
import pprint
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import types
|
||||
@ -25,6 +26,7 @@ import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from billingstack.openstack.common import excutils
|
||||
from billingstack.openstack.common.gettextutils import _
|
||||
from billingstack.openstack.common import importutils
|
||||
from billingstack.openstack.common import jsonutils
|
||||
@ -91,8 +93,8 @@ def _serialize(data):
|
||||
try:
|
||||
return jsonutils.dumps(data, ensure_ascii=True)
|
||||
except TypeError:
|
||||
LOG.error(_("JSON serialization failed."))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("JSON serialization failed."))
|
||||
|
||||
|
||||
def _deserialize(data):
|
||||
@ -219,7 +221,7 @@ class ZmqClient(object):
|
||||
def cast(self, msg_id, topic, data, envelope=False):
|
||||
msg_id = msg_id or 0
|
||||
|
||||
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
|
||||
if not envelope:
|
||||
self.outq.send(map(bytes,
|
||||
(msg_id, topic, 'cast', _serialize(data))))
|
||||
return
|
||||
@ -293,11 +295,16 @@ class InternalContext(object):
|
||||
def reply(self, ctx, proxy,
|
||||
msg_id=None, context=None, topic=None, msg=None):
|
||||
"""Reply to a casted call."""
|
||||
# Our real method is curried into msg['args']
|
||||
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
|
||||
# this may be able to be removed earlier than
|
||||
# 'I' if ConsumerBase.process were refactored.
|
||||
if type(msg) is list:
|
||||
payload = msg[-1]
|
||||
else:
|
||||
payload = msg
|
||||
|
||||
child_ctx = RpcContext.unmarshal(msg[0])
|
||||
response = ConsumerBase.normalize_reply(
|
||||
self._get_response(child_ctx, proxy, topic, msg[1]),
|
||||
self._get_response(ctx, proxy, topic, payload),
|
||||
ctx.replies)
|
||||
|
||||
LOG.debug(_("Sending reply"))
|
||||
@ -437,6 +444,8 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ZmqProxy, self).__init__(conf)
|
||||
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
|
||||
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
|
||||
|
||||
self.topic_proxy = {}
|
||||
|
||||
@ -462,6 +471,13 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
LOG.info(_("Creating proxy for topic: %s"), topic)
|
||||
|
||||
try:
|
||||
# The topic is received over the network,
|
||||
# don't trust this input.
|
||||
if self.badchars.search(topic) is not None:
|
||||
emsg = _("Topic contained dangerous characters.")
|
||||
LOG.warn(emsg)
|
||||
raise RPCException(emsg)
|
||||
|
||||
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
|
||||
(ipc_dir, topic),
|
||||
sock_type, bind=True)
|
||||
@ -518,9 +534,9 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
ipc_dir, run_as_root=True)
|
||||
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||
except utils.ProcessExecutionError:
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
@ -528,9 +544,9 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
except zmq.ZMQError:
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
|
||||
super(ZmqProxy, self).consume_in_thread()
|
||||
|
||||
@ -592,9 +608,6 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
|
||||
self.pool.spawn_n(self.process, proxy, ctx, request)
|
||||
|
||||
def consume_in_thread_group(self, thread_group):
|
||||
self.reactor.consume_in_thread_group(thread_group)
|
||||
|
||||
|
||||
class Connection(rpc_common.Connection):
|
||||
"""Manages connections and threads."""
|
||||
@ -647,6 +660,9 @@ class Connection(rpc_common.Connection):
|
||||
_get_matchmaker().start_heartbeat()
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
def consume_in_thread_group(self, thread_group):
|
||||
self.reactor.consume_in_thread_group(thread_group)
|
||||
|
||||
|
||||
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||
_msg_id=None):
|
||||
@ -684,8 +700,8 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
'method': '-reply',
|
||||
'args': {
|
||||
'msg_id': msg_id,
|
||||
'context': mcontext,
|
||||
'topic': reply_topic,
|
||||
# TODO(ewindisch): safe to remove mcontext in I.
|
||||
'msg': [mcontext, msg]
|
||||
}
|
||||
}
|
||||
@ -760,7 +776,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
LOG.warn(_("No matchmaker results. Not casting."))
|
||||
# While not strictly a timeout, callers know how to handle
|
||||
# this exception and a timeout isn't too big a lie.
|
||||
raise rpc_common.Timeout, "No match from matchmaker."
|
||||
raise rpc_common.Timeout(_("No match from matchmaker."))
|
||||
|
||||
# This supports brokerless fanout (addresses > 1)
|
||||
for queue in queues:
|
||||
|
@ -35,10 +35,10 @@ matchmaker_opts = [
|
||||
default='/etc/nova/matchmaker_ring.json',
|
||||
help='Matchmaker ring file (JSON)'),
|
||||
cfg.IntOpt('matchmaker_heartbeat_freq',
|
||||
default='300',
|
||||
default=300,
|
||||
help='Heartbeat frequency'),
|
||||
cfg.IntOpt('matchmaker_heartbeat_ttl',
|
||||
default='600',
|
||||
default=600,
|
||||
help='Heartbeat time-to-live.'),
|
||||
]
|
||||
|
||||
|
@ -68,16 +68,21 @@ class RpcProxy(object):
|
||||
:param context: The request context
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
:param timeout: (Optional) A timeout to use when waiting for the
|
||||
response. If no timeout is specified, a default timeout will be
|
||||
used that is usually sufficient.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
:returns: The return value from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
return rpc.call(context, self._get_topic(topic), msg, timeout)
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.call(context, real_topic, msg, timeout)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
|
||||
def multicall(self, context, msg, topic=None, version=None, timeout=None):
|
||||
"""rpc.multicall() a remote method.
|
||||
@ -85,17 +90,22 @@ class RpcProxy(object):
|
||||
:param context: The request context
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
:param timeout: (Optional) A timeout to use when waiting for the
|
||||
response. If no timeout is specified, a default timeout will be
|
||||
used that is usually sufficient.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
:returns: An iterator that lets you process each of the returned values
|
||||
from the remote method as they arrive.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
return rpc.multicall(context, self._get_topic(topic), msg, timeout)
|
||||
real_topic = self._get_topic(topic)
|
||||
try:
|
||||
return rpc.multicall(context, real_topic, msg, timeout)
|
||||
except rpc.common.Timeout as exc:
|
||||
raise rpc.common.Timeout(
|
||||
exc.info, real_topic, msg.get('method'))
|
||||
|
||||
def cast(self, context, msg, topic=None, version=None):
|
||||
"""rpc.cast() a remote method.
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -149,7 +149,7 @@ def write_git_changelog():
|
||||
git_dir = _get_git_directory()
|
||||
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
|
||||
if git_dir:
|
||||
git_log_cmd = 'git --git-dir=%s log --stat' % git_dir
|
||||
git_log_cmd = 'git --git-dir=%s log' % git_dir
|
||||
changelog = _run_shell_command(git_log_cmd)
|
||||
mailmap = _parse_git_mailmap(git_dir)
|
||||
with open(new_changelog, "w") as changelog_file:
|
||||
@ -171,6 +171,14 @@ def generate_authors():
|
||||
" log --format='%aN <%aE>' | sort -u | "
|
||||
"egrep -v '" + jenkins_email + "'")
|
||||
changelog = _run_shell_command(git_log_cmd)
|
||||
signed_cmd = ("git log --git-dir=" + git_dir +
|
||||
" | grep -i Co-authored-by: | sort -u")
|
||||
signed_entries = _run_shell_command(signed_cmd)
|
||||
if signed_entries:
|
||||
new_entries = "\n".join(
|
||||
[signed.split(":", 1)[1].strip()
|
||||
for signed in signed_entries.split("\n") if signed])
|
||||
changelog = "\n".join((changelog, new_entries))
|
||||
mailmap = _parse_git_mailmap(git_dir)
|
||||
with open(new_authors, 'w') as new_authors_fh:
|
||||
new_authors_fh.write(canonicalize_emails(changelog, mailmap))
|
||||
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -25,18 +25,22 @@ import datetime
|
||||
import iso8601
|
||||
|
||||
|
||||
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
||||
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
|
||||
# ISO 8601 extended time format with microseconds
|
||||
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
|
||||
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
|
||||
|
||||
|
||||
def isotime(at=None):
|
||||
def isotime(at=None, subsecond=False):
|
||||
"""Stringify time in ISO 8601 format"""
|
||||
if not at:
|
||||
at = utcnow()
|
||||
str = at.strftime(TIME_FORMAT)
|
||||
st = at.strftime(_ISO8601_TIME_FORMAT
|
||||
if not subsecond
|
||||
else _ISO8601_TIME_FORMAT_SUBSECOND)
|
||||
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
|
||||
str += ('Z' if tz == 'UTC' else tz)
|
||||
return str
|
||||
st += ('Z' if tz == 'UTC' else tz)
|
||||
return st
|
||||
|
||||
|
||||
def parse_isotime(timestr):
|
||||
|
@ -1,5 +1,5 @@
|
||||
|
||||
# Copyright 2012 OpenStack LLC
|
||||
# Copyright 2012 OpenStack Foundation
|
||||
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
Loading…
x
Reference in New Issue
Block a user