diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index eb6d11ab6..352fb534e 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -335,12 +335,15 @@ class Consumer(object):
             # bugs.launchpad.net/oslo.messaging/+bug/1609766
             # bugs.launchpad.net/neutron/+bug/1318721
 
+            # 406 error code relates to messages that are doubled ack'd
+
             # At any channel error, the RabbitMQ closes
             # the channel, but the amqp-lib quietly re-open
             # it. So, we must reset all tags and declare
             # all consumers again.
             conn._new_tags = set(conn._consumers.values())
-            if exc.code == 404:
+            if exc.code == 404 or (exc.code == 406 and
+                                   exc.method_name == 'Basic.ack'):
                 self.declare(conn)
                 self.queue.consume(callback=self._callback,
                                    consumer_tag=six.text_type(tag),
@@ -593,6 +596,24 @@ class Connection(object):
                   ' %(hostname)s:%(port)s',
                   self._get_connection_info())
 
+        # FIXME(gordc): wrapper to catch both kombu v3 and v4 errors
+        # remove this and only catch OperationalError when >4.0.0
+        if hasattr(kombu.exceptions, 'OperationalError'):
+            self.recoverable_errors = kombu.exceptions.OperationalError
+        else:
+            # NOTE(sileht): Some dummy driver like the in-memory one doesn't
+            # have notion of recoverable connection, so we must raise the
+            # original exception like kombu does in this case.
+            has_modern_errors = hasattr(
+                self.connection.transport, 'recoverable_connection_errors',
+            )
+            if has_modern_errors:
+                self.recoverable_errors = (
+                    self.connection.recoverable_channel_errors +
+                    self.connection.recoverable_connection_errors)
+            else:
+                self.recoverable_errors = ()
+
         # NOTE(sileht): kombu recommend to run heartbeat_check every
         # seconds, but we use a lock around the kombu connection
         # so, to not lock to much this lock to most of the time do nothing
@@ -707,7 +728,7 @@ class Connection(object):
         # NOTE(sileht): we reset the channel and ensure
         # the kombu underlying connection works
         self._set_current_channel(None)
-        self.ensure(method=lambda: self.connection.connection)
+        self.ensure(method=self.connection.connect)
         self.set_transport_socket_timeout()
 
     def ensure(self, method, retry=None,
@@ -792,19 +813,6 @@ class Connection(object):
             self._set_current_channel(channel)
             method()
 
-        # NOTE(sileht): Some dummy driver like the in-memory one doesn't
-        # have notion of recoverable connection, so we must raise the original
-        # exception like kombu does in this case.
-        has_modern_errors = hasattr(
-            self.connection.transport, 'recoverable_connection_errors',
-        )
-        if has_modern_errors:
-            recoverable_errors = (
-                self.connection.recoverable_channel_errors +
-                self.connection.recoverable_connection_errors)
-        else:
-            recoverable_errors = ()
-
         try:
             autoretry_method = self.connection.autoretry(
                 execute_method, channel=self.channel,
@@ -817,7 +825,7 @@ class Connection(object):
             ret, channel = autoretry_method()
             self._set_current_channel(channel)
             return ret
-        except recoverable_errors as exc:
+        except self.recoverable_errors as exc:
             LOG.debug("Received recoverable error from kombu:",
                       exc_info=True)
             error_callback and error_callback(exc)
@@ -883,13 +891,11 @@ class Connection(object):
 
     def reset(self):
         """Reset a connection so it can be used again."""
-        recoverable_errors = (self.connection.recoverable_channel_errors +
-                              self.connection.recoverable_connection_errors)
         with self._connection_lock:
             try:
                 for consumer, tag in self._consumers.items():
                     consumer.cancel(tag=tag)
-            except recoverable_errors:
+            except self.recoverable_errors:
                 self.ensure_connection()
             self._consumers.clear()
             self._active_tags.clear()
@@ -987,10 +993,6 @@ class Connection(object):
         while not self._heartbeat_exit_event.is_set():
             with self._connection_lock.for_heartbeat():
 
-                recoverable_errors = (
-                    self.connection.recoverable_channel_errors +
-                    self.connection.recoverable_connection_errors)
-
                 try:
                     try:
                         self._heartbeat_check()
@@ -1004,7 +1006,7 @@ class Connection(object):
                             self.connection.drain_events(timeout=0.001)
                         except socket.timeout:
                             pass
-                    except recoverable_errors as exc:
+                    except self.recoverable_errors as exc:
                         LOG.info(_LI("A recoverable connection/channel error "
                                      "occurred, trying to reconnect: %s"), exc)
                         self.ensure_connection()
@@ -1091,6 +1093,12 @@ class Connection(object):
                 except socket.timeout as exc:
                     poll_timeout = timer.check_return(
                         _raise_timeout, exc, maximum=self._poll_timeout)
+                except self.connection.channel_errors as exc:
+                    if exc.code == 406 and exc.method_name == 'Basic.ack':
+                        # NOTE(gordc): occasionally multiple workers will grab
+                        # same message and acknowledge it. if it happens, meh.
+                        raise self.connection.recoverable_channel_errors[0]
+                    raise
 
         with self._connection_lock:
             self.ensure(_consume,
@@ -1172,7 +1180,8 @@ class Connection(object):
     def _get_connection_info(self):
         info = self.connection.info()
         client_port = None
-        if self.channel and hasattr(self.channel.connection, 'sock'):
+        if (self.channel and hasattr(self.channel.connection, 'sock')
+                and self.channel.connection.sock):
             client_port = self.channel.connection.sock.getsockname()[1]
         info.update({'client_port': client_port,
                      'connection_id': self.connection_id})
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index f3ddef60a..6ab452e36 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -24,9 +24,7 @@ import kombu
 import kombu.transport.memory
 from oslo_config import cfg
 from oslo_serialization import jsonutils
-from oslo_utils import versionutils
 from oslotest import mockpatch
-import pkg_resources
 import testscenarios
 
 import oslo_messaging
@@ -106,7 +104,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
 
     def test_test_heartbeat_sent_connection_fail(self):
         self._do_test_heartbeat_sent(
-            heartbeat_side_effect=kombu.exceptions.ConnectionError,
+            heartbeat_side_effect=kombu.exceptions.OperationalError,
             info='A recoverable connection/channel error occurred, '
             'trying to reconnect: %s')
 
@@ -219,23 +217,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
             conn._publish(exchange_mock, 'msg', routing_key='routing_key',
                           timeout=1)
 
-        # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
-        # version 3.0.25, so do conversion according to kombu version.
-        # TODO(gcb) remove this workaround when all supported branches
-        # with requirement kombu >=3.0.25
-        kombu_version = pkg_resources.get_distribution('kombu').version
-        if versionutils.is_compatible('3.0.25', kombu_version):
-            fake_publish.assert_called_with(
-                'msg', expiration=1,
-                exchange=exchange_mock,
-                compression=self.conf.oslo_messaging_rabbit.kombu_compression,
-                routing_key='routing_key')
-        else:
-            fake_publish.assert_called_with(
-                'msg', expiration=1000,
-                exchange=exchange_mock,
-                compression=self.conf.oslo_messaging_rabbit.kombu_compression,
-                routing_key='routing_key')
+        fake_publish.assert_called_with(
+            'msg', expiration=1,
+            exchange=exchange_mock,
+            compression=self.conf.oslo_messaging_rabbit.kombu_compression,
+            routing_key='routing_key')
 
     @mock.patch('kombu.messaging.Producer.publish')
     def test_send_no_timeout(self, fake_publish):
@@ -279,7 +265,8 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
 
             with mock.patch('kombu.transport.virtual.Channel.close'):
                 # Ensure the exchange does not exists
-                self.assertRaises(exc, try_send, e_passive)
+                self.assertRaises(oslo_messaging.MessageDeliveryFailure,
+                                  try_send, e_passive)
                 # Create it
                 try_send(e_active)
                 # Ensure it creates it
@@ -287,12 +274,14 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
 
             with mock.patch('kombu.messaging.Producer.publish',
                             side_effect=exc):
-                # Ensure the exchange is already in cache
-                self.assertIn('foobar', conn._declared_exchanges)
-                # Reset connection
-                self.assertRaises(exc, try_send, e_passive)
-                # Ensure the cache is empty
-                self.assertEqual(0, len(conn._declared_exchanges))
+                with mock.patch('kombu.transport.virtual.Channel.close'):
+                    # Ensure the exchange is already in cache
+                    self.assertIn('foobar', conn._declared_exchanges)
+                    # Reset connection
+                    self.assertRaises(oslo_messaging.MessageDeliveryFailure,
+                                      try_send, e_passive)
+                    # Ensure the cache is empty
+                    self.assertEqual(0, len(conn._declared_exchanges))
 
             try_send(e_active)
             self.assertIn('foobar', conn._declared_exchanges)
@@ -336,7 +325,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
                 conn.connection.connection.recoverable_connection_errors = ()
                 conn.connection.connection.recoverable_channel_errors = ()
                 self.assertEqual(1, declare.call_count)
-                conn.connection.connection.transport.drain_events = mock.Mock()
+                conn.connection.connection.drain_events = mock.Mock()
                 # Ensure that a queue will be re-declared if the consume method
                 # of kombu.Queue raise amqp.NotFound
                 conn.consume()
@@ -360,7 +349,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
                     IOError,)
                 conn.connection.connection.recoverable_channel_errors = ()
                 self.assertEqual(1, declare.call_count)
-                conn.connection.connection.transport.drain_events = mock.Mock()
+                conn.connection.connection.drain_events = mock.Mock()
                 # Ensure that a queue will be re-declared after
                 # 'queue not found' exception despite on connection error.
                 conn.consume()
@@ -963,10 +952,6 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
                     heartbeat_timeout_threshold=0,
                     group="oslo_messaging_rabbit")
 
-        self.kombu_connect = mock.Mock()
-        self.useFixture(mockpatch.Patch(
-            'kombu.connection.Connection.connect',
-            side_effect=self.kombu_connect))
         self.useFixture(mockpatch.Patch(
             'kombu.connection.Connection.connection'))
         self.useFixture(mockpatch.Patch(
@@ -976,6 +961,10 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
         url = oslo_messaging.TransportURL.parse(self.conf, None)
         self.connection = rabbit_driver.Connection(self.conf, url,
                                                    driver_common.PURPOSE_SEND)
+        self.kombu_connect = mock.Mock()
+        self.useFixture(mockpatch.Patch(
+            'kombu.connection.Connection.connect',
+            side_effect=self.kombu_connect))
         self.addCleanup(self.connection.close)
 
     def test_ensure_four_retry(self):
diff --git a/requirements.txt b/requirements.txt
index 7ef1b538c..b1a992e1f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -27,8 +27,9 @@ PyYAML>=3.10.0 # MIT
 
 # rabbit driver is the default
 # we set the amqp version to ensure heartbeat works
-amqp<2.0,>=1.4.0 # LGPL
-kombu<4.0.0,>=3.0.25 # BSD
+# FIXME(gordc): bump to amqp2 and kombu4 once requirements updated
+amqp>=1.4.0  # LGPL
+kombu>=3.0.25 # BSD
 pika>=0.10.0 # BSD
 pika-pool>=0.1.3 # BSD
 
diff --git a/tools/tox_install.sh b/tools/tox_install.sh
index 97a198da4..48ccf9688 100755
--- a/tools/tox_install.sh
+++ b/tools/tox_install.sh
@@ -27,5 +27,8 @@ pip install -c$localfile openstack-requirements
 edit-constraints $localfile -- $CLIENT_NAME
 
 pip install -c$localfile -U $*
+# NOTE(gordc): temporary override since kombu capped at <4.0.0
+pip install -U 'amqp>=2.0.0'
+pip install -U 'kombu>=4.0.0'
 
 exit $?
diff --git a/tox.ini b/tox.ini
index beab5fa6a..5b0365dcb 100644
--- a/tox.ini
+++ b/tox.ini
@@ -32,12 +32,16 @@ commands = python setup.py build_sphinx
 setenv =
     {[testenv]setenv}
     TRANSPORT_DRIVER=rabbit
+    amqp>=2.0.0
+    kombu>=4.0.0
 commands = pifpaf run rabbitmq --  python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
 
 [testenv:py35-func-rabbit]
 setenv =
     {[testenv]setenv}
     TRANSPORT_DRIVER=rabbit
+    amqp>=2.0.0
+    kombu>=4.0.0
 basepython = python3.5
 commands = pifpaf run rabbitmq --  python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'