From b178397adf96243e607e627a52ebfe391f9a63a6 Mon Sep 17 00:00:00 2001 From: Bin Yang Date: Fri, 16 Apr 2021 16:41:10 +0800 Subject: [PATCH] Fix the notificationclient issue of not responding to pull request Change the client to use Pod IP instead of host alias to talk to notification services Refactor client according to Single Responsibility Principle: 1,extract broker state management logic from daemon module to broker_state_manager 2,extract broker connection management logic from daemon module to broker_connection_manager 3,extract notification handling logic from daemon module to notification_handler 4,move NotificationWorker from daemon module to notification_worker module 5,change broker endpoint from host alias to IP which removes dependency to /etc/hosts 6,add logic to re-setup the broker connection in case of notificationservice pod IP changes Partial-Bug: 1924198 Signed-off-by: Bin Yang Change-Id: Ifc9a16912f5ccebe0426a0d4e72c0f13dcbabcd7 --- .../notificationclient-sidecar/config.py | 11 +- .../notificationclientsdk/client/base.py | 33 +- .../client/notificationservice.py | 6 +- .../common/helpers/log_helper.py | 5 +- .../model/dto/broker_state.py | 123 ++++ .../services/broker_connection_manager.py | 320 +++++++++ .../services/broker_state_manager.py | 239 +++++++ .../notificationclientsdk/services/daemon.py | 661 +----------------- .../services/notification_handler.py | 128 ++++ .../services/notification_worker.py | 300 ++++++++ .../notificationclientsdk/services/ptp.py | 42 +- 11 files changed, 1179 insertions(+), 689 deletions(-) create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py create mode 100644 notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/config.py b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py index 8ce13bf..3f1625b 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/config.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py @@ -7,6 +7,9 @@ import os SIDECAR_API_PORT = os.environ.get("SIDECAR_API_PORT", "8080") SIDECAR_API_HOST = os.environ.get("SIDECAR_API_HOST", "127.0.0.1") +DATASTORE_PATH = os.environ.get("DATASTORE_PATH", "/opt/datastore") +LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") + # Server Specific Configurations server = { 'port': SIDECAR_API_PORT, @@ -29,14 +32,14 @@ app = { logging = { 'root': {'level': 'INFO', 'handlers': ['console']}, 'loggers': { - 'sidecar': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, - 'pecan': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'sidecar': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False}, + 'pecan': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False}, 'py.warnings': {'handlers': ['console']}, '__force_dict__': True }, 'handlers': { 'console': { - 'level': 'DEBUG', + 'level': LOGGING_LEVEL, 'class': 'logging.StreamHandler', 'formatter': 'color' } @@ -57,7 +60,7 @@ logging = { # Bindings and options to pass to SQLAlchemy's ``create_engine`` sqlalchemy = { - 'url' : 'sqlite:///sidecar.db', + 'url' : "sqlite:////{0}/sidecar.db".format(DATASTORE_PATH), 'echo' : False, 'echo_pool' : False, 'pool_recycle' : 3600, diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py index ccbb312..73a16c5 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py @@ -33,13 +33,20 @@ class BrokerClientBase(object): self._workerthread = threading.Thread(target=self._refresher, args=()) self._workerthread.start() - LOG.debug("Created Broker client:{0}".format(broker_name)) + LOG.debug("Created Broker client:{0},{1}".format(broker_name, broker_transport_endpoint)) def __del__(self): + self.cleanup() + + def cleanup(self): + self.clean_listeners() self._workerterminated = True - self._workerevent.set() - self.transport.cleanup() - del self.transport + if self._workerevent: + self._workerevent.set() + if self.transport: + self.transport.cleanup() + del self.transport + self.transport = None return def _refresher(self, retry_interval=5): @@ -95,7 +102,7 @@ class BrokerClientBase(object): continue return allset - def _trigger_refresh_listener(self, context): + def _trigger_refresh_listener(self): self._workerevent.set() # # sleep to re-schedule to run worker thread # time.sleep(2) @@ -117,14 +124,14 @@ class BrokerClientBase(object): context['endpoints'] = listener_endpoints context['active'] = True - self._trigger_refresh_listener(context) + self._trigger_refresh_listener() def remove_listener(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) with self._workerlock: if context: context['active'] = False - self._trigger_refresh_listener(context) + self._trigger_refresh_listener() def is_listening(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) @@ -137,6 +144,18 @@ class BrokerClientBase(object): return True return False + def __is_connected(self, context): + return context.get('rpcserver', None) is not None if context else False + + def clean_listeners(self): + for topic, servers in self.listeners.items(): + for server, context in servers.items(): + self.remove_listener(topic, server) + self._trigger_refresh_listener() + LOG.debug("listener {0}@{1} {2} stopped".format( + topic, server, + 'is' if self.__is_connected(context) else 'is not yet')) + def call(self, topic, server, api_name, timeout=None, retry=None, **api_kwargs): target = oslo_messaging.Target( topic=topic, server=server, version=self.broker_endpoint.Version, diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py index 09f683c..011c816 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -45,9 +45,10 @@ class NotificationServiceClient(BrokerClientBase): return time.time() '''Init client to notification service''' - def __init__(self, target_node_name, notificationservice_transport_endpoint): + def __init__(self, target_node_name, notificationservice_transport_endpoint, broker_pod_ip): self.Id = id(self) self.target_node_name = target_node_name + self.broker_pod_ip = broker_pod_ip super(NotificationServiceClient, self).__init__( '{0}'.format(target_node_name), notificationservice_transport_endpoint) @@ -89,3 +90,6 @@ class NotificationServiceClient(BrokerClientBase): server="{0}-EventListener-{1}".format(resource_type, self.Id) return super(NotificationServiceClient, self).is_listening( topic, server) + + def is_broker_ip(self, broker_pod_ip): + return self.broker_pod_ip == broker_pod_ip diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py index 349cbb7..f801c02 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py @@ -4,8 +4,11 @@ # SPDX-License-Identifier: Apache-2.0 # +import os import logging +LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") + def get_logger(module_name): logger = logging.getLogger(module_name) return config_logger(logger) @@ -14,5 +17,5 @@ def config_logger(logger): ''' configure the logger: uncomment following lines for debugging ''' - # logger.setLevel(level=logging.DEBUG) + logger.setLevel(LOGGING_LEVEL) return logger diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py new file mode 100644 index 0000000..983edfb --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py @@ -0,0 +1,123 @@ +#coding=utf-8 +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json + +from wsme import types as wtypes +from notificationclientsdk.model.dto.resourcetype import EnumResourceType + +class BrokerState(wtypes.Base): + BrokerName = wtypes.text + # Timestamp = float + Connected = bool + ConnectionStateChanged = bool + BrokerIP = wtypes.text + BrokerIPChanged = bool + ResourceTypes = [EnumResourceType] + ResourceTypesChanged = bool + ResourceTypesSubscribed = {wtypes.text:int} + ResourceTypesSubscribedChanged = bool + DataSyncupPendingDetails = [wtypes.text] + + def update_connection_state(self, is_connected): + if self.Connected != is_connected: + self.Connected = is_connected + self.ConnectionStateChanged = True + return self.ConnectionStateChanged + + def is_connected(self): + return self.Connected + + def is_connection_state_changed(self): + return self.ConnectionStateChanged + + def ack_connection_state_changed(self): + self.ConnectionStateChanged = False + + def update_broker_ip(self, broker_ip): + if self.BrokerIP != broker_ip: + self.BrokerIP = broker_ip + self.BrokerIPChanged = True + return self.BrokerIPChanged + + def is_broker_ip_changed(self): + return self.BrokerIPChanged + + def ack_broker_ip_changed(self): + self.BrokerIPChanged = False + + def update_resources(self, resources_list): + sorted_resource_list = resources_list.sort() + if self.ResourceTypes != sorted_resource_list: + self.ResourceTypes = sorted_resource_list + self.ResourceTypesChanged = True + return self.ResourceTypesChanged + + def is_resources_changed(self): + return self.ResourceTypesChanged + + def ack_resources_changed(self): + self.ResourceTypesChanged = False + + def any_resource_subscribed(self): + return len(self.ResourceTypesSubscribed) > 0 + + def try_subscribe_resource(self, resource_type, indicator): + self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or not resource_type in self.ResourceTypesSubscribed + self.ResourceTypesSubscribed[resource_type] = indicator + + def try_unsubscribe_resource(self, resource_type): + self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or resource_type in self.ResourceTypesSubscribed + self.ResourceTypesSubscribed.pop(resource_type, None) + + def is_resource_subscribed_changed(self): + return self.ResourceTypesSubscribedChanged + + def ack_resource_subscribed_changed(self): + self.ResourceTypesSubscribedChanged = False + + def is_resource_subscribed(self, resource_type): + return resource_type in self.ResourceTypesSubscribed + + def any_obsolete_subscription(self, indicator): + for s, i in self.ResourceTypesSubscribed.items(): + if i != indicator: + return True + return False + + def any_resource_subscribed(self): + return len(self.ResourceTypesSubscribed) > 0 + + def unsubscribe_resource_obsolete(self, indicator): + uninterested = [] + for s, i in self.ResourceTypesSubscribed.items(): + if i != indicator: + uninterested.append(s) + for s in uninterested: + self.ResourceTypesSubscribed.pop(s, None) + return len(uninterested) > 0 + + def signal_data_syncup(self, resource_type=None): + if not resource_type: + self.DataSyncupPendingDetails = [k for k,v in self.ResourceTypesSubscribed.items()] + elif resource_type not in self.ResourceTypesSubscribed: + return False + elif not resource_type in self.DataSyncupPendingDetails: + self.DataSyncupPendingDetails.append(resource_type) + return True + + def ack_data_syncup(self, resource_type=None): + if not resource_type: + self.DataSyncupPendingDetails = [] + elif resource_type in self.DataSyncupPendingDetails: + self.DataSyncupPendingDetails.remove(resource_type) + + def is_data_syncup(self, resource_type=None): + if not resource_type: + return len(self.DataSyncupPendingDetails or []) > 0 + else: + return not resource_type in self.DataSyncupPendingDetails or [] \ No newline at end of file diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py new file mode 100644 index 0000000..a3081e9 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py @@ -0,0 +1,320 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import time +import oslo_messaging +import logging +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.model.dto.broker_state import BrokerState + +from notificationclientsdk.client.locationservice import LocationServiceClient +from notificationclientsdk.client.notificationservice import NotificationServiceClient + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + + +class BrokerConnectionManager: + + def __init__(self, broker_location_handler, notification_handler, broker_connection_contexts): + ''' + broker_watchers: { + "": { + "broker_client": client1, + "subscribed_resource_list": ['PTP', ...] + }, + {...} + ... + } + ''' + self.shared_broker_context = broker_connection_contexts + self.registration_endpoint = RpcEndpointInfo( + self.shared_broker_context['REGISTRATION_TRANSPORT_ENDPOINT']) + self.broker_watchers = {} + self.location_watcher = LocationServiceClient(self.registration_endpoint.TransportEndpoint) + self.__broker_location_handler = broker_location_handler + self.__notification_handler = notification_handler + + def __del__(self): + if self.location_watcher: + self.location_watcher.cleanup() + del self.location_watcher + self.location_watcher = None + + def start(self): + self.__start_watch_all_nodes() + + def stop(self): + self.__stop_watch_all_nodes() + + def __validate(self, brokerstate): + valid = brokerstate.BrokerName or brokerstate.BrokerIP + return valid + + def start_watching_broker(self, brokerstate): + try: + if not self.__validate(brokerstate): + return False + + broker_name = brokerstate.BrokerName + + # must make sure the location is updated/watched: + # 1, check and start location watcher + if not self.location_watcher.is_listening_on_location(broker_name): + # start watching on the location announcement + self.location_watcher.add_location_listener( + broker_name, + location_handler=self.__broker_location_handler) + LOG.debug("Start watching location announcement of notificationservice@{0}" + .format(broker_name)) + # try to update location by query + try: + location_info = self.location_watcher.query_location(broker_name, timeout=5, retry=2) + LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) + if location_info: + podip = location_info.get("PodIP", None) + resourcetypes = location_info.get("ResourceTypes", None) + brokerstate.update_broker_ip(podip) + brokerstate.update_resources(resourcetypes) + else: + return False + except Exception as ex: + LOG.warning("Failed to update location of node:{0} due to: {1}".format( + broker_name, str(ex))) + raise ex + + # 2, create broker connection + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if not broker_client: + LOG.debug("Start watching notifications from notificationservice@{0}".format(broker_name)) + broker_client = self.__create_client(broker_name, brokerstate.BrokerIP) + broker_watcher["broker_client"] = broker_client + self.broker_watchers[broker_name] = broker_watcher + + # 3, update watching resources + result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + return result + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def __stop_watching_broker_resource(self, broker_client, broker_name, resource_type): + try: + if broker_client.is_listening_on_resource(resource_type): + broker_client.remove_resource_status_listener(resource_type) + return True + except Exception as ex: + LOG.warning("failed to stop watching resource:{0}@{1},{2}".format( + broker_name, resource_type, str(ex))) + return False + + def __start_watching_broker_resource(self, broker_client, broker_name, resource_type): + try: + if not broker_client.is_listening_on_resource(resource_type): + broker_client.add_resource_status_listener( + resource_type, status_handler=self.__notification_handler) + LOG.debug("Start watching {0}@{1}".format(resource_type, broker_name)) + + return True + except Exception as ex: + LOG.warning("failed to start watching resource:{0}@{1},{2}".format( + resource_type, broker_name, str(ex))) + return False + + def stop_watching_broker(self, broker_name): + try: + # 1, stop listening to broker's location announcement + if self.location_watcher.is_listening_on_location(broker_name): + self.location_watcher.remove_location_listener(broker_name) + LOG.debug("Stop watching location announcement for broker@{0}" + .format(broker_name)) + + # 2, remove broker client + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + broker_client.cleanup() + del broker_client + broker_client = None + self.broker_watchers.pop(broker_name, None) + LOG.debug("Stop watching notificationservice@{0}".format(broker_name)) + + return True + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + broker_name, str(ex))) + return False + + def restart_watching_broker(self, brokerstate): + try: + broker_name = brokerstate.BrokerName + LOG.debug("Try to restart watching notificationservice@{0}".format(broker_name)) + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + broker_client.cleanup() + del broker_client + broker_client = None + self.broker_watchers.pop(broker_name, None) + return self.start_watching_broker(brokerstate) + except Exception as ex: + LOG.warning("failed to restart watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def update_watching_resources(self, brokerstate): + try: + broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + return result + return False + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def __update_watching_resources(self, broker_watcher, broker_client, brokerstate): + try: + result = True + # 1, filter out those unsubscribed resources + subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + if subscribed_resource_list != brokerstate.ResourceTypesSubscribed: + # stop watching those uninterested + for resource_type in subscribed_resource_list: + if resource_type not in brokerstate.ResourceTypesSubscribed: + result = self.__stop_watching_broker_resource( + broker_client, brokerstate.BrokerName, resource_type) + + # 2, update the list + subscribed_resource_list = brokerstate.ResourceTypesSubscribed + broker_watcher["subscribed_resource_list"] = subscribed_resource_list + + # 3, start watching the subscribed resources + for resource_type in subscribed_resource_list: + result = self.__start_watching_broker_resource( + broker_client, brokerstate.BrokerName, resource_type) and result + return result + except Exception as ex: + LOG.warning("failed to update resources:{0},{1}".format( + brokerstate, str(ex))) + return False + + def is_watching_broker(self, broker_name): + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + return broker_client is not None + + def is_watching_resource(self, broker_name, resource_type): + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + return broker_client.is_listening_on_resource( + resource_type) if broker_client else False + + def __create_client(self, broker_name, broker_pod_ip): + if broker_name == NodeInfoHelper.BROKER_NODE_ALL: + # special case: if monitor all node, then use the same broker as locationservice + return self.location_watcher + broker_host = "[{0}]".format(broker_pod_ip) + broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( + self.shared_broker_context['NOTIFICATION_BROKER_USER'], + self.shared_broker_context['NOTIFICATION_BROKER_PASS'], + broker_host, + self.shared_broker_context['NOTIFICATION_BROKER_PORT']) + return NotificationServiceClient(broker_name, broker_transport_endpoint, broker_pod_ip) + + def __start_watch_all_nodes(self, retry_interval=5): + try: + LOG.debug( + "Start watching location announcement of notificationservice@{0}" + .format(NodeInfoHelper.BROKER_NODE_ALL)) + while not self.location_watcher.is_listening_on_location( + NodeInfoHelper.BROKER_NODE_ALL): + # start watching on the location announcement + self.location_watcher.add_location_listener( + NodeInfoHelper.BROKER_NODE_ALL, + location_handler=self.__broker_location_handler) + if not self.location_watcher.is_listening_on_location( + NodeInfoHelper.BROKER_NODE_ALL): + # retry later and forever + LOG.debug( + "Retry indefinitely to start listening to {0}..." + .format(NodeInfoHelper.BROKER_NODE_ALL)) + time.sleep(retry_interval) + + LOG.debug( + "Trigger the location announcement of notificationservice@{0}" + .format(NodeInfoHelper.BROKER_NODE_ALL)) + self.location_watcher.trigger_location_annoucement(timeout=20, retry=10) + except Exception as ex: + LOG.warning("exception: {0}".format(str(ex))) + pass + finally: + pass + return + + def __stop_watch_all_nodes(self): + pass + + def __syncup_data_by_resourcetype(self, broker_client, broker_name, resource_type): + # check to sync up resource status on a node + LOG.debug("try to sync up data for {0}@{1}".format(resource_type, broker_name)) + try: + if broker_name == NodeInfoHelper.BROKER_NODE_ALL: + self.location_watcher.trigger_publishing_status( + resource_type, timeout=5, retry=10) + return True + + # 1, query resource status + broker_client = self.broker_watchers.get(broker_name, None) + if not broker_client: + raise Exception("watcher is not ready for broker: {0}".format(broker_name)) + resource_status = broker_client.query_resource_status( + resource_type, timeout=5, retry=10) + + # 2, deliver resource by comparing LastDelivery time with EventTimestamp + # 3, update the LastDelivery with EventTimestamp + self.__notification_handler.handle(resource_status) + except oslo_messaging.exceptions.MessagingTimeout as ex: + LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format( + resource_type, broker_name, str(ex))) + return False + except Exception as ex: + LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format( + resource_type, broker_name, str(ex))) + raise ex + finally: + pass + return True + + def syncup_broker_data(self, brokerstate): + aggregated_result = True + broker_name = brokerstate.BrokerName + try: + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + for resource_type in subscribed_resource_list: + if not brokerstate.is_data_syncup(resource_type): + continue + result = self.__syncup_data_by_resourcetype( + broker_client, broker_name, resource_type) + if result: + brokerstate.ack_data_syncup(resource_type) + aggregated_result = aggregated_result and result + return aggregated_result + except Exception as ex: + LOG.warning("failed to sync up data for resources:{0},{1}".format( + broker_name, str(ex))) + return False diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py new file mode 100644 index 0000000..d858282 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py @@ -0,0 +1,239 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json +import logging +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.model.dto.broker_state import BrokerState + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class BrokerStateManager: + ''' + Manager to manage broker states + Note: Now it is not thread safe + ''' + def __init__(self): + self.broker_state_map = {} + self.disabled_brokers = [] + self.subscription_refresh_iteration = 0 + + def count_brokers(self): + return len(self.broker_state_map) + + def add_broker(self, broker_name): + brokerstate = self.broker_state_map.get(broker_name, None) + if not brokerstate: + brokerstate = BrokerState( + BrokerName=broker_name, + ResourceTypes=[], ResourceTypesSubscribed={}) + brokerstate.update_connection_state(False) + self.broker_state_map[broker_name] = brokerstate + return brokerstate + + def disable_broker(self, broker_name): + if not broker_name in self.disabled_brokers: + self.disabled_brokers.append(broker_name) + + def remove_broker(self, broker_name): + self.broker_state_map.pop(broker_name, None) + + def refresh_by_nodeinfos(self, nodeinfos_orm): + broker_state_changed = False + for s in nodeinfos_orm or []: + broker_state_changed = self.refresh_by_nodeinfo(s) or broker_state_changed + return broker_state_changed + + def refresh_by_nodeinfo(self, nodeinfo_orm): + brokerstate = self.broker_state_map.get(nodeinfo_orm.NodeName, None) + if not brokerstate: + return False + if nodeinfo_orm.Status == 1: + brokerstate.update_connection_state(True) + else: + brokerstate.update_connection_state(False) + brokerstate.update_broker_ip(nodeinfo_orm.PodIP) + brokerstate.update_resources(json.loads(nodeinfo_orm.ResourceTypes)) + return brokerstate.is_broker_ip_changed() or brokerstate.is_resources_changed() + + def refresh_by_subscriptions(self, subscriptions_orm): + broker_state_changed = False + # 1, refresh iteration + self.subscription_refresh_iteration = self.subscription_refresh_iteration + 1 + + # 2, refresh resource subscriptions by subscription record + for s in subscriptions_orm or []: + broker_state_changed = self.__refresh_by_subscription(s) or broker_state_changed + + # 3, mark broker state change by checking if any obsolete resources + broker_state_changed = broker_state_changed or self.any_obsolete_broker() + return broker_state_changed + + def any_obsolete_broker(self): + for broker_name, brokerstate in self.broker_state_map.items(): + try: + if brokerstate.any_obsolete_subscription( + self.subscription_refresh_iteration): + return True + except Exception as ex: + LOG.warning( + "failed to check obsoleted resources@{0}:{1}".format(broker_name, str(ex))) + continue + return False + + def __refresh_by_subscription(self, subscription_orm): + changed = False + broker_name = None + + subscription = SubscriptionInfo(subscription_orm) + resource_type = subscription.ResourceType + + LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) + if subscription_orm.Status != 1: + return False + + # assume PTP and not wildcast + if resource_type == ResourceType.TypePTP: + broker_name = subscription.ResourceQualifier.NodeName + else: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId)) + return False + + if not broker_name: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subscription.SubscriptionId)) + return False + + enumerated_broker_names = NodeInfoHelper.enumerate_nodes(broker_name) + if not enumerated_broker_names: + LOG.debug("Failed to enumerate broker names for {0}".format(broker_name)) + return False + + for expanded_broker_name in enumerated_broker_names: + brokerstate = self.broker_state_map.get(expanded_broker_name, None) + if not brokerstate: + brokerstate = self.add_broker(expanded_broker_name) + changed = True + + changed = changed or (brokerstate.is_resource_subscribed(resource_type) == False) + brokerstate.try_subscribe_resource(resource_type, self.subscription_refresh_iteration) + + return changed + + def syncup_broker_watchers(self, broker_connection_manager): + '''sync up brokers state to broker connection manager''' + aggregated_result = True + interested_brokers = [] + removed_brokers = [] + # 1, clean all obsolete resource subscriptions + # and disable broker in case no active resource subscription + for broker_name, brokerstate in self.broker_state_map.items(): + try: + brokerstate.unsubscribe_resource_obsolete(self.subscription_refresh_iteration) + if not brokerstate.any_resource_subscribed(): + LOG.debug("disable broker@{0} due to no subscription".format(broker_name)) + self.disable_broker(broker_name) + except Exception as ex: + LOG.warning( + "failed to clean obsolete subscribed resources@{0}:{1}".format( + broker_name, str(ex))) + continue + + # 2, stop watching all disabled brokers + for broker_name in self.disabled_brokers: + try: + LOG.debug("stop watching due to disabled: {0}".format(broker_name)) + result = broker_connection_manager.stop_watching_broker( + broker_name) + self.remove_broker(broker_name) + removed_brokers.append(broker_name) + aggregated_result = aggregated_result and result + except Exception as ex: + LOG.warning( + "failed to clean disabled broker@{0}: {1}".format( + broker_name, str(ex))) + aggregated_result = False + continue + self.disabled_brokers.clear() + + # 3, start/restart watching remains brokers + for broker_name, brokerstate in self.broker_state_map.items(): + interested_brokers.append(broker_name) + try: + result = True + is_connected = brokerstate.is_connected() + is_watching = broker_connection_manager.is_watching_broker( + broker_name) + + if not is_connected: + if is_watching: + LOG.debug("Stop watching due to disconnected: {0}".format(broker_name)) + result = broker_connection_manager.stop_watching_broker( + broker_name) + elif is_connected: + # note: start/restart watcher will update resources as well + if not is_watching: + LOG.debug("Start watching due to connected: {0}".format(broker_name)) + result = broker_connection_manager.start_watching_broker( + brokerstate) + elif brokerstate.is_broker_ip_changed(): + LOG.debug("Restart watching due to IP changed: {0}".format(broker_name)) + result = broker_connection_manager.restart_watching_broker( + brokerstate) + elif brokerstate.is_connection_state_changed(): + # trigger to sync up notification after (re-)connection + LOG.debug("Trigger to re-sync up data: {0}".format(broker_name)) + result = brokerstate.signal_data_syncup() + elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed(): + LOG.debug("Update watching due to resources changed: {0}".format(broker_name)) + result = broker_connection_manager.update_watching_resources(brokerstate) + + # leave the signals as it is to re-sync up in next loop in case of failure + if result: + # assumption to avoid race condition: same thread to manipulate brokerstate + brokerstate.ack_connection_state_changed() + brokerstate.ack_broker_ip_changed() + brokerstate.ack_resource_subscribed_changed() + brokerstate.ack_resources_changed() + + aggregated_result = aggregated_result and result + except Exception as ex: + LOG.warning("failed to sync up broker watcher:{0},{1}".format(broker_name, str(ex))) + aggregated_result = False + continue + return aggregated_result, interested_brokers, removed_brokers + + def syncup_broker_data(self, broker_connection_manager): + '''sync up to get rid of stall data''' + aggregated_result = True + synced_brokers = [] + unsynced_brokers = [] + for broker_name, brokerstate in self.broker_state_map.items(): + try: + if brokerstate.is_connected() and brokerstate.is_data_syncup(): + LOG.debug("Try to sync up broker data:{0}".format(broker_name)) + result = result and broker_connection_manager.syncup_broker_data( + brokerstate) + if result: + # assumption to avoid race condition: same thread to manipulate brokerstate + brokerstate.ack_data_syncup() + synced_brokers.append(broker_name) + else: + unsynced_brokers.append(broker_name) + aggregated_result = aggregated_result and result + except Exception as ex: + unsynced_brokers.append(broker_name) + LOG.warning("failed to sync up broker data:{0}".format(str(ex))) + aggregated_result = False + continue + return aggregated_result, synced_brokers, unsynced_brokers diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py index 139a875..629b9aa 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -4,42 +4,14 @@ # SPDX-License-Identifier: Apache-2.0 # -import os -import json -import time -import oslo_messaging -from oslo_config import cfg import logging import multiprocessing as mp -import threading -import sys -if sys.version > '3': - import queue as Queue -else: - import Queue - -from notificationclientsdk.common.helpers import subscription_helper -from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.common.helpers import rpc_helper from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo -from notificationclientsdk.model.dto.subscription import SubscriptionInfo -from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.location import LocationInfo - -from notificationclientsdk.repository.dbcontext import DbContext -from notificationclientsdk.repository.subscription_repo import SubscriptionRepo - -from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm - -from notificationclientsdk.repository.node_repo import NodeRepo - from notificationclientsdk.client.locationservice import LocationServiceClient -from notificationclientsdk.client.notificationservice import NotificationServiceClient -from notificationclientsdk.client.notificationservice import NotificationHandlerBase - -from notificationclientsdk.client.locationservice import LocationHandlerDefault +from notificationclientsdk.services.notification_worker import NotificationWorker LOG = logging.getLogger(__name__) @@ -53,635 +25,6 @@ def ProcessWorkerDefault(event, subscription_event, daemon_context): return -class NotificationWorker: - - class NotificationWatcher(NotificationHandlerBase): - def __init__(self, notification_watcher): - self.notification_watcher = notification_watcher - super(NotificationWorker.NotificationWatcher, self).__init__() - - def handle(self, notification_info): - LOG.debug("Received notification:{0}".format(notification_info)) - result = self.notification_watcher.handle_notification_delivery(notification_info) - return result - - class NodeInfoWatcher(LocationHandlerDefault): - def __init__(self, notification_watcher): - self.notification_watcher = notification_watcher - super(NotificationWorker.NodeInfoWatcher, self).__init__() - - def handle(self, location_info): - LOG.debug("Received location info:{0}".format(location_info)) - return self.notification_watcher.produce_location_event(location_info) - - def __init__( - self, event, subscription_event, daemon_context): - - self.daemon_context = daemon_context - self.residing_node_name = daemon_context['THIS_NODE_NAME'] - NodeInfoHelper.set_residing_node(self.residing_node_name) - - self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON']) - DbContext.init_dbcontext(self.sqlalchemy_conf) - self.event = event - self.subscription_event = subscription_event - - self.registration_endpoint = RpcEndpointInfo(daemon_context['REGISTRATION_TRANSPORT_ENDPOINT']) - self.locationservice_client = LocationServiceClient(self.registration_endpoint.TransportEndpoint) - # dict,key: node name, value , notificationservice client - self.notificationservice_clients = {} - - # Watcher callback - self.__NotificationWatcher = NotificationWorker.NotificationWatcher(self) - self.__NodeInfoWatcher = NotificationWorker.NodeInfoWatcher(self) - - self.__init_node_resources_map() - self.__init_node_info_channel() - self.__init_location_channel() - self.__init_notification_channel() - self.__init_node_sync_channel() - - def __init_node_resources_map(self): - self.node_resources_map = {} - self.node_resources_iteration = 0 - self.__node_resources_event = mp.Event() - - def __init_node_info_channel(self): - self.__node_info_event = mp.Event() - - def __init_location_channel(self): - self.location_event = mp.Event() - self.location_lock = threading.Lock() - # map index by node name - # only cache the latest loation info - self.location_channel = {} - self.location_keys_q = Queue.Queue() - - def __init_notification_channel(self): - self.notification_lock = threading.Lock() - self.notification_stat = {} - - def __init_node_sync_channel(self): - self.__node_sync_event = mp.Event() - self.__node_sync_q = Queue.Queue() - # initial to be set - self.__node_sync_event.set() - - def __del__(self): - del self.locationservice_client - - def signal_location_event(self): - self.location_event.set() - - def signal_subscription_event(self): - self.subscription_event.set() - - def signal_node_sync_event(self): - self.__node_sync_event.set() - - def signal_nodeinfo_event(self): - self.__node_info_event.set() - - def signal_node_resources_event(self): - self.__node_resources_event.set() - - def signal_events(self): - self.event.set() - - def produce_location_event(self, location_info): - node_name = location_info.get('NodeName', None) - podip = location_info.get("PodIP", None) - if not node_name or not podip: - LOG.warning("Missing PodIP inside location info:{0}".format(location_info)) - return False - - result = True - timestamp = location_info.get('Timestamp', 0) - # acquire lock to sync threads which invoke this method - self.location_lock.acquire() - try: - current = self.location_channel.get(node_name, {}) - if current.get('Timestamp', 0) < timestamp: - if current.get('PodIP', None) != podip: - # update /etc/hosts must happen in threads to avoid blocking by the main thread - NOTIFICATIONSERVICE_HOSTNAME = 'notificationservice-{0}' - hostfile_helper.update_host( - NOTIFICATIONSERVICE_HOSTNAME.format(node_name), podip) - LOG.debug("Updated location with IP:{0}".format(podip)) - - # replace the location_info - self.location_channel[node_name] = location_info - self.location_keys_q.put(node_name) - # notify the consumer to process the update - self.signal_location_event() - self.signal_events() - result = True - except Exception as ex: - LOG.warning("failed to produce location event:{0}".format(str(ex))) - result = False - finally: - # release lock - self.location_lock.release() - - return result - - def consume_location_event(self): - LOG.debug("Start consuming location event") - need_to_sync_node = False - node_changed = False - node_resource_updated = False - nodeinfo_repo = NodeRepo(autocommit=True) - - while not self.location_keys_q.empty(): - node_name = self.location_keys_q.get(False) - location_info = self.location_channel.get(node_name, None) - if not location_info: - LOG.warning("consume location@{0} without location info".format(node_name)) - continue - - LOG.debug("process location event@{0}:{1}".format(node_name, location_info)) - - location_info2 = LocationInfo(**location_info) - - entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1) - if not entry: - entry = NodeInfoOrm(**location_info2.to_orm()) - nodeinfo_repo.add(entry) - node_resource_updated = True - node_changed = True - self.__node_sync_q.put(node_name) - LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) - elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: - # update the entry - if entry.ResourceTypes != location_info2.ResourceTypes: - node_resource_updated = True - nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) - self.__node_sync_q.put(node_name) - LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) - else: - # do nothing - LOG.debug("Ignore the location for: {0}".format(entry.NodeName)) - continue - need_to_sync_node = True - continue - - del nodeinfo_repo - LOG.debug("Finished consuming location event") - if need_to_sync_node or node_resource_updated: - if node_changed: - LOG.debug("signal node changed event") - # node changes triggers rebuild map from subscription - # due to the potential subscriptions to all nodes - self.signal_subscription_event() - if node_resource_updated: - # signal the potential changes on node resources - LOG.debug("signal node resources updating event") - self.signal_nodeinfo_event() - if need_to_sync_node: - LOG.debug("signal node syncing event") - self.signal_node_sync_event() - self.signal_events() - pass - - def __get_lastest_delivery_timestamp(self, node_name, subscriptionid): - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - return last_delivery_time - - def __update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): - if not self.notification_stat.get(node_name, None): - self.notification_stat[node_name] = { - subscriptionid: { - 'EventTimestamp': this_delivery_time - } - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - elif not self.notification_stat[node_name].get(subscriptionid, None): - self.notification_stat[node_name][subscriptionid] = { - 'EventTimestamp': this_delivery_time - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - else: - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - if (last_delivery_time >= this_delivery_time): - return - last_delivery_stat['EventTimestamp'] = this_delivery_time - LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( - node_name, subscriptionid)) - - def handle_notification_delivery(self, notification_info): - LOG.debug("start notification delivery") - result = True - subscription_repo = None - try: - self.notification_lock.acquire() - subscription_repo = SubscriptionRepo(autocommit=True) - resource_type = notification_info.get('ResourceType', None) - node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) - if not resource_type: - raise Exception("abnormal notification@{0}".format(node_name)) - - if resource_type == ResourceType.TypePTP: - pass - else: - raise Exception("notification with unsupported resource type:{0}".format(resource_type)) - - this_delivery_time = notification_info['EventTimestamp'] - - entries = subscription_repo.get(ResourceType=resource_type, Status=1) - for entry in entries: - subscriptionid = entry.SubscriptionId - ResourceQualifierJson = entry.ResourceQualifierJson or '{}' - ResourceQualifier = json.loads(ResourceQualifierJson) - # qualify by NodeName - entry_node_name = ResourceQualifier.get('NodeName', None) - node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) - if not node_name_matched: - continue - - subscription_dto2 = SubscriptionInfo(entry) - try: - last_delivery_time = self.__get_lastest_delivery_timestamp(node_name, subscriptionid) - if last_delivery_time and last_delivery_time >= this_delivery_time: - # skip this entry since already delivered - LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId)) - continue - - subscription_helper.notify(subscription_dto2, notification_info) - LOG.debug("notification is delivered successfully to {0}".format( - entry.SubscriptionId)) - - self.__update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) - - except Exception as ex: - LOG.warning("notification is not delivered to {0}:{1}".format( - entry.SubscriptionId, str(ex))) - # proceed to next entry - continue - finally: - pass - except Exception as ex: - LOG.warning("Failed to delivery notification:{0}".format(str(ex))) - result = False - finally: - self.notification_lock.release() - if not subscription_repo: - del subscription_repo - - if result: - LOG.debug("Finished notification delivery") - else: - LOG.warning("Failed on notification delivery") - return result - - def process_sync_node_event(self): - LOG.debug("Start processing sync node event") - need_to_sync_node_again = False - - while not self.__node_sync_q.empty(): - broker_node_name = self.__node_sync_q.get(False) - try: - result = self.syncup_node(broker_node_name) - if not result: - need_to_sync_node_again = True - except Exception as ex: - LOG.warning("Failed to syncup node{0}:{1}".format(broker_node_name, str(ex))) - continue - - if need_to_sync_node_again: - # continue try in to next loop - self.signal_node_sync_event() - self.signal_events() - LOG.debug("Finished processing sync node event") - - def run(self): - # start location listener - self.__start_watch_all_nodes() - while True: - self.event.wait() - self.event.clear() - LOG.debug("daemon control event is asserted") - - if self.location_event.is_set(): - self.location_event.clear() - # process location notifications - self.consume_location_event() - - if self.subscription_event.is_set(): - self.subscription_event.clear() - # build node resources map from subscriptions - self.process_subscription_event() - - if self.__node_info_event.is_set(): - self.__node_info_event.clear() - # update node_resources_map from node info - self.__update_map_from_nodeinfos() - - if self.__node_resources_event.is_set(): - self.__node_resources_event.clear() - # update watchers from node_resources_map - self.__refresh_watchers_from_map() - - if self.__node_sync_event.is_set(): - self.__node_sync_event.clear() - # compensate for the possible loss of notification during reconnection - self.process_sync_node_event() - - continue - return - - def syncup_resource(self, broker_node_name, resource_type): - # check to sync up resource status on a node - LOG.debug("sync up resource@{0} :{1}".format(broker_node_name, resource_type)) - try: - if broker_node_name == NodeInfoHelper.BROKER_NODE_ALL: - self.locationservice_client.trigger_publishing_status( - resource_type, timeout=5, retry=10) - return True - - # 1, query resource status - broker_client = self.notificationservice_clients.get(broker_node_name, None) - if not broker_client: - raise Exception("notification service client is not setup for node {0}".format(broker_node_name)) - resource_status = broker_client.query_resource_status( - resource_type, timeout=5, retry=10) - - # 2, deliver resource by comparing LastDelivery time with EventTimestamp - # 3, update the LastDelivery with EventTimestamp - self.__NotificationWatcher.handle(resource_status) - except oslo_messaging.exceptions.MessagingTimeout as ex: - LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( - resource_type, broker_node_name, str(ex))) - return False - except Exception as ex: - LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( - resource_type, broker_node_name, str(ex))) - raise ex - finally: - pass - return True - - def syncup_node(self, broker_node_name): - all_resource_synced = True - # check to sync up resources status on a node - node_resources = self.node_resources_map.get(broker_node_name, None) - if node_resources: - LOG.debug("sync up resources@{0} :{1}".format(broker_node_name, node_resources)) - for resource_type, iteration in node_resources.items(): - if iteration == self.node_resources_iteration: - result = self.syncup_resource(broker_node_name, resource_type) - if not result: - all_resource_synced = False - return all_resource_synced - - def __cleanup_map(self): - for broker_node_name, node_resources in self.node_resources_map.items(): - resourcetypelist = [r for (r, i) in node_resources.items() if i= this_delivery_time: + # skip this entry since already delivered + LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId)) + continue + + subscription_helper.notify(subscription_dto2, notification_info) + LOG.debug("notification is delivered successfully to {0}".format( + entry.SubscriptionId)) + + self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) + + except Exception as ex: + LOG.warning("notification is not delivered to {0}:{1}".format( + entry.SubscriptionId, str(ex))) + # proceed to next entry + continue + finally: + pass + LOG.debug("Finished notification delivery") + return True + except Exception as ex: + LOG.warning("Failed to delivery notification:{0}".format(str(ex))) + return False + finally: + self.notification_lock.release() + if not subscription_repo: + del subscription_repo + + def __get_latest_delivery_timestamp(self, node_name, subscriptionid): + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + return last_delivery_time + + def update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): + if not self.notification_stat.get(node_name, None): + self.notification_stat[node_name] = { + subscriptionid: { + 'EventTimestamp': this_delivery_time + } + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + elif not self.notification_stat[node_name].get(subscriptionid, None): + self.notification_stat[node_name][subscriptionid] = { + 'EventTimestamp': this_delivery_time + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + else: + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + if (last_delivery_time >= this_delivery_time): + return + last_delivery_stat['EventTimestamp'] = this_delivery_time + LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( + node_name, subscriptionid)) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py new file mode 100644 index 0000000..72d06c8 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py @@ -0,0 +1,300 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json +import logging + +import multiprocessing as mp +import threading +import sys +if sys.version > '3': + import queue as Queue +else: + import Queue + +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.model.dto.location import LocationInfo + +from notificationclientsdk.repository.dbcontext import DbContext +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo + +from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm + +from notificationclientsdk.repository.node_repo import NodeRepo + +from notificationclientsdk.client.locationservice import LocationHandlerDefault, LocationHandlerBase + +from notificationclientsdk.services.broker_state_manager import BrokerStateManager +from notificationclientsdk.services.broker_connection_manager import BrokerConnectionManager +from notificationclientsdk.services.notification_handler import NotificationHandler + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class NotificationWorker: + + class LocationInfoHandler(LocationHandlerBase): + '''Glue code to forward location info to daemon method''' + def __init__(self, locationinfo_dispatcher): + self.locationinfo_dispatcher = locationinfo_dispatcher + super(NotificationWorker.LocationInfoHandler, self).__init__() + + def handle(self, location_info): + LOG.debug("Received location info:{0}".format(location_info)) + return self.locationinfo_dispatcher.produce_location_event(location_info) + + def __init__( + self, event, subscription_event, daemon_context): + + self.__alive = True + + self.daemon_context = daemon_context + self.residing_node_name = daemon_context['THIS_NODE_NAME'] + NodeInfoHelper.set_residing_node(self.residing_node_name) + + self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON']) + DbContext.init_dbcontext(self.sqlalchemy_conf) + self.event = event + self.subscription_event = subscription_event + + self.__locationinfo_handler = NotificationWorker.LocationInfoHandler(self) + self.__notification_handler = NotificationHandler() + self.broker_connection_manager = BrokerConnectionManager( + self.__locationinfo_handler, + self.__notification_handler, + self.daemon_context) + self.broker_state_manager = BrokerStateManager() + + self.__init_location_channel() + + # event to signal brokers state change + self.__brokers_watcher_event = mp.Event() + self.__brokers_data_syncup_event = mp.Event() + + def __init_location_channel(self): + self.location_event = mp.Event() + self.location_lock = threading.Lock() + # only cache the latest loation info + self.location_channel = {} + self.location_keys_q = Queue.Queue() + + def signal_events(self): + self.event.set() + + def produce_location_event(self, location_info): + node_name = location_info.get('NodeName', None) + podip = location_info.get("PodIP", None) + if not node_name or not podip: + LOG.warning("Missing PodIP inside location info:{0}".format(location_info)) + return False + timestamp = location_info.get('Timestamp', 0) + # mutex for threads which produce location events + self.location_lock.acquire() + try: + current = self.location_channel.get(node_name, {}) + if current.get('Timestamp', 0) < timestamp: + # update with the location_info + self.location_channel[node_name] = location_info + self.location_keys_q.put(node_name) + # notify the consumer to process the update + self.location_event.set() + self.signal_events() + return True + except Exception as ex: + LOG.warning("failed to produce location event:{0}".format(str(ex))) + return False + finally: + # release lock + self.location_lock.release() + + def run(self): + self.broker_connection_manager.start() + while self.__alive: + self.event.wait() + self.event.clear() + LOG.debug("daemon control event is asserted") + + if self.location_event.is_set(): + self.location_event.clear() + # update location information + self.consume_location_event() + + if self.subscription_event.is_set(): + self.subscription_event.clear() + # refresh brokers state from subscriptions + self.handle_subscriptions_event() + + if self.__brokers_watcher_event.is_set(): + self.__brokers_watcher_event.clear() + # sync up brokers connection with their state + self.handle_brokers_watcher_event() + + if self.__brokers_data_syncup_event.is_set(): + self.__brokers_data_syncup_event.clear() + # sync up broker's data + self.handle_brokers_data_syncup_event() + + continue + self.broker_connection_manager.stop() + return + + def consume_location_event(self): + nodeinfo_repo = None + try: + LOG.debug("Start to consume location event") + _nodeinfo_added = 0 + _nodeinfo_updated = 0 + nodeinfo_repo = NodeRepo(autocommit=True) + + while not self.location_keys_q.empty(): + node_name = self.location_keys_q.get(False) + location_info = self.location_channel.get(node_name, None) + if not location_info: + LOG.warning("ignore location info@{0} without content".format(node_name)) + continue + + LOG.debug("consume location info @{0}:{1}".format(node_name, location_info)) + is_nodeinfo_added, is_nodeinfo_updated = self.__persist_locationinfo( + location_info, nodeinfo_repo) + _nodeinfo_added = _nodeinfo_added + (1 if is_nodeinfo_added else 0) + _nodeinfo_updated = _nodeinfo_updated + (1 if is_nodeinfo_updated else 0) + continue + + LOG.debug("Finished consuming location event") + if _nodeinfo_added > 0: + LOG.debug("signal event to refresh brokers state from subscription") + # node info changes trigger rebuilding broker states from subscription + # due to some subscriptions might subscribe resources of all nodes + self.subscription_event.set() + if _nodeinfo_added > 0 or _nodeinfo_updated > 0: + LOG.debug("try to refresh brokers state due to changes of node info") + nodeinfos = nodeinfo_repo.get() + broker_state_changed = self.broker_state_manager.refresh_by_nodeinfos(nodeinfos) + if broker_state_changed: + # signal the potential changes on node resources + LOG.debug("signal event to re-sync up brokers state") + self.__brokers_watcher_event.set() + self.signal_events() + except Exception as ex: + LOG.warning("failed to consume location event:{0}".format(str(ex))) + finally: + if nodeinfo_repo: + del nodeinfo_repo + + def handle_subscriptions_event(self): + broker_state_changed = self.__update_broker_with_subscription() + if broker_state_changed: + self.__brokers_watcher_event.set() + self.signal_events() + + def __persist_locationinfo(self, location_info, nodeinfo_repo): + is_nodeinfo_added = False + is_nodeinfo_updated = False + try: + location_info2 = LocationInfo(**location_info) + entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1) + if not entry: + entry = NodeInfoOrm(**location_info2.to_orm()) + nodeinfo_repo.add(entry) + is_nodeinfo_added = True + LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) + elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: + # location info with newer timestamp indicate broker need to be re-sync up + is_nodeinfo_updated = True + nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) + LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) + else: + # do nothing + LOG.debug("Ignore the location for broker: {0}".format(entry.NodeName)) + except Exception as ex: + LOG.warning("failed to update broker state with location info:{0}, {1}".format( + location_info, str(ex))) + finally: + return is_nodeinfo_added, is_nodeinfo_updated + + def __update_broker_with_subscription(self): + '''update broker state with subscriptions''' + broker_state_changed = False + subscription_repo = None + nodeinfo_repo = None + + try: + subscription_repo = SubscriptionRepo(autocommit=True) + nodeinfo_repo = NodeRepo(autocommit=True) + subs = subscription_repo.get() + LOG.debug("found {0} subscriptions".format(subs.count())) + broker_state_changed = self.broker_state_manager.refresh_by_subscriptions(subs) + if broker_state_changed: + nodeinfo_repo = NodeRepo(autocommit=True) + nodeinfos = nodeinfo_repo.get() + self.broker_state_manager.refresh_by_nodeinfos(nodeinfos) + + for s in subs: + subinfo = SubscriptionInfo(s) + + # assume resource type being PTP and not wildcast + resource_type = s.ResourceType + if resource_type == ResourceType.TypePTP: + broker_name = subinfo.ResourceQualifier.NodeName + else: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId)) + continue + + if s.Status == 1: + current_node_name = NodeInfoHelper.expand_node_name(broker_name) + + # update the initial delivery timestamp as well + self.__notification_handler.update_delivery_timestamp( + NodeInfoHelper.default_node_name(broker_name), + s.SubscriptionId, s.InitialDeliveryTimestamp) + + # delete all entry with Status == 0 + subscription_repo.delete(Status=0) + finally: + del subscription_repo + del nodeinfo_repo + return broker_state_changed + + def handle_brokers_watcher_event(self): + result = False + try: + LOG.debug("try to sync up watcher for {0} brokers".format( + self.broker_state_manager.count_brokers())) + result, _, _ = self.broker_state_manager.syncup_broker_watchers( + self.broker_connection_manager) + self.__brokers_data_syncup_event.set() + except Exception as ex: + result = False + LOG.warning("fail to sync up watcher for brokers: {0}".format(str(ex))) + finally: + if not result: + # retry indefinitely + self.__brokers_watcher_event.set() + self.signal_events() + + def handle_brokers_data_syncup_event(self): + result = False + try: + LOG.debug("try to sync up data for {0} brokers".format( + self.broker_state_manager.count_brokers())) + result,_,_ = self.broker_state_manager.syncup_broker_data( + self.broker_connection_manager) + except Exception as ex: + result = False + LOG.warning("fail to sync up data for brokers: {0}".format(str(ex))) + finally: + if not result: + self.__brokers_data_syncup_event.set() + self.signal_events() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 07d4ecf..d0f21bd 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -35,24 +35,28 @@ class PtpService(object): def __del__(self): del self.subscription_repo + self.locationservice_client.cleanup() + del self.locationservice_client return - def query(self, broker_node_name): - default_node_name = NodeInfoHelper.default_node_name(broker_node_name) + def query(self, broker_name): + default_node_name = NodeInfoHelper.default_node_name(broker_name) nodeinfo_repo = NodeRepo(autocommit=False) nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) # check node availability from DB if not nodeinfo: - raise client_exception.NodeNotAvailable(broker_node_name) + raise client_exception.NodeNotAvailable(broker_name) else: + broker_pod_ip = nodeinfo.PodIP supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') if ResourceType.TypePTP in supported_resource_types: - return self._query(default_node_name) + return self._query(default_node_name, broker_pod_ip) else: - raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - def _query(self, broker_node_name): - broker_host = "notificationservice-{0}".format(broker_node_name) + def _query(self, broker_name, broker_pod_ip): + # broker_host = "notificationservice-{0}".format(broker_name) + broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], self.daemon_control.daemon_context['NOTIFICATION_BROKER_PASS'], @@ -61,35 +65,39 @@ class PtpService(object): notificationservice_client = None try: notificationservice_client = NotificationServiceClient( - broker_node_name, broker_transport_endpoint) + broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( ResourceType.TypePTP, timeout=5, retry=10) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( - broker_node_name, str(ex))) - raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) + broker_name, str(ex))) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) except kombu.exceptions.OperationalError as ex: - LOG.warning("Node {0} is unreachable yet".format(broker_node_name)) - raise client_exception.NodeNotAvailable(broker_node_name) + LOG.warning("Node {0} is unreachable yet".format(broker_name)) + raise client_exception.NodeNotAvailable(broker_name) finally: if notificationservice_client: + notificationservice_client.cleanup() del notificationservice_client def add_subscription(self, subscription_dto): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) - broker_node_name = subscription_dto.ResourceQualifier.NodeName - default_node_name = NodeInfoHelper.default_node_name(broker_node_name) - nodeinfos = NodeInfoHelper.enumerate_nodes(broker_node_name) + broker_name = subscription_dto.ResourceQualifier.NodeName + default_node_name = NodeInfoHelper.default_node_name(broker_name) + nodeinfos = NodeInfoHelper.enumerate_nodes(broker_name) # check node availability from DB if not nodeinfos or not default_node_name in nodeinfos: LOG.warning("Node {0} is not available yet".format(default_node_name)) - raise client_exception.NodeNotAvailable(broker_node_name) + raise client_exception.NodeNotAvailable(broker_name) # get initial resource status if default_node_name: + nodeinfo_repo = NodeRepo(autocommit=False) + nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) + broker_pod_ip = nodeinfo.PodIP ptpstatus = None - ptpstatus = self._query(default_node_name) + ptpstatus = self._query(default_node_name, broker_pod_ip) LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry