From 89cc47ec4abebcc8bcb877c2486f803acc41008f Mon Sep 17 00:00:00 2001 From: dukhlov Date: Wed, 24 Feb 2016 17:16:12 -0500 Subject: [PATCH] Adds exhange declaration on sender's side When you send message to exchange which is not exist current channel is closed and you need to reconnect. This is undesired. Also this patch separate sending fanout message and call/cast messages and don't raise exception if exchange doesn't exist for fanout messages. Change-Id: Ia556d0c1b219387892007925bb437664aaaccb69 --- oslo_messaging/_drivers/impl_pika.py | 94 +++++++++++++++++-- .../_drivers/pika_driver/pika_engine.py | 37 +++++--- .../_drivers/pika_driver/pika_message.py | 22 ++--- .../_drivers/pika_driver/pika_poller.py | 26 +++-- .../tests/drivers/pika/test_message.py | 8 +- .../tests/drivers/pika/test_poller.py | 28 +++--- 6 files changed, 145 insertions(+), 70 deletions(-) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index a8b9fae15..faa3e3bce 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -34,7 +34,7 @@ pika_opts = [ help='Maximum number of channels to allow'), cfg.IntOpt('frame_max', default=None, help='The maximum byte size for an AMQP frame'), - cfg.IntOpt('heartbeat_interval', default=1, + cfg.IntOpt('heartbeat_interval', default=3, help="How often to send heartbeats for consumer's connections"), cfg.BoolOpt('ssl', default=None, help='Enable SSL'), @@ -51,7 +51,7 @@ pika_opts = [ ] pika_pool_opts = [ - cfg.IntOpt('pool_max_size', default=10, + cfg.IntOpt('pool_max_size', default=30, help="Maximum number of connections to keep queued."), cfg.IntOpt('pool_max_overflow', default=0, help="Maximum number of connections to create above " @@ -158,6 +158,23 @@ class PikaDriver(base.BaseDriver): def require_features(self, requeue=False): pass + def _declare_rpc_exchange(self, exchange, timeout): + with (self._pika_engine.connection_without_confirmation_pool + .acquire(timeout=timeout)) as conn: + try: + self._pika_engine.declare_exchange_by_channel( + conn.channel, + self._pika_engine.get_rpc_exchange_name( + exchange + ), "direct", False + ) + except pika_pool.Timeout as e: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired. {}.".format( + str(e) + ) + ) + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): expiration_time = None if timeout is None else time.time() + timeout @@ -165,9 +182,23 @@ class PikaDriver(base.BaseDriver): if retry is None: retry = self._pika_engine.default_rpc_retry_attempts + exchange = self._pika_engine.get_rpc_exchange_name( + target.exchange + ) + def on_exception(ex): - if isinstance(ex, (pika_drv_exc.ConnectionException, - exceptions.MessageDeliveryFailure)): + if isinstance(ex, pika_drv_exc.ExchangeNotFoundException): + # it is desired to create exchange because if we sent to + # exchange which is not exists, we get ChannelClosed exception + # and need to reconnect + try: + self._declare_rpc_exchange(exchange, + expiration_time - time.time()) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + return True + elif isinstance(ex, (pika_drv_exc.ConnectionException, + exceptions.MessageDeliveryFailure)): LOG.warn("Problem during message sending. %s", ex) return True else: @@ -182,14 +213,35 @@ class PikaDriver(base.BaseDriver): ) ) + if target.fanout: + return self.cast_all_servers( + exchange, target.topic, ctxt, message, expiration_time, + retrier + ) + + routing_key = self._pika_engine.get_rpc_queue_name( + target.topic, target.server, retrier is None + ) + msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message, ctxt) - reply = msg.send( - target, - reply_listener=self._reply_listener if wait_for_reply else None, - expiration_time=expiration_time, - retrier=retrier - ) + try: + reply = msg.send( + exchange=exchange, + routing_key=routing_key, + reply_listener=( + self._reply_listener if wait_for_reply else None + ), + expiration_time=expiration_time, + retrier=retrier + ) + except pika_drv_exc.ExchangeNotFoundException as ex: + try: + self._declare_rpc_exchange(exchange, + expiration_time - time.time()) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + raise ex if reply is not None: if reply.failure is not None: @@ -197,6 +249,28 @@ class PikaDriver(base.BaseDriver): return reply.result + def cast_all_servers(self, exchange, topic, ctxt, message, expiration_time, + retrier=None): + msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, + ctxt) + try: + msg.send( + exchange=exchange, + routing_key=self._pika_engine.get_rpc_queue_name( + topic, "all_servers", retrier is None + ), + mandatory=False, + expiration_time=expiration_time, + retrier=retrier + ) + except pika_drv_exc.ExchangeNotFoundException: + try: + self._declare_rpc_exchange( + exchange, expiration_time - time.time() + ) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + def _declare_notification_queue_binding(self, target, timeout=None): if timeout is not None and timeout < 0: raise exceptions.MessagingTimeout( diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 7041771b3..a8c9a712f 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -272,7 +272,7 @@ class PikaEngine(object): except pika_pool.Connection.connectivity_errors as e: LOG.warn("Can't establish connection to host. %s", e) except pika_drv_exc.HostConnectionNotAllowedException as e: - LOG.warn("Connection to host is not Allowed. %s", e) + LOG.warn("Connection to host is not allowed. %s", e) connection_attempts -= 1 pika_next_connection_num += 1 @@ -357,6 +357,28 @@ class PikaEngine(object): self.HOST_CONNECTION_LAST_TRY_TIME ] = cur_time + @staticmethod + def declare_exchange_by_channel(channel, exchange, exchange_type, durable): + """Declare exchange using already created channel, if they don't exist + + :param channel: Channel for communication with RabbitMQ + :param exchange: String, RabbitMQ exchange name + :param exchange_type: String ('direct', 'topic' or 'fanout') + exchange type for exchange to be declared + :param durable: Boolean, creates durable exchange if true + """ + try: + channel.exchange_declare( + exchange, exchange_type, auto_delete=True, durable=durable + ) + except pika_pool.Connection.connectivity_errors as e: + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during declaring exchange: " + "exchange:{}, exchange_type: {}, durable: {}. {}".format( + exchange, exchange_type, durable, str(e) + ) + ) + @staticmethod def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, exchange_type, queue_expiration, @@ -397,23 +419,14 @@ class PikaEngine(object): ) ) - def get_rpc_exchange_name(self, exchange, topic, fanout, no_ack): + def get_rpc_exchange_name(self, exchange): """Returns RabbitMQ exchange name for given rpc request :param exchange: String, oslo.messaging target's exchange - :param topic: String, oslo.messaging target's topic - :param fanout: Boolean, oslo.messaging target's fanout mode - :param no_ack: Boolean, use message delivery with acknowledges or not :return: String, RabbitMQ exchange name """ - exchange = (exchange or self.default_rpc_exchange) - - if fanout: - exchange = '{}_fanout_{}_{}'.format( - exchange, "no_ack" if no_ack else "with_ack", topic - ) - return exchange + return exchange or self.default_rpc_exchange @staticmethod def get_rpc_queue_name(topic, server, no_ack): diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index dab422e20..4726c489c 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -495,11 +495,12 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): self.msg_id = None self.reply_q = None - def send(self, target, reply_listener=None, expiration_time=None, - retrier=None): + def send(self, exchange, routing_key, reply_listener=None, + expiration_time=None, retrier=None): """Send RPC message with configured retrying - :param target: Target, oslo.messaging target which defines RPC service + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing :param reply_listener: RpcReplyPikaListener, listener for waiting reply. If None - return immediately without reply waiting :param expiration_time: Float, expiration time in seconds @@ -507,15 +508,6 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ - - exchange = self._pika_engine.get_rpc_exchange_name( - target.exchange, target.topic, target.fanout, retrier is None - ) - - queue = "" if target.fanout else self._pika_engine.get_rpc_queue_name( - target.topic, target.server, retrier is None - ) - msg_dict, msg_props = self._prepare_message_to_send() if reply_listener: @@ -531,7 +523,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): future = reply_listener.register_reply_waiter(msg_id=self.msg_id) self._do_send( - exchange=exchange, routing_key=queue, msg_dict=msg_dict, + exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, msg_props=msg_props, confirm=True, mandatory=True, persistent=False, expiration_time=expiration_time, retrier=retrier @@ -546,8 +538,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): raise e else: self._do_send( - exchange=exchange, routing_key=queue, msg_dict=msg_dict, - msg_props=msg_props, confirm=True, mandatory=not target.fanout, + exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, + msg_props=msg_props, confirm=True, mandatory=True, persistent=False, expiration_time=expiration_time, retrier=retrier ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index ce2b1c3df..cc2d6e983 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -295,13 +295,13 @@ class RpcServicePikaPoller(PikaPoller): """ queue_expiration = self._pika_engine.rpc_queue_expiration + exchange = self._pika_engine.get_rpc_exchange_name( + self._target.exchange + ) + queues_to_consume = [] for no_ack in [True, False]: - exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange, self._target.topic, False, no_ack - ) - queue = self._pika_engine.get_rpc_queue_name( self._target.topic, None, no_ack ) @@ -323,21 +323,19 @@ class RpcServicePikaPoller(PikaPoller): queue=server_queue, routing_key=server_queue, exchange_type='direct', queue_expiration=queue_expiration ) + all_servers_routing_key = self._pika_engine.get_rpc_queue_name( + self._target.topic, "all_servers", no_ack + ) + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, durable=False, + queue=server_queue, routing_key=all_servers_routing_key, + exchange_type='direct', queue_expiration=queue_expiration + ) queues_to_consume.append( {"queue_name": server_queue, "no_ack": no_ack, "consumer_tag": None} ) - fanout_exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange, self._target.topic, True, no_ack - ) - - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=fanout_exchange, - queue=server_queue, routing_key="", exchange_type='fanout', - queue_expiration=queue_expiration, durable=False - ) - return queues_to_consume diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index cf1b1d853..0cc1b869d 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -434,8 +434,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): expiration_time = time.time() + expiration message.send( - target=oslo_messaging.Target(exchange=self._exchange, - topic=self._routing_key), + exchange=self._exchange, + routing_key=self._routing_key, reply_listener=None, expiration_time=expiration_time, retrier=None @@ -490,8 +490,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): reply_listener.get_reply_qname.return_value = reply_queue_name res = message.send( - target=oslo_messaging.Target(exchange=self._exchange, - topic=self._routing_key), + exchange=self._exchange, + routing_key=self._routing_key, reply_listener=reply_listener, expiration_time=expiration_time, retrier=None diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index abb08044a..17ba3b708 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -234,9 +234,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ) self._pika_engine.get_rpc_exchange_name.side_effect = ( - lambda exchange, topic, fanout, no_ack: "_".join( - [exchange, topic, str(fanout), str(no_ack)] - ) + lambda exchange: exchange ) self._prefetch_count = 123 @@ -277,7 +275,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): declare_queue_binding_by_channel_mock.assert_has_calls(( mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_True", + exchange="exchange", exchange_type='direct', queue="topic_None_True", queue_expiration=12345, @@ -285,7 +283,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_True", + exchange="exchange", exchange_type='direct', queue="topic_server_True", queue_expiration=12345, @@ -293,15 +291,15 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_True_True", - exchange_type='fanout', + exchange="exchange", + exchange_type='direct', queue="topic_server_True", queue_expiration=12345, - routing_key='' + routing_key="topic_all_servers_True" ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_False", + exchange="exchange", exchange_type='direct', queue="topic_None_False", queue_expiration=12345, @@ -309,20 +307,20 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_False", + exchange="exchange", exchange_type='direct', queue="topic_server_False", queue_expiration=12345, - routing_key="topic_server_False" + routing_key='topic_server_False' ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_True_False", - exchange_type='fanout', + exchange="exchange", + exchange_type='direct', queue="topic_server_False", queue_expiration=12345, - routing_key='' - ), + routing_key='topic_all_servers_False' + ) ))