Workaround retry issue of connection to notificationservice

Monkey patch kombu package to avoid retrying forever
while connecting to rabbitmq broker

Spawn a thread to ensure the connection to notificationservice

Story: 2008529
Task: 42020

Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: Ic12bd2af29ad71739f157dac66998d7972eb2edc
This commit is contained in:
Bin Yang 2021-03-09 15:24:19 +08:00
parent 1ca32529ad
commit 5eb6e432dd
5 changed files with 127 additions and 41 deletions

View File

@ -7,6 +7,7 @@
import os
import json
import time
import threading
import oslo_messaging
from oslo_config import cfg
from notificationclientsdk.common.helpers import rpc_helper
@ -25,13 +26,42 @@ class BrokerClientBase(object):
self.listeners = {}
self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint)
self.transport = rpc_helper.get_transport(self.broker_endpoint)
self._workerevent = threading.Event()
self._workerlock = threading.Lock()
self._workerterminated = False
# spawn a thread to retry on setting up listener
self._workerthread = threading.Thread(target=self._refresher, args=())
self._workerthread.start()
LOG.debug("Created Broker client:{0}".format(broker_name))
def __del__(self):
self._workerterminated = True
self._workerevent.set()
self.transport.cleanup()
del self.transport
return
def _refresher(self, retry_interval=5):
while not self._workerterminated:
self._workerevent.wait()
self._workerevent.clear()
allset = False
with self._workerlock:
allset = self._refresh()
if self._workerevent.is_set():
continue
if not allset:
# retry later
time.sleep(retry_interval)
# retry on next loop
self._workerevent.set()
def __is_listening(self, context):
isactive = context and context.get(
'active', False) and context.get('rpcserver', False)
return isactive
def __create_listener(self, context):
target = oslo_messaging.Target(
topic=context['topic'],
@ -42,6 +72,7 @@ class BrokerClientBase(object):
return server
def _refresh(self):
allset = True
for topic, servers in self.listeners.items():
for servername, context in servers.items():
try:
@ -57,13 +88,21 @@ class BrokerClientBase(object):
rpcserver.wait()
context.pop('rpcserver')
LOG.debug("Stopped rpcserver@{0}@{1}".format(context['topic'], context['server']))
except:
LOG.error("Failed to update listener for topic/server:{0}/{1}"
.format(topic, servername))
except Exception as ex:
LOG.error("Failed to update listener for topic/server:{0}/{1}, reason:{2}"
.format(topic, servername, str(ex)))
allset = False
continue
return allset
def _trigger_refresh_listener(self, context):
self._workerevent.set()
# # sleep to re-schedule to run worker thread
# time.sleep(2)
def add_listener(self, topic, server, listener_endpoints=None):
context = self.listeners.get(topic,{}).get(server, {})
with self._workerlock:
if not context:
context = {
'endpoints': listener_endpoints,
@ -78,23 +117,23 @@ class BrokerClientBase(object):
context['endpoints'] = listener_endpoints
context['active'] = True
self._refresh()
self._trigger_refresh_listener(context)
def remove_listener(self, topic, server):
context = self.listeners.get(topic,{}).get(server, {})
with self._workerlock:
if context:
context['active'] = False
self._refresh()
self._trigger_refresh_listener(context)
def is_listening(self, topic, server):
context = self.listeners.get(topic,{}).get(server, {})
return context.get('active', False)
return self.__is_listening(context)
def any_listener(self):
for topic, servers in self.listeners.items():
for servername, context in servers.items():
isactive = context.get('active', False)
if isactive:
if self.__is_listening(context):
return True
return False

View File

@ -0,0 +1,31 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import kombu.utils.functional
class OsloMessagingPatcher(object):
retry_over_time_orig = None
@staticmethod
def retry_over_time_patch(
fun, catch, args=None, kwargs=None, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None, timeout=None):
"""
patch to retry_over_time with default max_retries=5
"""
if not max_retries:
max_retries = 2
return OsloMessagingPatcher.retry_over_time_orig(
fun, catch, args, kwargs, errback,
max_retries, interval_start, interval_step,
interval_max, callback, timeout)
@staticmethod
def patch():
if not OsloMessagingPatcher.retry_over_time_orig:
OsloMessagingPatcher.retry_over_time_orig = kombu.utils.functional.retry_over_time
kombu.utils.functional.retry_over_time = OsloMessagingPatcher.retry_over_time_patch
return

View File

@ -3,3 +3,7 @@
#
# SPDX-License-Identifier: Apache-2.0
#
from notificationclientsdk.common.helpers.patcher import OsloMessagingPatcher
OsloMessagingPatcher.patch()

View File

@ -615,16 +615,21 @@ class NotificationWorker:
self.signal_node_resources_event()
self.signal_events()
def __start_watch_all_nodes(self):
def __start_watch_all_nodes(self, retry_interval=5):
try:
if not self.locationservice_client.is_listening_on_location(
while not self.locationservice_client.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# start watching on the location announcement
self.locationservice_client.add_location_listener(
NodeInfoHelper.BROKER_NODE_ALL,
location_handler=self.__NodeInfoWatcher)
LOG.debug("Start watching location announcement of notificationservice@{0}"
LOG.debug(
"Start watching location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
if not self.locationservice_client.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# retry later and forever
time.sleep(retry_interval)
self.locationservice_client.trigger_location_annoucement(timeout=20, retry=10)
except Exception as ex:
LOG.debug("exception: {0}".format(str(ex)))

View File

@ -7,6 +7,7 @@
import oslo_messaging
import logging
import json
import kombu
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
@ -48,7 +49,7 @@ class PtpService(object):
if ResourceType.TypePTP in supported_resource_types:
return self._query(default_node_name)
else:
raise client_exception.ResourceNotAvailable(default_node_name, ResourceType.TypePTP)
raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP)
def _query(self, broker_node_name):
broker_host = "notificationservice-{0}".format(broker_node_name)
@ -57,12 +58,23 @@ class PtpService(object):
self.daemon_control.daemon_context['NOTIFICATION_BROKER_PASS'],
broker_host,
self.daemon_control.daemon_context['NOTIFICATION_BROKER_PORT'])
notificationservice_client = None
try:
notificationservice_client = NotificationServiceClient(
broker_node_name, broker_transport_endpoint)
resource_status = notificationservice_client.query_resource_status(
ResourceType.TypePTP, timeout=5, retry=10)
del notificationservice_client
return resource_status
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("ptp status is not available @node {0} due to {1}".format(
broker_node_name, str(ex)))
raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP)
except kombu.exceptions.OperationalError as ex:
LOG.warning("Node {0} is unreachable yet".format(broker_node_name))
raise client_exception.NodeNotAvailable(broker_node_name)
finally:
if notificationservice_client:
del notificationservice_client
def add_subscription(self, subscription_dto):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
@ -77,13 +89,8 @@ class PtpService(object):
# get initial resource status
if default_node_name:
ptpstatus = None
try:
ptpstatus = self._query(default_node_name)
LOG.info("initial ptpstatus:{0}".format(ptpstatus))
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("ptp status is not available @node {0} due to {1}".format(
default_node_name, str(ex)))
raise client_exception.ResourceNotAvailable(broker_node_name, subscription_dto.ResourceType)
# construct subscription entry
subscription_orm.InitialDeliveryTimestamp = ptpstatus.get('EventTimestamp', None)