From c2e647876126e7860b2cbdbc423d7583b42b7451 Mon Sep 17 00:00:00 2001 From: Jason Dunsmore Date: Tue, 8 Mar 2016 16:34:19 -0600 Subject: [PATCH] Add ability to cancel Threads and ThreadGroups Change-Id: I4d5eb361fd1820b4fe8501df6fe2a03c49e816b3 Partial-Bug: #1536451 --- oslo_service/tests/test_threadgroup.py | 29 +++++++++++++++++++++++ oslo_service/threadgroup.py | 32 ++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/oslo_service/tests/test_threadgroup.py b/oslo_service/tests/test_threadgroup.py index 83fa517..b0028fb 100644 --- a/oslo_service/tests/test_threadgroup.py +++ b/oslo_service/tests/test_threadgroup.py @@ -93,6 +93,35 @@ class ThreadGroupTestCase(test_base.BaseTestCase): self.assertEqual(0, len(self.tg.threads)) self.assertTrue(end_time - start_time >= 1) + def test_cancel_early(self): + + def foo(*args, **kwargs): + time.sleep(1) + self.tg.add_thread(foo, 'arg', kwarg='kwarg') + self.tg.cancel() + + self.assertEqual(0, len(self.tg.threads)) + + def test_cancel_late(self): + + def foo(*args, **kwargs): + time.sleep(0.3) + self.tg.add_thread(foo, 'arg', kwarg='kwarg') + time.sleep(0) + self.tg.cancel() + + self.assertEqual(1, len(self.tg.threads)) + + def test_cancel_timeout(self): + + def foo(*args, **kwargs): + time.sleep(0.3) + self.tg.add_thread(foo, 'arg', kwarg='kwarg') + time.sleep(0) + self.tg.cancel(timeout=0.2, wait_time=0.1) + + self.assertEqual(0, len(self.tg.threads)) + def test_stop_timers(self): def foo(*args, **kwargs): diff --git a/oslo_service/threadgroup.py b/oslo_service/threadgroup.py index 00bee87..f700207 100644 --- a/oslo_service/threadgroup.py +++ b/oslo_service/threadgroup.py @@ -13,6 +13,7 @@ # under the License. import logging import threading +import time import eventlet from eventlet import greenpool @@ -58,6 +59,9 @@ class Thread(object): def link(self, func, *args, **kwargs): self.thread.link(func, *args, **kwargs) + def cancel(self, *throw_args): + self.thread.cancel(*throw_args) + class ThreadGroup(object): """The point of the ThreadGroup class is to: @@ -154,3 +158,31 @@ class ThreadGroup(object): self._perform_action_on_threads( lambda x: x.wait(), lambda x: LOG.exception(_LE('Error waiting on thread.'))) + + def _any_threads_alive(self): + current = threading.current_thread() + for x in self.threads[:]: + if x.ident == current.ident: + # Don't check current thread. + continue + if not x.thread.dead: + return True + return False + + def cancel(self, *throw_args, **kwargs): + self._perform_action_on_threads( + lambda x: x.cancel(*throw_args), + lambda x: LOG.exception(_LE('Error canceling thread.'))) + + timeout = kwargs.get('timeout', None) + if timeout is None: + return + wait_time = kwargs.get('wait_time', 1) + start = time.time() + while self._any_threads_alive(): + run_time = time.time() - start + if run_time < timeout: + eventlet.sleep(wait_time) + continue + LOG.debug("Cancel timeout reached, stopping threads.") + self.stop()