diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/config.py b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py index 8ce13bf..3f1625b 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/config.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/config.py @@ -7,6 +7,9 @@ import os SIDECAR_API_PORT = os.environ.get("SIDECAR_API_PORT", "8080") SIDECAR_API_HOST = os.environ.get("SIDECAR_API_HOST", "127.0.0.1") +DATASTORE_PATH = os.environ.get("DATASTORE_PATH", "/opt/datastore") +LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") + # Server Specific Configurations server = { 'port': SIDECAR_API_PORT, @@ -29,14 +32,14 @@ app = { logging = { 'root': {'level': 'INFO', 'handlers': ['console']}, 'loggers': { - 'sidecar': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, - 'pecan': {'level': 'DEBUG', 'handlers': ['console'], 'propagate': False}, + 'sidecar': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False}, + 'pecan': {'level': LOGGING_LEVEL, 'handlers': ['console'], 'propagate': False}, 'py.warnings': {'handlers': ['console']}, '__force_dict__': True }, 'handlers': { 'console': { - 'level': 'DEBUG', + 'level': LOGGING_LEVEL, 'class': 'logging.StreamHandler', 'formatter': 'color' } @@ -57,7 +60,7 @@ logging = { # Bindings and options to pass to SQLAlchemy's ``create_engine`` sqlalchemy = { - 'url' : 'sqlite:///sidecar.db', + 'url' : "sqlite:////{0}/sidecar.db".format(DATASTORE_PATH), 'echo' : False, 'echo_pool' : False, 'pool_recycle' : 3600, diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py index ccbb312..73a16c5 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/base.py @@ -33,13 +33,20 @@ class BrokerClientBase(object): self._workerthread = threading.Thread(target=self._refresher, args=()) self._workerthread.start() - LOG.debug("Created Broker client:{0}".format(broker_name)) + LOG.debug("Created Broker client:{0},{1}".format(broker_name, broker_transport_endpoint)) def __del__(self): + self.cleanup() + + def cleanup(self): + self.clean_listeners() self._workerterminated = True - self._workerevent.set() - self.transport.cleanup() - del self.transport + if self._workerevent: + self._workerevent.set() + if self.transport: + self.transport.cleanup() + del self.transport + self.transport = None return def _refresher(self, retry_interval=5): @@ -95,7 +102,7 @@ class BrokerClientBase(object): continue return allset - def _trigger_refresh_listener(self, context): + def _trigger_refresh_listener(self): self._workerevent.set() # # sleep to re-schedule to run worker thread # time.sleep(2) @@ -117,14 +124,14 @@ class BrokerClientBase(object): context['endpoints'] = listener_endpoints context['active'] = True - self._trigger_refresh_listener(context) + self._trigger_refresh_listener() def remove_listener(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) with self._workerlock: if context: context['active'] = False - self._trigger_refresh_listener(context) + self._trigger_refresh_listener() def is_listening(self, topic, server): context = self.listeners.get(topic,{}).get(server, {}) @@ -137,6 +144,18 @@ class BrokerClientBase(object): return True return False + def __is_connected(self, context): + return context.get('rpcserver', None) is not None if context else False + + def clean_listeners(self): + for topic, servers in self.listeners.items(): + for server, context in servers.items(): + self.remove_listener(topic, server) + self._trigger_refresh_listener() + LOG.debug("listener {0}@{1} {2} stopped".format( + topic, server, + 'is' if self.__is_connected(context) else 'is not yet')) + def call(self, topic, server, api_name, timeout=None, retry=None, **api_kwargs): target = oslo_messaging.Target( topic=topic, server=server, version=self.broker_endpoint.Version, diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py index 09f683c..011c816 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -45,9 +45,10 @@ class NotificationServiceClient(BrokerClientBase): return time.time() '''Init client to notification service''' - def __init__(self, target_node_name, notificationservice_transport_endpoint): + def __init__(self, target_node_name, notificationservice_transport_endpoint, broker_pod_ip): self.Id = id(self) self.target_node_name = target_node_name + self.broker_pod_ip = broker_pod_ip super(NotificationServiceClient, self).__init__( '{0}'.format(target_node_name), notificationservice_transport_endpoint) @@ -89,3 +90,6 @@ class NotificationServiceClient(BrokerClientBase): server="{0}-EventListener-{1}".format(resource_type, self.Id) return super(NotificationServiceClient, self).is_listening( topic, server) + + def is_broker_ip(self, broker_pod_ip): + return self.broker_pod_ip == broker_pod_ip diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py index 349cbb7..f801c02 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py @@ -4,8 +4,11 @@ # SPDX-License-Identifier: Apache-2.0 # +import os import logging +LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") + def get_logger(module_name): logger = logging.getLogger(module_name) return config_logger(logger) @@ -14,5 +17,5 @@ def config_logger(logger): ''' configure the logger: uncomment following lines for debugging ''' - # logger.setLevel(level=logging.DEBUG) + logger.setLevel(LOGGING_LEVEL) return logger diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py new file mode 100644 index 0000000..983edfb --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/model/dto/broker_state.py @@ -0,0 +1,123 @@ +#coding=utf-8 +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json + +from wsme import types as wtypes +from notificationclientsdk.model.dto.resourcetype import EnumResourceType + +class BrokerState(wtypes.Base): + BrokerName = wtypes.text + # Timestamp = float + Connected = bool + ConnectionStateChanged = bool + BrokerIP = wtypes.text + BrokerIPChanged = bool + ResourceTypes = [EnumResourceType] + ResourceTypesChanged = bool + ResourceTypesSubscribed = {wtypes.text:int} + ResourceTypesSubscribedChanged = bool + DataSyncupPendingDetails = [wtypes.text] + + def update_connection_state(self, is_connected): + if self.Connected != is_connected: + self.Connected = is_connected + self.ConnectionStateChanged = True + return self.ConnectionStateChanged + + def is_connected(self): + return self.Connected + + def is_connection_state_changed(self): + return self.ConnectionStateChanged + + def ack_connection_state_changed(self): + self.ConnectionStateChanged = False + + def update_broker_ip(self, broker_ip): + if self.BrokerIP != broker_ip: + self.BrokerIP = broker_ip + self.BrokerIPChanged = True + return self.BrokerIPChanged + + def is_broker_ip_changed(self): + return self.BrokerIPChanged + + def ack_broker_ip_changed(self): + self.BrokerIPChanged = False + + def update_resources(self, resources_list): + sorted_resource_list = resources_list.sort() + if self.ResourceTypes != sorted_resource_list: + self.ResourceTypes = sorted_resource_list + self.ResourceTypesChanged = True + return self.ResourceTypesChanged + + def is_resources_changed(self): + return self.ResourceTypesChanged + + def ack_resources_changed(self): + self.ResourceTypesChanged = False + + def any_resource_subscribed(self): + return len(self.ResourceTypesSubscribed) > 0 + + def try_subscribe_resource(self, resource_type, indicator): + self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or not resource_type in self.ResourceTypesSubscribed + self.ResourceTypesSubscribed[resource_type] = indicator + + def try_unsubscribe_resource(self, resource_type): + self.ResourceTypesSubscribedChanged = self.ResourceTypesSubscribedChanged or resource_type in self.ResourceTypesSubscribed + self.ResourceTypesSubscribed.pop(resource_type, None) + + def is_resource_subscribed_changed(self): + return self.ResourceTypesSubscribedChanged + + def ack_resource_subscribed_changed(self): + self.ResourceTypesSubscribedChanged = False + + def is_resource_subscribed(self, resource_type): + return resource_type in self.ResourceTypesSubscribed + + def any_obsolete_subscription(self, indicator): + for s, i in self.ResourceTypesSubscribed.items(): + if i != indicator: + return True + return False + + def any_resource_subscribed(self): + return len(self.ResourceTypesSubscribed) > 0 + + def unsubscribe_resource_obsolete(self, indicator): + uninterested = [] + for s, i in self.ResourceTypesSubscribed.items(): + if i != indicator: + uninterested.append(s) + for s in uninterested: + self.ResourceTypesSubscribed.pop(s, None) + return len(uninterested) > 0 + + def signal_data_syncup(self, resource_type=None): + if not resource_type: + self.DataSyncupPendingDetails = [k for k,v in self.ResourceTypesSubscribed.items()] + elif resource_type not in self.ResourceTypesSubscribed: + return False + elif not resource_type in self.DataSyncupPendingDetails: + self.DataSyncupPendingDetails.append(resource_type) + return True + + def ack_data_syncup(self, resource_type=None): + if not resource_type: + self.DataSyncupPendingDetails = [] + elif resource_type in self.DataSyncupPendingDetails: + self.DataSyncupPendingDetails.remove(resource_type) + + def is_data_syncup(self, resource_type=None): + if not resource_type: + return len(self.DataSyncupPendingDetails or []) > 0 + else: + return not resource_type in self.DataSyncupPendingDetails or [] \ No newline at end of file diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py new file mode 100644 index 0000000..a3081e9 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py @@ -0,0 +1,320 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import time +import oslo_messaging +import logging +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.model.dto.broker_state import BrokerState + +from notificationclientsdk.client.locationservice import LocationServiceClient +from notificationclientsdk.client.notificationservice import NotificationServiceClient + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + + +class BrokerConnectionManager: + + def __init__(self, broker_location_handler, notification_handler, broker_connection_contexts): + ''' + broker_watchers: { + "": { + "broker_client": client1, + "subscribed_resource_list": ['PTP', ...] + }, + {...} + ... + } + ''' + self.shared_broker_context = broker_connection_contexts + self.registration_endpoint = RpcEndpointInfo( + self.shared_broker_context['REGISTRATION_TRANSPORT_ENDPOINT']) + self.broker_watchers = {} + self.location_watcher = LocationServiceClient(self.registration_endpoint.TransportEndpoint) + self.__broker_location_handler = broker_location_handler + self.__notification_handler = notification_handler + + def __del__(self): + if self.location_watcher: + self.location_watcher.cleanup() + del self.location_watcher + self.location_watcher = None + + def start(self): + self.__start_watch_all_nodes() + + def stop(self): + self.__stop_watch_all_nodes() + + def __validate(self, brokerstate): + valid = brokerstate.BrokerName or brokerstate.BrokerIP + return valid + + def start_watching_broker(self, brokerstate): + try: + if not self.__validate(brokerstate): + return False + + broker_name = brokerstate.BrokerName + + # must make sure the location is updated/watched: + # 1, check and start location watcher + if not self.location_watcher.is_listening_on_location(broker_name): + # start watching on the location announcement + self.location_watcher.add_location_listener( + broker_name, + location_handler=self.__broker_location_handler) + LOG.debug("Start watching location announcement of notificationservice@{0}" + .format(broker_name)) + # try to update location by query + try: + location_info = self.location_watcher.query_location(broker_name, timeout=5, retry=2) + LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) + if location_info: + podip = location_info.get("PodIP", None) + resourcetypes = location_info.get("ResourceTypes", None) + brokerstate.update_broker_ip(podip) + brokerstate.update_resources(resourcetypes) + else: + return False + except Exception as ex: + LOG.warning("Failed to update location of node:{0} due to: {1}".format( + broker_name, str(ex))) + raise ex + + # 2, create broker connection + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if not broker_client: + LOG.debug("Start watching notifications from notificationservice@{0}".format(broker_name)) + broker_client = self.__create_client(broker_name, brokerstate.BrokerIP) + broker_watcher["broker_client"] = broker_client + self.broker_watchers[broker_name] = broker_watcher + + # 3, update watching resources + result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + return result + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def __stop_watching_broker_resource(self, broker_client, broker_name, resource_type): + try: + if broker_client.is_listening_on_resource(resource_type): + broker_client.remove_resource_status_listener(resource_type) + return True + except Exception as ex: + LOG.warning("failed to stop watching resource:{0}@{1},{2}".format( + broker_name, resource_type, str(ex))) + return False + + def __start_watching_broker_resource(self, broker_client, broker_name, resource_type): + try: + if not broker_client.is_listening_on_resource(resource_type): + broker_client.add_resource_status_listener( + resource_type, status_handler=self.__notification_handler) + LOG.debug("Start watching {0}@{1}".format(resource_type, broker_name)) + + return True + except Exception as ex: + LOG.warning("failed to start watching resource:{0}@{1},{2}".format( + resource_type, broker_name, str(ex))) + return False + + def stop_watching_broker(self, broker_name): + try: + # 1, stop listening to broker's location announcement + if self.location_watcher.is_listening_on_location(broker_name): + self.location_watcher.remove_location_listener(broker_name) + LOG.debug("Stop watching location announcement for broker@{0}" + .format(broker_name)) + + # 2, remove broker client + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + broker_client.cleanup() + del broker_client + broker_client = None + self.broker_watchers.pop(broker_name, None) + LOG.debug("Stop watching notificationservice@{0}".format(broker_name)) + + return True + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + broker_name, str(ex))) + return False + + def restart_watching_broker(self, brokerstate): + try: + broker_name = brokerstate.BrokerName + LOG.debug("Try to restart watching notificationservice@{0}".format(broker_name)) + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + broker_client.cleanup() + del broker_client + broker_client = None + self.broker_watchers.pop(broker_name, None) + return self.start_watching_broker(brokerstate) + except Exception as ex: + LOG.warning("failed to restart watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def update_watching_resources(self, brokerstate): + try: + broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, {}) + broker_client = broker_watcher.get("broker_client", None) + if broker_client: + result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + return result + return False + except Exception as ex: + LOG.warning("failed to start watching:{0},{1}".format( + brokerstate, str(ex))) + return False + + def __update_watching_resources(self, broker_watcher, broker_client, brokerstate): + try: + result = True + # 1, filter out those unsubscribed resources + subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + if subscribed_resource_list != brokerstate.ResourceTypesSubscribed: + # stop watching those uninterested + for resource_type in subscribed_resource_list: + if resource_type not in brokerstate.ResourceTypesSubscribed: + result = self.__stop_watching_broker_resource( + broker_client, brokerstate.BrokerName, resource_type) + + # 2, update the list + subscribed_resource_list = brokerstate.ResourceTypesSubscribed + broker_watcher["subscribed_resource_list"] = subscribed_resource_list + + # 3, start watching the subscribed resources + for resource_type in subscribed_resource_list: + result = self.__start_watching_broker_resource( + broker_client, brokerstate.BrokerName, resource_type) and result + return result + except Exception as ex: + LOG.warning("failed to update resources:{0},{1}".format( + brokerstate, str(ex))) + return False + + def is_watching_broker(self, broker_name): + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + return broker_client is not None + + def is_watching_resource(self, broker_name, resource_type): + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + return broker_client.is_listening_on_resource( + resource_type) if broker_client else False + + def __create_client(self, broker_name, broker_pod_ip): + if broker_name == NodeInfoHelper.BROKER_NODE_ALL: + # special case: if monitor all node, then use the same broker as locationservice + return self.location_watcher + broker_host = "[{0}]".format(broker_pod_ip) + broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( + self.shared_broker_context['NOTIFICATION_BROKER_USER'], + self.shared_broker_context['NOTIFICATION_BROKER_PASS'], + broker_host, + self.shared_broker_context['NOTIFICATION_BROKER_PORT']) + return NotificationServiceClient(broker_name, broker_transport_endpoint, broker_pod_ip) + + def __start_watch_all_nodes(self, retry_interval=5): + try: + LOG.debug( + "Start watching location announcement of notificationservice@{0}" + .format(NodeInfoHelper.BROKER_NODE_ALL)) + while not self.location_watcher.is_listening_on_location( + NodeInfoHelper.BROKER_NODE_ALL): + # start watching on the location announcement + self.location_watcher.add_location_listener( + NodeInfoHelper.BROKER_NODE_ALL, + location_handler=self.__broker_location_handler) + if not self.location_watcher.is_listening_on_location( + NodeInfoHelper.BROKER_NODE_ALL): + # retry later and forever + LOG.debug( + "Retry indefinitely to start listening to {0}..." + .format(NodeInfoHelper.BROKER_NODE_ALL)) + time.sleep(retry_interval) + + LOG.debug( + "Trigger the location announcement of notificationservice@{0}" + .format(NodeInfoHelper.BROKER_NODE_ALL)) + self.location_watcher.trigger_location_annoucement(timeout=20, retry=10) + except Exception as ex: + LOG.warning("exception: {0}".format(str(ex))) + pass + finally: + pass + return + + def __stop_watch_all_nodes(self): + pass + + def __syncup_data_by_resourcetype(self, broker_client, broker_name, resource_type): + # check to sync up resource status on a node + LOG.debug("try to sync up data for {0}@{1}".format(resource_type, broker_name)) + try: + if broker_name == NodeInfoHelper.BROKER_NODE_ALL: + self.location_watcher.trigger_publishing_status( + resource_type, timeout=5, retry=10) + return True + + # 1, query resource status + broker_client = self.broker_watchers.get(broker_name, None) + if not broker_client: + raise Exception("watcher is not ready for broker: {0}".format(broker_name)) + resource_status = broker_client.query_resource_status( + resource_type, timeout=5, retry=10) + + # 2, deliver resource by comparing LastDelivery time with EventTimestamp + # 3, update the LastDelivery with EventTimestamp + self.__notification_handler.handle(resource_status) + except oslo_messaging.exceptions.MessagingTimeout as ex: + LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format( + resource_type, broker_name, str(ex))) + return False + except Exception as ex: + LOG.warning("Fail to sync up data {0}@{1}, due to {2}".format( + resource_type, broker_name, str(ex))) + raise ex + finally: + pass + return True + + def syncup_broker_data(self, brokerstate): + aggregated_result = True + broker_name = brokerstate.BrokerName + try: + broker_watcher = self.broker_watchers.get(broker_name, {}) + broker_client = broker_watcher.get("broker_client", None) + subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + for resource_type in subscribed_resource_list: + if not brokerstate.is_data_syncup(resource_type): + continue + result = self.__syncup_data_by_resourcetype( + broker_client, broker_name, resource_type) + if result: + brokerstate.ack_data_syncup(resource_type) + aggregated_result = aggregated_result and result + return aggregated_result + except Exception as ex: + LOG.warning("failed to sync up data for resources:{0},{1}".format( + broker_name, str(ex))) + return False diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py new file mode 100644 index 0000000..d858282 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py @@ -0,0 +1,239 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json +import logging +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.model.dto.broker_state import BrokerState + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class BrokerStateManager: + ''' + Manager to manage broker states + Note: Now it is not thread safe + ''' + def __init__(self): + self.broker_state_map = {} + self.disabled_brokers = [] + self.subscription_refresh_iteration = 0 + + def count_brokers(self): + return len(self.broker_state_map) + + def add_broker(self, broker_name): + brokerstate = self.broker_state_map.get(broker_name, None) + if not brokerstate: + brokerstate = BrokerState( + BrokerName=broker_name, + ResourceTypes=[], ResourceTypesSubscribed={}) + brokerstate.update_connection_state(False) + self.broker_state_map[broker_name] = brokerstate + return brokerstate + + def disable_broker(self, broker_name): + if not broker_name in self.disabled_brokers: + self.disabled_brokers.append(broker_name) + + def remove_broker(self, broker_name): + self.broker_state_map.pop(broker_name, None) + + def refresh_by_nodeinfos(self, nodeinfos_orm): + broker_state_changed = False + for s in nodeinfos_orm or []: + broker_state_changed = self.refresh_by_nodeinfo(s) or broker_state_changed + return broker_state_changed + + def refresh_by_nodeinfo(self, nodeinfo_orm): + brokerstate = self.broker_state_map.get(nodeinfo_orm.NodeName, None) + if not brokerstate: + return False + if nodeinfo_orm.Status == 1: + brokerstate.update_connection_state(True) + else: + brokerstate.update_connection_state(False) + brokerstate.update_broker_ip(nodeinfo_orm.PodIP) + brokerstate.update_resources(json.loads(nodeinfo_orm.ResourceTypes)) + return brokerstate.is_broker_ip_changed() or brokerstate.is_resources_changed() + + def refresh_by_subscriptions(self, subscriptions_orm): + broker_state_changed = False + # 1, refresh iteration + self.subscription_refresh_iteration = self.subscription_refresh_iteration + 1 + + # 2, refresh resource subscriptions by subscription record + for s in subscriptions_orm or []: + broker_state_changed = self.__refresh_by_subscription(s) or broker_state_changed + + # 3, mark broker state change by checking if any obsolete resources + broker_state_changed = broker_state_changed or self.any_obsolete_broker() + return broker_state_changed + + def any_obsolete_broker(self): + for broker_name, brokerstate in self.broker_state_map.items(): + try: + if brokerstate.any_obsolete_subscription( + self.subscription_refresh_iteration): + return True + except Exception as ex: + LOG.warning( + "failed to check obsoleted resources@{0}:{1}".format(broker_name, str(ex))) + continue + return False + + def __refresh_by_subscription(self, subscription_orm): + changed = False + broker_name = None + + subscription = SubscriptionInfo(subscription_orm) + resource_type = subscription.ResourceType + + LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) + if subscription_orm.Status != 1: + return False + + # assume PTP and not wildcast + if resource_type == ResourceType.TypePTP: + broker_name = subscription.ResourceQualifier.NodeName + else: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId)) + return False + + if not broker_name: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subscription.SubscriptionId)) + return False + + enumerated_broker_names = NodeInfoHelper.enumerate_nodes(broker_name) + if not enumerated_broker_names: + LOG.debug("Failed to enumerate broker names for {0}".format(broker_name)) + return False + + for expanded_broker_name in enumerated_broker_names: + brokerstate = self.broker_state_map.get(expanded_broker_name, None) + if not brokerstate: + brokerstate = self.add_broker(expanded_broker_name) + changed = True + + changed = changed or (brokerstate.is_resource_subscribed(resource_type) == False) + brokerstate.try_subscribe_resource(resource_type, self.subscription_refresh_iteration) + + return changed + + def syncup_broker_watchers(self, broker_connection_manager): + '''sync up brokers state to broker connection manager''' + aggregated_result = True + interested_brokers = [] + removed_brokers = [] + # 1, clean all obsolete resource subscriptions + # and disable broker in case no active resource subscription + for broker_name, brokerstate in self.broker_state_map.items(): + try: + brokerstate.unsubscribe_resource_obsolete(self.subscription_refresh_iteration) + if not brokerstate.any_resource_subscribed(): + LOG.debug("disable broker@{0} due to no subscription".format(broker_name)) + self.disable_broker(broker_name) + except Exception as ex: + LOG.warning( + "failed to clean obsolete subscribed resources@{0}:{1}".format( + broker_name, str(ex))) + continue + + # 2, stop watching all disabled brokers + for broker_name in self.disabled_brokers: + try: + LOG.debug("stop watching due to disabled: {0}".format(broker_name)) + result = broker_connection_manager.stop_watching_broker( + broker_name) + self.remove_broker(broker_name) + removed_brokers.append(broker_name) + aggregated_result = aggregated_result and result + except Exception as ex: + LOG.warning( + "failed to clean disabled broker@{0}: {1}".format( + broker_name, str(ex))) + aggregated_result = False + continue + self.disabled_brokers.clear() + + # 3, start/restart watching remains brokers + for broker_name, brokerstate in self.broker_state_map.items(): + interested_brokers.append(broker_name) + try: + result = True + is_connected = brokerstate.is_connected() + is_watching = broker_connection_manager.is_watching_broker( + broker_name) + + if not is_connected: + if is_watching: + LOG.debug("Stop watching due to disconnected: {0}".format(broker_name)) + result = broker_connection_manager.stop_watching_broker( + broker_name) + elif is_connected: + # note: start/restart watcher will update resources as well + if not is_watching: + LOG.debug("Start watching due to connected: {0}".format(broker_name)) + result = broker_connection_manager.start_watching_broker( + brokerstate) + elif brokerstate.is_broker_ip_changed(): + LOG.debug("Restart watching due to IP changed: {0}".format(broker_name)) + result = broker_connection_manager.restart_watching_broker( + brokerstate) + elif brokerstate.is_connection_state_changed(): + # trigger to sync up notification after (re-)connection + LOG.debug("Trigger to re-sync up data: {0}".format(broker_name)) + result = brokerstate.signal_data_syncup() + elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed(): + LOG.debug("Update watching due to resources changed: {0}".format(broker_name)) + result = broker_connection_manager.update_watching_resources(brokerstate) + + # leave the signals as it is to re-sync up in next loop in case of failure + if result: + # assumption to avoid race condition: same thread to manipulate brokerstate + brokerstate.ack_connection_state_changed() + brokerstate.ack_broker_ip_changed() + brokerstate.ack_resource_subscribed_changed() + brokerstate.ack_resources_changed() + + aggregated_result = aggregated_result and result + except Exception as ex: + LOG.warning("failed to sync up broker watcher:{0},{1}".format(broker_name, str(ex))) + aggregated_result = False + continue + return aggregated_result, interested_brokers, removed_brokers + + def syncup_broker_data(self, broker_connection_manager): + '''sync up to get rid of stall data''' + aggregated_result = True + synced_brokers = [] + unsynced_brokers = [] + for broker_name, brokerstate in self.broker_state_map.items(): + try: + if brokerstate.is_connected() and brokerstate.is_data_syncup(): + LOG.debug("Try to sync up broker data:{0}".format(broker_name)) + result = result and broker_connection_manager.syncup_broker_data( + brokerstate) + if result: + # assumption to avoid race condition: same thread to manipulate brokerstate + brokerstate.ack_data_syncup() + synced_brokers.append(broker_name) + else: + unsynced_brokers.append(broker_name) + aggregated_result = aggregated_result and result + except Exception as ex: + unsynced_brokers.append(broker_name) + LOG.warning("failed to sync up broker data:{0}".format(str(ex))) + aggregated_result = False + continue + return aggregated_result, synced_brokers, unsynced_brokers diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py index 139a875..629b9aa 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -4,42 +4,14 @@ # SPDX-License-Identifier: Apache-2.0 # -import os -import json -import time -import oslo_messaging -from oslo_config import cfg import logging import multiprocessing as mp -import threading -import sys -if sys.version > '3': - import queue as Queue -else: - import Queue - -from notificationclientsdk.common.helpers import subscription_helper -from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.common.helpers import rpc_helper from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo -from notificationclientsdk.model.dto.subscription import SubscriptionInfo -from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.location import LocationInfo - -from notificationclientsdk.repository.dbcontext import DbContext -from notificationclientsdk.repository.subscription_repo import SubscriptionRepo - -from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm - -from notificationclientsdk.repository.node_repo import NodeRepo - from notificationclientsdk.client.locationservice import LocationServiceClient -from notificationclientsdk.client.notificationservice import NotificationServiceClient -from notificationclientsdk.client.notificationservice import NotificationHandlerBase - -from notificationclientsdk.client.locationservice import LocationHandlerDefault +from notificationclientsdk.services.notification_worker import NotificationWorker LOG = logging.getLogger(__name__) @@ -53,635 +25,6 @@ def ProcessWorkerDefault(event, subscription_event, daemon_context): return -class NotificationWorker: - - class NotificationWatcher(NotificationHandlerBase): - def __init__(self, notification_watcher): - self.notification_watcher = notification_watcher - super(NotificationWorker.NotificationWatcher, self).__init__() - - def handle(self, notification_info): - LOG.debug("Received notification:{0}".format(notification_info)) - result = self.notification_watcher.handle_notification_delivery(notification_info) - return result - - class NodeInfoWatcher(LocationHandlerDefault): - def __init__(self, notification_watcher): - self.notification_watcher = notification_watcher - super(NotificationWorker.NodeInfoWatcher, self).__init__() - - def handle(self, location_info): - LOG.debug("Received location info:{0}".format(location_info)) - return self.notification_watcher.produce_location_event(location_info) - - def __init__( - self, event, subscription_event, daemon_context): - - self.daemon_context = daemon_context - self.residing_node_name = daemon_context['THIS_NODE_NAME'] - NodeInfoHelper.set_residing_node(self.residing_node_name) - - self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON']) - DbContext.init_dbcontext(self.sqlalchemy_conf) - self.event = event - self.subscription_event = subscription_event - - self.registration_endpoint = RpcEndpointInfo(daemon_context['REGISTRATION_TRANSPORT_ENDPOINT']) - self.locationservice_client = LocationServiceClient(self.registration_endpoint.TransportEndpoint) - # dict,key: node name, value , notificationservice client - self.notificationservice_clients = {} - - # Watcher callback - self.__NotificationWatcher = NotificationWorker.NotificationWatcher(self) - self.__NodeInfoWatcher = NotificationWorker.NodeInfoWatcher(self) - - self.__init_node_resources_map() - self.__init_node_info_channel() - self.__init_location_channel() - self.__init_notification_channel() - self.__init_node_sync_channel() - - def __init_node_resources_map(self): - self.node_resources_map = {} - self.node_resources_iteration = 0 - self.__node_resources_event = mp.Event() - - def __init_node_info_channel(self): - self.__node_info_event = mp.Event() - - def __init_location_channel(self): - self.location_event = mp.Event() - self.location_lock = threading.Lock() - # map index by node name - # only cache the latest loation info - self.location_channel = {} - self.location_keys_q = Queue.Queue() - - def __init_notification_channel(self): - self.notification_lock = threading.Lock() - self.notification_stat = {} - - def __init_node_sync_channel(self): - self.__node_sync_event = mp.Event() - self.__node_sync_q = Queue.Queue() - # initial to be set - self.__node_sync_event.set() - - def __del__(self): - del self.locationservice_client - - def signal_location_event(self): - self.location_event.set() - - def signal_subscription_event(self): - self.subscription_event.set() - - def signal_node_sync_event(self): - self.__node_sync_event.set() - - def signal_nodeinfo_event(self): - self.__node_info_event.set() - - def signal_node_resources_event(self): - self.__node_resources_event.set() - - def signal_events(self): - self.event.set() - - def produce_location_event(self, location_info): - node_name = location_info.get('NodeName', None) - podip = location_info.get("PodIP", None) - if not node_name or not podip: - LOG.warning("Missing PodIP inside location info:{0}".format(location_info)) - return False - - result = True - timestamp = location_info.get('Timestamp', 0) - # acquire lock to sync threads which invoke this method - self.location_lock.acquire() - try: - current = self.location_channel.get(node_name, {}) - if current.get('Timestamp', 0) < timestamp: - if current.get('PodIP', None) != podip: - # update /etc/hosts must happen in threads to avoid blocking by the main thread - NOTIFICATIONSERVICE_HOSTNAME = 'notificationservice-{0}' - hostfile_helper.update_host( - NOTIFICATIONSERVICE_HOSTNAME.format(node_name), podip) - LOG.debug("Updated location with IP:{0}".format(podip)) - - # replace the location_info - self.location_channel[node_name] = location_info - self.location_keys_q.put(node_name) - # notify the consumer to process the update - self.signal_location_event() - self.signal_events() - result = True - except Exception as ex: - LOG.warning("failed to produce location event:{0}".format(str(ex))) - result = False - finally: - # release lock - self.location_lock.release() - - return result - - def consume_location_event(self): - LOG.debug("Start consuming location event") - need_to_sync_node = False - node_changed = False - node_resource_updated = False - nodeinfo_repo = NodeRepo(autocommit=True) - - while not self.location_keys_q.empty(): - node_name = self.location_keys_q.get(False) - location_info = self.location_channel.get(node_name, None) - if not location_info: - LOG.warning("consume location@{0} without location info".format(node_name)) - continue - - LOG.debug("process location event@{0}:{1}".format(node_name, location_info)) - - location_info2 = LocationInfo(**location_info) - - entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1) - if not entry: - entry = NodeInfoOrm(**location_info2.to_orm()) - nodeinfo_repo.add(entry) - node_resource_updated = True - node_changed = True - self.__node_sync_q.put(node_name) - LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) - elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: - # update the entry - if entry.ResourceTypes != location_info2.ResourceTypes: - node_resource_updated = True - nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) - self.__node_sync_q.put(node_name) - LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) - else: - # do nothing - LOG.debug("Ignore the location for: {0}".format(entry.NodeName)) - continue - need_to_sync_node = True - continue - - del nodeinfo_repo - LOG.debug("Finished consuming location event") - if need_to_sync_node or node_resource_updated: - if node_changed: - LOG.debug("signal node changed event") - # node changes triggers rebuild map from subscription - # due to the potential subscriptions to all nodes - self.signal_subscription_event() - if node_resource_updated: - # signal the potential changes on node resources - LOG.debug("signal node resources updating event") - self.signal_nodeinfo_event() - if need_to_sync_node: - LOG.debug("signal node syncing event") - self.signal_node_sync_event() - self.signal_events() - pass - - def __get_lastest_delivery_timestamp(self, node_name, subscriptionid): - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - return last_delivery_time - - def __update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): - if not self.notification_stat.get(node_name, None): - self.notification_stat[node_name] = { - subscriptionid: { - 'EventTimestamp': this_delivery_time - } - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - elif not self.notification_stat[node_name].get(subscriptionid, None): - self.notification_stat[node_name][subscriptionid] = { - 'EventTimestamp': this_delivery_time - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - else: - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - if (last_delivery_time >= this_delivery_time): - return - last_delivery_stat['EventTimestamp'] = this_delivery_time - LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( - node_name, subscriptionid)) - - def handle_notification_delivery(self, notification_info): - LOG.debug("start notification delivery") - result = True - subscription_repo = None - try: - self.notification_lock.acquire() - subscription_repo = SubscriptionRepo(autocommit=True) - resource_type = notification_info.get('ResourceType', None) - node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) - if not resource_type: - raise Exception("abnormal notification@{0}".format(node_name)) - - if resource_type == ResourceType.TypePTP: - pass - else: - raise Exception("notification with unsupported resource type:{0}".format(resource_type)) - - this_delivery_time = notification_info['EventTimestamp'] - - entries = subscription_repo.get(ResourceType=resource_type, Status=1) - for entry in entries: - subscriptionid = entry.SubscriptionId - ResourceQualifierJson = entry.ResourceQualifierJson or '{}' - ResourceQualifier = json.loads(ResourceQualifierJson) - # qualify by NodeName - entry_node_name = ResourceQualifier.get('NodeName', None) - node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) - if not node_name_matched: - continue - - subscription_dto2 = SubscriptionInfo(entry) - try: - last_delivery_time = self.__get_lastest_delivery_timestamp(node_name, subscriptionid) - if last_delivery_time and last_delivery_time >= this_delivery_time: - # skip this entry since already delivered - LOG.debug("Ignore the notification for: {0}".format(entry.SubscriptionId)) - continue - - subscription_helper.notify(subscription_dto2, notification_info) - LOG.debug("notification is delivered successfully to {0}".format( - entry.SubscriptionId)) - - self.__update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) - - except Exception as ex: - LOG.warning("notification is not delivered to {0}:{1}".format( - entry.SubscriptionId, str(ex))) - # proceed to next entry - continue - finally: - pass - except Exception as ex: - LOG.warning("Failed to delivery notification:{0}".format(str(ex))) - result = False - finally: - self.notification_lock.release() - if not subscription_repo: - del subscription_repo - - if result: - LOG.debug("Finished notification delivery") - else: - LOG.warning("Failed on notification delivery") - return result - - def process_sync_node_event(self): - LOG.debug("Start processing sync node event") - need_to_sync_node_again = False - - while not self.__node_sync_q.empty(): - broker_node_name = self.__node_sync_q.get(False) - try: - result = self.syncup_node(broker_node_name) - if not result: - need_to_sync_node_again = True - except Exception as ex: - LOG.warning("Failed to syncup node{0}:{1}".format(broker_node_name, str(ex))) - continue - - if need_to_sync_node_again: - # continue try in to next loop - self.signal_node_sync_event() - self.signal_events() - LOG.debug("Finished processing sync node event") - - def run(self): - # start location listener - self.__start_watch_all_nodes() - while True: - self.event.wait() - self.event.clear() - LOG.debug("daemon control event is asserted") - - if self.location_event.is_set(): - self.location_event.clear() - # process location notifications - self.consume_location_event() - - if self.subscription_event.is_set(): - self.subscription_event.clear() - # build node resources map from subscriptions - self.process_subscription_event() - - if self.__node_info_event.is_set(): - self.__node_info_event.clear() - # update node_resources_map from node info - self.__update_map_from_nodeinfos() - - if self.__node_resources_event.is_set(): - self.__node_resources_event.clear() - # update watchers from node_resources_map - self.__refresh_watchers_from_map() - - if self.__node_sync_event.is_set(): - self.__node_sync_event.clear() - # compensate for the possible loss of notification during reconnection - self.process_sync_node_event() - - continue - return - - def syncup_resource(self, broker_node_name, resource_type): - # check to sync up resource status on a node - LOG.debug("sync up resource@{0} :{1}".format(broker_node_name, resource_type)) - try: - if broker_node_name == NodeInfoHelper.BROKER_NODE_ALL: - self.locationservice_client.trigger_publishing_status( - resource_type, timeout=5, retry=10) - return True - - # 1, query resource status - broker_client = self.notificationservice_clients.get(broker_node_name, None) - if not broker_client: - raise Exception("notification service client is not setup for node {0}".format(broker_node_name)) - resource_status = broker_client.query_resource_status( - resource_type, timeout=5, retry=10) - - # 2, deliver resource by comparing LastDelivery time with EventTimestamp - # 3, update the LastDelivery with EventTimestamp - self.__NotificationWatcher.handle(resource_status) - except oslo_messaging.exceptions.MessagingTimeout as ex: - LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( - resource_type, broker_node_name, str(ex))) - return False - except Exception as ex: - LOG.warning("Fail to syncup resource {0}@{1}, due to {2}".format( - resource_type, broker_node_name, str(ex))) - raise ex - finally: - pass - return True - - def syncup_node(self, broker_node_name): - all_resource_synced = True - # check to sync up resources status on a node - node_resources = self.node_resources_map.get(broker_node_name, None) - if node_resources: - LOG.debug("sync up resources@{0} :{1}".format(broker_node_name, node_resources)) - for resource_type, iteration in node_resources.items(): - if iteration == self.node_resources_iteration: - result = self.syncup_resource(broker_node_name, resource_type) - if not result: - all_resource_synced = False - return all_resource_synced - - def __cleanup_map(self): - for broker_node_name, node_resources in self.node_resources_map.items(): - resourcetypelist = [r for (r, i) in node_resources.items() if i= this_delivery_time: + # skip this entry since already delivered + LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId)) + continue + + subscription_helper.notify(subscription_dto2, notification_info) + LOG.debug("notification is delivered successfully to {0}".format( + entry.SubscriptionId)) + + self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) + + except Exception as ex: + LOG.warning("notification is not delivered to {0}:{1}".format( + entry.SubscriptionId, str(ex))) + # proceed to next entry + continue + finally: + pass + LOG.debug("Finished notification delivery") + return True + except Exception as ex: + LOG.warning("Failed to delivery notification:{0}".format(str(ex))) + return False + finally: + self.notification_lock.release() + if not subscription_repo: + del subscription_repo + + def __get_latest_delivery_timestamp(self, node_name, subscriptionid): + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + return last_delivery_time + + def update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): + if not self.notification_stat.get(node_name, None): + self.notification_stat[node_name] = { + subscriptionid: { + 'EventTimestamp': this_delivery_time + } + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + elif not self.notification_stat[node_name].get(subscriptionid, None): + self.notification_stat[node_name][subscriptionid] = { + 'EventTimestamp': this_delivery_time + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + else: + last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + if (last_delivery_time >= this_delivery_time): + return + last_delivery_stat['EventTimestamp'] = this_delivery_time + LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( + node_name, subscriptionid)) diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py new file mode 100644 index 0000000..72d06c8 --- /dev/null +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py @@ -0,0 +1,300 @@ +# +# Copyright (c) 2021 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json +import logging + +import multiprocessing as mp +import threading +import sys +if sys.version > '3': + import queue as Queue +else: + import Queue + +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo +from notificationclientsdk.model.dto.subscription import SubscriptionInfo +from notificationclientsdk.model.dto.resourcetype import ResourceType +from notificationclientsdk.model.dto.location import LocationInfo + +from notificationclientsdk.repository.dbcontext import DbContext +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo + +from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm + +from notificationclientsdk.repository.node_repo import NodeRepo + +from notificationclientsdk.client.locationservice import LocationHandlerDefault, LocationHandlerBase + +from notificationclientsdk.services.broker_state_manager import BrokerStateManager +from notificationclientsdk.services.broker_connection_manager import BrokerConnectionManager +from notificationclientsdk.services.notification_handler import NotificationHandler + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper +log_helper.config_logger(LOG) + +class NotificationWorker: + + class LocationInfoHandler(LocationHandlerBase): + '''Glue code to forward location info to daemon method''' + def __init__(self, locationinfo_dispatcher): + self.locationinfo_dispatcher = locationinfo_dispatcher + super(NotificationWorker.LocationInfoHandler, self).__init__() + + def handle(self, location_info): + LOG.debug("Received location info:{0}".format(location_info)) + return self.locationinfo_dispatcher.produce_location_event(location_info) + + def __init__( + self, event, subscription_event, daemon_context): + + self.__alive = True + + self.daemon_context = daemon_context + self.residing_node_name = daemon_context['THIS_NODE_NAME'] + NodeInfoHelper.set_residing_node(self.residing_node_name) + + self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON']) + DbContext.init_dbcontext(self.sqlalchemy_conf) + self.event = event + self.subscription_event = subscription_event + + self.__locationinfo_handler = NotificationWorker.LocationInfoHandler(self) + self.__notification_handler = NotificationHandler() + self.broker_connection_manager = BrokerConnectionManager( + self.__locationinfo_handler, + self.__notification_handler, + self.daemon_context) + self.broker_state_manager = BrokerStateManager() + + self.__init_location_channel() + + # event to signal brokers state change + self.__brokers_watcher_event = mp.Event() + self.__brokers_data_syncup_event = mp.Event() + + def __init_location_channel(self): + self.location_event = mp.Event() + self.location_lock = threading.Lock() + # only cache the latest loation info + self.location_channel = {} + self.location_keys_q = Queue.Queue() + + def signal_events(self): + self.event.set() + + def produce_location_event(self, location_info): + node_name = location_info.get('NodeName', None) + podip = location_info.get("PodIP", None) + if not node_name or not podip: + LOG.warning("Missing PodIP inside location info:{0}".format(location_info)) + return False + timestamp = location_info.get('Timestamp', 0) + # mutex for threads which produce location events + self.location_lock.acquire() + try: + current = self.location_channel.get(node_name, {}) + if current.get('Timestamp', 0) < timestamp: + # update with the location_info + self.location_channel[node_name] = location_info + self.location_keys_q.put(node_name) + # notify the consumer to process the update + self.location_event.set() + self.signal_events() + return True + except Exception as ex: + LOG.warning("failed to produce location event:{0}".format(str(ex))) + return False + finally: + # release lock + self.location_lock.release() + + def run(self): + self.broker_connection_manager.start() + while self.__alive: + self.event.wait() + self.event.clear() + LOG.debug("daemon control event is asserted") + + if self.location_event.is_set(): + self.location_event.clear() + # update location information + self.consume_location_event() + + if self.subscription_event.is_set(): + self.subscription_event.clear() + # refresh brokers state from subscriptions + self.handle_subscriptions_event() + + if self.__brokers_watcher_event.is_set(): + self.__brokers_watcher_event.clear() + # sync up brokers connection with their state + self.handle_brokers_watcher_event() + + if self.__brokers_data_syncup_event.is_set(): + self.__brokers_data_syncup_event.clear() + # sync up broker's data + self.handle_brokers_data_syncup_event() + + continue + self.broker_connection_manager.stop() + return + + def consume_location_event(self): + nodeinfo_repo = None + try: + LOG.debug("Start to consume location event") + _nodeinfo_added = 0 + _nodeinfo_updated = 0 + nodeinfo_repo = NodeRepo(autocommit=True) + + while not self.location_keys_q.empty(): + node_name = self.location_keys_q.get(False) + location_info = self.location_channel.get(node_name, None) + if not location_info: + LOG.warning("ignore location info@{0} without content".format(node_name)) + continue + + LOG.debug("consume location info @{0}:{1}".format(node_name, location_info)) + is_nodeinfo_added, is_nodeinfo_updated = self.__persist_locationinfo( + location_info, nodeinfo_repo) + _nodeinfo_added = _nodeinfo_added + (1 if is_nodeinfo_added else 0) + _nodeinfo_updated = _nodeinfo_updated + (1 if is_nodeinfo_updated else 0) + continue + + LOG.debug("Finished consuming location event") + if _nodeinfo_added > 0: + LOG.debug("signal event to refresh brokers state from subscription") + # node info changes trigger rebuilding broker states from subscription + # due to some subscriptions might subscribe resources of all nodes + self.subscription_event.set() + if _nodeinfo_added > 0 or _nodeinfo_updated > 0: + LOG.debug("try to refresh brokers state due to changes of node info") + nodeinfos = nodeinfo_repo.get() + broker_state_changed = self.broker_state_manager.refresh_by_nodeinfos(nodeinfos) + if broker_state_changed: + # signal the potential changes on node resources + LOG.debug("signal event to re-sync up brokers state") + self.__brokers_watcher_event.set() + self.signal_events() + except Exception as ex: + LOG.warning("failed to consume location event:{0}".format(str(ex))) + finally: + if nodeinfo_repo: + del nodeinfo_repo + + def handle_subscriptions_event(self): + broker_state_changed = self.__update_broker_with_subscription() + if broker_state_changed: + self.__brokers_watcher_event.set() + self.signal_events() + + def __persist_locationinfo(self, location_info, nodeinfo_repo): + is_nodeinfo_added = False + is_nodeinfo_updated = False + try: + location_info2 = LocationInfo(**location_info) + entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1) + if not entry: + entry = NodeInfoOrm(**location_info2.to_orm()) + nodeinfo_repo.add(entry) + is_nodeinfo_added = True + LOG.debug("Add NodeInfo: {0}".format(entry.NodeName)) + elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']: + # location info with newer timestamp indicate broker need to be re-sync up + is_nodeinfo_updated = True + nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm()) + LOG.debug("Update NodeInfo: {0}".format(entry.NodeName)) + else: + # do nothing + LOG.debug("Ignore the location for broker: {0}".format(entry.NodeName)) + except Exception as ex: + LOG.warning("failed to update broker state with location info:{0}, {1}".format( + location_info, str(ex))) + finally: + return is_nodeinfo_added, is_nodeinfo_updated + + def __update_broker_with_subscription(self): + '''update broker state with subscriptions''' + broker_state_changed = False + subscription_repo = None + nodeinfo_repo = None + + try: + subscription_repo = SubscriptionRepo(autocommit=True) + nodeinfo_repo = NodeRepo(autocommit=True) + subs = subscription_repo.get() + LOG.debug("found {0} subscriptions".format(subs.count())) + broker_state_changed = self.broker_state_manager.refresh_by_subscriptions(subs) + if broker_state_changed: + nodeinfo_repo = NodeRepo(autocommit=True) + nodeinfos = nodeinfo_repo.get() + self.broker_state_manager.refresh_by_nodeinfos(nodeinfos) + + for s in subs: + subinfo = SubscriptionInfo(s) + + # assume resource type being PTP and not wildcast + resource_type = s.ResourceType + if resource_type == ResourceType.TypePTP: + broker_name = subinfo.ResourceQualifier.NodeName + else: + # ignore the subscription due to unsupported type + LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId)) + continue + + if s.Status == 1: + current_node_name = NodeInfoHelper.expand_node_name(broker_name) + + # update the initial delivery timestamp as well + self.__notification_handler.update_delivery_timestamp( + NodeInfoHelper.default_node_name(broker_name), + s.SubscriptionId, s.InitialDeliveryTimestamp) + + # delete all entry with Status == 0 + subscription_repo.delete(Status=0) + finally: + del subscription_repo + del nodeinfo_repo + return broker_state_changed + + def handle_brokers_watcher_event(self): + result = False + try: + LOG.debug("try to sync up watcher for {0} brokers".format( + self.broker_state_manager.count_brokers())) + result, _, _ = self.broker_state_manager.syncup_broker_watchers( + self.broker_connection_manager) + self.__brokers_data_syncup_event.set() + except Exception as ex: + result = False + LOG.warning("fail to sync up watcher for brokers: {0}".format(str(ex))) + finally: + if not result: + # retry indefinitely + self.__brokers_watcher_event.set() + self.signal_events() + + def handle_brokers_data_syncup_event(self): + result = False + try: + LOG.debug("try to sync up data for {0} brokers".format( + self.broker_state_manager.count_brokers())) + result,_,_ = self.broker_state_manager.syncup_broker_data( + self.broker_connection_manager) + except Exception as ex: + result = False + LOG.warning("fail to sync up data for brokers: {0}".format(str(ex))) + finally: + if not result: + self.__brokers_data_syncup_event.set() + self.signal_events() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 07d4ecf..d0f21bd 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -35,24 +35,28 @@ class PtpService(object): def __del__(self): del self.subscription_repo + self.locationservice_client.cleanup() + del self.locationservice_client return - def query(self, broker_node_name): - default_node_name = NodeInfoHelper.default_node_name(broker_node_name) + def query(self, broker_name): + default_node_name = NodeInfoHelper.default_node_name(broker_name) nodeinfo_repo = NodeRepo(autocommit=False) nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) # check node availability from DB if not nodeinfo: - raise client_exception.NodeNotAvailable(broker_node_name) + raise client_exception.NodeNotAvailable(broker_name) else: + broker_pod_ip = nodeinfo.PodIP supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') if ResourceType.TypePTP in supported_resource_types: - return self._query(default_node_name) + return self._query(default_node_name, broker_pod_ip) else: - raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - def _query(self, broker_node_name): - broker_host = "notificationservice-{0}".format(broker_node_name) + def _query(self, broker_name, broker_pod_ip): + # broker_host = "notificationservice-{0}".format(broker_name) + broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], self.daemon_control.daemon_context['NOTIFICATION_BROKER_PASS'], @@ -61,35 +65,39 @@ class PtpService(object): notificationservice_client = None try: notificationservice_client = NotificationServiceClient( - broker_node_name, broker_transport_endpoint) + broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( ResourceType.TypePTP, timeout=5, retry=10) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( - broker_node_name, str(ex))) - raise client_exception.ResourceNotAvailable(broker_node_name, ResourceType.TypePTP) + broker_name, str(ex))) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) except kombu.exceptions.OperationalError as ex: - LOG.warning("Node {0} is unreachable yet".format(broker_node_name)) - raise client_exception.NodeNotAvailable(broker_node_name) + LOG.warning("Node {0} is unreachable yet".format(broker_name)) + raise client_exception.NodeNotAvailable(broker_name) finally: if notificationservice_client: + notificationservice_client.cleanup() del notificationservice_client def add_subscription(self, subscription_dto): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) - broker_node_name = subscription_dto.ResourceQualifier.NodeName - default_node_name = NodeInfoHelper.default_node_name(broker_node_name) - nodeinfos = NodeInfoHelper.enumerate_nodes(broker_node_name) + broker_name = subscription_dto.ResourceQualifier.NodeName + default_node_name = NodeInfoHelper.default_node_name(broker_name) + nodeinfos = NodeInfoHelper.enumerate_nodes(broker_name) # check node availability from DB if not nodeinfos or not default_node_name in nodeinfos: LOG.warning("Node {0} is not available yet".format(default_node_name)) - raise client_exception.NodeNotAvailable(broker_node_name) + raise client_exception.NodeNotAvailable(broker_name) # get initial resource status if default_node_name: + nodeinfo_repo = NodeRepo(autocommit=False) + nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) + broker_pod_ip = nodeinfo.PodIP ptpstatus = None - ptpstatus = self._query(default_node_name) + ptpstatus = self._query(default_node_name, broker_pod_ip) LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry