diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py index ff36991..846fd31 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py @@ -44,3 +44,6 @@ VALID_SOURCE_URI = { SOURCE_SYNCE_LOCK_STATE_EXTENDED, SOURCE_SYNCE_LOCK_STATE } + +WILDCARD_CURRENT_NODE = '.' +WILDCARD_ALL_NODES = '*' diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py index 68b2460..ef92fad 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/nodeinfo_helper.py @@ -4,11 +4,16 @@ # SPDX-License-Identifier: Apache-2.0 # -import json +import logging from notificationclientsdk.repository.node_repo import NodeRepo +from notificationclientsdk.common.helpers import constants +from notificationclientsdk.common.helpers import log_helper + +LOG = logging.getLogger(__name__) +log_helper.config_logger(LOG) + class NodeInfoHelper(object): - BROKER_NODE_ALL = '*' residing_node_name = None @staticmethod @@ -22,25 +27,23 @@ class NodeInfoHelper(object): @staticmethod def expand_node_name(node_name_pattern): - if node_name_pattern == '.': + if node_name_pattern == constants.WILDCARD_CURRENT_NODE: return NodeInfoHelper.residing_node_name - elif node_name_pattern == NodeInfoHelper.BROKER_NODE_ALL: - return NodeInfoHelper.BROKER_NODE_ALL else: return node_name_pattern @staticmethod def default_node_name(node_name_pattern): - if node_name_pattern == '.' or node_name_pattern == '*': + if node_name_pattern == constants.WILDCARD_CURRENT_NODE: return NodeInfoHelper.residing_node_name else: return node_name_pattern @staticmethod def match_node_name(node_name_pattern, target_node_name): - if node_name_pattern == '*': + if node_name_pattern == constants.WILDCARD_ALL_NODES: return True - elif node_name_pattern == '.': + elif node_name_pattern == constants.WILDCARD_CURRENT_NODE: return NodeInfoHelper.residing_node_name == target_node_name else: return node_name_pattern == target_node_name @@ -58,14 +61,16 @@ class NodeInfoHelper(object): try: nodeinfo_repo = NodeRepo(autocommit=True) filter = {} - if node_name_pattern == '*': + if node_name_pattern == constants.WILDCARD_ALL_NODES: pass - elif not node_name_pattern or node_name_pattern == '.': - filter = { 'NodeName': NodeInfoHelper.residing_node_name } + elif not node_name_pattern or \ + node_name_pattern == constants.WILDCARD_CURRENT_NODE: + filter = {'NodeName': NodeInfoHelper.residing_node_name} else: - filter = { 'NodeName': node_name_pattern } + filter = {'NodeName': node_name_pattern} - nodeinfos = [x.NodeName for x in nodeinfo_repo.get(Status=1, **filter)] + nodeinfos = [x.NodeName + for x in nodeinfo_repo.get(Status=1, **filter)] except Exception as ex: LOG.warning("Failed to enumerate nodes:{0}".format(str(ex))) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py index 8b74c5c..747a4ac 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_connection_manager.py @@ -8,21 +8,20 @@ import time import oslo_messaging import logging from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper -from notificationclientsdk.model.dto.broker_state import BrokerState - +from notificationclientsdk.common.helpers import constants +from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.client.locationservice import LocationServiceClient -from notificationclientsdk.client.notificationservice import NotificationServiceClient +from notificationclientsdk.client.notificationservice \ + import NotificationServiceClient LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) class BrokerConnectionManager: - def __init__(self, broker_location_handler, notification_handler, broker_connection_contexts): + def __init__(self, broker_location_handler, notification_handler, + broker_connection_contexts): ''' broker_watchers: { "": { @@ -37,7 +36,8 @@ class BrokerConnectionManager: self.registration_endpoint = RpcEndpointInfo( self.shared_broker_context['REGISTRATION_TRANSPORT_ENDPOINT']) self.broker_watchers = {} - self.location_watcher = LocationServiceClient(self.registration_endpoint.TransportEndpoint) + self.location_watcher = LocationServiceClient( + self.registration_endpoint.TransportEndpoint) self.__broker_location_handler = broker_location_handler self.__notification_handler = notification_handler @@ -71,42 +71,50 @@ class BrokerConnectionManager: self.location_watcher.add_location_listener( broker_name, location_handler=self.__broker_location_handler) - LOG.debug("Start watching location announcement of notificationservice@{0}" - .format(broker_name)) + LOG.debug("Start watching location announcement of " + "notificationservice@{0}".format(broker_name)) # try to update location by query try: - location_info = self.location_watcher.query_location(broker_name, timeout=5, retry=2) - LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) + location_info = self.location_watcher.query_location( + broker_name, timeout=5, retry=2) + LOG.debug("Pulled location info@{0}:{1}".format( + broker_name, location_info)) if location_info: podip = location_info.get("PodIP", None) - resourcetypes = location_info.get("ResourceTypes", None) + resourcetypes = location_info.get("ResourceTypes", + None) brokerstate.update_broker_ip(podip) brokerstate.update_resources(resourcetypes) else: return False except Exception as ex: - LOG.warning("Failed to update location of node:{0} due to: {1}".format( - broker_name, str(ex))) + LOG.warning("Failed to update location of node:{0} " + "due to: {1}".format(broker_name, str(ex))) raise ex # 2, create broker connection broker_watcher = self.broker_watchers.get(broker_name, {}) broker_client = broker_watcher.get("broker_client", None) if not broker_client: - LOG.debug("Start watching notifications from notificationservice@{0}".format(broker_name)) - broker_client = self.__create_client(broker_name, brokerstate.BrokerIP) + LOG.debug("Start watching notifications from " + "notificationservice@{0}".format(broker_name)) + broker_client = self.__create_client(broker_name, + brokerstate.BrokerIP) broker_watcher["broker_client"] = broker_client self.broker_watchers[broker_name] = broker_watcher # 3, update watching resources - result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + result = self.__update_watching_resources(broker_watcher, + broker_client, + brokerstate) return result except Exception as ex: LOG.warning("failed to start watching:{0},{1}".format( brokerstate, str(ex))) return False - def __stop_watching_broker_resource(self, broker_client, broker_name, resource_type): + def __stop_watching_broker_resource(self, broker_client, broker_name, + resource_type): try: if broker_client.is_listening_on_resource(resource_type): broker_client.remove_resource_status_listener(resource_type) @@ -116,12 +124,14 @@ class BrokerConnectionManager: broker_name, resource_type, str(ex))) return False - def __start_watching_broker_resource(self, broker_client, broker_name, resource_type): + def __start_watching_broker_resource(self, broker_client, broker_name, + resource_type): try: if not broker_client.is_listening_on_resource(resource_type): broker_client.add_resource_status_listener( resource_type, status_handler=self.__notification_handler) - LOG.debug("Start watching {0}@{1}".format(resource_type, broker_name)) + LOG.debug("Start watching {0}@{1}".format(resource_type, + broker_name)) return True except Exception as ex: @@ -135,7 +145,7 @@ class BrokerConnectionManager: if self.location_watcher.is_listening_on_location(broker_name): self.location_watcher.remove_location_listener(broker_name) LOG.debug("Stop watching location announcement for broker@{0}" - .format(broker_name)) + "".format(broker_name)) # 2, remove broker client broker_watcher = self.broker_watchers.get(broker_name, {}) @@ -145,7 +155,8 @@ class BrokerConnectionManager: del broker_client broker_client = None self.broker_watchers.pop(broker_name, None) - LOG.debug("Stop watching notificationservice@{0}".format(broker_name)) + LOG.debug("Stop watching notificationservice@{0}".format( + broker_name)) return True except Exception as ex: @@ -156,7 +167,8 @@ class BrokerConnectionManager: def restart_watching_broker(self, brokerstate): try: broker_name = brokerstate.BrokerName - LOG.debug("Try to restart watching notificationservice@{0}".format(broker_name)) + LOG.debug("Try to restart watching notificationservice@{0}".format( + broker_name)) broker_watcher = self.broker_watchers.get(broker_name, {}) broker_client = broker_watcher.get("broker_client", None) if broker_client: @@ -172,10 +184,13 @@ class BrokerConnectionManager: def update_watching_resources(self, brokerstate): try: - broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, {}) + broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, + {}) broker_client = broker_watcher.get("broker_client", None) if broker_client: - result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate) + result = self.__update_watching_resources(broker_watcher, + broker_client, + brokerstate) return result return False except Exception as ex: @@ -183,26 +198,32 @@ class BrokerConnectionManager: brokerstate, str(ex))) return False - def __update_watching_resources(self, broker_watcher, broker_client, brokerstate): + def __update_watching_resources(self, broker_watcher, broker_client, + brokerstate): try: result = True # 1, filter out those unsubscribed resources - subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + subscribed_resource_list = broker_watcher.get( + "subscribed_resource_list", []) if subscribed_resource_list != brokerstate.ResourceTypesSubscribed: # stop watching those uninterested for resource_type in subscribed_resource_list: - if resource_type not in brokerstate.ResourceTypesSubscribed: + if resource_type not in \ + brokerstate.ResourceTypesSubscribed: result = self.__stop_watching_broker_resource( - broker_client, brokerstate.BrokerName, resource_type) + broker_client, brokerstate.BrokerName, + resource_type) # 2, update the list subscribed_resource_list = brokerstate.ResourceTypesSubscribed - broker_watcher["subscribed_resource_list"] = subscribed_resource_list + broker_watcher["subscribed_resource_list"] = \ + subscribed_resource_list # 3, start watching the subscribed resources for resource_type in subscribed_resource_list: result = self.__start_watching_broker_resource( - broker_client, brokerstate.BrokerName, resource_type) and result + broker_client, brokerstate.BrokerName, resource_type) and \ + result return result except Exception as ex: LOG.warning("failed to update resources:{0},{1}".format( @@ -221,8 +242,9 @@ class BrokerConnectionManager: resource_type) if broker_client else False def __create_client(self, broker_name, broker_pod_ip): - if broker_name == NodeInfoHelper.BROKER_NODE_ALL: - # special case: if monitor all node, then use the same broker as locationservice + if broker_name == constants.WILDCARD_ALL_NODES: + # special case: if monitor all node, then use the same broker as + # locationservice return self.location_watcher broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( @@ -230,31 +252,34 @@ class BrokerConnectionManager: self.shared_broker_context['NOTIFICATION_BROKER_PASS'], broker_host, self.shared_broker_context['NOTIFICATION_BROKER_PORT']) - return NotificationServiceClient(broker_name, broker_transport_endpoint, broker_pod_ip) + return NotificationServiceClient(broker_name, + broker_transport_endpoint, + broker_pod_ip) def __start_watch_all_nodes(self, retry_interval=5): try: - LOG.debug( - "Start watching location announcement of notificationservice@{0}" - .format(NodeInfoHelper.BROKER_NODE_ALL)) + LOG.debug("Start watching location announcement of " + "notificationservice@{0}".format( + constants.WILDCARD_ALL_NODES)) while not self.location_watcher.is_listening_on_location( - NodeInfoHelper.BROKER_NODE_ALL): + constants.WILDCARD_ALL_NODES): # start watching on the location announcement self.location_watcher.add_location_listener( - NodeInfoHelper.BROKER_NODE_ALL, + constants.WILDCARD_ALL_NODES, location_handler=self.__broker_location_handler) if not self.location_watcher.is_listening_on_location( - NodeInfoHelper.BROKER_NODE_ALL): + constants.WILDCARD_ALL_NODES): # retry later and forever LOG.debug( "Retry indefinitely to start listening to {0}..." - .format(NodeInfoHelper.BROKER_NODE_ALL)) + .format(constants.WILDCARD_ALL_NODES)) time.sleep(retry_interval) LOG.debug( "Trigger the location announcement of notificationservice@{0}" - .format(NodeInfoHelper.BROKER_NODE_ALL)) - self.location_watcher.trigger_location_annoucement(timeout=20, retry=10) + .format(constants.WILDCARD_ALL_NODES)) + self.location_watcher.trigger_location_annoucement(timeout=20, + retry=10) except Exception as ex: LOG.warning("exception: {0}".format(str(ex))) pass @@ -265,11 +290,13 @@ class BrokerConnectionManager: def __stop_watch_all_nodes(self): pass - def __syncup_data_by_resourcetype(self, broker_client, broker_name, resource_type): + def __syncup_data_by_resourcetype(self, broker_client, broker_name, + resource_type): # check to sync up resource status on a node - LOG.debug("try to sync up data for {0}@{1}".format(resource_type, broker_name)) + LOG.debug("try to sync up data for {0}@{1}".format( + resource_type, broker_name)) try: - if broker_name == NodeInfoHelper.BROKER_NODE_ALL: + if broker_name == constants.WILDCARD_ALL_NODES: self.location_watcher.trigger_publishing_status( resource_type, timeout=5, retry=10) return True @@ -277,11 +304,13 @@ class BrokerConnectionManager: # 1, query resource status broker_client = self.broker_watchers.get(broker_name, None) if not broker_client: - raise Exception("watcher is not ready for broker: {0}".format(broker_name)) + raise Exception("watcher is not ready for broker: {0}".format( + broker_name)) resource_status = broker_client.query_resource_status( resource_type, timeout=5, retry=10) - # 2, deliver resource by comparing LastDelivery time with EventTimestamp + # 2, deliver resource by comparing LastDelivery time with + # EventTimestamp # 3, update the LastDelivery with EventTimestamp self.__notification_handler.handle(resource_status) except oslo_messaging.exceptions.MessagingTimeout as ex: @@ -302,7 +331,8 @@ class BrokerConnectionManager: try: broker_watcher = self.broker_watchers.get(broker_name, {}) broker_client = broker_watcher.get("broker_client", None) - subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[]) + subscribed_resource_list = broker_watcher.get( + "subscribed_resource_list", []) for resource_type in subscribed_resource_list: if not brokerstate.is_data_syncup(resource_type): continue diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 4fd5a14..322787c 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -129,34 +129,119 @@ class PtpService(object): def add_subscription(self, subscription_dto): resource_address = None if hasattr(subscription_dto, 'ResourceAddress'): + version = 2 _, nodename, _, _, _ = subscription_helper.parse_resource_address( subscription_dto.ResourceAddress) - broker_name = nodename + LOG.debug("nodename in ResourceAddress is '%s', residing is %s" % + (nodename, self.daemon_control.get_residing_nodename())) + resource_address = subscription_dto.ResourceAddress + LOG.debug('Looking for existing subscription for EndpointUri %s ' + 'ResourceAddress %s' % (subscription_dto.EndpointUri, + resource_address)) + entry = self.subscription_repo.get_one( + EndpointUri=subscription_dto.EndpointUri, + ResourceAddress=resource_address) + + if entry is None: + # Did not find matched duplicated, but needs to look for other + # cases... + if nodename != constants.WILDCARD_ALL_NODES: + # There may be a subscription for all nodes already in + # place + resource_address_star = \ + subscription_helper.set_nodename_in_resource_address( + resource_address, constants.WILDCARD_ALL_NODES) + LOG.debug('Additional lookup for existing subscription ' + 'for EndpointUri %s ResourceAddress %s' + % (subscription_dto.EndpointUri, + resource_address_star)) + if self.subscription_repo.get_one( + EndpointUri=subscription_dto.EndpointUri, + ResourceAddress=resource_address_star) is not None: + LOG.debug('Found existing %s entry in subscription ' + 'repo' % constants.WILDCARD_ALL_NODES) + raise client_exception.ServiceError(409) + + if nodename == constants.WILDCARD_CURRENT_NODE: + # There may be a subscription for the residing (current) + # node already in place + resource_address_synonym = \ + subscription_helper.set_nodename_in_resource_address( + resource_address, + self.daemon_control.get_residing_nodename()) + LOG.debug('In addition, looking for existing subscription ' + 'for EndpointUri %s ResourceAddress %s' % ( + subscription_dto.EndpointUri, + resource_address_synonym)) + entry = self.subscription_repo.get_one( + EndpointUri=subscription_dto.EndpointUri, + ResourceAddress=resource_address_synonym) + + if nodename == self.daemon_control.get_residing_nodename(): + # There may be a subscription for '.' (current node) + # already in place + resource_address_synonym = \ + subscription_helper.set_nodename_in_resource_address( + resource_address, constants.WILDCARD_CURRENT_NODE) + LOG.debug('In addition, looking for existing subscription ' + 'for EndpointUri %s ResourceAddress %s' % ( + subscription_dto.EndpointUri, + resource_address_synonym)) + entry = self.subscription_repo.get_one( + EndpointUri=subscription_dto.EndpointUri, + ResourceAddress=resource_address_synonym) + + if entry is not None: + LOG.debug('Found existing v2 entry in subscription repo') + raise client_exception.ServiceError(409) + + if nodename == constants.WILDCARD_ALL_NODES: + broker_names = self.daemon_control.list_of_service_nodenames() + else: + broker_names = [nodename] + elif hasattr(subscription_dto, 'ResourceType'): - broker_name = subscription_dto.ResourceQualifier.NodeName - default_node_name = NodeInfoHelper.default_node_name(broker_name) + version = 1 - broker_pod_ip, supported_resource_types = self.__get_node_info( - default_node_name) + resource_qualifier_dto = \ + subscription_dto.ResourceQualifier.to_dict() + LOG.debug('Looking for existing subscription for EndpointUri %s ' + 'ResourceQualifier %s' % (subscription_dto.EndpointUri, + resource_qualifier_dto)) + entries = self.subscription_repo.get( + EndpointUri=subscription_dto.EndpointUri) + for entry in entries: + resource_qualifier_json = entry.ResourceQualifierJson or '{}' + resource_qualifier_repo = json.loads(resource_qualifier_json) + if resource_qualifier_dto == resource_qualifier_repo: + LOG.debug('Found existing v1 entry in subscription repo') + raise client_exception.ServiceError(409) - if not broker_pod_ip: - LOG.warning("Node {0} is not available yet".format( - default_node_name)) - raise client_exception.NodeNotAvailable(broker_name) + broker_names = [subscription_dto.ResourceQualifier.NodeName] - if ResourceType.TypePTP not in supported_resource_types: - LOG.warning("Resource {0}@{1} is not available yet".format( - ResourceType.TypePTP, default_node_name)) - raise client_exception.ResourceNotAvailable(broker_name, - ResourceType.TypePTP) + nodes = {} # node-ptpstatus pairs + for broker in broker_names: + default_node_name = NodeInfoHelper.default_node_name(broker) + broker_pod_ip, supported_resource_types = self.__get_node_info( + default_node_name) - # get initial resource status - if default_node_name: - ptpstatus = None + if not broker_pod_ip: + LOG.warning("Node {0} is not available yet".format( + default_node_name)) + raise client_exception.NodeNotAvailable(broker) + + if ResourceType.TypePTP not in supported_resource_types: + LOG.warning("Resource {0}@{1} is not available yet".format( + ResourceType.TypePTP, default_node_name)) + raise client_exception.ResourceNotAvailable( + broker, ResourceType.TypePTP) + + # get initial resource status ptpstatus = self._query(default_node_name, broker_pod_ip, resource_address, optional=None) - LOG.info("initial ptpstatus:{0}".format(ptpstatus)) + LOG.info("Initial ptpstatus for {0}:{1}".format(default_node_name, + ptpstatus)) # construct subscription entry if constants.PTP_V1_KEY in ptpstatus: @@ -171,74 +256,38 @@ class PtpService(object): ptpstatus[item]['time']).strftime( '%Y-%m-%dT%H:%M:%S%fZ') - # avoid duplicated subscription - entry = None - if hasattr(subscription_dto, 'ResourceType'): - version = 1 - resource_qualifier_dto = \ - subscription_dto.ResourceQualifier.to_dict() - LOG.debug('Looking for existing subscription for ' - 'EndpointUri %s ResourceQualifier %s' - % (subscription_dto.EndpointUri, - resource_qualifier_dto)) - entries = self.subscription_repo.get( - EndpointUri=subscription_dto.EndpointUri - ) - for e in entries: - resource_qualifier_json = e.ResourceQualifierJson or '{}' - resource_qualifier_repo = json.loads( - resource_qualifier_json) - if resource_qualifier_dto == resource_qualifier_repo: - entry = e - break - else: - version = 2 + nodes[default_node_name] = ptpstatus - # Replace eventual '.' in ResourceAddress by the actual - # nodename - subscription_dto.ResourceAddress = \ - subscription_helper.set_nodename_in_resource_address( - subscription_dto.ResourceAddress, default_node_name) + subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) + subscription_orm.InitialDeliveryTimestamp = timestamp + entry = self.subscription_repo.add(subscription_orm) - LOG.debug('Looking for existing subscription for ' - 'EndpointUri %s ResourceAddress %s' - % (subscription_dto.EndpointUri, - subscription_dto.ResourceAddress)) - entry = self.subscription_repo.get_one( - EndpointUri=subscription_dto.EndpointUri, - ResourceAddress=subscription_dto.ResourceAddress - ) - if entry: - LOG.debug('Found existing entry in subscription repo') - raise client_exception.ServiceError(409) - - subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) - subscription_orm.InitialDeliveryTimestamp = timestamp - entry = self.subscription_repo.add(subscription_orm) - - # Delivery the initial notification of ptp status - if version == 1: - subscription_dto2 = SubscriptionInfoV1(entry) - else: - subscription_dto2 = SubscriptionInfoV2(entry) + # Delivery the initial notification of ptp status + if version == 1: + subscription_dto2 = SubscriptionInfoV1(entry) + else: + subscription_dto2 = SubscriptionInfoV2(entry) + for node in nodes.items(): try: - subscription_helper.notify(subscription_dto2, ptpstatus) - LOG.info("initial ptpstatus is delivered successfully") + subscription_helper.notify(subscription_dto2, node[1]) + LOG.info("Initial ptpstatus of {0} is delivered successfully" + "".format(node[0])) except Exception as ex: - LOG.warning("initial ptpstatus is not delivered:{0}".format( - str(ex))) + LOG.warning("Initial ptpstatus of {0} is not delivered:{1}" + "".format(node[0], str(ex))) raise client_exception.InvalidEndpoint( subscription_dto.EndpointUri) - try: - # commit the subscription entry - self.subscription_repo.commit() - self.daemon_control.refresh() - except Exception as ex: - LOG.warning("subscription is not added successfully:" - "{0}".format(str(ex))) - raise ex + try: + # commit the subscription entry + self.subscription_repo.commit() + self.daemon_control.refresh() + except Exception as ex: + LOG.warning("subscription is not added successfully:" + "{0}".format(str(ex))) + raise ex + return subscription_dto2 def remove_subscription(self, subscriptionid): diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py index 8f6153e..0c8be55 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -34,7 +34,7 @@ class ResourceAddressController(object): _, nodename, resource, optional, self.resource_address = \ subscription_helper.parse_resource_address( self.resource_address) - if nodename == '.': + if nodename == constants.WILDCARD_CURRENT_NODE: nodename = notification_control.get_residing_nodename() LOG.debug('Nodename to query: %s' % nodename) if not notification_control.in_service_nodenames(nodename):