Cole Walker 4178a86975 Integration work for ORAN notification
Several improvements and fixes to enable the end-to-end functionality of
all of the components in support of the O-RAN Spec Compliant Timing API
Notification work.

1. Add time stamps to logging for notificationservice and
notificationclient
2. Add support for the "optional" hierarchy in the resource address
which allows the client to query the status of a specific ptp instances.
ie. get the status of instance ptp1 rather than all ptp instances
3. Add a parent key to the returned notification data so that multiple
statuses can be returned to the client with a single notification'
4. Reworked the notificationservice daemonset to start its process
directly rather than using an intermediary script. This allows the
container logs to show properly via kubectl logs and will also allow the
container to crash properly if the program errors out.
5. Reworked the helm values for ptp4l and ts2phc instances to allow
users to supply overrides with multiple instances

Test plan:
PASS: PTP notification v1 compatibility
PASS: GET all v2 resources
PASS: SUBSCRIBE/LIST/DELETE v2 resources
PASS: Build and deploy containers/fluxcd app

Story: 2010056
Task: 46226

Change-Id: Id471fdc0815afdcc5639e81c6457616e268e6cd7
Signed-off-by: Cole Walker <cole.walker@windriver.com>
2022-09-14 11:02:43 -04:00

312 lines
13 KiB
Python

#
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
import multiprocessing as mp
import threading
import sys
if sys.version > '3':
import queue as Queue
else:
import Queue
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.location import LocationInfo
from notificationclientsdk.repository.dbcontext import DbContext
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.client.locationservice import LocationHandlerDefault, LocationHandlerBase
from notificationclientsdk.services.broker_state_manager import BrokerStateManager
from notificationclientsdk.services.broker_connection_manager import BrokerConnectionManager
from notificationclientsdk.services.notification_handler import NotificationHandler
LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
class NotificationWorker:
class LocationInfoHandler(LocationHandlerBase):
'''Glue code to forward location info to daemon method'''
def __init__(self, locationinfo_dispatcher):
self.locationinfo_dispatcher = locationinfo_dispatcher
super(NotificationWorker.LocationInfoHandler, self).__init__()
def handle(self, location_info):
LOG.debug("Received location info:{0}".format(location_info))
return self.locationinfo_dispatcher.produce_location_event(location_info)
def __init__(
self, event, subscription_event, daemon_context):
self.__alive = True
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
NodeInfoHelper.set_residing_node(self.residing_node_name)
self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON'])
DbContext.init_dbcontext(self.sqlalchemy_conf)
self.event = event
self.subscription_event = subscription_event
self.__locationinfo_handler = NotificationWorker.LocationInfoHandler(self)
self.__notification_handler = NotificationHandler()
self.broker_connection_manager = BrokerConnectionManager(
self.__locationinfo_handler,
self.__notification_handler,
self.daemon_context)
self.broker_state_manager = BrokerStateManager()
self.__init_location_channel()
# event to signal brokers state change
self.__brokers_watcher_event = mp.Event()
self.__brokers_data_syncup_event = mp.Event()
def __init_location_channel(self):
self.location_event = mp.Event()
self.location_lock = threading.Lock()
# only cache the latest loation info
self.location_channel = {}
self.location_keys_q = Queue.Queue()
def signal_events(self):
self.event.set()
def produce_location_event(self, location_info):
node_name = location_info.get('NodeName', None)
podip = location_info.get("PodIP", None)
if not node_name or not podip:
LOG.warning("Missing PodIP inside location info:{0}".format(location_info))
return False
timestamp = location_info.get('Timestamp', 0)
# mutex for threads which produce location events
self.location_lock.acquire()
try:
current = self.location_channel.get(node_name, {})
if current.get('Timestamp', 0) < timestamp:
# update with the location_info
self.location_channel[node_name] = location_info
self.location_keys_q.put(node_name)
# notify the consumer to process the update
self.location_event.set()
self.signal_events()
return True
except Exception as ex:
LOG.warning("failed to produce location event:{0}".format(str(ex)))
return False
finally:
# release lock
self.location_lock.release()
def run(self):
self.broker_connection_manager.start()
while self.__alive:
self.event.wait()
self.event.clear()
LOG.debug("daemon control event is asserted")
if self.location_event.is_set():
self.location_event.clear()
# update location information
self.consume_location_event()
if self.subscription_event.is_set():
self.subscription_event.clear()
# refresh brokers state from subscriptions
self.handle_subscriptions_event()
if self.__brokers_watcher_event.is_set():
self.__brokers_watcher_event.clear()
# sync up brokers connection with their state
self.handle_brokers_watcher_event()
if self.__brokers_data_syncup_event.is_set():
self.__brokers_data_syncup_event.clear()
# sync up broker's data
self.handle_brokers_data_syncup_event()
continue
self.broker_connection_manager.stop()
return
def consume_location_event(self):
nodeinfo_repo = None
try:
LOG.debug("Start to consume location event")
_nodeinfo_added = 0
_nodeinfo_updated = 0
nodeinfo_repo = NodeRepo(autocommit=True)
while not self.location_keys_q.empty():
node_name = self.location_keys_q.get(False)
location_info = self.location_channel.get(node_name, None)
if not location_info:
LOG.warning("ignore location info@{0} without content".format(node_name))
continue
LOG.debug("consume location info @{0}:{1}".format(node_name, location_info))
is_nodeinfo_added, is_nodeinfo_updated = self.__persist_locationinfo(
location_info, nodeinfo_repo)
_nodeinfo_added = _nodeinfo_added + (1 if is_nodeinfo_added else 0)
_nodeinfo_updated = _nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
continue
LOG.debug("Finished consuming location event")
if _nodeinfo_added > 0:
LOG.debug("signal event to refresh brokers state from subscription")
# node info changes trigger rebuilding broker states from subscription
# due to some subscriptions might subscribe resources of all nodes
self.subscription_event.set()
if _nodeinfo_added > 0 or _nodeinfo_updated > 0:
LOG.debug("try to refresh brokers state due to changes of node info")
nodeinfos = nodeinfo_repo.get()
broker_state_changed = self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
if broker_state_changed:
# signal the potential changes on node resources
LOG.debug("signal event to re-sync up brokers state")
self.__brokers_watcher_event.set()
self.signal_events()
except Exception as ex:
LOG.warning("failed to consume location event:{0}".format(str(ex)))
finally:
if nodeinfo_repo:
del nodeinfo_repo
def handle_subscriptions_event(self):
broker_state_changed = self.__update_broker_with_subscription()
if broker_state_changed:
self.__brokers_watcher_event.set()
self.signal_events()
def __persist_locationinfo(self, location_info, nodeinfo_repo):
is_nodeinfo_added = False
is_nodeinfo_updated = False
try:
location_info2 = LocationInfo(**location_info)
entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1)
if not entry:
entry = NodeInfoOrm(**location_info2.to_orm())
nodeinfo_repo.add(entry)
is_nodeinfo_added = True
LOG.debug("Add NodeInfo: {0}".format(entry.NodeName))
elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']:
# location info with newer timestamp indicate broker need to be re-sync up
is_nodeinfo_updated = True
nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm())
LOG.debug("Update NodeInfo: {0}".format(entry.NodeName))
else:
# do nothing
LOG.debug("Ignore the location for broker: {0}".format(entry.NodeName))
except Exception as ex:
LOG.warning("failed to update broker state with location info:{0}, {1}".format(
location_info, str(ex)))
finally:
return is_nodeinfo_added, is_nodeinfo_updated
def __update_broker_with_subscription(self):
'''update broker state with subscriptions'''
broker_state_changed = False
subscription_repo = None
nodeinfo_repo = None
try:
subscription_repo = SubscriptionRepo(autocommit=True)
nodeinfo_repo = NodeRepo(autocommit=True)
subs = subscription_repo.get()
LOG.debug("found {0} subscriptions".format(subs.count()))
broker_state_changed = self.broker_state_manager.refresh_by_subscriptions(subs)
if broker_state_changed:
nodeinfo_repo = NodeRepo(autocommit=True)
nodeinfos = nodeinfo_repo.get()
self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
for s in subs:
if s.ResourceType:
subinfo = SubscriptionInfoV1(s)
# assume resource type being PTP and not wildcard
resource_type = s.ResourceType
if resource_type == ResourceType.TypePTP:
broker_name = subinfo.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
continue
elif s.ResourceAddress:
# Get nodename from resource address
LOG.info("Parse resource address {}".format(s.ResourceAddress))
_, nodename, _, _, _ = subscription_helper.parse_resource_address(
s.ResourceAddress)
broker_name = nodename
else:
LOG.debug("Subscription {} does not have ResourceType or "
"ResourceAddress".format(s.SubscriptionId))
continue
if s.Status == 1:
current_node_name = NodeInfoHelper.expand_node_name(broker_name)
# update the initial delivery timestamp as well
self.__notification_handler.update_delivery_timestamp(
NodeInfoHelper.default_node_name(broker_name),
s.SubscriptionId, s.InitialDeliveryTimestamp)
# delete all entry with Status == 0
subscription_repo.delete(Status=0)
finally:
del subscription_repo
del nodeinfo_repo
return broker_state_changed
def handle_brokers_watcher_event(self):
result = False
try:
LOG.debug("try to sync up watcher for {0} brokers".format(
self.broker_state_manager.count_brokers()))
result, _, _ = self.broker_state_manager.syncup_broker_watchers(
self.broker_connection_manager)
self.__brokers_data_syncup_event.set()
except Exception as ex:
result = False
LOG.warning("fail to sync up watcher for brokers: {0}".format(str(ex)))
finally:
if not result:
# retry indefinitely
self.__brokers_watcher_event.set()
self.signal_events()
def handle_brokers_data_syncup_event(self):
result = False
try:
LOG.debug("try to sync up data for {0} brokers".format(
self.broker_state_manager.count_brokers()))
result, _, _ = self.broker_state_manager.syncup_broker_data(
self.broker_connection_manager)
except Exception as ex:
result = False
LOG.warning("fail to sync up data for brokers: {0}".format(str(ex)))
finally:
if not result:
self.__brokers_data_syncup_event.set()
self.signal_events()