Realign with upstream tarball
This commit is contained in:
parent
c82b7af89a
commit
6d42b67cef
@ -2,4 +2,3 @@
|
|||||||
host=review.openstack.org
|
host=review.openstack.org
|
||||||
port=29418
|
port=29418
|
||||||
project=openstack/oslo.messaging.git
|
project=openstack/oslo.messaging.git
|
||||||
defaultbranch=stable/kilo
|
|
||||||
|
@ -360,113 +360,6 @@ class ConnectionLock(DummyConnectionLock):
|
|||||||
return lambda: threading.current_thread()
|
return lambda: threading.current_thread()
|
||||||
|
|
||||||
|
|
||||||
class DummyConnectionLock(object):
|
|
||||||
def acquire(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def heartbeat_acquire(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.acquire()
|
|
||||||
|
|
||||||
def __exit__(self, type, value, traceback):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionLock(DummyConnectionLock):
|
|
||||||
"""Lock object to protect access the the kombu connection
|
|
||||||
|
|
||||||
This is a lock object to protect access the the kombu connection
|
|
||||||
object between the heartbeat thread and the driver thread.
|
|
||||||
|
|
||||||
They are two way to acquire this lock:
|
|
||||||
* lock.acquire()
|
|
||||||
* lock.heartbeat_acquire()
|
|
||||||
|
|
||||||
In both case lock.release(), release the lock.
|
|
||||||
|
|
||||||
The goal is that the heartbeat thread always have the priority
|
|
||||||
for acquiring the lock. This ensures we have no heartbeat
|
|
||||||
starvation when the driver sends a lot of messages.
|
|
||||||
|
|
||||||
So when lock.heartbeat_acquire() is called next time the lock
|
|
||||||
is released(), the caller unconditionnaly acquires
|
|
||||||
the lock, even someone else have asked for the lock before it.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._workers_waiting = 0
|
|
||||||
self._heartbeat_waiting = False
|
|
||||||
self._lock_acquired = None
|
|
||||||
self._monitor = threading.Lock()
|
|
||||||
self._workers_locks = threading.Condition(self._monitor)
|
|
||||||
self._heartbeat_lock = threading.Condition(self._monitor)
|
|
||||||
self._get_thread_id = self._fetch_current_thread_functor()
|
|
||||||
|
|
||||||
def acquire(self):
|
|
||||||
with self._monitor:
|
|
||||||
while self._lock_acquired:
|
|
||||||
self._workers_waiting += 1
|
|
||||||
self._workers_locks.wait()
|
|
||||||
self._workers_waiting -= 1
|
|
||||||
self._lock_acquired = self._get_thread_id()
|
|
||||||
|
|
||||||
def heartbeat_acquire(self):
|
|
||||||
# NOTE(sileht): must be called only one time
|
|
||||||
with self._monitor:
|
|
||||||
while self._lock_acquired is not None:
|
|
||||||
self._heartbeat_waiting = True
|
|
||||||
self._heartbeat_lock.wait()
|
|
||||||
self._heartbeat_waiting = False
|
|
||||||
self._lock_acquired = self._get_thread_id()
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
with self._monitor:
|
|
||||||
if self._lock_acquired is None:
|
|
||||||
raise RuntimeError("We can't release a not acquired lock")
|
|
||||||
thread_id = self._get_thread_id()
|
|
||||||
if self._lock_acquired != thread_id:
|
|
||||||
raise RuntimeError("We can't release lock acquired by another "
|
|
||||||
"thread/greenthread; %s vs %s" %
|
|
||||||
(self._lock_acquired, thread_id))
|
|
||||||
self._lock_acquired = None
|
|
||||||
if self._heartbeat_waiting:
|
|
||||||
self._heartbeat_lock.notify()
|
|
||||||
elif self._workers_waiting > 0:
|
|
||||||
self._workers_locks.notify()
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def for_heartbeat(self):
|
|
||||||
self.heartbeat_acquire()
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _fetch_current_thread_functor():
|
|
||||||
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
|
|
||||||
# or addressed we have to use complicated workaround to get a object
|
|
||||||
# that will not be recycled; the usage of threading.current_thread()
|
|
||||||
# doesn't appear to currently be monkey patched and therefore isn't
|
|
||||||
# reliable to use (and breaks badly when used as all threads share
|
|
||||||
# the same current_thread() object)...
|
|
||||||
try:
|
|
||||||
import eventlet
|
|
||||||
from eventlet import patcher
|
|
||||||
green_threaded = patcher.is_monkey_patched('thread')
|
|
||||||
except ImportError:
|
|
||||||
green_threaded = False
|
|
||||||
if green_threaded:
|
|
||||||
return lambda: eventlet.getcurrent()
|
|
||||||
else:
|
|
||||||
return lambda: threading.current_thread()
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
"""Connection object."""
|
"""Connection object."""
|
||||||
|
|
||||||
@ -955,9 +848,6 @@ class Connection(object):
|
|||||||
if self._heartbeat_supported_and_enabled():
|
if self._heartbeat_supported_and_enabled():
|
||||||
self._heartbeat_check()
|
self._heartbeat_check()
|
||||||
|
|
||||||
if self._heartbeat_supported_and_enabled():
|
|
||||||
self.connection.heartbeat_check(
|
|
||||||
rate=self.driver_conf.heartbeat_rate)
|
|
||||||
try:
|
try:
|
||||||
self.connection.drain_events(timeout=poll_timeout)
|
self.connection.drain_events(timeout=poll_timeout)
|
||||||
return
|
return
|
||||||
@ -1150,10 +1040,6 @@ class Connection(object):
|
|||||||
'routing_key': routing_key})
|
'routing_key': routing_key})
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
continue
|
continue
|
||||||
self._log_publisher_send_error(msg_id, exc)
|
|
||||||
raise
|
|
||||||
except Exception as exc:
|
|
||||||
self._log_publisher_send_error(msg_id, exc)
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def direct_send(self, msg_id, msg):
|
def direct_send(self, msg_id, msg):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user