diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index a8b9fae15..faa3e3bce 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -34,7 +34,7 @@ pika_opts = [ help='Maximum number of channels to allow'), cfg.IntOpt('frame_max', default=None, help='The maximum byte size for an AMQP frame'), - cfg.IntOpt('heartbeat_interval', default=1, + cfg.IntOpt('heartbeat_interval', default=3, help="How often to send heartbeats for consumer's connections"), cfg.BoolOpt('ssl', default=None, help='Enable SSL'), @@ -51,7 +51,7 @@ pika_opts = [ ] pika_pool_opts = [ - cfg.IntOpt('pool_max_size', default=10, + cfg.IntOpt('pool_max_size', default=30, help="Maximum number of connections to keep queued."), cfg.IntOpt('pool_max_overflow', default=0, help="Maximum number of connections to create above " @@ -158,6 +158,23 @@ class PikaDriver(base.BaseDriver): def require_features(self, requeue=False): pass + def _declare_rpc_exchange(self, exchange, timeout): + with (self._pika_engine.connection_without_confirmation_pool + .acquire(timeout=timeout)) as conn: + try: + self._pika_engine.declare_exchange_by_channel( + conn.channel, + self._pika_engine.get_rpc_exchange_name( + exchange + ), "direct", False + ) + except pika_pool.Timeout as e: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired. {}.".format( + str(e) + ) + ) + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): expiration_time = None if timeout is None else time.time() + timeout @@ -165,9 +182,23 @@ class PikaDriver(base.BaseDriver): if retry is None: retry = self._pika_engine.default_rpc_retry_attempts + exchange = self._pika_engine.get_rpc_exchange_name( + target.exchange + ) + def on_exception(ex): - if isinstance(ex, (pika_drv_exc.ConnectionException, - exceptions.MessageDeliveryFailure)): + if isinstance(ex, pika_drv_exc.ExchangeNotFoundException): + # it is desired to create exchange because if we sent to + # exchange which is not exists, we get ChannelClosed exception + # and need to reconnect + try: + self._declare_rpc_exchange(exchange, + expiration_time - time.time()) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + return True + elif isinstance(ex, (pika_drv_exc.ConnectionException, + exceptions.MessageDeliveryFailure)): LOG.warn("Problem during message sending. %s", ex) return True else: @@ -182,14 +213,35 @@ class PikaDriver(base.BaseDriver): ) ) + if target.fanout: + return self.cast_all_servers( + exchange, target.topic, ctxt, message, expiration_time, + retrier + ) + + routing_key = self._pika_engine.get_rpc_queue_name( + target.topic, target.server, retrier is None + ) + msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message, ctxt) - reply = msg.send( - target, - reply_listener=self._reply_listener if wait_for_reply else None, - expiration_time=expiration_time, - retrier=retrier - ) + try: + reply = msg.send( + exchange=exchange, + routing_key=routing_key, + reply_listener=( + self._reply_listener if wait_for_reply else None + ), + expiration_time=expiration_time, + retrier=retrier + ) + except pika_drv_exc.ExchangeNotFoundException as ex: + try: + self._declare_rpc_exchange(exchange, + expiration_time - time.time()) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + raise ex if reply is not None: if reply.failure is not None: @@ -197,6 +249,28 @@ class PikaDriver(base.BaseDriver): return reply.result + def cast_all_servers(self, exchange, topic, ctxt, message, expiration_time, + retrier=None): + msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, + ctxt) + try: + msg.send( + exchange=exchange, + routing_key=self._pika_engine.get_rpc_queue_name( + topic, "all_servers", retrier is None + ), + mandatory=False, + expiration_time=expiration_time, + retrier=retrier + ) + except pika_drv_exc.ExchangeNotFoundException: + try: + self._declare_rpc_exchange( + exchange, expiration_time - time.time() + ) + except pika_drv_exc.ConnectionException as e: + LOG.warn("Problem during declaring exchange. %", e) + def _declare_notification_queue_binding(self, target, timeout=None): if timeout is not None and timeout < 0: raise exceptions.MessagingTimeout( diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 7041771b3..a8c9a712f 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -272,7 +272,7 @@ class PikaEngine(object): except pika_pool.Connection.connectivity_errors as e: LOG.warn("Can't establish connection to host. %s", e) except pika_drv_exc.HostConnectionNotAllowedException as e: - LOG.warn("Connection to host is not Allowed. %s", e) + LOG.warn("Connection to host is not allowed. %s", e) connection_attempts -= 1 pika_next_connection_num += 1 @@ -357,6 +357,28 @@ class PikaEngine(object): self.HOST_CONNECTION_LAST_TRY_TIME ] = cur_time + @staticmethod + def declare_exchange_by_channel(channel, exchange, exchange_type, durable): + """Declare exchange using already created channel, if they don't exist + + :param channel: Channel for communication with RabbitMQ + :param exchange: String, RabbitMQ exchange name + :param exchange_type: String ('direct', 'topic' or 'fanout') + exchange type for exchange to be declared + :param durable: Boolean, creates durable exchange if true + """ + try: + channel.exchange_declare( + exchange, exchange_type, auto_delete=True, durable=durable + ) + except pika_pool.Connection.connectivity_errors as e: + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during declaring exchange: " + "exchange:{}, exchange_type: {}, durable: {}. {}".format( + exchange, exchange_type, durable, str(e) + ) + ) + @staticmethod def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, exchange_type, queue_expiration, @@ -397,23 +419,14 @@ class PikaEngine(object): ) ) - def get_rpc_exchange_name(self, exchange, topic, fanout, no_ack): + def get_rpc_exchange_name(self, exchange): """Returns RabbitMQ exchange name for given rpc request :param exchange: String, oslo.messaging target's exchange - :param topic: String, oslo.messaging target's topic - :param fanout: Boolean, oslo.messaging target's fanout mode - :param no_ack: Boolean, use message delivery with acknowledges or not :return: String, RabbitMQ exchange name """ - exchange = (exchange or self.default_rpc_exchange) - - if fanout: - exchange = '{}_fanout_{}_{}'.format( - exchange, "no_ack" if no_ack else "with_ack", topic - ) - return exchange + return exchange or self.default_rpc_exchange @staticmethod def get_rpc_queue_name(topic, server, no_ack): diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index dab422e20..4726c489c 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -495,11 +495,12 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): self.msg_id = None self.reply_q = None - def send(self, target, reply_listener=None, expiration_time=None, - retrier=None): + def send(self, exchange, routing_key, reply_listener=None, + expiration_time=None, retrier=None): """Send RPC message with configured retrying - :param target: Target, oslo.messaging target which defines RPC service + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing :param reply_listener: RpcReplyPikaListener, listener for waiting reply. If None - return immediately without reply waiting :param expiration_time: Float, expiration time in seconds @@ -507,15 +508,6 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ - - exchange = self._pika_engine.get_rpc_exchange_name( - target.exchange, target.topic, target.fanout, retrier is None - ) - - queue = "" if target.fanout else self._pika_engine.get_rpc_queue_name( - target.topic, target.server, retrier is None - ) - msg_dict, msg_props = self._prepare_message_to_send() if reply_listener: @@ -531,7 +523,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): future = reply_listener.register_reply_waiter(msg_id=self.msg_id) self._do_send( - exchange=exchange, routing_key=queue, msg_dict=msg_dict, + exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, msg_props=msg_props, confirm=True, mandatory=True, persistent=False, expiration_time=expiration_time, retrier=retrier @@ -546,8 +538,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): raise e else: self._do_send( - exchange=exchange, routing_key=queue, msg_dict=msg_dict, - msg_props=msg_props, confirm=True, mandatory=not target.fanout, + exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, + msg_props=msg_props, confirm=True, mandatory=True, persistent=False, expiration_time=expiration_time, retrier=retrier ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index ce2b1c3df..cc2d6e983 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -295,13 +295,13 @@ class RpcServicePikaPoller(PikaPoller): """ queue_expiration = self._pika_engine.rpc_queue_expiration + exchange = self._pika_engine.get_rpc_exchange_name( + self._target.exchange + ) + queues_to_consume = [] for no_ack in [True, False]: - exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange, self._target.topic, False, no_ack - ) - queue = self._pika_engine.get_rpc_queue_name( self._target.topic, None, no_ack ) @@ -323,21 +323,19 @@ class RpcServicePikaPoller(PikaPoller): queue=server_queue, routing_key=server_queue, exchange_type='direct', queue_expiration=queue_expiration ) + all_servers_routing_key = self._pika_engine.get_rpc_queue_name( + self._target.topic, "all_servers", no_ack + ) + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, durable=False, + queue=server_queue, routing_key=all_servers_routing_key, + exchange_type='direct', queue_expiration=queue_expiration + ) queues_to_consume.append( {"queue_name": server_queue, "no_ack": no_ack, "consumer_tag": None} ) - fanout_exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange, self._target.topic, True, no_ack - ) - - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=fanout_exchange, - queue=server_queue, routing_key="", exchange_type='fanout', - queue_expiration=queue_expiration, durable=False - ) - return queues_to_consume diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index cf1b1d853..0cc1b869d 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -434,8 +434,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): expiration_time = time.time() + expiration message.send( - target=oslo_messaging.Target(exchange=self._exchange, - topic=self._routing_key), + exchange=self._exchange, + routing_key=self._routing_key, reply_listener=None, expiration_time=expiration_time, retrier=None @@ -490,8 +490,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): reply_listener.get_reply_qname.return_value = reply_queue_name res = message.send( - target=oslo_messaging.Target(exchange=self._exchange, - topic=self._routing_key), + exchange=self._exchange, + routing_key=self._routing_key, reply_listener=reply_listener, expiration_time=expiration_time, retrier=None diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index abb08044a..17ba3b708 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -234,9 +234,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ) self._pika_engine.get_rpc_exchange_name.side_effect = ( - lambda exchange, topic, fanout, no_ack: "_".join( - [exchange, topic, str(fanout), str(no_ack)] - ) + lambda exchange: exchange ) self._prefetch_count = 123 @@ -277,7 +275,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): declare_queue_binding_by_channel_mock.assert_has_calls(( mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_True", + exchange="exchange", exchange_type='direct', queue="topic_None_True", queue_expiration=12345, @@ -285,7 +283,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_True", + exchange="exchange", exchange_type='direct', queue="topic_server_True", queue_expiration=12345, @@ -293,15 +291,15 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_True_True", - exchange_type='fanout', + exchange="exchange", + exchange_type='direct', queue="topic_server_True", queue_expiration=12345, - routing_key='' + routing_key="topic_all_servers_True" ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_False", + exchange="exchange", exchange_type='direct', queue="topic_None_False", queue_expiration=12345, @@ -309,20 +307,20 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_False_False", + exchange="exchange", exchange_type='direct', queue="topic_server_False", queue_expiration=12345, - routing_key="topic_server_False" + routing_key='topic_server_False' ), mock.call( channel=self._poller_channel_mock, durable=False, - exchange="exchange_topic_True_False", - exchange_type='fanout', + exchange="exchange", + exchange_type='direct', queue="topic_server_False", queue_expiration=12345, - routing_key='' - ), + routing_key='topic_all_servers_False' + ) ))