Merge "Allow subscription to all nodes with '*'"

This commit is contained in:
Zuul 2022-11-22 21:16:20 +00:00 committed by Gerrit Code Review
commit 305c706a05
5 changed files with 229 additions and 142 deletions

View File

@ -44,3 +44,6 @@ VALID_SOURCE_URI = {
SOURCE_SYNCE_LOCK_STATE_EXTENDED,
SOURCE_SYNCE_LOCK_STATE
}
WILDCARD_CURRENT_NODE = '.'
WILDCARD_ALL_NODES = '*'

View File

@ -4,11 +4,16 @@
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.common.helpers import constants
from notificationclientsdk.common.helpers import log_helper
LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
class NodeInfoHelper(object):
BROKER_NODE_ALL = '*'
residing_node_name = None
@staticmethod
@ -22,25 +27,23 @@ class NodeInfoHelper(object):
@staticmethod
def expand_node_name(node_name_pattern):
if node_name_pattern == '.':
if node_name_pattern == constants.WILDCARD_CURRENT_NODE:
return NodeInfoHelper.residing_node_name
elif node_name_pattern == NodeInfoHelper.BROKER_NODE_ALL:
return NodeInfoHelper.BROKER_NODE_ALL
else:
return node_name_pattern
@staticmethod
def default_node_name(node_name_pattern):
if node_name_pattern == '.' or node_name_pattern == '*':
if node_name_pattern == constants.WILDCARD_CURRENT_NODE:
return NodeInfoHelper.residing_node_name
else:
return node_name_pattern
@staticmethod
def match_node_name(node_name_pattern, target_node_name):
if node_name_pattern == '*':
if node_name_pattern == constants.WILDCARD_ALL_NODES:
return True
elif node_name_pattern == '.':
elif node_name_pattern == constants.WILDCARD_CURRENT_NODE:
return NodeInfoHelper.residing_node_name == target_node_name
else:
return node_name_pattern == target_node_name
@ -58,14 +61,16 @@ class NodeInfoHelper(object):
try:
nodeinfo_repo = NodeRepo(autocommit=True)
filter = {}
if node_name_pattern == '*':
if node_name_pattern == constants.WILDCARD_ALL_NODES:
pass
elif not node_name_pattern or node_name_pattern == '.':
filter = { 'NodeName': NodeInfoHelper.residing_node_name }
elif not node_name_pattern or \
node_name_pattern == constants.WILDCARD_CURRENT_NODE:
filter = {'NodeName': NodeInfoHelper.residing_node_name}
else:
filter = { 'NodeName': node_name_pattern }
filter = {'NodeName': node_name_pattern}
nodeinfos = [x.NodeName for x in nodeinfo_repo.get(Status=1, **filter)]
nodeinfos = [x.NodeName
for x in nodeinfo_repo.get(Status=1, **filter)]
except Exception as ex:
LOG.warning("Failed to enumerate nodes:{0}".format(str(ex)))

View File

@ -8,21 +8,20 @@ import time
import oslo_messaging
import logging
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.broker_state import BrokerState
from notificationclientsdk.common.helpers import constants
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.client.locationservice import LocationServiceClient
from notificationclientsdk.client.notificationservice import NotificationServiceClient
from notificationclientsdk.client.notificationservice \
import NotificationServiceClient
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class BrokerConnectionManager:
def __init__(self, broker_location_handler, notification_handler, broker_connection_contexts):
def __init__(self, broker_location_handler, notification_handler,
broker_connection_contexts):
'''
broker_watchers: {
"<broker name1>": {
@ -37,7 +36,8 @@ class BrokerConnectionManager:
self.registration_endpoint = RpcEndpointInfo(
self.shared_broker_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.broker_watchers = {}
self.location_watcher = LocationServiceClient(self.registration_endpoint.TransportEndpoint)
self.location_watcher = LocationServiceClient(
self.registration_endpoint.TransportEndpoint)
self.__broker_location_handler = broker_location_handler
self.__notification_handler = notification_handler
@ -71,42 +71,50 @@ class BrokerConnectionManager:
self.location_watcher.add_location_listener(
broker_name,
location_handler=self.__broker_location_handler)
LOG.debug("Start watching location announcement of notificationservice@{0}"
.format(broker_name))
LOG.debug("Start watching location announcement of "
"notificationservice@{0}".format(broker_name))
# try to update location by query
try:
location_info = self.location_watcher.query_location(broker_name, timeout=5, retry=2)
LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info))
location_info = self.location_watcher.query_location(
broker_name, timeout=5, retry=2)
LOG.debug("Pulled location info@{0}:{1}".format(
broker_name, location_info))
if location_info:
podip = location_info.get("PodIP", None)
resourcetypes = location_info.get("ResourceTypes", None)
resourcetypes = location_info.get("ResourceTypes",
None)
brokerstate.update_broker_ip(podip)
brokerstate.update_resources(resourcetypes)
else:
return False
except Exception as ex:
LOG.warning("Failed to update location of node:{0} due to: {1}".format(
broker_name, str(ex)))
LOG.warning("Failed to update location of node:{0} "
"due to: {1}".format(broker_name, str(ex)))
raise ex
# 2, create broker connection
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
if not broker_client:
LOG.debug("Start watching notifications from notificationservice@{0}".format(broker_name))
broker_client = self.__create_client(broker_name, brokerstate.BrokerIP)
LOG.debug("Start watching notifications from "
"notificationservice@{0}".format(broker_name))
broker_client = self.__create_client(broker_name,
brokerstate.BrokerIP)
broker_watcher["broker_client"] = broker_client
self.broker_watchers[broker_name] = broker_watcher
# 3, update watching resources
result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate)
result = self.__update_watching_resources(broker_watcher,
broker_client,
brokerstate)
return result
except Exception as ex:
LOG.warning("failed to start watching:{0},{1}".format(
brokerstate, str(ex)))
return False
def __stop_watching_broker_resource(self, broker_client, broker_name, resource_type):
def __stop_watching_broker_resource(self, broker_client, broker_name,
resource_type):
try:
if broker_client.is_listening_on_resource(resource_type):
broker_client.remove_resource_status_listener(resource_type)
@ -116,12 +124,14 @@ class BrokerConnectionManager:
broker_name, resource_type, str(ex)))
return False
def __start_watching_broker_resource(self, broker_client, broker_name, resource_type):
def __start_watching_broker_resource(self, broker_client, broker_name,
resource_type):
try:
if not broker_client.is_listening_on_resource(resource_type):
broker_client.add_resource_status_listener(
resource_type, status_handler=self.__notification_handler)
LOG.debug("Start watching {0}@{1}".format(resource_type, broker_name))
LOG.debug("Start watching {0}@{1}".format(resource_type,
broker_name))
return True
except Exception as ex:
@ -135,7 +145,7 @@ class BrokerConnectionManager:
if self.location_watcher.is_listening_on_location(broker_name):
self.location_watcher.remove_location_listener(broker_name)
LOG.debug("Stop watching location announcement for broker@{0}"
.format(broker_name))
"".format(broker_name))
# 2, remove broker client
broker_watcher = self.broker_watchers.get(broker_name, {})
@ -145,7 +155,8 @@ class BrokerConnectionManager:
del broker_client
broker_client = None
self.broker_watchers.pop(broker_name, None)
LOG.debug("Stop watching notificationservice@{0}".format(broker_name))
LOG.debug("Stop watching notificationservice@{0}".format(
broker_name))
return True
except Exception as ex:
@ -156,7 +167,8 @@ class BrokerConnectionManager:
def restart_watching_broker(self, brokerstate):
try:
broker_name = brokerstate.BrokerName
LOG.debug("Try to restart watching notificationservice@{0}".format(broker_name))
LOG.debug("Try to restart watching notificationservice@{0}".format(
broker_name))
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
if broker_client:
@ -172,10 +184,13 @@ class BrokerConnectionManager:
def update_watching_resources(self, brokerstate):
try:
broker_watcher = self.broker_watchers.get(brokerstate.BrokerName, {})
broker_watcher = self.broker_watchers.get(brokerstate.BrokerName,
{})
broker_client = broker_watcher.get("broker_client", None)
if broker_client:
result = self.__update_watching_resources(broker_watcher, broker_client, brokerstate)
result = self.__update_watching_resources(broker_watcher,
broker_client,
brokerstate)
return result
return False
except Exception as ex:
@ -183,26 +198,32 @@ class BrokerConnectionManager:
brokerstate, str(ex)))
return False
def __update_watching_resources(self, broker_watcher, broker_client, brokerstate):
def __update_watching_resources(self, broker_watcher, broker_client,
brokerstate):
try:
result = True
# 1, filter out those unsubscribed resources
subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[])
subscribed_resource_list = broker_watcher.get(
"subscribed_resource_list", [])
if subscribed_resource_list != brokerstate.ResourceTypesSubscribed:
# stop watching those uninterested
for resource_type in subscribed_resource_list:
if resource_type not in brokerstate.ResourceTypesSubscribed:
if resource_type not in \
brokerstate.ResourceTypesSubscribed:
result = self.__stop_watching_broker_resource(
broker_client, brokerstate.BrokerName, resource_type)
broker_client, brokerstate.BrokerName,
resource_type)
# 2, update the list
subscribed_resource_list = brokerstate.ResourceTypesSubscribed
broker_watcher["subscribed_resource_list"] = subscribed_resource_list
broker_watcher["subscribed_resource_list"] = \
subscribed_resource_list
# 3, start watching the subscribed resources
for resource_type in subscribed_resource_list:
result = self.__start_watching_broker_resource(
broker_client, brokerstate.BrokerName, resource_type) and result
broker_client, brokerstate.BrokerName, resource_type) and \
result
return result
except Exception as ex:
LOG.warning("failed to update resources:{0},{1}".format(
@ -221,8 +242,9 @@ class BrokerConnectionManager:
resource_type) if broker_client else False
def __create_client(self, broker_name, broker_pod_ip):
if broker_name == NodeInfoHelper.BROKER_NODE_ALL:
# special case: if monitor all node, then use the same broker as locationservice
if broker_name == constants.WILDCARD_ALL_NODES:
# special case: if monitor all node, then use the same broker as
# locationservice
return self.location_watcher
broker_host = "[{0}]".format(broker_pod_ip)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
@ -230,31 +252,34 @@ class BrokerConnectionManager:
self.shared_broker_context['NOTIFICATION_BROKER_PASS'],
broker_host,
self.shared_broker_context['NOTIFICATION_BROKER_PORT'])
return NotificationServiceClient(broker_name, broker_transport_endpoint, broker_pod_ip)
return NotificationServiceClient(broker_name,
broker_transport_endpoint,
broker_pod_ip)
def __start_watch_all_nodes(self, retry_interval=5):
try:
LOG.debug(
"Start watching location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
LOG.debug("Start watching location announcement of "
"notificationservice@{0}".format(
constants.WILDCARD_ALL_NODES))
while not self.location_watcher.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
constants.WILDCARD_ALL_NODES):
# start watching on the location announcement
self.location_watcher.add_location_listener(
NodeInfoHelper.BROKER_NODE_ALL,
constants.WILDCARD_ALL_NODES,
location_handler=self.__broker_location_handler)
if not self.location_watcher.is_listening_on_location(
NodeInfoHelper.BROKER_NODE_ALL):
constants.WILDCARD_ALL_NODES):
# retry later and forever
LOG.debug(
"Retry indefinitely to start listening to {0}..."
.format(NodeInfoHelper.BROKER_NODE_ALL))
.format(constants.WILDCARD_ALL_NODES))
time.sleep(retry_interval)
LOG.debug(
"Trigger the location announcement of notificationservice@{0}"
.format(NodeInfoHelper.BROKER_NODE_ALL))
self.location_watcher.trigger_location_annoucement(timeout=20, retry=10)
.format(constants.WILDCARD_ALL_NODES))
self.location_watcher.trigger_location_annoucement(timeout=20,
retry=10)
except Exception as ex:
LOG.warning("exception: {0}".format(str(ex)))
pass
@ -265,11 +290,13 @@ class BrokerConnectionManager:
def __stop_watch_all_nodes(self):
pass
def __syncup_data_by_resourcetype(self, broker_client, broker_name, resource_type):
def __syncup_data_by_resourcetype(self, broker_client, broker_name,
resource_type):
# check to sync up resource status on a node
LOG.debug("try to sync up data for {0}@{1}".format(resource_type, broker_name))
LOG.debug("try to sync up data for {0}@{1}".format(
resource_type, broker_name))
try:
if broker_name == NodeInfoHelper.BROKER_NODE_ALL:
if broker_name == constants.WILDCARD_ALL_NODES:
self.location_watcher.trigger_publishing_status(
resource_type, timeout=5, retry=10)
return True
@ -277,11 +304,13 @@ class BrokerConnectionManager:
# 1, query resource status
broker_client = self.broker_watchers.get(broker_name, None)
if not broker_client:
raise Exception("watcher is not ready for broker: {0}".format(broker_name))
raise Exception("watcher is not ready for broker: {0}".format(
broker_name))
resource_status = broker_client.query_resource_status(
resource_type, timeout=5, retry=10)
# 2, deliver resource by comparing LastDelivery time with EventTimestamp
# 2, deliver resource by comparing LastDelivery time with
# EventTimestamp
# 3, update the LastDelivery with EventTimestamp
self.__notification_handler.handle(resource_status)
except oslo_messaging.exceptions.MessagingTimeout as ex:
@ -302,7 +331,8 @@ class BrokerConnectionManager:
try:
broker_watcher = self.broker_watchers.get(broker_name, {})
broker_client = broker_watcher.get("broker_client", None)
subscribed_resource_list = broker_watcher.get("subscribed_resource_list",[])
subscribed_resource_list = broker_watcher.get(
"subscribed_resource_list", [])
for resource_type in subscribed_resource_list:
if not brokerstate.is_data_syncup(resource_type):
continue

View File

@ -129,34 +129,119 @@ class PtpService(object):
def add_subscription(self, subscription_dto):
resource_address = None
if hasattr(subscription_dto, 'ResourceAddress'):
version = 2
_, nodename, _, _, _ = subscription_helper.parse_resource_address(
subscription_dto.ResourceAddress)
broker_name = nodename
LOG.debug("nodename in ResourceAddress is '%s', residing is %s" %
(nodename, self.daemon_control.get_residing_nodename()))
resource_address = subscription_dto.ResourceAddress
LOG.debug('Looking for existing subscription for EndpointUri %s '
'ResourceAddress %s' % (subscription_dto.EndpointUri,
resource_address))
entry = self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=resource_address)
if entry is None:
# Did not find matched duplicated, but needs to look for other
# cases...
if nodename != constants.WILDCARD_ALL_NODES:
# There may be a subscription for all nodes already in
# place
resource_address_star = \
subscription_helper.set_nodename_in_resource_address(
resource_address, constants.WILDCARD_ALL_NODES)
LOG.debug('Additional lookup for existing subscription '
'for EndpointUri %s ResourceAddress %s'
% (subscription_dto.EndpointUri,
resource_address_star))
if self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=resource_address_star) is not None:
LOG.debug('Found existing %s entry in subscription '
'repo' % constants.WILDCARD_ALL_NODES)
raise client_exception.ServiceError(409)
if nodename == constants.WILDCARD_CURRENT_NODE:
# There may be a subscription for the residing (current)
# node already in place
resource_address_synonym = \
subscription_helper.set_nodename_in_resource_address(
resource_address,
self.daemon_control.get_residing_nodename())
LOG.debug('In addition, looking for existing subscription '
'for EndpointUri %s ResourceAddress %s' % (
subscription_dto.EndpointUri,
resource_address_synonym))
entry = self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=resource_address_synonym)
if nodename == self.daemon_control.get_residing_nodename():
# There may be a subscription for '.' (current node)
# already in place
resource_address_synonym = \
subscription_helper.set_nodename_in_resource_address(
resource_address, constants.WILDCARD_CURRENT_NODE)
LOG.debug('In addition, looking for existing subscription '
'for EndpointUri %s ResourceAddress %s' % (
subscription_dto.EndpointUri,
resource_address_synonym))
entry = self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=resource_address_synonym)
if entry is not None:
LOG.debug('Found existing v2 entry in subscription repo')
raise client_exception.ServiceError(409)
if nodename == constants.WILDCARD_ALL_NODES:
broker_names = self.daemon_control.list_of_service_nodenames()
else:
broker_names = [nodename]
elif hasattr(subscription_dto, 'ResourceType'):
broker_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_name)
version = 1
broker_pod_ip, supported_resource_types = self.__get_node_info(
default_node_name)
resource_qualifier_dto = \
subscription_dto.ResourceQualifier.to_dict()
LOG.debug('Looking for existing subscription for EndpointUri %s '
'ResourceQualifier %s' % (subscription_dto.EndpointUri,
resource_qualifier_dto))
entries = self.subscription_repo.get(
EndpointUri=subscription_dto.EndpointUri)
for entry in entries:
resource_qualifier_json = entry.ResourceQualifierJson or '{}'
resource_qualifier_repo = json.loads(resource_qualifier_json)
if resource_qualifier_dto == resource_qualifier_repo:
LOG.debug('Found existing v1 entry in subscription repo')
raise client_exception.ServiceError(409)
if not broker_pod_ip:
LOG.warning("Node {0} is not available yet".format(
default_node_name))
raise client_exception.NodeNotAvailable(broker_name)
broker_names = [subscription_dto.ResourceQualifier.NodeName]
if ResourceType.TypePTP not in supported_resource_types:
LOG.warning("Resource {0}@{1} is not available yet".format(
ResourceType.TypePTP, default_node_name))
raise client_exception.ResourceNotAvailable(broker_name,
ResourceType.TypePTP)
nodes = {} # node-ptpstatus pairs
for broker in broker_names:
default_node_name = NodeInfoHelper.default_node_name(broker)
broker_pod_ip, supported_resource_types = self.__get_node_info(
default_node_name)
# get initial resource status
if default_node_name:
ptpstatus = None
if not broker_pod_ip:
LOG.warning("Node {0} is not available yet".format(
default_node_name))
raise client_exception.NodeNotAvailable(broker)
if ResourceType.TypePTP not in supported_resource_types:
LOG.warning("Resource {0}@{1} is not available yet".format(
ResourceType.TypePTP, default_node_name))
raise client_exception.ResourceNotAvailable(
broker, ResourceType.TypePTP)
# get initial resource status
ptpstatus = self._query(default_node_name, broker_pod_ip,
resource_address, optional=None)
LOG.info("initial ptpstatus:{0}".format(ptpstatus))
LOG.info("Initial ptpstatus for {0}:{1}".format(default_node_name,
ptpstatus))
# construct subscription entry
if constants.PTP_V1_KEY in ptpstatus:
@ -171,74 +256,38 @@ class PtpService(object):
ptpstatus[item]['time']).strftime(
'%Y-%m-%dT%H:%M:%S%fZ')
# avoid duplicated subscription
entry = None
if hasattr(subscription_dto, 'ResourceType'):
version = 1
resource_qualifier_dto = \
subscription_dto.ResourceQualifier.to_dict()
LOG.debug('Looking for existing subscription for '
'EndpointUri %s ResourceQualifier %s'
% (subscription_dto.EndpointUri,
resource_qualifier_dto))
entries = self.subscription_repo.get(
EndpointUri=subscription_dto.EndpointUri
)
for e in entries:
resource_qualifier_json = e.ResourceQualifierJson or '{}'
resource_qualifier_repo = json.loads(
resource_qualifier_json)
if resource_qualifier_dto == resource_qualifier_repo:
entry = e
break
else:
version = 2
nodes[default_node_name] = ptpstatus
# Replace eventual '.' in ResourceAddress by the actual
# nodename
subscription_dto.ResourceAddress = \
subscription_helper.set_nodename_in_resource_address(
subscription_dto.ResourceAddress, default_node_name)
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
subscription_orm.InitialDeliveryTimestamp = timestamp
entry = self.subscription_repo.add(subscription_orm)
LOG.debug('Looking for existing subscription for '
'EndpointUri %s ResourceAddress %s'
% (subscription_dto.EndpointUri,
subscription_dto.ResourceAddress))
entry = self.subscription_repo.get_one(
EndpointUri=subscription_dto.EndpointUri,
ResourceAddress=subscription_dto.ResourceAddress
)
if entry:
LOG.debug('Found existing entry in subscription repo')
raise client_exception.ServiceError(409)
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
subscription_orm.InitialDeliveryTimestamp = timestamp
entry = self.subscription_repo.add(subscription_orm)
# Delivery the initial notification of ptp status
if version == 1:
subscription_dto2 = SubscriptionInfoV1(entry)
else:
subscription_dto2 = SubscriptionInfoV2(entry)
# Delivery the initial notification of ptp status
if version == 1:
subscription_dto2 = SubscriptionInfoV1(entry)
else:
subscription_dto2 = SubscriptionInfoV2(entry)
for node in nodes.items():
try:
subscription_helper.notify(subscription_dto2, ptpstatus)
LOG.info("initial ptpstatus is delivered successfully")
subscription_helper.notify(subscription_dto2, node[1])
LOG.info("Initial ptpstatus of {0} is delivered successfully"
"".format(node[0]))
except Exception as ex:
LOG.warning("initial ptpstatus is not delivered:{0}".format(
str(ex)))
LOG.warning("Initial ptpstatus of {0} is not delivered:{1}"
"".format(node[0], str(ex)))
raise client_exception.InvalidEndpoint(
subscription_dto.EndpointUri)
try:
# commit the subscription entry
self.subscription_repo.commit()
self.daemon_control.refresh()
except Exception as ex:
LOG.warning("subscription is not added successfully:"
"{0}".format(str(ex)))
raise ex
try:
# commit the subscription entry
self.subscription_repo.commit()
self.daemon_control.refresh()
except Exception as ex:
LOG.warning("subscription is not added successfully:"
"{0}".format(str(ex)))
raise ex
return subscription_dto2
def remove_subscription(self, subscriptionid):

View File

@ -34,7 +34,7 @@ class ResourceAddressController(object):
_, nodename, resource, optional, self.resource_address = \
subscription_helper.parse_resource_address(
self.resource_address)
if nodename == '.':
if nodename == constants.WILDCARD_CURRENT_NODE:
nodename = notification_control.get_residing_nodename()
LOG.debug('Nodename to query: %s' % nodename)
if not notification_control.in_service_nodenames(nodename):