diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 21cda4373..692553184 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -28,6 +28,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ +from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LW @@ -64,7 +65,7 @@ class AMQPIncomingMessage(base.IncomingMessage): unique_id = msg[rpc_amqp.UNIQUE_ID] LOG.debug("sending reply msg_id: %(msg_id)s " - "reply queue: %(reply_q)s" % { + "reply queue: %(reply_q)s", { 'msg_id': self.msg_id, 'unique_id': unique_id, 'reply_q': self.reply_q}) @@ -99,7 +100,7 @@ class AMQPIncomingMessage(base.IncomingMessage): if timer.check_return() > 0: LOG.debug(("The reply %(msg_id)s cannot be sent " "%(reply_q)s reply queue don't exist, " - "retrying...") % { + "retrying..."), { 'msg_id': self.msg_id, 'reply_q': self.reply_q}) time.sleep(0.25) @@ -107,7 +108,7 @@ class AMQPIncomingMessage(base.IncomingMessage): self._obsolete_reply_queues.add(self.reply_q, self.msg_id) LOG.info(_LI("The reply %(msg_id)s cannot be sent " "%(reply_q)s reply queue don't exist after " - "%(duration)s sec abandoning...") % { + "%(duration)s sec abandoning..."), { 'msg_id': self.msg_id, 'reply_q': self.reply_q, 'duration': duration}) @@ -192,7 +193,7 @@ class AMQPListener(base.Listener): unique_id = self.msg_id_cache.check_duplicate_message(message) - LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % { + LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s", { 'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id}) self.incoming.append(AMQPIncomingMessage(self, @@ -250,10 +251,11 @@ class ReplyWaiters(object): def add(self, msg_id): self._queues[msg_id] = moves.queue.Queue() if len(self._queues) > self._wrn_threshold: - LOG.warn('Number of call queues is greater than warning ' - 'threshold: %d. There could be a leak. Increasing' - ' threshold to: %d', self._wrn_threshold, - self._wrn_threshold * 2) + LOG.warn(_LW('Number of call queues is greater than warning ' + 'threshold: %(old_threshold)s. There could be a ' + 'leak. Increasing threshold to: %(threshold)s'), + {'old_threshold': self._wrn_threshold, + 'threshold': self._wrn_threshold * 2}) self._wrn_threshold *= 2 def remove(self, msg_id): @@ -286,14 +288,14 @@ class ReplyWaiter(object): try: self.conn.consume() except Exception: - LOG.exception("Failed to process incoming message, " - "retrying...") + LOG.exception(_LE("Failed to process incoming message, " + "retrying...")) def __call__(self, message): message.acknowledge() incoming_msg_id = message.pop('_msg_id', None) if message.get('ending'): - LOG.debug("received reply msg_id: %s" % incoming_msg_id) + LOG.debug("received reply msg_id: %s", incoming_msg_id) self.waiters.put(incoming_msg_id, message) def listen(self, msg_id): diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 85d814da6..7b446d74d 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -85,7 +85,8 @@ class RPCException(Exception): except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs - LOG.exception(_LE('Exception in string format operation')) + LOG.exception(_LE('Exception in string format operation, ' + 'kwargs are:')) for name, value in six.iteritems(kwargs): LOG.error("%s: %s", name, value) # at least get the core message out if something happened @@ -411,7 +412,7 @@ class ConnectionContext(Connection): try: self.connection.reset() except Exception: - LOG.exception("Fail to reset the connection, drop it") + LOG.exception(_LE("Fail to reset the connection, drop it")) try: self.connection.close() except Exception: diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 1f75b3349..87d81ca4f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -402,15 +402,15 @@ class Connection(object): self._url = '' if self.fake_rabbit: - LOG.warn("Deprecated: fake_rabbit option is deprecated, set " - "rpc_backend to kombu+memory or use the fake " - "driver instead.") + LOG.warn(_LW("Deprecated: fake_rabbit option is deprecated, set " + "rpc_backend to kombu+memory or use the fake " + "driver instead.")) self._url = 'memory://%s/' % virtual_host elif url.hosts: if url.transport.startswith('kombu+'): LOG.warn(_LW('Selecting the kombu transport through the ' 'transport url (%s) is a experimental feature ' - 'and this is not yet supported.') % url.transport) + 'and this is not yet supported.'), url.transport) if len(url.hosts) > 1: random.shuffle(url.hosts) for host in url.hosts: @@ -600,10 +600,10 @@ class Connection(object): current_pid = os.getpid() if self._initial_pid != current_pid: - LOG.warn("Process forked after connection established! " - "This can result in unpredictable behavior. " - "See: http://docs.openstack.org/developer/" - "oslo_messaging/transport.html") + LOG.warn(_LW("Process forked after connection established! " + "This can result in unpredictable behavior. " + "See: http://docs.openstack.org/developer/" + "oslo_messaging/transport.html")) self._initial_pid = current_pid if retry is None: diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index 4b9a7621d..7e7273f4d 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -36,6 +36,7 @@ from six import moves from oslo_messaging._drivers.protocols.amqp import eventloop from oslo_messaging._drivers.protocols.amqp import opts +from oslo_messaging._i18n import _LE, _LI, _LW from oslo_messaging import exceptions from oslo_messaging import transport @@ -90,8 +91,8 @@ class Replies(pyngus.ReceiverEventHandler): # reply is placed on reply_queue self._correlation[request.id] = reply_queue request.reply_to = self._receiver.source_address - LOG.debug("Reply for msg id=%s expected on link %s", - request.id, request.reply_to) + LOG.debug("Reply for msg id=%(id)s expected on link %(reply_to)s", + {'id': request.id, 'reply_to': request.reply_to}) return request.id def cancel_response(self, msg_id): @@ -121,7 +122,7 @@ class Replies(pyngus.ReceiverEventHandler): # TODO(kgiusti) Unclear if this error will ever occur (as opposed to # the Connection failing instead). Log for now, possibly implement a # recovery strategy if necessary. - LOG.error("Reply subscription closed by peer: %s", + LOG.error(_LE("Reply subscription closed by peer: %s"), (pn_condition or "no error given")) def message_received(self, receiver, message, handle): @@ -141,8 +142,8 @@ class Replies(pyngus.ReceiverEventHandler): del self._correlation[key] receiver.message_accepted(handle) else: - LOG.warn("Can't find receiver for response msg id=%s, dropping!", - key) + LOG.warn(_LW("Can't find receiver for response msg id=%s, " + "dropping!"), key) receiver.message_modified(handle, True, True, None) def _update_credit(self): @@ -194,12 +195,12 @@ class Server(pyngus.ReceiverEventHandler): """This is a Pyngus callback, invoked by Pyngus when the peer of this receiver link has initiated closing the connection. """ - text = "Server subscription %(addr)s closed by peer: %(err_msg)s" vals = { "addr": receiver.source_address or receiver.target_address, "err_msg": pn_condition or "no error given" } - LOG.error(text % vals) + LOG.error(_LE("Server subscription %(addr)s closed " + "by peer: %(err_msg)s"), vals) def message_received(self, receiver, message, handle): """This is a Pyngus callback, invoked by Pyngus when a new message @@ -348,7 +349,8 @@ class Controller(pyngus.ConnectionEventHandler): will include the reply message (if successful). """ address = self._resolve(target) - LOG.debug("Sending request for %s to %s", target, address) + LOG.debug("Sending request for %(target)s to %(address)s", + {'target': target, 'address': address}) if reply_expected: msg_id = self._replies.prepare_for_response(request, result_queue) @@ -399,7 +401,8 @@ class Controller(pyngus.ConnectionEventHandler): self._subscribe(target, addresses, in_queue) def _subscribe(self, target, addresses, in_queue): - LOG.debug("Subscribing to %s (%s)", target, addresses) + LOG.debug("Subscribing to %(target)s (%(addresses)s)", + {'target': target, 'addresses': addresses}) self._servers[target] = Server(addresses, in_queue) self._servers[target].attach(self._socket_connection.connection) @@ -500,7 +503,7 @@ class Controller(pyngus.ConnectionEventHandler): try: self._tasks.get(False).execute(self) except Exception as e: - LOG.exception("Error processing task: %s", e) + LOG.exception(_LE("Error processing task: %s"), e) count += 1 # if we hit _max_task_batch, resume task processing later: @@ -532,7 +535,7 @@ class Controller(pyngus.ConnectionEventHandler): """Called when the driver destroys the controller, this method attempts to cleanly close the AMQP connection to the peer. """ - LOG.info("Shutting down AMQP connection") + LOG.info(_LI("Shutting down AMQP connection")) self._closing = True if self._socket_connection.connection.active: # try a clean shutdown @@ -547,8 +550,9 @@ class Controller(pyngus.ConnectionEventHandler): """Invoked when the Replies reply link has become active. At this point, we are ready to send/receive messages (via Task processing). """ - LOG.info("Messaging is active (%s:%i)", self.hosts.current.hostname, - self.hosts.current.port) + LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"), + {'hostname': self.hosts.current.hostname, + 'port': self.hosts.current.port}) self._schedule_task_processing() # callback from eventloop on socket error @@ -576,8 +580,9 @@ class Controller(pyngus.ConnectionEventHandler): the peer is up. At this point, the driver will activate all subscriber links (server) and the reply link. """ - LOG.debug("Connection active (%s:%i), subscribing...", - self.hosts.current.hostname, self.hosts.current.port) + LOG.debug("Connection active (%(hostname)s:%(port)s), subscribing...", + {'hostname': self.hosts.current.hostname, + 'port': self.hosts.current.port}) for s in self._servers.values(): s.attach(self._socket_connection.connection) self._replies = Replies(self._socket_connection.connection, @@ -603,7 +608,7 @@ class Controller(pyngus.ConnectionEventHandler): # connection. Acknowledge the close, and try to reconnect/failover # later once the connection has closed (connection_closed is # called). - LOG.info("Connection closed by peer: %s", + LOG.info(_LI("Connection closed by peer: %s"), reason or "no reason given") self._socket_connection.connection.close() @@ -614,9 +619,11 @@ class Controller(pyngus.ConnectionEventHandler): """ if outcome == proton.SASL.OK: return - LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s", - self.hosts.current.hostname, self.hosts.current.port, - self.hosts.current.username) + LOG.error(_LE("AUTHENTICATION FAILURE: Cannot connect to " + "%(hostname)s:%(port)s as user %(username)s"), + {'hostname': self.hosts.current.hostname, + 'port': self.hosts.current.port, + 'username': self.hosts.current.username}) # connection failure will be handled later def _complete_shutdown(self): @@ -625,7 +632,7 @@ class Controller(pyngus.ConnectionEventHandler): """ self._socket_connection.close() self.processor.shutdown() - LOG.info("Messaging has shutdown") + LOG.info(_LI("Messaging has shutdown")) def _handle_connection_loss(self): """The connection to the messaging service has been lost. Try to @@ -641,7 +648,7 @@ class Controller(pyngus.ConnectionEventHandler): if not self._reconnecting: self._reconnecting = True self._replies = None - LOG.info("delaying reconnect attempt for %d seconds", + LOG.info(_LI("delaying reconnect attempt for %d seconds"), self._delay) self.processor.schedule(lambda: self._do_reconnect(), self._delay) @@ -660,5 +667,6 @@ class Controller(pyngus.ConnectionEventHandler): self._senders = {} self._socket_connection.reset() host = self.hosts.next() - LOG.info("Reconnecting to: %s:%i", host.hostname, host.port) + LOG.info(_LI("Reconnecting to: %(hostname):%(port)"), + {'hostname': host.hostname, 'port': host.port}) self._socket_connection.connect(host) diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index 32abf435f..6663ce7d3 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -31,6 +31,7 @@ from six import moves from oslo_messaging._drivers import base from oslo_messaging._drivers import common +from oslo_messaging._i18n import _LI, _LW from oslo_messaging import target as messaging_target @@ -137,7 +138,7 @@ class ProtonDriver(base.BaseDriver): def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=[]): # TODO(kgiusti) Remove once driver fully stabilizes: - LOG.warning("Support for the 'amqp' transport is EXPERIMENTAL.") + LOG.warning(_LW("Support for the 'amqp' transport is EXPERIMENTAL.")) if proton is None or hasattr(controller, "fake_controller"): raise NotImplementedError("Proton AMQP C libraries not installed") @@ -167,7 +168,8 @@ class ProtonDriver(base.BaseDriver): if old_pid != self._pid: if self._ctrl is not None: - LOG.warning("Process forked after connection established!") + LOG.warning(_LW("Process forked after connection " + "established!")) self._ctrl.shutdown(wait=False) # Create a Controller that connects to the messaging service: self._ctrl = controller.Controller(self._hosts, @@ -244,4 +246,4 @@ class ProtonDriver(base.BaseDriver): if self._ctrl: self._ctrl.shutdown() self._ctrl = None - LOG.info("AMQP 1.0 messaging driver shutdown") + LOG.info(_LI("AMQP 1.0 messaging driver shutdown")) diff --git a/oslo_messaging/_drivers/protocols/amqp/drivertasks.py b/oslo_messaging/_drivers/protocols/amqp/drivertasks.py index 385241334..a23ef9f47 100644 --- a/oslo_messaging/_drivers/protocols/amqp/drivertasks.py +++ b/oslo_messaging/_drivers/protocols/amqp/drivertasks.py @@ -17,6 +17,7 @@ import threading import time from oslo_messaging._drivers.protocols.amqp import controller +from oslo_messaging._i18n import _LW from oslo_messaging import exceptions from six import moves @@ -61,7 +62,8 @@ class SendTask(controller.Task): controller.request(self._target, self._request, self._results_queue, self._wait_for_reply) else: - LOG.warn("Send request to %s aborted: TTL expired.", self._target) + LOG.warn(_LW("Send request to %s aborted: TTL expired."), + self._target) class ListenTask(controller.Task): diff --git a/oslo_messaging/_drivers/protocols/amqp/eventloop.py b/oslo_messaging/_drivers/protocols/amqp/eventloop.py index 696e8a522..a9a828da4 100644 --- a/oslo_messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo_messaging/_drivers/protocols/amqp/eventloop.py @@ -36,6 +36,7 @@ import uuid import pyngus from six import moves +from oslo_messaging._i18n import _LE, _LI, _LW LOG = logging.getLogger(__name__) @@ -100,7 +101,7 @@ class _SocketConnection(object): if not addr: key = "%s:%i" % (host.hostname, host.port) error = "Invalid peer address '%s'" % key - LOG.error(error) + LOG.error(_LE("Invalid peer address '%s'"), key) self._handler.socket_error(error) return my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2]) @@ -111,7 +112,7 @@ class _SocketConnection(object): except socket.error as e: if e.errno != errno.EINPROGRESS: error = "Socket connect failure '%s'" % str(e) - LOG.error(error) + LOG.error(_LE("Socket connect failure '%s'"), str(e)) self._handler.socket_error(error) return self.socket = my_socket @@ -316,7 +317,7 @@ class Thread(threading.Thread): results = select.select(readfds, writefds, [], timeout) except select.error as serror: if serror[0] == errno.EINTR: - LOG.warning("ignoring interrupt from select(): %s", + LOG.warning(_LW("ignoring interrupt from select(): %s"), str(serror)) continue raise # assuming fatal... @@ -342,6 +343,6 @@ class Thread(threading.Thread): self._schedule.process() # run any deferred requests - LOG.info("eventloop thread exiting, container=%s", + LOG.info(_LI("eventloop thread exiting, container=%s"), self._container.name) self._container.destroy() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index d39f9927c..1d5729c80 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -58,8 +58,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): self._redirect_reply(message) def _redirect_in_request(self, multipart_message): - LOG.debug("-> Redirecting request %s to TCP publisher" - % multipart_message) + LOG.debug("-> Redirecting request %s to TCP publisher", + multipart_message) envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] if self.conf.use_pub_sub and \ envelope[zmq_names.FIELD_MSG_TYPE] \ @@ -69,13 +69,13 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): self.direct_publisher.send_request(multipart_message) def _redirect_reply(self, reply): - LOG.debug("Reply proxy %s" % reply) + LOG.debug("Reply proxy %s", reply) if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE: - LOG.debug("Acknowledge dropped %s" % reply) + LOG.debug("Acknowledge dropped %s", reply) return - LOG.debug("<- Redirecting reply to ROUTER: reply: %s" - % reply[zmq_names.IDX_REPLY_BODY:]) + LOG.debug("<- Redirecting reply to ROUTER: reply: %s", + reply[zmq_names.IDX_REPLY_BODY:]) self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:]) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index eaba22bdb..db3fc0280 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -58,7 +58,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase): finally: self.reply_waiter.untrack_id(request.message_id) - LOG.debug("Received reply %s" % reply) + LOG.debug("Received reply %s", reply) if reply[zmq_names.FIELD_FAILURE]: raise rpc_common.deserialize_remote_exception( reply[zmq_names.FIELD_FAILURE], @@ -86,9 +86,8 @@ class RequestSender(zmq_publisher_base.PublisherMultisend): socket.send(b'', zmq.SNDMORE) socket.send_pyobj(request) - LOG.debug("Sending message_id %(message)s to a target %(target)s" - % {"message": request.message_id, - "target": request.target}) + LOG.debug("Sending message_id %(message)s to a target %(target)s", + {"message": request.message_id, "target": request.target}) def _check_hosts_connections(self, target, listener_type): if str(target) in self.outbound_sockets: @@ -144,10 +143,10 @@ class RequestSenderLight(RequestSender): def _do_send_request(self, socket, request): LOG.debug("Sending %(type)s message_id %(message)s" - " to a target %(target)s" - % {"type": request.msg_type, - "message": request.message_id, - "target": request.target}) + " to a target %(target)s", + {"type": request.msg_type, + "message": request.message_id, + "target": request.target}) envelope = request.create_envelope() @@ -182,7 +181,7 @@ class ReplyWaiter(object): empty = socket.recv() assert empty == b'', "Empty expected!" reply = socket.recv_pyobj() - LOG.debug("Received reply %s" % reply) + LOG.debug("Received reply %s", reply) return reply self.poller.register(socket, recv_method=_receive_method) @@ -196,4 +195,4 @@ class ReplyWaiter(object): if call_future: call_future.set_result(reply) else: - LOG.warning(_LW("Received timed out reply: %s") % reply_id) + LOG.warning(_LW("Received timed out reply: %s"), reply_id) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index cf8358eb9..07606a0eb 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -42,8 +42,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): # a queue for keeping messages to send them later # when some listener appears. However such approach # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection") - % request.msg_type) + LOG.warning(_LW("Request %s was dropped because no connection"), + request.msg_type) return if request.msg_type in zmq_names.MULTISEND_TYPES: @@ -61,9 +61,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): socket.send(b'', zmq.SNDMORE) socket.send_pyobj(request) - LOG.debug("Sending message_id %(message)s to a target %(target)s" - % {"message": request.message_id, - "target": request.target}) + LOG.debug("Sending message_id %(message)s to a target %(target)s", + {"message": request.message_id, "target": request.target}) def cleanup(self): super(DealerPublisher, self).cleanup() @@ -90,10 +89,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase): self.socket.send_pyobj(request) LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " - "a target %(target)s" - % {"message": request.message_id, - "target": request.target, - "addr": self.address}) + "a target %(target)s", + {"message": request.message_id, + "target": request.target, + "addr": self.address}) def cleanup(self): self.socket.setsockopt(zmq.LINGER, 0) @@ -118,7 +117,7 @@ class AcknowledgementReceiver(object): def poll_for_acknowledgements(self): ack_message, socket = self.poller.poll() - LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID]) + LOG.debug("Message %s acknowledged", ack_message[zmq_names.FIELD_ID]) def cleanup(self): self.thread.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index c8ad98345..f233d099b 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -35,7 +35,7 @@ class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - LOG.debug("Envelope: %s" % envelope) + LOG.debug("Envelope: %s", envelope) target = envelope[zmq_names.FIELD_TARGET] dealer_socket = self._check_hosts_connections( @@ -46,8 +46,8 @@ class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher): # a queue for keeping messages to send them later # when some listener appears. However such approach # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection") - % envelope[zmq_names.FIELD_MSG_TYPE]) + LOG.warning(_LW("Request %s was dropped because no connection"), + envelope[zmq_names.FIELD_MSG_TYPE]) return self.reply_receiver.track_socket(dealer_socket.handle) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 0a5a58ebe..f228f2592 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -53,7 +53,7 @@ class PubPublisherProxy(zmq_publisher_base.PublisherBase): self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context) - LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher") % + LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher"), {"pub": self.host, "pull": self.sync_channel.sync_host}) @@ -75,10 +75,10 @@ class PubPublisherProxy(zmq_publisher_base.PublisherBase): self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) LOG.debug("Publishing message [%(topic)s] %(message_id)s to " - "a target %(target)s " - % {"message_id": message_id, - "target": target, - "topic": topic_filter}) + "a target %(target)s ", + {"message_id": message_id, + "target": target, + "topic": topic_filter}) def cleanup(self): self.matchmaker.unregister_publisher( @@ -114,10 +114,10 @@ class SyncChannel(object): self.sync_socket.port) def is_ready(self): - LOG.debug("[%s] Waiting for ready from first subscriber" % + LOG.debug("[%s] Waiting for ready from first subscriber", self.sync_host) if self._ready is None: self._ready = self.poller.poll() - LOG.debug("[%s] Received ready from first subscriber" % + LOG.debug("[%s] Received ready from first subscriber", self.sync_host) return self._ready is not None diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index cc2011e07..bcd3a9fa3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -91,10 +91,10 @@ class PublisherBase(object): :type request: zmq_request.Request """ LOG.debug("Sending %(type)s message_id %(message)s to a target " - "%(target)s" - % {"type": request.msg_type, - "message": request.message_id, - "target": request.target}) + "%(target)s", + {"type": request.msg_type, + "message": request.message_id, + "target": request.target}) socket.send_pyobj(request) def cleanup(self): @@ -137,10 +137,8 @@ class PublisherMultisend(PublisherBase): def _connect_to_address(self, socket, address, target): stype = zmq_names.socket_type_str(self.socket_type) try: - LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s") - % {"stype": stype, - "address": address, - "target": target}) + LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s"), + {"stype": stype, "address": address, "target": target}) if six.PY3: socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1())) @@ -151,8 +149,8 @@ class PublisherMultisend(PublisherBase): except zmq.ZMQError as e: errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\ % (stype, address, e) - LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s") - % (stype, address, e)) + LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"), + (stype, address, e)) raise rpc_common.RPCException(errmsg) def _connect_to_host(self, socket, host, target): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index 3a38cfd43..c7854aeb4 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -39,8 +39,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend): request.target, zmq_names.socket_type_str(zmq.PULL)) if not push_socket.connections: - LOG.warning(_LW("Request %s was dropped because no connection") - % request.msg_type) + LOG.warning(_LW("Request %s was dropped because no connection"), + request.msg_type) return if request.msg_type in zmq_names.MULTISEND_TYPES: @@ -53,6 +53,5 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend): super(PushPublisher, self)._send_request(socket, request) - LOG.debug("Publishing message %(message)s to a target %(target)s" - % {"message": request.message, - "target": request.target}) + LOG.debug("Publishing message %(message)s to a target %(target)s", + {"message": request.message, "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index aa0c73464..c1cc29ab8 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -55,7 +55,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller): try: sockets = dict(self.poller.poll(timeout=timeout)) except zmq.ZMQError as e: - LOG.debug("Polling terminated with error: %s" % e) + LOG.debug("Polling terminated with error: %s", e) if not sockets: return None, None diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 2145c96fc..07936d308 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -74,8 +74,8 @@ class SingleSocketConsumer(ConsumerBase): except zmq.ZMQError as e: errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ % (self.port, e) - LOG.error(_LE("Failed binding to port %(port)d: %(e)s") - % (self.port, e)) + LOG.error(_LE("Failed binding to port %(port)d: %(e)s"), + (self.port, e)) raise rpc_common.RPCException(errmsg) @property diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 81cf7fde0..4a3efeecd 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -47,7 +47,7 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) def listen(self, target): - LOG.info(_LI("Listen to target %s") % str(target)) + LOG.info(_LI("Listen to target %s"), str(target)) # Do nothing here because we have a single socket def receive_message(self, socket): @@ -56,14 +56,13 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): assert msg_type is not None, 'Bad format: msg type expected' context = socket.recv_pyobj() message = socket.recv_pyobj() - LOG.debug("Received %(msg_type)s message %(msg)s" - % {"msg_type": msg_type, - "msg": str(message)}) + LOG.debug("Received %(msg_type)s message %(msg)s", + {"msg_type": msg_type, "msg": str(message)}) if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): return PullIncomingMessage(self.server, context, message) else: - LOG.error(_LE("Unknown message type: %s") % msg_type) + LOG.error(_LE("Unknown message type: %s"), msg_type) except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed: %s") % str(e)) + LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index f5885c55a..c284ba48e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE +from oslo_messaging._i18n import _LE, _LI LOG = logging.getLogger(__name__) @@ -57,11 +57,12 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): self.targets = [] self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.port) - LOG.info("[%s] Run ROUTER consumer" % self.host) + LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) def listen(self, target): - LOG.info("[%s] Listen to target %s" % (self.host, target)) + LOG.info(_LI("[%(host)s] Listen to target %(target)s"), + {'host': self.host, 'target': target}) self.targets.append(target) self.matchmaker.register(target, self.host, @@ -83,11 +84,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def receive_message(self, socket): try: request, reply_id = self._receive_request(socket) - LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s" - % {"host": self.host, - "type": request.msg_type, - "id": request.message_id, - "target": request.target}) + LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", + {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( @@ -97,10 +98,10 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): self.server, request.context, request.message, socket, reply_id, request.message_id, self.poller) else: - LOG.error(_LE("Unknown message type: %s") % request.msg_type) + LOG.error(_LE("Unknown message type: %s"), request.msg_type) except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed: %s") % str(e)) + LOG.error(_LE("Receiving message failed: %s"), str(e)) class RouterConsumerBroker(RouterConsumer): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index d51032182..4d1e03585 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -77,9 +77,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) self.subscriptions.add(topic_filter) - LOG.debug("[%(host)s] Subscribing to topic %(filter)s" - % {"host": self.id, - "filter": topic_filter}) + LOG.debug("[%(host)s] Subscribing to topic %(filter)s", + {"host": self.id, "filter": topic_filter}) def on_publishers(self, publishers): with self._socket_lock: @@ -87,17 +86,18 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.socket.connect(zmq_address.get_tcp_direct_address(host)) self.poller.register(self.socket, self.receive_message) - LOG.debug("[%s] SUB consumer connected to publishers %s" - % (self.id, publishers)) + LOG.debug("[%s] SUB consumer connected to publishers %s", + (self.id, publishers)) def listen(self, target): - LOG.debug("Listen to target %s" % target) + LOG.debug("Listen to target %s", target) with self._socket_lock: self._subscribe_on_target(target) def _receive_request(self, socket): topic_filter = socket.recv() - LOG.debug("[%s] Received %s topic" % (self.id, topic_filter)) + LOG.debug("[%(id)s] Received %(topict_filter)s topic", + {'id': self.id, 'topic_filter': topic_filter}) assert topic_filter in self.subscriptions request = socket.recv_pyobj() return request @@ -107,18 +107,18 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): request = self._receive_request(socket) if not request: return None - LOG.debug("Received %(type)s, %(id)s, %(target)s" - % {"type": request.msg_type, - "id": request.message_id, - "target": request.target}) + LOG.debug("Received %(type)s, %(id)s, %(target)s", + {"type": request.msg_type, + "id": request.message_id, + "target": request.target}) if request.msg_type not in zmq_names.MULTISEND_TYPES: - LOG.error(_LE("Unknown message type: %s") % request.msg_type) + LOG.error(_LE("Unknown message type: %s"), request.msg_type) else: return SubIncomingMessage(self.server, request, socket, self.poller) except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed: %s") % str(e)) + LOG.error(_LE("Receiving message failed: %s"), str(e)) class MatchmakerPoller(object): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 4d6fa90ff..93c981c7c 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -21,6 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\ from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_sub_consumer from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LI LOG = logging.getLogger(__name__) @@ -53,8 +54,9 @@ class ZmqServer(base.Listener): return message def stop(self): - consumer = self.router_consumer - LOG.info("Stop server %s:%d" % (consumer.address, consumer.port)) + consumer = self.rpc_consumer + LOG.info(_LI("Stop server %(address)s:%(port)s"), + {'address': consumer.address, 'port': consumer.port}) def cleanup(self): self.poller.close() diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py index 780724416..0d2e44b8c 100644 --- a/oslo_messaging/dispatcher.py +++ b/oslo_messaging/dispatcher.py @@ -16,6 +16,9 @@ import logging import six +from oslo_messaging._i18n import _ + + __all__ = [ "DispatcherBase", "DispatcherExecutorContext" @@ -58,7 +61,7 @@ class DispatcherExecutorContext(object): self._result = self._dispatch(self._incoming, self._executor_callback) except Exception: - msg = 'The dispatcher method must catches all exceptions' + msg = _('The dispatcher method must catches all exceptions') LOG.exception(msg) raise RuntimeError(msg) diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 72287678d..5677cad67 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -19,6 +19,7 @@ import logging import six +from oslo_messaging._i18n import _LE, _LW from oslo_messaging import dispatcher from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -74,7 +75,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): else: m.acknowledge() except Exception: - LOG.error("Fail to ack/requeue message", exc_info=True) + LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) def _dispatch_and_handle_error(self, incoming, executor_callback): """Dispatch a notification message to the appropriate endpoint method. @@ -85,7 +86,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): try: return self._dispatch(incoming, executor_callback) except Exception: - LOG.error('Exception during message handling', exc_info=True) + LOG.error(_LE('Exception during message handling'), exc_info=True) def _dispatch(self, incoming, executor_callback=None): """Dispatch notification messages to the appropriate endpoint method. @@ -101,7 +102,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): raw_messages = list(raw_messages) messages = list(messages) if priority not in PRIORITIES: - LOG.warning('Unknown priority "%s"', priority) + LOG.warning(_LW('Unknown priority "%s"'), priority) continue for screen, callback in self._callbacks_by_priority.get(priority, []): diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index 42de46434..e7642ca94 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -18,6 +18,7 @@ import logging import oslo_messaging +from oslo_messaging._i18n import _LE from oslo_messaging.notify import notifier LOG = logging.getLogger(__name__) @@ -47,8 +48,8 @@ class MessagingDriver(notifier.Driver): version=self.version, retry=retry) except Exception: - LOG.exception("Could not send notification to %(topic)s. " - "Payload=%(message)s", + LOG.exception(_LE("Could not send notification to %(topic)s. " + "Payload=%(message)s"), dict(topic=topic, message=message)) diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py index cc4f2eb8f..13650c302 100644 --- a/oslo_messaging/notify/notifier.py +++ b/oslo_messaging/notify/notifier.py @@ -24,6 +24,7 @@ from oslo_utils import timeutils import six from stevedore import named +from oslo_messaging._i18n import _LE from oslo_messaging import serializer as msg_serializer from oslo_messaging import transport as msg_transport @@ -225,8 +226,8 @@ class Notifier(object): try: ext.obj.notify(ctxt, msg, priority, retry or self.retry) except Exception as e: - _LOG.exception("Problem '%(e)s' attempting to send to " - "notification system. Payload=%(payload)s", + _LOG.exception(_LE("Problem '%(e)s' attempting to send to " + "notification system. Payload=%(payload)s"), dict(e=e, payload=payload)) if self._driver_mgr.extensions: diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 6b4e50a0c..25ed5bff7 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -34,6 +34,7 @@ from oslo_utils import timeutils from stevedore import driver from oslo_messaging._drivers import base as driver_base +from oslo_messaging._i18n import _LW from oslo_messaging import exceptions LOG = logging.getLogger(__name__) @@ -111,7 +112,7 @@ class _OrderedTask(object): while condition(): if log_timer is not None and log_timer.expired(): - LOG.warn('Possible hang: %s' % msg) + LOG.warn(_LW('Possible hang: %s'), msg) LOG.debug(''.join(traceback.format_stack())) # Only log once. After than we wait indefinitely without # logging. @@ -345,11 +346,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): """ # Warn that restarting will be deprecated if self._started: - LOG.warn('Restarting a MessageHandlingServer is inherently racy. ' - 'It is deprecated, and will become a noop in a future ' - 'release of oslo.messaging. If you need to restart ' - 'MessageHandlingServer you should instantiate a new ' - 'object.') + LOG.warn(_LW('Restarting a MessageHandlingServer is inherently ' + 'racy. It is deprecated, and will become a noop in ' + 'a future release of oslo.messaging. If you need to ' + 'restart MessageHandlingServer you should ' + 'instantiate a new object.')) self._started = True try: diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 846ea86e2..ad666ae2c 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -744,7 +744,7 @@ class TestServerLocking(test_utils.BaseTestCase): # DEFAULT_LOG_AFTER log_event = threading.Event() - mock_log.warn.side_effect = lambda _: log_event.set() + mock_log.warn.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second thread = eventlet.spawn(self.server.stop) @@ -760,7 +760,7 @@ class TestServerLocking(test_utils.BaseTestCase): # the number of seconds passed to log_after log_event = threading.Event() - mock_log.warn.side_effect = lambda _: log_event.set() + mock_log.warn.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second thread = eventlet.spawn(self.server.stop, log_after=1) @@ -776,7 +776,7 @@ class TestServerLocking(test_utils.BaseTestCase): # specified an absolute timeout log_event = threading.Event() - mock_log.warn.side_effect = lambda _: log_event.set() + mock_log.warn.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second thread = eventlet.spawn(self.server.stop, log_after=1, timeout=2)