Fix the notificationclient issue of not responding to pull request

Change the client to use Pod IP instead of host alias to talk to notification services
Refactor client according to Single Responsibility Principle:
1,extract broker state management logic from daemon module to broker_state_manager
2,extract broker connection management logic from daemon module to broker_connection_manager
3,extract notification handling logic from daemon module to notification_handler
4,move NotificationWorker from daemon module to notification_worker module
5,change broker endpoint from host alias to IP which removes dependency to /etc/hosts
6,add logic to re-setup the broker connection in case of notificationservice pod IP changes

Partial-Bug: 1924198

Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: Ifc9a16912f5ccebe0426a0d4e72c0f13dcbabcd7
This commit is contained in:
Bin Yang 2021-04-16 16:41:10 +08:00
parent 5e0e557796
commit b178397adf
11 changed files with 1179 additions and 689 deletions

View File

@ -7,6 +7,9 @@
import os
SIDECAR_API_PORT = os.environ.get("SIDECAR_API_PORT", "8080")
SIDECAR_API_HOST = os.environ.get("SIDECAR_API_HOST", "127.0.0.1")
DATASTORE_PATH = os.environ.get("DATASTORE_PATH", "/opt/datastore")
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO")
# Server Specific Configurations
server = {
'port': SIDECAR_API_PORT,
@ -29,14 +32,14 @@ app = {
logging = {
'root': {'level': 'INFO', 'handlers': ['console']},
'loggers': {
'sidecar': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False},
'pecan': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False},
'sidecar': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False},
'pecan': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False},
'py.warnings': {'handlers': ['console']},
'__force_dict__': True
},
'handlers': {
'console': {
'level': 'DEBUG',
'level': LOGGING_LEVEL,
'class': 'logging.StreamHandler',
'formatter': 'color'
}
@ -57,7 +60,7 @@ logging = {
# Bindings and options to pass to SQLAlchemy's ``create_engine``
sqlalchemy = {
'url' : 'sqlite:///sidecar.db',
'url' : "sqlite:////{0}/sidecar.db".format(DATASTORE_PATH),
'echo' : False,
'echo_pool' : False,
'pool_recycle' : 3600,

View File

@ -33,13 +33,20 @@ class BrokerClientBase(object):
self._workerthread = threading.Thread(target=self._refresher, args=())
self._workerthread.start()
LOG.debug("Created Broker client:{0}".format(broker_name))
LOG.debug("Created Broker client:{0},{1}".format(broker_name, broker_transport_endpoint))
def __del__(self):
self.cleanup()
def cleanup(self):
self.clean_listeners()
self._workerterminated = True
self._workerevent.set()
self.transport.cleanup()
del self.transport
if self._workerevent:
self._workerevent.set()
if self.transport:
self.transport.cleanup()
del self.transport
self.transport = None
return
def _refresher(self, retry_interval=5):
@ -95,7 +102,7 @@ class BrokerClientBase(object):
continue
return allset
def _trigger_refresh_listener(self, context):
def _trigger_refresh_listener(self):
self._workerevent.set()
# # sleep to re-schedule to run worker thread
# time.sleep(2)
@ -117,14 +124,14 @@ class BrokerClientBase(object):
context['endpoints'] = listener_endpoints
context['active'] = True
self._trigger_refresh_listener(context)
self._trigger_refresh_listener()
def remove_listener(self, topic, server):
context = self.listeners.get(topic,{}).get(server, {})
with self._workerlock:
if context:
context['active'] = False
self._trigger_refresh_listener(context)
self._trigger_refresh_listener()
def is_listening(self, topic, server):
context = self.listeners.get(topic,{}).get(server, {})
@ -137,6 +144,18 @@ class BrokerClientBase(object):
return True
return False
def __is_connected(self, context):
return context.get('rpcserver', None) is not None if context else False
def clean_listeners(self):
for topic, servers in self.listeners.items():
for server, context in servers.items():
self.remove_listener(topic, server)
self._trigger_refresh_listener()
LOG.debug("listener {0}@{1} {2} stopped".format(
topic, server,
'is' if self.__is_connected(context) else 'is not yet'))
def call(self, topic, server, api_name, timeout=None, retry=None, **api_kwargs):
target = oslo_messaging.Target(
topic=topic, server=server, version=self.broker_endpoint.Version,

View File

@ -45,9 +45,10 @@ class NotificationServiceClient(BrokerClientBase):
return time.time()
'''Init client to notification service'''
def __init__(self, target_node_name, notificationservice_transport_endpoint):
def __init__(self, target_node_name, notificationservice_transport_endpoint, broker_pod_ip):
self.Id = id(self)
self.target_node_name = target_node_name
self.broker_pod_ip = broker_pod_ip
super(NotificationServiceClient, self).__init__(
'{0}'.format(target_node_name),
notificationservice_transport_endpoint)
@ -89,3 +90,6 @@ class NotificationServiceClient(BrokerClientBase):
server="{0}-EventListener-{1}".format(resource_type, self.Id)
return super(NotificationServiceClient, self).is_listening(
topic, server)
def is_broker_ip(self, broker_pod_ip):
return self.broker_pod_ip == broker_pod_ip

View File

@ -4,8 +4,11 @@
# SPDX-License-Identifier: Apache-2.0
#
import os
import logging
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO")
def get_logger(module_name):
logger = logging.getLogger(module_name)
return config_logger(logger)
@ -14,5 +17,5 @@ def config_logger(logger):
'''
configure the logger: uncomment following lines for debugging
'''
# logger.setLevel(level=logging.DEBUG)
logger.setLevel(LOGGING_LEVEL)
return logger

View File

@ -0,0 +1,123 @@
#coding=utf-8
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
from wsme import types as wtypes
from notificationclientsdk.model.dto.resourcetype import EnumResourceType
class BrokerState(wtypes.Base):
BrokerName = wtypes.text
# Timestamp = float
Connected = bool
ConnectionStateChanged = bool
BrokerIP = wtypes.text
BrokerIPChanged = bool
ResourceTypes = [EnumResourceType]
ResourceTypesChanged = bool
ResourceTypesSubscribed = {wtypes.text:int}
ResourceTypesSubscribedChanged = bool
DataSyncupPendingDetails = [wtypes.text]
def update_connection_state(self, is_connected):
if self.Connected != is_connected:
self.Connected = is_connected
self.ConnectionStateChanged = True
return self.ConnectionStateChanged
def is_connected(self):
return self.Connected
def is_connection_state_changed(self):
return self.ConnectionStateChanged
def ack_connection_state_changed(self):
self.ConnectionStateChanged = False
def update_broker_ip(self, broker_ip):
if self.BrokerIP != broker_ip:
self.BrokerIP = broker_ip
self.BrokerIPChanged = True
return self.BrokerIPChanged
def is_broker_ip_changed(self):
return self.BrokerIPChanged
def ack_broker_ip_changed(self):
self.BrokerIPChanged = False
def update_resources(self, resources_list):
sorted_resource_list = resources_list.sort()
if self.ResourceTypes != sorted_resource_list:
self.ResourceTypes = sorted_resource_list
self.ResourceTypesChanged = True
return self.ResourceTypesChanged
def is_resources_changed(self):
return self.ResourceTypesChanged
def ack_resources_changed(self):
self.ResourceTypesChanged = False
def any_resource_subscribed(self):
return len(self.ResourceTypesSubscribed) > 0
def try_subscribe_resource(self, resource_type, indicator):
self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or not resource_type in self.ResourceTypesSubscribed
self.ResourceTypesSubscribed[resource_type] = indicator
def try_unsubscribe_resource(self, resource_type):
self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or resource_type in self.ResourceTypesSubscribed
self.ResourceTypesSubscribed.pop(resource_type, None)
def is_resource_subscribed_changed(self):
return self.ResourceTypesSubscribedChanged
def ack_resource_subscribed_changed(self):
self.ResourceTypesSubscribedChanged = False
def is_resource_subscribed(self, resource_type):
return resource_type in self.ResourceTypesSubscribed
def any_obsolete_subscription(self, indicator):
for s, i in self.ResourceTypesSubscribed.items():
if i != indicator:
return True
return False
def any_resource_subscribed(self):
return len(self.ResourceTypesSubscribed) > 0
def unsubscribe_resource_obsolete(self, indicator):
uninterested = []
for s, i in self.ResourceTypesSubscribed.items():
if i != indicator:
uninterested.append(s)
for s in uninterested:
self.ResourceTypesSubscribed.pop(s, None)
return len(uninterested) > 0
def signal_data_syncup(self, resource_type=None):
if not resource_type:
self.DataSyncupPendingDetails = [k for k,v in self.ResourceTypesSubscribed.items()]
elif resource_type not in self.ResourceTypesSubscribed:
return False
elif not resource_type in self.DataSyncupPendingDetails:
self.DataSyncupPendingDetails.append(resource_type)
return True
def ack_data_syncup(self, resource_type=None):
if not resource_type:
self.DataSyncupPendingDetails = []
elif resource_type in self.DataSyncupPendingDetails:
self.DataSyncupPendingDetails.remove(resource_type)
def is_data_syncup(self, resource_type=None):
if not resource_type:
return len(self.DataSyncupPendingDetails or []) > 0
else:
return not resource_type in self.DataSyncupPendingDetails or []

View File

@ -0,0 +1,320 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
import oslo_messaging
import logging
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.broker_state import BrokerState
from notificationclientsdk.client.locationservice import LocationServiceClient
from notificationclientsdk.client.notificationservice import NotificationServiceClient
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class BrokerConnectionManager:
def __init__(self, broker_location_handler, notification_handler, broker_connection_contexts):
'''
broker_watchers: {
"<broker name1>": {
"broker_client": client1,
"subscribed_resource_list": ['PTP', ...]
},
{...}
...
}
'''
self.shared_broker_context = broker_connection_contexts
self.registration_endpoint = RpcEndpointInfo(
self.shared_broker_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.broker_watchers = {}
self.location_watcher = LocationServiceClient(self.registration_endpoint.TransportEndpoint)
self.__broker_location_handler = broker_location_handler
self.__notification_handler = notification_handler
def __del__(self):
if self.location_watcher:
self.location_watcher.cleanup()
del self.location_watcher
self.location_watcher = None
def start(self):
self.__start_watch_all_nodes()
def stop(self):
self.__stop_watch_all_nodes()
def __validate(self, brokerstate):
valid = brokerstate.BrokerName or brokerstate.BrokerIP
return valid
def start_watching_broker(self, brokerstate):
try:
if not self.__validate(brokerstate):
return False
broker_name = brokerstate.BrokerName
# must make sure the location is updated/watched:
# 1, check and start location watcher
if not self.location_watcher.is_listening_on_location(broker_name):
# start watching on the location announcement
self.location_watcher.add_location_listener(
broker_name,
location_handler=self.__broker_location_handler)
LOG.debug("Start watching location announcement of notificationservice@{0}"
.format(broker_name))
# try to update location by query
try:
location_info = self.location_watcher.query_location(broker_name, timeout=5, retry=2)
LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info))
if location_info:
podip = location_info.get("PodIP", None)
resourcetypes = location_info.get("ResourceTypes", None)
brokerstate.update_broker_ip(podip)
brokerstate.update_resources(resourcetypes)
else:
return False
except Exception as ex:
LOG.warning("Failed to update location of node:{0} due to: {1}".format(
broker_name, str(ex)))
raise ex
# 2, create broker connection
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
if not broker_client:
LOG.debug("Start watching notifications from notificationservice@{0}".format(broker_name))
broker_client = self.__create_client(broker_name, brokerstate.BrokerIP)
broker_watcher["broker_client"] = broker_client
self.broker_watchers[broker_name] = broker_watcher
# 3, update watching resources
result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate)
return result
except Exception as ex:
LOG.warning("failed to start watching:{0},{1}".format(
brokerstate, str(ex)))
return False
def __stop_watching_broker_resource(self, broker_client, broker_name, resource_type):
try:
if broker_client.is_listening_on_resource(resource_type):
broker_client.remove_resource_status_listener(resource_type)
return True
except Exception as ex:
LOG.warning("failed to stop watching resource:{0}@{1},{2}".format(
broker_name, resource_type, str(ex)))
return False
def __start_watching_broker_resource(self, broker_client, broker_name, resource_type):
try:
if not broker_client.is_listening_on_resource(resource_type):
broker_client.add_resource_status_listener(
resource_type, status_handler=self.__notification_handler)
LOG.debug("Start watching {0}@{1}".format(resource_type, broker_name))
return True
except Exception as ex:
LOG.warning("failed to start watching resource:{0}@{1},{2}".format(
resource_type, broker_name, str(ex)))
return False
def stop_watching_broker(self, broker_name):
try:
# 1, stop listening to broker's location announcement
if self.location_watcher.is_listening_on_location(broker_name):
self.location_watcher.remove_location_listener(broker_name)
LOG.debug("Stop watching location announcement for broker@{0}"
.format(broker_name))
# 2, remove broker client
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
if broker_client:
broker_client.cleanup()
del broker_client
broker_client = None
self.broker_watchers.pop(broker_name, None)
LOG.debug("Stop watching notificationservice@{0}".format(broker_name))
return True
except Exception as ex:
LOG.warning("failed to start watching:{0},{1}".format(
broker_name, str(ex)))
return False
def restart_watching_broker(self, brokerstate):
try:
broker_name = brokerstate.BrokerName
LOG.debug("Try to restart watching notificationservice@{0}".format(broker_name))
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
if broker_client:
broker_client.cleanup()
del broker_client
broker_client = None
self.broker_watchers.pop(broker_name, None)
return self.start_watching_broker(brokerstate)
except Exception as ex:
LOG.warning("failed to restart watching:{0},{1}".format(
brokerstate, str(ex)))
return False
def update_watching_resources(self, brokerstate):
try:
broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, {})
broker_client = broker_watcher.get("broker_client", None)
if broker_client:
result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate)
return result
return False
except Exception as ex:
LOG.warning("failed to start watching:{0},{1}".format(
brokerstate, str(ex)))
return False
def __update_watching_resources(self, broker_watcher, broker_client, brokerstate):
try:
result = True
# 1, filter out those unsubscribed resources
subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[])
if subscribed_resource_list != brokerstate.ResourceTypesSubscribed:
# stop watching those uninterested
for resource_type in subscribed_resource_list:
if resource_type not in brokerstate.ResourceTypesSubscribed:
result = self.__stop_watching_broker_resource(
broker_client, brokerstate.BrokerName, resource_type)
# 2, update the list
subscribed_resource_list = brokerstate.ResourceTypesSubscribed
broker_watcher["subscribed_resource_list"] = subscribed_resource_list
# 3, start watching the subscribed resources
for resource_type in subscribed_resource_list:
result = self.__start_watching_broker_resource(
broker_client, brokerstate.BrokerName, resource_type) and result
return result
except Exception as ex:
LOG.warning("failed to update resources:{0},{1}".format(
brokerstate, str(ex)))
return False
def is_watching_broker(self, broker_name):
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
return broker_client is not None
def is_watching_resource(self, broker_name, resource_type):
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
return broker_client.is_listening_on_resource(
resource_type) if broker_client else False
def __create_client(self, broker_name, broker_pod_ip):
if broker_name == NodeInfoHelper.BROKER_NODE_ALL:
# special case: if monitor all node, then use the same broker as locationservice
return self.location_watcher
broker_host = "[{0}]".format(broker_pod_ip)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
self.shared_broker_context['NOTIFICATION_BROKER_USER'],
self.shared_broker_context['NOTIFICATION_BROKER_PASS'],
broker_host,
self.shared_broker_context['NOTIFICATION_BROKER_PORT'])
return NotificationServiceClient(broker_name, broker_transport_endpoint, broker_pod_ip)
def __start_watch_all_nodes(self, retry_interval=5):
try:
LOG.debug(
"Start watching location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
while not self.location_watcher.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# start watching on the location announcement
self.location_watcher.add_location_listener(
NodeInfoHelper.BROKER_NODE_ALL,
location_handler=self.__broker_location_handler)
if not self.location_watcher.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# retry later and forever
LOG.debug(
"Retry indefinitely to start listening to {0}..."
.format(NodeInfoHelper.BROKER_NODE_ALL))
time.sleep(retry_interval)
LOG.debug(
"Trigger the location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
self.location_watcher.trigger_location_annoucement(timeout=20, retry=10)
except Exception as ex:
LOG.warning("exception: {0}".format(str(ex)))
pass
finally:
pass
return
def __stop_watch_all_nodes(self):
pass
def __syncup_data_by_resourcetype(self, broker_client, broker_name, resource_type):
# check to sync up resource status on a node
LOG.debug("try to sync up data for {0}@{1}".format(resource_type, broker_name))
try:
if broker_name == NodeInfoHelper.BROKER_NODE_ALL:
self.location_watcher.trigger_publishing_status(
resource_type, timeout=5, retry=10)
return True
# 1, query resource status
broker_client = self.broker_watchers.get(broker_name, None)
if not broker_client:
raise Exception("watcher is not ready for broker: {0}".format(broker_name))
resource_status = broker_client.query_resource_status(
resource_type, timeout=5, retry=10)
# 2, deliver resource by comparing LastDelivery time with EventTimestamp
# 3, update the LastDelivery with EventTimestamp
self.__notification_handler.handle(resource_status)
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format(
resource_type, broker_name, str(ex)))
return False
except Exception as ex:
LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format(
resource_type, broker_name, str(ex)))
raise ex
finally:
pass
return True
def syncup_broker_data(self, brokerstate):
aggregated_result = True
broker_name = brokerstate.BrokerName
try:
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[])
for resource_type in subscribed_resource_list:
if not brokerstate.is_data_syncup(resource_type):
continue
result = self.__syncup_data_by_resourcetype(
broker_client, broker_name, resource_type)
if result:
brokerstate.ack_data_syncup(resource_type)
aggregated_result = aggregated_result and result
return aggregated_result
except Exception as ex:
LOG.warning("failed to sync up data for resources:{0},{1}".format(
broker_name, str(ex)))
return False

View File

@ -0,0 +1,239 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.broker_state import BrokerState
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class BrokerStateManager:
'''
Manager to manage broker states
Note: Now it is not thread safe
'''
def __init__(self):
self.broker_state_map = {}
self.disabled_brokers = []
self.subscription_refresh_iteration = 0
def count_brokers(self):
return len(self.broker_state_map)
def add_broker(self, broker_name):
brokerstate = self.broker_state_map.get(broker_name, None)
if not brokerstate:
brokerstate = BrokerState(
BrokerName=broker_name,
ResourceTypes=[], ResourceTypesSubscribed={})
brokerstate.update_connection_state(False)
self.broker_state_map[broker_name] = brokerstate
return brokerstate
def disable_broker(self, broker_name):
if not broker_name in self.disabled_brokers:
self.disabled_brokers.append(broker_name)
def remove_broker(self, broker_name):
self.broker_state_map.pop(broker_name, None)
def refresh_by_nodeinfos(self, nodeinfos_orm):
broker_state_changed = False
for s in nodeinfos_orm or []:
broker_state_changed = self.refresh_by_nodeinfo(s) or broker_state_changed
return broker_state_changed
def refresh_by_nodeinfo(self, nodeinfo_orm):
brokerstate = self.broker_state_map.get(nodeinfo_orm.NodeName, None)
if not brokerstate:
return False
if nodeinfo_orm.Status == 1:
brokerstate.update_connection_state(True)
else:
brokerstate.update_connection_state(False)
brokerstate.update_broker_ip(nodeinfo_orm.PodIP)
brokerstate.update_resources(json.loads(nodeinfo_orm.ResourceTypes))
return brokerstate.is_broker_ip_changed() or brokerstate.is_resources_changed()
def refresh_by_subscriptions(self, subscriptions_orm):
broker_state_changed = False
# 1, refresh iteration
self.subscription_refresh_iteration = self.subscription_refresh_iteration + 1
# 2, refresh resource subscriptions by subscription record
for s in subscriptions_orm or []:
broker_state_changed = self.__refresh_by_subscription(s) or broker_state_changed
# 3, mark broker state change by checking if any obsolete resources
broker_state_changed = broker_state_changed or self.any_obsolete_broker()
return broker_state_changed
def any_obsolete_broker(self):
for broker_name, brokerstate in self.broker_state_map.items():
try:
if brokerstate.any_obsolete_subscription(
self.subscription_refresh_iteration):
return True
except Exception as ex:
LOG.warning(
"failed to check obsoleted resources@{0}:{1}".format(broker_name, str(ex)))
continue
return False
def __refresh_by_subscription(self, subscription_orm):
changed = False
broker_name = None
subscription = SubscriptionInfo(subscription_orm)
resource_type = subscription.ResourceType
LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status))
if subscription_orm.Status != 1:
return False
# assume PTP and not wildcast
if resource_type == ResourceType.TypePTP:
broker_name = subscription.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
return False
if not broker_name:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subscription.SubscriptionId))
return False
enumerated_broker_names = NodeInfoHelper.enumerate_nodes(broker_name)
if not enumerated_broker_names:
LOG.debug("Failed to enumerate broker names for {0}".format(broker_name))
return False
for expanded_broker_name in enumerated_broker_names:
brokerstate = self.broker_state_map.get(expanded_broker_name, None)
if not brokerstate:
brokerstate = self.add_broker(expanded_broker_name)
changed = True
changed = changed or (brokerstate.is_resource_subscribed(resource_type) == False)
brokerstate.try_subscribe_resource(resource_type, self.subscription_refresh_iteration)
return changed
def syncup_broker_watchers(self, broker_connection_manager):
'''sync up brokers state to broker connection manager'''
aggregated_result = True
interested_brokers = []
removed_brokers = []
# 1, clean all obsolete resource subscriptions
# and disable broker in case no active resource subscription
for broker_name, brokerstate in self.broker_state_map.items():
try:
brokerstate.unsubscribe_resource_obsolete(self.subscription_refresh_iteration)
if not brokerstate.any_resource_subscribed():
LOG.debug("disable broker@{0} due to no subscription".format(broker_name))
self.disable_broker(broker_name)
except Exception as ex:
LOG.warning(
"failed to clean obsolete subscribed resources@{0}:{1}".format(
broker_name, str(ex)))
continue
# 2, stop watching all disabled brokers
for broker_name in self.disabled_brokers:
try:
LOG.debug("stop watching due to disabled: {0}".format(broker_name))
result = broker_connection_manager.stop_watching_broker(
broker_name)
self.remove_broker(broker_name)
removed_brokers.append(broker_name)
aggregated_result = aggregated_result and result
except Exception as ex:
LOG.warning(
"failed to clean disabled broker@{0}: {1}".format(
broker_name, str(ex)))
aggregated_result = False
continue
self.disabled_brokers.clear()
# 3, start/restart watching remains brokers
for broker_name, brokerstate in self.broker_state_map.items():
interested_brokers.append(broker_name)
try:
result = True
is_connected = brokerstate.is_connected()
is_watching = broker_connection_manager.is_watching_broker(
broker_name)
if not is_connected:
if is_watching:
LOG.debug("Stop watching due to disconnected: {0}".format(broker_name))
result = broker_connection_manager.stop_watching_broker(
broker_name)
elif is_connected:
# note: start/restart watcher will update resources as well
if not is_watching:
LOG.debug("Start watching due to connected: {0}".format(broker_name))
result = broker_connection_manager.start_watching_broker(
brokerstate)
elif brokerstate.is_broker_ip_changed():
LOG.debug("Restart watching due to IP changed: {0}".format(broker_name))
result = broker_connection_manager.restart_watching_broker(
brokerstate)
elif brokerstate.is_connection_state_changed():
# trigger to sync up notification after (re-)connection
LOG.debug("Trigger to re-sync up data: {0}".format(broker_name))
result = brokerstate.signal_data_syncup()
elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed():
LOG.debug("Update watching due to resources changed: {0}".format(broker_name))
result = broker_connection_manager.update_watching_resources(brokerstate)
# leave the signals as it is to re-sync up in next loop in case of failure
if result:
# assumption to avoid race condition: same thread to manipulate brokerstate
brokerstate.ack_connection_state_changed()
brokerstate.ack_broker_ip_changed()
brokerstate.ack_resource_subscribed_changed()
brokerstate.ack_resources_changed()
aggregated_result = aggregated_result and result
except Exception as ex:
LOG.warning("failed to sync up broker watcher:{0},{1}".format(broker_name, str(ex)))
aggregated_result = False
continue
return aggregated_result, interested_brokers, removed_brokers
def syncup_broker_data(self, broker_connection_manager):
'''sync up to get rid of stall data'''
aggregated_result = True
synced_brokers = []
unsynced_brokers = []
for broker_name, brokerstate in self.broker_state_map.items():
try:
if brokerstate.is_connected() and brokerstate.is_data_syncup():
LOG.debug("Try to sync up broker data:{0}".format(broker_name))
result = result and broker_connection_manager.syncup_broker_data(
brokerstate)
if result:
# assumption to avoid race condition: same thread to manipulate brokerstate
brokerstate.ack_data_syncup()
synced_brokers.append(broker_name)
else:
unsynced_brokers.append(broker_name)
aggregated_result = aggregated_result and result
except Exception as ex:
unsynced_brokers.append(broker_name)
LOG.warning("failed to sync up broker data:{0}".format(str(ex)))
aggregated_result = False
continue
return aggregated_result, synced_brokers, unsynced_brokers

View File

@ -4,42 +4,14 @@
# SPDX-License-Identifier: Apache-2.0
#
import os
import json
import time
import oslo_messaging
from oslo_config import cfg
import logging
import multiprocessing as mp
import threading
import sys
if sys.version > '3':
import queue as Queue
else:
import Queue
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.common.helpers import rpc_helper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.location import LocationInfo
from notificationclientsdk.repository.dbcontext import DbContext
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.client.locationservice import LocationServiceClient
from notificationclientsdk.client.notificationservice import NotificationServiceClient
from notificationclientsdk.client.notificationservice import NotificationHandlerBase
from notificationclientsdk.client.locationservice import LocationHandlerDefault
from notificationclientsdk.services.notification_worker import NotificationWorker
LOG = logging.getLogger(__name__)
@ -53,635 +25,6 @@ def ProcessWorkerDefault(event, subscription_event, daemon_context):
return
class NotificationWorker:
class NotificationWatcher(NotificationHandlerBase):
def __init__(self, notification_watcher):
self.notification_watcher = notification_watcher
super(NotificationWorker.NotificationWatcher, self).__init__()
def handle(self, notification_info):
LOG.debug("Received notification:{0}".format(notification_info))
result = self.notification_watcher.handle_notification_delivery(notification_info)
return result
class NodeInfoWatcher(LocationHandlerDefault):
def __init__(self, notification_watcher):
self.notification_watcher = notification_watcher
super(NotificationWorker.NodeInfoWatcher, self).__init__()
def handle(self, location_info):
LOG.debug("Received location info:{0}".format(location_info))
return self.notification_watcher.produce_location_event(location_info)
def __init__(
self, event, subscription_event, daemon_context):
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
NodeInfoHelper.set_residing_node(self.residing_node_name)
self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON'])
DbContext.init_dbcontext(self.sqlalchemy_conf)
self.event = event
self.subscription_event = subscription_event
self.registration_endpoint = RpcEndpointInfo(daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.locationservice_client = LocationServiceClient(self.registration_endpoint.TransportEndpoint)
# dict,key: node name, value , notificationservice client
self.notificationservice_clients = {}
# Watcher callback
self.__NotificationWatcher = NotificationWorker.NotificationWatcher(self)
self.__NodeInfoWatcher = NotificationWorker.NodeInfoWatcher(self)
self.__init_node_resources_map()
self.__init_node_info_channel()
self.__init_location_channel()
self.__init_notification_channel()
self.__init_node_sync_channel()
def __init_node_resources_map(self):
self.node_resources_map = {}
self.node_resources_iteration = 0
self.__node_resources_event = mp.Event()
def __init_node_info_channel(self):
self.__node_info_event = mp.Event()
def __init_location_channel(self):
self.location_event = mp.Event()
self.location_lock = threading.Lock()
# map index by node name
# only cache the latest loation info
self.location_channel = {}
self.location_keys_q = Queue.Queue()
def __init_notification_channel(self):
self.notification_lock = threading.Lock()
self.notification_stat = {}
def __init_node_sync_channel(self):
self.__node_sync_event = mp.Event()
self.__node_sync_q = Queue.Queue()
# initial to be set
self.__node_sync_event.set()
def __del__(self):
del self.locationservice_client
def signal_location_event(self):
self.location_event.set()
def signal_subscription_event(self):
self.subscription_event.set()
def signal_node_sync_event(self):
self.__node_sync_event.set()
def signal_nodeinfo_event(self):
self.__node_info_event.set()
def signal_node_resources_event(self):
self.__node_resources_event.set()
def signal_events(self):
self.event.set()
def produce_location_event(self, location_info):
node_name = location_info.get('NodeName', None)
podip = location_info.get("PodIP", None)
if not node_name or not podip:
LOG.warning("Missing PodIP inside location info:{0}".format(location_info))
return False
result = True
timestamp = location_info.get('Timestamp', 0)
# acquire lock to sync threads which invoke this method
self.location_lock.acquire()
try:
current = self.location_channel.get(node_name, {})
if current.get('Timestamp', 0) < timestamp:
if current.get('PodIP', None) != podip:
# update /etc/hosts must happen in threads to avoid blocking by the main thread
NOTIFICATIONSERVICE_HOSTNAME = 'notificationservice-{0}'
hostfile_helper.update_host(
NOTIFICATIONSERVICE_HOSTNAME.format(node_name), podip)
LOG.debug("Updated location with IP:{0}".format(podip))
# replace the location_info
self.location_channel[node_name] = location_info
self.location_keys_q.put(node_name)
# notify the consumer to process the update
self.signal_location_event()
self.signal_events()
result = True
except Exception as ex:
LOG.warning("failed to produce location event:{0}".format(str(ex)))
result = False
finally:
# release lock
self.location_lock.release()
return result
def consume_location_event(self):
LOG.debug("Start consuming location event")
need_to_sync_node = False
node_changed = False
node_resource_updated = False
nodeinfo_repo = NodeRepo(autocommit=True)
while not self.location_keys_q.empty():
node_name = self.location_keys_q.get(False)
location_info = self.location_channel.get(node_name, None)
if not location_info:
LOG.warning("consume location@{0} without location info".format(node_name))
continue
LOG.debug("process location event@{0}:{1}".format(node_name, location_info))
location_info2 = LocationInfo(**location_info)
entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1)
if not entry:
entry = NodeInfoOrm(**location_info2.to_orm())
nodeinfo_repo.add(entry)
node_resource_updated = True
node_changed = True
self.__node_sync_q.put(node_name)
LOG.debug("Add NodeInfo: {0}".format(entry.NodeName))
elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']:
# update the entry
if entry.ResourceTypes != location_info2.ResourceTypes:
node_resource_updated = True
nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm())
self.__node_sync_q.put(node_name)
LOG.debug("Update NodeInfo: {0}".format(entry.NodeName))
else:
# do nothing
LOG.debug("Ignore the location for: {0}".format(entry.NodeName))
continue
need_to_sync_node = True
continue
del nodeinfo_repo
LOG.debug("Finished consuming location event")
if need_to_sync_node or node_resource_updated:
if node_changed:
LOG.debug("signal node changed event")
# node changes triggers rebuild map from subscription
# due to the potential subscriptions to all nodes
self.signal_subscription_event()
if node_resource_updated:
# signal the potential changes on node resources
LOG.debug("signal node resources updating event")
self.signal_nodeinfo_event()
if need_to_sync_node:
LOG.debug("signal node syncing event")
self.signal_node_sync_event()
self.signal_events()
pass
def __get_lastest_delivery_timestamp(self, node_name, subscriptionid):
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
return last_delivery_time
def __update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time):
if not self.notification_stat.get(node_name, None):
self.notification_stat[node_name] = {
subscriptionid: {
'EventTimestamp': this_delivery_time
}
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
elif not self.notification_stat[node_name].get(subscriptionid, None):
self.notification_stat[node_name][subscriptionid] = {
'EventTimestamp': this_delivery_time
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
else:
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
if (last_delivery_time >= this_delivery_time):
return
last_delivery_stat['EventTimestamp'] = this_delivery_time
LOG.debug("delivery time @node: {0},subscription:{1} is updated".format(
node_name, subscriptionid))
def handle_notification_delivery(self, notification_info):
LOG.debug("start notification delivery")
result = True
subscription_repo = None
try:
self.notification_lock.acquire()
subscription_repo = SubscriptionRepo(autocommit=True)
resource_type = notification_info.get('ResourceType', None)
node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None)
if not resource_type:
raise Exception("abnormal notification@{0}".format(node_name))
if resource_type == ResourceType.TypePTP:
pass
else:
raise Exception("notification with unsupported resource type:{0}".format(resource_type))
this_delivery_time = notification_info['EventTimestamp']
entries = subscription_repo.get(ResourceType=resource_type, Status=1)
for entry in entries:
subscriptionid = entry.SubscriptionId
ResourceQualifierJson = entry.ResourceQualifierJson or '{}'
ResourceQualifier = json.loads(ResourceQualifierJson)
# qualify by NodeName
entry_node_name = ResourceQualifier.get('NodeName', None)
node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name)
if not node_name_matched:
continue
subscription_dto2 = SubscriptionInfo(entry)
try:
last_delivery_time = self.__get_lastest_delivery_timestamp(node_name, subscriptionid)
if last_delivery_time and last_delivery_time >= this_delivery_time:
# skip this entry since already delivered
LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId))
continue
subscription_helper.notify(subscription_dto2, notification_info)
LOG.debug("notification is delivered successfully to {0}".format(
entry.SubscriptionId))
self.__update_delivery_timestamp(node_name, subscriptionid, this_delivery_time)
except Exception as ex:
LOG.warning("notification is not delivered to {0}:{1}".format(
entry.SubscriptionId, str(ex)))
# proceed to next entry
continue
finally:
pass
except Exception as ex:
LOG.warning("Failed to delivery notification:{0}".format(str(ex)))
result = False
finally:
self.notification_lock.release()
if not subscription_repo:
del subscription_repo
if result:
LOG.debug("Finished notification delivery")
else:
LOG.warning("Failed on notification delivery")
return result
def process_sync_node_event(self):
LOG.debug("Start processing sync node event")
need_to_sync_node_again = False
while not self.__node_sync_q.empty():
broker_node_name = self.__node_sync_q.get(False)
try:
result = self.syncup_node(broker_node_name)
if not result:
need_to_sync_node_again = True
except Exception as ex:
LOG.warning("Failed to syncup node{0}:{1}".format(broker_node_name, str(ex)))
continue
if need_to_sync_node_again:
# continue try in to next loop
self.signal_node_sync_event()
self.signal_events()
LOG.debug("Finished processing sync node event")
def run(self):
# start location listener
self.__start_watch_all_nodes()
while True:
self.event.wait()
self.event.clear()
LOG.debug("daemon control event is asserted")
if self.location_event.is_set():
self.location_event.clear()
# process location notifications
self.consume_location_event()
if self.subscription_event.is_set():
self.subscription_event.clear()
# build node resources map from subscriptions
self.process_subscription_event()
if self.__node_info_event.is_set():
self.__node_info_event.clear()
# update node_resources_map from node info
self.__update_map_from_nodeinfos()
if self.__node_resources_event.is_set():
self.__node_resources_event.clear()
# update watchers from node_resources_map
self.__refresh_watchers_from_map()
if self.__node_sync_event.is_set():
self.__node_sync_event.clear()
# compensate for the possible loss of notification during reconnection
self.process_sync_node_event()
continue
return
def syncup_resource(self, broker_node_name, resource_type):
# check to sync up resource status on a node
LOG.debug("sync up resource@{0} :{1}".format(broker_node_name, resource_type))
try:
if broker_node_name == NodeInfoHelper.BROKER_NODE_ALL:
self.locationservice_client.trigger_publishing_status(
resource_type, timeout=5, retry=10)
return True
# 1, query resource status
broker_client = self.notificationservice_clients.get(broker_node_name, None)
if not broker_client:
raise Exception("notification service client is not setup for node {0}".format(broker_node_name))
resource_status = broker_client.query_resource_status(
resource_type, timeout=5, retry=10)
# 2, deliver resource by comparing LastDelivery time with EventTimestamp
# 3, update the LastDelivery with EventTimestamp
self.__NotificationWatcher.handle(resource_status)
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format(
resource_type, broker_node_name, str(ex)))
return False
except Exception as ex:
LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format(
resource_type, broker_node_name, str(ex)))
raise ex
finally:
pass
return True
def syncup_node(self, broker_node_name):
all_resource_synced = True
# check to sync up resources status on a node
node_resources = self.node_resources_map.get(broker_node_name, None)
if node_resources:
LOG.debug("sync up resources@{0} :{1}".format(broker_node_name, node_resources))
for resource_type, iteration in node_resources.items():
if iteration == self.node_resources_iteration:
result = self.syncup_resource(broker_node_name, resource_type)
if not result:
all_resource_synced = False
return all_resource_synced
def __cleanup_map(self):
for broker_node_name, node_resources in self.node_resources_map.items():
resourcetypelist = [r for (r, i) in node_resources.items() if i<self.node_resources_iteration]
for r in resourcetypelist:
node_resources.pop(r)
if len(node_resources) == 0:
self.node_resources_map[broker_node_name] = None
nodes = [n for (n, r) in self.node_resources_map.items() if not r]
for n in nodes:
self.node_resources_map.pop(n)
return
'''build map from subscriptions: {node_name:{resource_type:true}'''
def __build_map_from_subscriptions(self):
# increase iteration
self.node_resources_iteration = self.node_resources_iteration+1
subscription_repo = None
try:
subscription_repo = SubscriptionRepo(autocommit=True)
subs = subscription_repo.get()
LOG.debug("found {0} subscriptions".format(subs.count()))
for s in subs:
subinfo = SubscriptionInfo(s)
LOG.debug("subscription:{0}, Status:{1}".format(subinfo.to_dict(), s.Status))
# assume PTP and not wildcast
resource_type = s.ResourceType
if resource_type == ResourceType.TypePTP:
broker_node_name = subinfo.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
continue
if s.Status == 1:
current_node_name = NodeInfoHelper.expand_node_name(broker_node_name)
node_map = self.node_resources_map.get(current_node_name, None)
if not node_map:
node_map = {}
self.node_resources_map[current_node_name] = node_map
node_map[resource_type] = self.node_resources_iteration
# update the initial delivery timestamp as well
self.__update_delivery_timestamp(
NodeInfoHelper.default_node_name(broker_node_name),
s.SubscriptionId, s.InitialDeliveryTimestamp)
# delete all entry with Status == 0
subscription_repo.delete(Status=0)
finally:
del subscription_repo
return True
def __update_map_from_nodeinfos(self):
'''Hanlde changes of ResourceTypes'''
node_resources_map_updated = False
result = False
nodeinfo_repo = NodeRepo(autocommit=True)
LOG.debug("Start node updating event")
try:
nodeinfos = nodeinfo_repo.get()
for nodeinfo in nodeinfos:
supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]')
node_map = self.node_resources_map.get(nodeinfo.NodeName, {})
for t, v in node_map.items():
if v == self.node_resources_iteration and not t in supported_resource_types:
# remove the resource type request by decrease the iteration
node_map[t] = self.node_resources_iteration - 1
node_resources_map_updated = True
LOG.warning("Detected unavailable resource type: {0}@{1}".format(t, nodeinfo.NodeName))
else:
continue
pass
except Exception as ex:
LOG.warning("Failed to update map from nodeinfo:{0}".format(str(ex)))
finally:
del nodeinfo_repo
LOG.debug("Finished node updating event")
if node_resources_map_updated:
self.signal_node_resources_event()
self.signal_events()
result = True
return result
def __start_watch_resource(self, broker_node_name, resource_type):
# 1, check and run notificationservice client
broker_client = self.notificationservice_clients.get(broker_node_name, None)
if not broker_client:
broker_client = self.__create_client(broker_node_name)
self.notificationservice_clients[broker_node_name] = broker_client
# 2, check and enable resource status watcher
if not broker_client.is_listening_on_resource(resource_type):
# must make sure the location is updated/watched:
# check and start location watcher
if not self.locationservice_client.is_listening_on_location(broker_node_name):
# start watching on the location announcement
self.locationservice_client.add_location_listener(
broker_node_name,
location_handler=self.__NodeInfoWatcher)
LOG.debug("Start watching location announcement of notificationservice@{0}"
.format(broker_node_name))
# try to update location by query
try:
self.locationservice_client.update_location(
broker_node_name, timeout=5, retry=2)
LOG.debug("Updated location of notificationservice@{0}".format(broker_node_name))
except Exception as ex:
LOG.warning("Failed to update location of node:{0} due to: {1}".format(
broker_node_name, str(ex)))
pass
broker_client.add_resource_status_listener(
resource_type, status_handler=self.__NotificationWatcher)
LOG.debug("Start watching {0}@{1}".format(resource_type, broker_node_name))
else:
# check if node_info has been updated, if yes, query the latest resource status
pass
return True
def __stop_watch_resource(self, broker_node_name, resource_type):
broker_client = self.notificationservice_clients.get(broker_node_name, None)
# 1, disable resource status watcher
if broker_client and broker_client.is_listening_on_resource(resource_type):
broker_client.remove_resource_status_listener(resource_type)
LOG.debug("Stop watching {0}@{1}".format(resource_type, broker_node_name))
return True
def __refresh_location_watcher(self):
# update location watchers
for broker_node_name, broker_client in self.notificationservice_clients.items():
if not broker_client:
continue
if broker_client.any_listener():
# check and start location watcher
if not self.locationservice_client.is_listening_on_location(broker_node_name):
# start watching on the location announcement
self.locationservice_client.add_location_listener(
broker_node_name,
location_handler=self.__NodeInfoWatcher)
LOG.debug("Start watching location announcement of notificationservice@{0}"
.format(broker_node_name))
# update location by query
try:
self.locationservice_client.update_location(
broker_node_name, timeout=5, retry=2)
LOG.debug("Updated location of notificationservice@{0}".format(broker_node_name))
except Exception as ex:
LOG.debug("Failed to Updated location of notificationservice@{0}".format(
broker_node_name))
continue
else:
pass
elif self.locationservice_client.is_listening_on_location(broker_node_name):
# 1, stop location listener
self.locationservice_client.remove_location_listener(broker_node_name)
LOG.debug("Stop watching location announcement for node@{0}"
.format(broker_node_name))
# 2, remove broker client
self.notificationservice_clients[broker_node_name] = None
del broker_client
LOG.debug("Stop watching notificationservice@{0}".format(broker_node_name))
else:
pass
return
def process_subscription_event(self):
# get subscriptions from DB
result = self.__build_map_from_subscriptions()
if result:
# need update map with nodeinfo after rebuilding the map
self.signal_nodeinfo_event()
self.signal_node_resources_event()
self.signal_events()
def __start_watch_all_nodes(self, retry_interval=5):
try:
while not self.locationservice_client.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# start watching on the location announcement
self.locationservice_client.add_location_listener(
NodeInfoHelper.BROKER_NODE_ALL,
location_handler=self.__NodeInfoWatcher)
LOG.debug(
"Start watching location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
if not self.locationservice_client.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
# retry later and forever
time.sleep(retry_interval)
self.locationservice_client.trigger_location_annoucement(timeout=20, retry=10)
except Exception as ex:
LOG.debug("exception: {0}".format(str(ex)))
pass
finally:
pass
return
def __refresh_watchers_from_map(self):
try:
LOG.debug("refresh with {0} nodes".format(len(self.node_resources_map)))
node_to_sync = []
for broker_node_name, node_resources in self.node_resources_map.items():
LOG.debug("check to watch resources@{0} :{1}".format(broker_node_name, node_resources))
need_to_sync_node = False
for resource_type, iteration in node_resources.items():
# enable watchers
if iteration == self.node_resources_iteration:
self.__start_watch_resource(broker_node_name, resource_type)
need_to_sync_node = True
else:
self.__stop_watch_resource(broker_node_name, resource_type)
if need_to_sync_node:
node_to_sync.append(broker_node_name)
self.__refresh_location_watcher()
self.__cleanup_map()
if node_to_sync:
# trigger the node sync up event
for node_name in node_to_sync:
self.__node_sync_q.put(node_name)
self.signal_node_sync_event()
self.signal_events()
except Exception as ex:
LOG.debug("exception: {0}".format(str(ex)))
pass
finally:
pass
return
def __create_client(self, broker_node_name):
if broker_node_name == NodeInfoHelper.BROKER_NODE_ALL:
# special case: if monitor all node, then use the same broker as locationservice
return self.locationservice_client
broker_host = "notificationservice-{0}".format(broker_node_name)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
self.daemon_context['NOTIFICATION_BROKER_USER'],
self.daemon_context['NOTIFICATION_BROKER_PASS'],
broker_host,
self.daemon_context['NOTIFICATION_BROKER_PORT'])
return NotificationServiceClient(broker_node_name, broker_transport_endpoint)
class DaemonControl(object):
def __init__(self, daemon_context, process_worker = None):

View File

@ -0,0 +1,128 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
import multiprocessing as mp
import threading
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.client.notificationservice import NotificationHandlerBase
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class NotificationHandler(NotificationHandlerBase):
def __init__(self):
self.__supported_resource_types = (ResourceType.TypePTP,)
self.__init_notification_channel()
pass
def __init_notification_channel(self):
self.notification_lock = threading.Lock()
self.notification_stat = {}
# def handle_notification_delivery(self, notification_info):
def handle(self, notification_info):
LOG.debug("start notification delivery")
subscription_repo = None
try:
self.notification_lock.acquire()
subscription_repo = SubscriptionRepo(autocommit=True)
resource_type = notification_info.get('ResourceType', None)
node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None)
if not resource_type:
raise Exception("abnormal notification@{0}".format(node_name))
if not resource_type in self.__supported_resource_types:
raise Exception("notification with unsupported resource type:{0}".format(resource_type))
this_delivery_time = notification_info['EventTimestamp']
entries = subscription_repo.get(ResourceType=resource_type, Status=1)
for entry in entries:
subscriptionid = entry.SubscriptionId
ResourceQualifierJson = entry.ResourceQualifierJson or '{}'
ResourceQualifier = json.loads(ResourceQualifierJson)
# qualify by NodeName
entry_node_name = ResourceQualifier.get('NodeName', None)
node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name)
if not node_name_matched:
continue
subscription_dto2 = SubscriptionInfo(entry)
try:
last_delivery_time = self.__get_latest_delivery_timestamp(node_name, subscriptionid)
if last_delivery_time and last_delivery_time >= this_delivery_time:
# skip this entry since already delivered
LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId))
continue
subscription_helper.notify(subscription_dto2, notification_info)
LOG.debug("notification is delivered successfully to {0}".format(
entry.SubscriptionId))
self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time)
except Exception as ex:
LOG.warning("notification is not delivered to {0}:{1}".format(
entry.SubscriptionId, str(ex)))
# proceed to next entry
continue
finally:
pass
LOG.debug("Finished notification delivery")
return True
except Exception as ex:
LOG.warning("Failed to delivery notification:{0}".format(str(ex)))
return False
finally:
self.notification_lock.release()
if not subscription_repo:
del subscription_repo
def __get_latest_delivery_timestamp(self, node_name, subscriptionid):
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
return last_delivery_time
def update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time):
if not self.notification_stat.get(node_name, None):
self.notification_stat[node_name] = {
subscriptionid: {
'EventTimestamp': this_delivery_time
}
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
elif not self.notification_stat[node_name].get(subscriptionid, None):
self.notification_stat[node_name][subscriptionid] = {
'EventTimestamp': this_delivery_time
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
else:
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
if (last_delivery_time >= this_delivery_time):
return
last_delivery_stat['EventTimestamp'] = this_delivery_time
LOG.debug("delivery time @node: {0},subscription:{1} is updated".format(
node_name, subscriptionid))

View File

@ -0,0 +1,300 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
import multiprocessing as mp
import threading
import sys
if sys.version > '3':
import queue as Queue
else:
import Queue
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.location import LocationInfo
from notificationclientsdk.repository.dbcontext import DbContext
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.client.locationservice import LocationHandlerDefault, LocationHandlerBase
from notificationclientsdk.services.broker_state_manager import BrokerStateManager
from notificationclientsdk.services.broker_connection_manager import BrokerConnectionManager
from notificationclientsdk.services.notification_handler import NotificationHandler
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class NotificationWorker:
class LocationInfoHandler(LocationHandlerBase):
'''Glue code to forward location info to daemon method'''
def __init__(self, locationinfo_dispatcher):
self.locationinfo_dispatcher = locationinfo_dispatcher
super(NotificationWorker.LocationInfoHandler, self).__init__()
def handle(self, location_info):
LOG.debug("Received location info:{0}".format(location_info))
return self.locationinfo_dispatcher.produce_location_event(location_info)
def __init__(
self, event, subscription_event, daemon_context):
self.__alive = True
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
NodeInfoHelper.set_residing_node(self.residing_node_name)
self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON'])
DbContext.init_dbcontext(self.sqlalchemy_conf)
self.event = event
self.subscription_event = subscription_event
self.__locationinfo_handler = NotificationWorker.LocationInfoHandler(self)
self.__notification_handler = NotificationHandler()
self.broker_connection_manager = BrokerConnectionManager(
self.__locationinfo_handler,
self.__notification_handler,
self.daemon_context)
self.broker_state_manager = BrokerStateManager()
self.__init_location_channel()
# event to signal brokers state change
self.__brokers_watcher_event = mp.Event()
self.__brokers_data_syncup_event = mp.Event()
def __init_location_channel(self):
self.location_event = mp.Event()
self.location_lock = threading.Lock()
# only cache the latest loation info
self.location_channel = {}
self.location_keys_q = Queue.Queue()
def signal_events(self):
self.event.set()
def produce_location_event(self, location_info):
node_name = location_info.get('NodeName', None)
podip = location_info.get("PodIP", None)
if not node_name or not podip:
LOG.warning("Missing PodIP inside location info:{0}".format(location_info))
return False
timestamp = location_info.get('Timestamp', 0)
# mutex for threads which produce location events
self.location_lock.acquire()
try:
current = self.location_channel.get(node_name, {})
if current.get('Timestamp', 0) < timestamp:
# update with the location_info
self.location_channel[node_name] = location_info
self.location_keys_q.put(node_name)
# notify the consumer to process the update
self.location_event.set()
self.signal_events()
return True
except Exception as ex:
LOG.warning("failed to produce location event:{0}".format(str(ex)))
return False
finally:
# release lock
self.location_lock.release()
def run(self):
self.broker_connection_manager.start()
while self.__alive:
self.event.wait()
self.event.clear()
LOG.debug("daemon control event is asserted")
if self.location_event.is_set():
self.location_event.clear()
# update location information
self.consume_location_event()
if self.subscription_event.is_set():
self.subscription_event.clear()
# refresh brokers state from subscriptions
self.handle_subscriptions_event()
if self.__brokers_watcher_event.is_set():
self.__brokers_watcher_event.clear()
# sync up brokers connection with their state
self.handle_brokers_watcher_event()
if self.__brokers_data_syncup_event.is_set():
self.__brokers_data_syncup_event.clear()
# sync up broker's data
self.handle_brokers_data_syncup_event()
continue
self.broker_connection_manager.stop()
return
def consume_location_event(self):
nodeinfo_repo = None
try:
LOG.debug("Start to consume location event")
_nodeinfo_added = 0
_nodeinfo_updated = 0
nodeinfo_repo = NodeRepo(autocommit=True)
while not self.location_keys_q.empty():
node_name = self.location_keys_q.get(False)
location_info = self.location_channel.get(node_name, None)
if not location_info:
LOG.warning("ignore location info@{0} without content".format(node_name))
continue
LOG.debug("consume location info @{0}:{1}".format(node_name, location_info))
is_nodeinfo_added, is_nodeinfo_updated = self.__persist_locationinfo(
location_info, nodeinfo_repo)
_nodeinfo_added = _nodeinfo_added + (1 if is_nodeinfo_added else 0)
_nodeinfo_updated = _nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
continue
LOG.debug("Finished consuming location event")
if _nodeinfo_added > 0:
LOG.debug("signal event to refresh brokers state from subscription")
# node info changes trigger rebuilding broker states from subscription
# due to some subscriptions might subscribe resources of all nodes
self.subscription_event.set()
if _nodeinfo_added > 0 or _nodeinfo_updated > 0:
LOG.debug("try to refresh brokers state due to changes of node info")
nodeinfos = nodeinfo_repo.get()
broker_state_changed = self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
if broker_state_changed:
# signal the potential changes on node resources
LOG.debug("signal event to re-sync up brokers state")
self.__brokers_watcher_event.set()
self.signal_events()
except Exception as ex:
LOG.warning("failed to consume location event:{0}".format(str(ex)))
finally:
if nodeinfo_repo:
del nodeinfo_repo
def handle_subscriptions_event(self):
broker_state_changed = self.__update_broker_with_subscription()
if broker_state_changed:
self.__brokers_watcher_event.set()
self.signal_events()
def __persist_locationinfo(self, location_info, nodeinfo_repo):
is_nodeinfo_added = False
is_nodeinfo_updated = False
try:
location_info2 = LocationInfo(**location_info)
entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1)
if not entry:
entry = NodeInfoOrm(**location_info2.to_orm())
nodeinfo_repo.add(entry)
is_nodeinfo_added = True
LOG.debug("Add NodeInfo: {0}".format(entry.NodeName))
elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']:
# location info with newer timestamp indicate broker need to be re-sync up
is_nodeinfo_updated = True
nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm())
LOG.debug("Update NodeInfo: {0}".format(entry.NodeName))
else:
# do nothing
LOG.debug("Ignore the location for broker: {0}".format(entry.NodeName))
except Exception as ex:
LOG.warning("failed to update broker state with location info:{0}, {1}".format(
location_info, str(ex)))
finally:
return is_nodeinfo_added, is_nodeinfo_updated
def __update_broker_with_subscription(self):
'''update broker state with subscriptions'''
broker_state_changed = False
subscription_repo = None
nodeinfo_repo = None
try:
subscription_repo = SubscriptionRepo(autocommit=True)
nodeinfo_repo = NodeRepo(autocommit=True)
subs = subscription_repo.get()
LOG.debug("found {0} subscriptions".format(subs.count()))
broker_state_changed = self.broker_state_manager.refresh_by_subscriptions(subs)
if broker_state_changed:
nodeinfo_repo = NodeRepo(autocommit=True)
nodeinfos = nodeinfo_repo.get()
self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
for s in subs:
subinfo = SubscriptionInfo(s)
# assume resource type being PTP and not wildcast
resource_type = s.ResourceType
if resource_type == ResourceType.TypePTP:
broker_name = subinfo.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
continue
if s.Status == 1:
current_node_name = NodeInfoHelper.expand_node_name(broker_name)
# update the initial delivery timestamp as well
self.__notification_handler.update_delivery_timestamp(
NodeInfoHelper.default_node_name(broker_name),
s.SubscriptionId, s.InitialDeliveryTimestamp)
# delete all entry with Status == 0
subscription_repo.delete(Status=0)
finally:
del subscription_repo
del nodeinfo_repo
return broker_state_changed
def handle_brokers_watcher_event(self):
result = False
try:
LOG.debug("try to sync up watcher for {0} brokers".format(
self.broker_state_manager.count_brokers()))
result, _, _ = self.broker_state_manager.syncup_broker_watchers(
self.broker_connection_manager)
self.__brokers_data_syncup_event.set()
except Exception as ex:
result = False
LOG.warning("fail to sync up watcher for brokers: {0}".format(str(ex)))
finally:
if not result:
# retry indefinitely
self.__brokers_watcher_event.set()
self.signal_events()
def handle_brokers_data_syncup_event(self):
result = False
try:
LOG.debug("try to sync up data for {0} brokers".format(
self.broker_state_manager.count_brokers()))
result,_,_ = self.broker_state_manager.syncup_broker_data(
self.broker_connection_manager)
except Exception as ex:
result = False
LOG.warning("fail to sync up data for brokers: {0}".format(str(ex)))
finally:
if not result:
self.__brokers_data_syncup_event.set()
self.signal_events()

View File

@ -35,24 +35,28 @@ class PtpService(object):
def __del__(self):
del self.subscription_repo
self.locationservice_client.cleanup()
del self.locationservice_client
return
def query(self, broker_node_name):
default_node_name = NodeInfoHelper.default_node_name(broker_node_name)
def query(self, broker_name):
default_node_name = NodeInfoHelper.default_node_name(broker_name)
nodeinfo_repo = NodeRepo(autocommit=False)
nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name)
# check node availability from DB
if not nodeinfo:
raise client_exception.NodeNotAvailable(broker_node_name)
raise client_exception.NodeNotAvailable(broker_name)
else:
broker_pod_ip = nodeinfo.PodIP
supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]')
if ResourceType.TypePTP in supported_resource_types:
return self._query(default_node_name)
return self._query(default_node_name, broker_pod_ip)
else:
raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP)
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP)
def _query(self, broker_node_name):
broker_host = "notificationservice-{0}".format(broker_node_name)
def _query(self, broker_name, broker_pod_ip):
# broker_host = "notificationservice-{0}".format(broker_name)
broker_host = "[{0}]".format(broker_pod_ip)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'],
self.daemon_control.daemon_context['NOTIFICATION_BROKER_PASS'],
@ -61,35 +65,39 @@ class PtpService(object):
notificationservice_client = None
try:
notificationservice_client = NotificationServiceClient(
broker_node_name, broker_transport_endpoint)
broker_name, broker_transport_endpoint, broker_pod_ip)
resource_status = notificationservice_client.query_resource_status(
ResourceType.TypePTP, timeout=5, retry=10)
return resource_status
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("ptp status is not available @node {0} due to {1}".format(
broker_node_name, str(ex)))
raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP)
broker_name, str(ex)))
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP)
except kombu.exceptions.OperationalError as ex:
LOG.warning("Node {0} is unreachable yet".format(broker_node_name))
raise client_exception.NodeNotAvailable(broker_node_name)
LOG.warning("Node {0} is unreachable yet".format(broker_name))
raise client_exception.NodeNotAvailable(broker_name)
finally:
if notificationservice_client:
notificationservice_client.cleanup()
del notificationservice_client
def add_subscription(self, subscription_dto):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
broker_node_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_node_name)
nodeinfos = NodeInfoHelper.enumerate_nodes(broker_node_name)
broker_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_name)
nodeinfos = NodeInfoHelper.enumerate_nodes(broker_name)
# check node availability from DB
if not nodeinfos or not default_node_name in nodeinfos:
LOG.warning("Node {0} is not available yet".format(default_node_name))
raise client_exception.NodeNotAvailable(broker_node_name)
raise client_exception.NodeNotAvailable(broker_name)
# get initial resource status
if default_node_name:
nodeinfo_repo = NodeRepo(autocommit=False)
nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name)
broker_pod_ip = nodeinfo.PodIP
ptpstatus = None
ptpstatus = self._query(default_node_name)
ptpstatus = self._query(default_node_name, broker_pod_ip)
LOG.info("initial ptpstatus:{0}".format(ptpstatus))
# construct subscription entry