Merge "Fix O-RAN (API v2) pull of current state"
This commit is contained in:
commit
db31ab00ba
@ -19,20 +19,28 @@ LOG = logging.getLogger(__name__)
|
||||
log_helper.config_logger(LOG)
|
||||
|
||||
|
||||
def ProcessWorkerDefault(event, subscription_event, daemon_context):
|
||||
def ProcessWorkerDefault(event,
|
||||
subscription_event,
|
||||
daemon_context,
|
||||
service_nodenames):
|
||||
'''Entry point of Default Process Worker'''
|
||||
worker = NotificationWorker(event, subscription_event, daemon_context)
|
||||
worker = NotificationWorker(event,
|
||||
subscription_event,
|
||||
daemon_context,
|
||||
service_nodenames)
|
||||
worker.run()
|
||||
|
||||
|
||||
class DaemonControl(object):
|
||||
def __init__(self, daemon_context, process_worker=None):
|
||||
self.daemon_context = daemon_context
|
||||
self.residing_node_name = daemon_context['THIS_NODE_NAME']
|
||||
self.event = mp.Event()
|
||||
self.subscription_event = mp.Event()
|
||||
self.manager = mp.Manager()
|
||||
self.daemon_context = self.manager.dict(daemon_context)
|
||||
LOG.debug('Managed (shared) daemon_context id %d contents %s' %
|
||||
(id(self.daemon_context), daemon_context))
|
||||
self.service_nodenames = self.manager.list()
|
||||
LOG.debug('Managed (shared) list of nodes id %d contents %s' %
|
||||
(id(self.service_nodenames), self.service_nodenames))
|
||||
self.registration_endpoint = RpcEndpointInfo(
|
||||
daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'])
|
||||
self.registration_transport = rpc_helper.get_transport(
|
||||
@ -46,7 +54,8 @@ class DaemonControl(object):
|
||||
self.mpinstance = mp.Process(target=process_worker,
|
||||
args=(self.event,
|
||||
self.subscription_event,
|
||||
self.daemon_context))
|
||||
daemon_context,
|
||||
self.service_nodenames))
|
||||
self.mpinstance.start()
|
||||
|
||||
# initial update
|
||||
@ -61,5 +70,11 @@ class DaemonControl(object):
|
||||
self.subscription_event.set()
|
||||
self.event.set()
|
||||
|
||||
def get_service_nodename(self):
|
||||
return self.daemon_context['SERVICE_NODE_NAME']
|
||||
def get_residing_nodename(self):
|
||||
return self.residing_node_name
|
||||
|
||||
def in_service_nodenames(self, nodename):
|
||||
return nodename in self.service_nodenames
|
||||
|
||||
def list_of_service_nodenames(self):
|
||||
return self.service_nodenames[:]
|
||||
|
@ -56,7 +56,8 @@ class NotificationWorker:
|
||||
return self.locationinfo_dispatcher.produce_location_event(
|
||||
location_info)
|
||||
|
||||
def __init__(self, event, subscription_event, daemon_context):
|
||||
def __init__(self, event, subscription_event, daemon_context,
|
||||
service_nodenames):
|
||||
self.__alive = True
|
||||
|
||||
self.daemon_context = daemon_context
|
||||
@ -68,6 +69,8 @@ class NotificationWorker:
|
||||
self.event = event
|
||||
self.subscription_event = subscription_event
|
||||
|
||||
self.service_nodenames = service_nodenames
|
||||
|
||||
self.__locationinfo_handler = \
|
||||
NotificationWorker.LocationInfoHandler(self)
|
||||
self.__notification_handler = NotificationHandler()
|
||||
@ -174,14 +177,14 @@ class NotificationWorker:
|
||||
self.__persist_locationinfo(location_info, nodeinfo_repo)
|
||||
_nodeinfo_added = \
|
||||
_nodeinfo_added + (1 if is_nodeinfo_added else 0)
|
||||
if is_nodeinfo_added and \
|
||||
node_name not in self.service_nodenames:
|
||||
self.service_nodenames.append(node_name)
|
||||
LOG.debug("List of nodes updated: id %d contents %s" %
|
||||
(id(self.service_nodenames),
|
||||
self.service_nodenames))
|
||||
_nodeinfo_updated = \
|
||||
_nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
|
||||
if is_nodeinfo_added or is_nodeinfo_updated:
|
||||
LOG.debug("Setting daemon's SERVICE_NODE_NAME to %s"
|
||||
% node_name)
|
||||
self.daemon_context['SERVICE_NODE_NAME'] = node_name
|
||||
LOG.debug("Daemon context updated: id %d contents %s"
|
||||
% (id(self.daemon_context), self.daemon_context))
|
||||
continue
|
||||
|
||||
LOG.debug("Finished consuming location event")
|
||||
|
@ -30,18 +30,38 @@ class CurrentStateController(rest.RestController):
|
||||
@expose('json')
|
||||
def get(self):
|
||||
try:
|
||||
service_nodenames = \
|
||||
notification_control.list_of_service_nodenames()
|
||||
LOG.debug('List of service nodes: %s' % service_nodenames)
|
||||
if len(service_nodenames) == 0:
|
||||
LOG.warning('No PTP service available yet')
|
||||
abort(404)
|
||||
|
||||
# Starting with residing node, try querying the announced locations
|
||||
# since the notification app may have moved to another node
|
||||
nodename = notification_control.get_residing_nodename()
|
||||
ptpservice = PtpService(notification_control)
|
||||
service_node_name = notification_control.get_service_nodename()
|
||||
LOG.debug('service_node_name is %s' % service_node_name)
|
||||
ptpstatus = ptpservice.query(service_node_name)
|
||||
LOG.debug('Got ptpstatus: %s' % ptpstatus)
|
||||
# response.status = 200
|
||||
return ptpstatus
|
||||
except client_exception.NodeNotAvailable as ex:
|
||||
LOG.warning("Node is not available:{0}".format(str(ex)))
|
||||
|
||||
while len(service_nodenames) > 0:
|
||||
try:
|
||||
LOG.debug('Querying nodename: %s' % nodename)
|
||||
ptpstatus = ptpservice.query(nodename)
|
||||
LOG.debug('Got ptpstatus: %s' % ptpstatus)
|
||||
# response.status = 200
|
||||
return ptpstatus
|
||||
except client_exception.NodeNotAvailable as ex:
|
||||
LOG.warning("{0}".format(str(ex)))
|
||||
service_nodenames.remove(nodename)
|
||||
if len(service_nodenames) > 0:
|
||||
nodename = service_nodenames[0]
|
||||
except Exception:
|
||||
raise # break
|
||||
|
||||
LOG.warning('No PTP service available')
|
||||
abort(404)
|
||||
|
||||
except client_exception.ResourceNotAvailable as ex:
|
||||
LOG.warning("Resource is not available:{0}".format(str(ex)))
|
||||
LOG.warning("{0}".format(str(ex)))
|
||||
abort(404)
|
||||
except oslo_messaging.exceptions.MessagingTimeout as ex:
|
||||
LOG.warning("Resource is not reachable:{0}".format(str(ex)))
|
||||
|
@ -34,16 +34,17 @@ class ResourceAddressController(object):
|
||||
_, nodename, resource, optional, self.resource_address = \
|
||||
subscription_helper.parse_resource_address(
|
||||
self.resource_address)
|
||||
service_node_name = notification_control.get_service_nodename()
|
||||
LOG.debug('service_node_name is %s' % service_node_name)
|
||||
if nodename != service_node_name and nodename != '.':
|
||||
if nodename == '.':
|
||||
nodename = notification_control.get_residing_nodename()
|
||||
LOG.debug('Nodename to query: %s' % nodename)
|
||||
if not notification_control.in_service_nodenames(nodename):
|
||||
LOG.warning("Node {} is not available".format(nodename))
|
||||
abort(404)
|
||||
if resource not in constants.VALID_SOURCE_URI:
|
||||
LOG.warning("Resource {} is not valid".format(resource))
|
||||
abort(404)
|
||||
ptpservice = PtpService(notification_control)
|
||||
ptpstatus = ptpservice.query(service_node_name,
|
||||
ptpstatus = ptpservice.query(nodename,
|
||||
self.resource_address, optional)
|
||||
LOG.debug('Got ptpstatus: %s' % ptpstatus)
|
||||
# Change time from float to ascii format
|
||||
@ -55,10 +56,10 @@ class ResourceAddressController(object):
|
||||
'%Y-%m-%dT%H:%M:%S%fZ')
|
||||
return ptpstatus
|
||||
except client_exception.NodeNotAvailable as ex:
|
||||
LOG.warning("Node is not available:{0}".format(str(ex)))
|
||||
LOG.warning("{0}".format(str(ex)))
|
||||
abort(404)
|
||||
except client_exception.ResourceNotAvailable as ex:
|
||||
LOG.warning("Resource is not available:{0}".format(str(ex)))
|
||||
LOG.warning("{0}".format(str(ex)))
|
||||
abort(404)
|
||||
except oslo_messaging.exceptions.MessagingTimeout as ex:
|
||||
LOG.warning("Resource is not reachable:{0}".format(str(ex)))
|
||||
|
@ -33,7 +33,6 @@ sqlalchemy_conf_json = json.dumps(sqlalchemy_conf)
|
||||
daemon_context = {
|
||||
'SQLALCHEMY_CONF_JSON': sqlalchemy_conf_json,
|
||||
'THIS_NODE_NAME': THIS_NODE_NAME,
|
||||
'SERVICE_NODE_NAME': THIS_NODE_NAME,
|
||||
'REGISTRATION_TRANSPORT_ENDPOINT': REGISTRATION_TRANSPORT_ENDPOINT,
|
||||
'NOTIFICATION_BROKER_USER': NOTIFICATION_BROKER_USER,
|
||||
'NOTIFICATION_BROKER_PASS': NOTIFICATION_BROKER_PASS,
|
||||
|
Loading…
x
Reference in New Issue
Block a user