From 5eb6e432dde2993e5e99025ba7f4be8b899cef12 Mon Sep 17 00:00:00 2001 From: Bin Yang Date: Tue, 9 Mar 2021 15:24:19 +0800 Subject: [PATCH] Workaround retry issue of connection to notificationservice Monkey patch kombu package to avoid retrying forever while connecting to rabbitmq broker Spawn a thread to ensure the connection to notificationservice Story: 2008529 Task: 42020 Signed-off-by: Bin Yang Change-Id: Ic12bd2af29ad71739f157dac66998d7972eb2edc --- .../notificationclientsdk/client/base.py | 85 ++++++++++++++----- .../common/helpers/patcher.py | 31 +++++++ .../services/__init__.py | 4 + .../notificationclientsdk/services/daemon.py | 13 ++- .../notificationclientsdk/services/ptp.py | 35 +++++--- 5 files changed, 127 insertions(+), 41 deletions(-) create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/patcher.py diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py index 83c8973..ccbb312 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py @@ -7,6 +7,7 @@ import os import json import time +import threading import oslo_messaging from oslo_config import cfg from notificationclientsdk.common.helpers import rpc_helper @@ -25,13 +26,42 @@ class BrokerClientBase(object): self.listeners = {} self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint) self.transport = rpc_helper.get_transport(self.broker_endpoint) + self._workerevent = threading.Event() + self._workerlock = threading.Lock() + self._workerterminated = False + # spawn a thread to retry on setting up listener + self._workerthread = threading.Thread(target=self._refresher, args=()) + self._workerthread.start() + LOG.debug("Created Broker client:{0}".format(broker_name)) def __del__(self): + self._workerterminated = True + self._workerevent.set() self.transport.cleanup() del self.transport return + def _refresher(self, retry_interval=5): + while not self._workerterminated: + self._workerevent.wait() + self._workerevent.clear() + allset = False + with self._workerlock: + allset = self._refresh() + if self._workerevent.is_set(): + continue + if not allset: + # retry later + time.sleep(retry_interval) + # retry on next loop + self._workerevent.set() + + def __is_listening(self, context): + isactive = context and context.get( + 'active', False) and context.get('rpcserver', False) + return isactive + def __create_listener(self, context): target = oslo_messaging.Target( topic=context['topic'], @@ -42,6 +72,7 @@ class BrokerClientBase(object): return server def _refresh(self): + allset = True for topic, servers in self.listeners.items(): for servername, context in servers.items(): try: @@ -57,44 +88,52 @@ class BrokerClientBase(object): rpcserver.wait() context.pop('rpcserver') LOG.debug("Stopped rpcserver@{0}@{1}".format(context['topic'], context['server'])) - except: - LOG.error("Failed to update listener for topic/server:{0}/{1}" - .format(topic, servername)) + except Exception as ex: + LOG.error("Failed to update listener for topic/server:{0}/{1}, reason:{2}" + .format(topic, servername, str(ex))) + allset = False continue + return allset + + def _trigger_refresh_listener(self, context): + self._workerevent.set() + # # sleep to re-schedule to run worker thread + # time.sleep(2) def add_listener(self, topic, server, listener_endpoints=None): context = self.listeners.get(topic,{}).get(server, {}) - if not context: - context = { - 'endpoints': listener_endpoints, - 'topic': topic, - 'server': server, - 'active': True - } - if not self.listeners.get(topic, None): - self.listeners[topic] = {} - self.listeners[topic][server] = context - else: - context['endpoints'] = listener_endpoints - context['active'] = True + with self._workerlock: + if not context: + context = { + 'endpoints': listener_endpoints, + 'topic': topic, + 'server': server, + 'active': True + } + if not self.listeners.get(topic, None): + self.listeners[topic] = {} + self.listeners[topic][server] = context + else: + context['endpoints'] = listener_endpoints + context['active'] = True - self._refresh() + self._trigger_refresh_listener(context) def remove_listener(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) - if context: - context['active'] = False - self._refresh() + with self._workerlock: + if context: + context['active'] = False + self._trigger_refresh_listener(context) def is_listening(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) - return context.get('active', False) + return self.__is_listening(context) def any_listener(self): for topic, servers in self.listeners.items(): for servername, context in servers.items(): - isactive = context.get('active', False) - if isactive: + if self.__is_listening(context): return True return False diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/patcher.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/patcher.py new file mode 100644 index 0000000..f1ee9a7 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/patcher.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import kombu.utils.functional + +class OsloMessagingPatcher(object): + retry_over_time_orig = None + @staticmethod + def retry_over_time_patch( + fun, catch, args=None, kwargs=None, errback=None, + max_retries=None, interval_start=2, interval_step=2, + interval_max=30, callback=None, timeout=None): + """ + patch to retry_over_time with default max_retries=5 + """ + if not max_retries: + max_retries = 2 + return OsloMessagingPatcher.retry_over_time_orig( + fun, catch, args, kwargs, errback, + max_retries, interval_start, interval_step, + interval_max, callback, timeout) + + @staticmethod + def patch(): + if not OsloMessagingPatcher.retry_over_time_orig: + OsloMessagingPatcher.retry_over_time_orig = kombu.utils.functional.retry_over_time + kombu.utils.functional.retry_over_time = OsloMessagingPatcher.retry_over_time_patch + return diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py index 6be15e8..830ccec 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/__init__.py @@ -3,3 +3,7 @@ # # SPDX-License-Identifier: Apache-2.0 # + +from notificationclientsdk.common.helpers.patcher import OsloMessagingPatcher + +OsloMessagingPatcher.patch() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py index 1faf52a..139a875 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -615,16 +615,21 @@ class NotificationWorker: self.signal_node_resources_event() self.signal_events() - def __start_watch_all_nodes(self): + def __start_watch_all_nodes(self, retry_interval=5): try: - if not self.locationservice_client.is_listening_on_location( + while not self.locationservice_client.is_listening_on_location( NodeInfoHelper.BROKER_NODE_ALL): # start watching on the location announcement self.locationservice_client.add_location_listener( NodeInfoHelper.BROKER_NODE_ALL, location_handler=self.__NodeInfoWatcher) - LOG.debug("Start watching location announcement of notificationservice@{0}" - .format(NodeInfoHelper.BROKER_NODE_ALL)) + LOG.debug( + "Start watching location announcement of notificationservice@{0}" + .format(NodeInfoHelper.BROKER_NODE_ALL)) + if not self.locationservice_client.is_listening_on_location( + NodeInfoHelper.BROKER_NODE_ALL): + # retry later and forever + time.sleep(retry_interval) self.locationservice_client.trigger_location_annoucement(timeout=20, retry=10) except Exception as ex: LOG.debug("exception: {0}".format(str(ex))) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 602a5de..07d4ecf 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -7,6 +7,7 @@ import oslo_messaging import logging import json +import kombu from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo @@ -48,7 +49,7 @@ class PtpService(object): if ResourceType.TypePTP in supported_resource_types: return self._query(default_node_name) else: - raise client_exception.ResourceNotAvailable(default_node_name, ResourceType.TypePTP) + raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) def _query(self, broker_node_name): broker_host = "notificationservice-{0}".format(broker_node_name) @@ -57,12 +58,23 @@ class PtpService(object): self.daemon_control.daemon_context['NOTIFICATION_BROKER_PASS'], broker_host, self.daemon_control.daemon_context['NOTIFICATION_BROKER_PORT']) - notificationservice_client = NotificationServiceClient( - broker_node_name, broker_transport_endpoint) - resource_status = notificationservice_client.query_resource_status( - ResourceType.TypePTP, timeout=5, retry=10) - del notificationservice_client - return resource_status + notificationservice_client = None + try: + notificationservice_client = NotificationServiceClient( + broker_node_name, broker_transport_endpoint) + resource_status = notificationservice_client.query_resource_status( + ResourceType.TypePTP, timeout=5, retry=10) + return resource_status + except oslo_messaging.exceptions.MessagingTimeout as ex: + LOG.warning("ptp status is not available @node {0} due to {1}".format( + broker_node_name, str(ex))) + raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) + except kombu.exceptions.OperationalError as ex: + LOG.warning("Node {0} is unreachable yet".format(broker_node_name)) + raise client_exception.NodeNotAvailable(broker_node_name) + finally: + if notificationservice_client: + del notificationservice_client def add_subscription(self, subscription_dto): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) @@ -77,13 +89,8 @@ class PtpService(object): # get initial resource status if default_node_name: ptpstatus = None - try: - ptpstatus = self._query(default_node_name) - LOG.info("initial ptpstatus:{0}".format(ptpstatus)) - except oslo_messaging.exceptions.MessagingTimeout as ex: - LOG.warning("ptp status is not available @node {0} due to {1}".format( - default_node_name, str(ex))) - raise client_exception.ResourceNotAvailable(broker_node_name, subscription_dto.ResourceType) + ptpstatus = self._query(default_node_name) + LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry subscription_orm.InitialDeliveryTimestamp = ptpstatus.get('EventTimestamp', None)