Update name of service node to pull PTP status

Since neither in PTP notification API v1 or v2 the pull of PTP state:
https://docs.starlingx.io/api-ref/ptp-notification-armada-app/api_ptp_notifications_definition_v1.html#pull-status-notifications
https://docs.starlingx.io/api-ref/ptp-notification-armada-app/api_ptp_notifications_definition_v2.html#pull-status-notifications
contains the name or address of the node running the PTP tracking
service, in the scenario where that service (server) moves to another
node while the sidecar (client) remains in the original node where the
service was running before, further attempts to pull the PTP state
fail with "404 Not Found".

This change stores in the (now shared among threads) daemon context the
node where service is running, and updates with latest location event
triggered at notification worker.
Instead of taking the residing node of sidecar ("THIS_NODE_NAME"), it
reads the node name from the context to call the GET method of API.

Test Plan:
PASS: Installed new version of sidecar image and changed location of
      service, pulling PTP state with success before and after move.

Closes-Bug: #1991793
Signed-off-by: Douglas Henrique Koerich <douglashenrique.koerich@windriver.com>
Change-Id: Ie3f72c5b84f1d9093d6ee906bbc11f9fd4ceb31b
This commit is contained in:
Douglas Henrique Koerich 2022-10-05 16:06:05 -03:00
parent 619aa8b59f
commit 4c58b9e8b3
5 changed files with 135 additions and 92 deletions

View File

@ -9,49 +9,57 @@ import logging
import multiprocessing as mp
from notificationclientsdk.common.helpers import rpc_helper
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.client.locationservice import LocationServiceClient
from notificationclientsdk.services.notification_worker import NotificationWorker
from notificationclientsdk.services.notification_worker import \
NotificationWorker
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
'''Entry point of Default Process Worker'''
def ProcessWorkerDefault(event, subscription_event, daemon_context):
'''Entry point of Default Process Worker'''
worker = NotificationWorker(event, subscription_event, daemon_context)
worker.run()
return
class DaemonControl(object):
def __init__(self, daemon_context, process_worker = None):
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
def __init__(self, daemon_context, process_worker=None):
self.event = mp.Event()
self.subscription_event = mp.Event()
self.registration_endpoint = RpcEndpointInfo(daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.registration_transport = rpc_helper.get_transport(self.registration_endpoint)
self.locationservice_client = LocationServiceClient(self.registration_endpoint.TransportEndpoint)
self.manager = mp.Manager()
self.daemon_context = self.manager.dict(daemon_context)
LOG.debug('Managed (shared) daemon_context id %d contents %s' %
(id(self.daemon_context), daemon_context))
self.registration_endpoint = RpcEndpointInfo(
daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.registration_transport = rpc_helper.get_transport(
self.registration_endpoint)
self.locationservice_client = LocationServiceClient(
self.registration_endpoint.TransportEndpoint)
if not process_worker:
process_worker = ProcessWorkerDefault
self.mpinstance = mp.Process( target=process_worker, args=(
self.event, self.subscription_event, daemon_context))
self.mpinstance = mp.Process(target=process_worker,
args=(self.event,
self.subscription_event,
self.daemon_context))
self.mpinstance.start()
# initial update
self.refresh()
pass
def __del__(self):
if self.locationservice_client:
self.locationservice_client.cleanup()
self.locationservice_client = None
return
def refresh(self):
self.subscription_event.set()
self.event.set()
def get_service_nodename(self):
return self.daemon_context['SERVICE_NODE_NAME']

View File

@ -17,11 +17,9 @@ else:
import Queue
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.location import LocationInfo
@ -33,11 +31,14 @@ from notificationclientsdk.model.orm.node import NodeInfo as NodeInfoOrm
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.client.locationservice import LocationHandlerDefault, LocationHandlerBase
from notificationclientsdk.client.locationservice import LocationHandlerBase
from notificationclientsdk.services.broker_state_manager import BrokerStateManager
from notificationclientsdk.services.broker_connection_manager import BrokerConnectionManager
from notificationclientsdk.services.notification_handler import NotificationHandler
from notificationclientsdk.services.broker_state_manager import \
BrokerStateManager
from notificationclientsdk.services.broker_connection_manager import \
BrokerConnectionManager
from notificationclientsdk.services.notification_handler import \
NotificationHandler
LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
@ -46,30 +47,29 @@ log_helper.config_logger(LOG)
class NotificationWorker:
class LocationInfoHandler(LocationHandlerBase):
'''Glue code to forward location info to daemon method'''
def __init__(self, locationinfo_dispatcher):
self.locationinfo_dispatcher = locationinfo_dispatcher
super(NotificationWorker.LocationInfoHandler, self).__init__()
def handle(self, location_info):
LOG.debug("Received location info:{0}".format(location_info))
return self.locationinfo_dispatcher.produce_location_event(location_info)
def __init__(
self, event, subscription_event, daemon_context):
return self.locationinfo_dispatcher.produce_location_event(
location_info)
def __init__(self, event, subscription_event, daemon_context):
self.__alive = True
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
NodeInfoHelper.set_residing_node(self.residing_node_name)
NodeInfoHelper.set_residing_node(daemon_context['THIS_NODE_NAME'])
self.sqlalchemy_conf = json.loads(daemon_context['SQLALCHEMY_CONF_JSON'])
self.sqlalchemy_conf = json.loads(
daemon_context['SQLALCHEMY_CONF_JSON'])
DbContext.init_dbcontext(self.sqlalchemy_conf)
self.event = event
self.subscription_event = subscription_event
self.__locationinfo_handler = NotificationWorker.LocationInfoHandler(self)
self.__locationinfo_handler = \
NotificationWorker.LocationInfoHandler(self)
self.__notification_handler = NotificationHandler()
self.broker_connection_manager = BrokerConnectionManager(
self.__locationinfo_handler,
@ -97,7 +97,8 @@ class NotificationWorker:
node_name = location_info.get('NodeName', None)
podip = location_info.get("PodIP", None)
if not node_name or not podip:
LOG.warning("Missing PodIP inside location info:{0}".format(location_info))
LOG.warning(
"Missing PodIP inside location info:{0}".format(location_info))
return False
timestamp = location_info.get('Timestamp', 0)
# mutex for threads which produce location events
@ -147,8 +148,8 @@ class NotificationWorker:
self.handle_brokers_data_syncup_event()
continue
self.broker_connection_manager.stop()
return
def consume_location_event(self):
nodeinfo_repo = None
@ -162,31 +163,49 @@ class NotificationWorker:
node_name = self.location_keys_q.get(False)
location_info = self.location_channel.get(node_name, None)
if not location_info:
LOG.warning("ignore location info@{0} without content".format(node_name))
LOG.warning(
"ignore location info@{0} without content".format(
node_name))
continue
LOG.debug("consume location info @{0}:{1}".format(node_name, location_info))
is_nodeinfo_added, is_nodeinfo_updated = self.__persist_locationinfo(
location_info, nodeinfo_repo)
_nodeinfo_added = _nodeinfo_added + (1 if is_nodeinfo_added else 0)
_nodeinfo_updated = _nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
LOG.debug("consume location info @{0}:{1}".format(
node_name, location_info))
is_nodeinfo_added, is_nodeinfo_updated = \
self.__persist_locationinfo(location_info, nodeinfo_repo)
_nodeinfo_added = \
_nodeinfo_added + (1 if is_nodeinfo_added else 0)
_nodeinfo_updated = \
_nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
if is_nodeinfo_added or is_nodeinfo_updated:
LOG.debug("Setting daemon's SERVICE_NODE_NAME to %s"
% node_name)
self.daemon_context['SERVICE_NODE_NAME'] = node_name
LOG.debug("Daemon context updated: id %d contents %s"
% (id(self.daemon_context), self.daemon_context))
continue
LOG.debug("Finished consuming location event")
if _nodeinfo_added > 0:
LOG.debug("signal event to refresh brokers state from subscription")
# node info changes trigger rebuilding broker states from subscription
# due to some subscriptions might subscribe resources of all nodes
LOG.debug(
"signal event to refresh brokers state from subscription")
# node info changes trigger rebuilding broker states from
# subscription due to some subscriptions might subscribe
# resources of all nodes
self.subscription_event.set()
if _nodeinfo_added > 0 or _nodeinfo_updated > 0:
LOG.debug("try to refresh brokers state due to changes of node info")
LOG.debug(
"try to refresh brokers state due to changes of node info")
nodeinfos = nodeinfo_repo.get()
broker_state_changed = self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
broker_state_changed = \
self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
if broker_state_changed:
# signal the potential changes on node resources
LOG.debug("signal event to re-sync up brokers state")
self.__brokers_watcher_event.set()
self.signal_events()
except Exception as ex:
LOG.warning("failed to consume location event:{0}".format(str(ex)))
finally:
@ -204,23 +223,28 @@ class NotificationWorker:
is_nodeinfo_updated = False
try:
location_info2 = LocationInfo(**location_info)
entry = nodeinfo_repo.get_one(NodeName=location_info['NodeName'], Status=1)
entry = nodeinfo_repo.get_one(
NodeName=location_info['NodeName'], Status=1)
if not entry:
entry = NodeInfoOrm(**location_info2.to_orm())
nodeinfo_repo.add(entry)
is_nodeinfo_added = True
LOG.debug("Add NodeInfo: {0}".format(entry.NodeName))
elif not entry.Timestamp or entry.Timestamp < location_info['Timestamp']:
# location info with newer timestamp indicate broker need to be re-sync up
elif not entry.Timestamp or (entry.Timestamp <
location_info['Timestamp']):
# location info with newer timestamp indicate broker need to be
# re-sync up
is_nodeinfo_updated = True
nodeinfo_repo.update(entry.NodeName, **location_info2.to_orm())
LOG.debug("Update NodeInfo: {0}".format(entry.NodeName))
else:
# do nothing
LOG.debug("Ignore the location for broker: {0}".format(entry.NodeName))
LOG.debug("Ignore the location for broker: {0}".format(
entry.NodeName))
except Exception as ex:
LOG.warning("failed to update broker state with location info:{0}, {1}".format(
location_info, str(ex)))
LOG.warning("failed to update broker state with "
"location info:{0},{1}".format(location_info,
str(ex)))
finally:
return is_nodeinfo_added, is_nodeinfo_updated
@ -235,7 +259,8 @@ class NotificationWorker:
nodeinfo_repo = NodeRepo(autocommit=True)
subs = subscription_repo.get()
LOG.debug("found {0} subscriptions".format(subs.count()))
broker_state_changed = self.broker_state_manager.refresh_by_subscriptions(subs)
broker_state_changed = \
self.broker_state_manager.refresh_by_subscriptions(subs)
if broker_state_changed:
nodeinfo_repo = NodeRepo(autocommit=True)
nodeinfos = nodeinfo_repo.get()
@ -250,13 +275,17 @@ class NotificationWorker:
broker_name = subinfo.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
LOG.debug(
"Ignore the subscription for: {0}".format(
subinfo.SubscriptionId))
continue
elif s.ResourceAddress:
# Get nodename from resource address
LOG.info("Parse resource address {}".format(s.ResourceAddress))
_, nodename, _, _, _ = subscription_helper.parse_resource_address(
s.ResourceAddress)
LOG.info(
"Parse resource address {}".format(s.ResourceAddress))
_, nodename, _, _, _ = \
subscription_helper.parse_resource_address(
s.ResourceAddress)
broker_name = nodename
else:
LOG.debug("Subscription {} does not have ResourceType or "
@ -264,8 +293,6 @@ class NotificationWorker:
continue
if s.Status == 1:
current_node_name = NodeInfoHelper.expand_node_name(broker_name)
# update the initial delivery timestamp as well
self.__notification_handler.update_delivery_timestamp(
NodeInfoHelper.default_node_name(broker_name),
@ -288,7 +315,8 @@ class NotificationWorker:
self.__brokers_data_syncup_event.set()
except Exception as ex:
result = False
LOG.warning("fail to sync up watcher for brokers: {0}".format(str(ex)))
LOG.warning(
"fail to sync up watcher for brokers: {0}".format(str(ex)))
finally:
if not result:
# retry indefinitely
@ -304,7 +332,8 @@ class NotificationWorker:
self.broker_connection_manager)
except Exception as ex:
result = False
LOG.warning("fail to sync up data for brokers: {0}".format(str(ex)))
LOG.warning(
"fail to sync up data for brokers: {0}".format(str(ex)))
finally:
if not result:
self.__brokers_data_syncup_event.set()

View File

@ -1,26 +1,27 @@
#coding=utf-8
#
# Copyright (c) 2020-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from pecan import expose, redirect, rest, route, response, abort
from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError
from pecan import expose, rest, route, abort
from webob.exc import HTTPException, HTTPServerError
from wsme import types as wtypes
from wsmeext.pecan import wsexpose
import os
import logging
import oslo_messaging
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.services.ptp import PtpService
from notificationclientsdk.exception import client_exception
from sidecar.repository.notification_control import notification_control
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
class CurrentStateController(rest.RestController):
def __init__(self):
@ -30,7 +31,10 @@ class CurrentStateController(rest.RestController):
def get(self):
try:
ptpservice = PtpService(notification_control)
ptpstatus = ptpservice.query(THIS_NODE_NAME)
service_node_name = notification_control.get_service_nodename()
LOG.debug('service_node_name is %s' % service_node_name)
ptpstatus = ptpservice.query(service_node_name)
LOG.debug('Got ptpstatus: %s' % ptpstatus)
# response.status = 200
return ptpstatus
except client_exception.NodeNotAvailable as ex:
@ -51,9 +55,10 @@ class CurrentStateController(rest.RestController):
# raise ex
abort(500)
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)
class PtpController(rest.RestController):
def __init__(self):
pass
@ -62,5 +67,6 @@ class PtpController(rest.RestController):
def get(self):
return 'ptp'
route(PtpController, 'CurrentState', CurrentStateController())
route(PtpController, 'currentstate', CurrentStateController())

View File

@ -4,14 +4,10 @@
# SPDX-License-Identifier: Apache-2.0
#
from pecan import expose, redirect, rest, route, response, abort
from webob.exc import HTTPException, HTTPNotFound, HTTPBadRequest, HTTPClientError, HTTPServerError
from pecan import expose, abort
from webob.exc import HTTPException, HTTPServerError
from wsme import types as wtypes
from wsmeext.pecan import wsexpose
from datetime import datetime, timezone
import os
from datetime import datetime
import logging
import oslo_messaging
@ -19,15 +15,13 @@ from notificationclientsdk.common.helpers import constants
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.services.ptp import PtpService
from notificationclientsdk.exception import client_exception
from notificationclientsdk.common.helpers import log_helper
from sidecar.repository.notification_control import notification_control
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
class ResourceAddressController(object):
def __init__(self, resource_address):
@ -37,22 +31,28 @@ class ResourceAddressController(object):
def CurrentState(self):
try:
# validate resource address
_, nodename, resource, optional, self.resource_address = subscription_helper.\
parse_resource_address(self.resource_address)
if nodename != THIS_NODE_NAME and nodename != '.':
_, nodename, resource, optional, self.resource_address = \
subscription_helper.parse_resource_address(
self.resource_address)
service_node_name = notification_control.get_service_nodename()
LOG.debug('service_node_name is %s' % service_node_name)
if nodename != service_node_name and nodename != '.':
LOG.warning("Node {} is not available".format(nodename))
abort(404)
if resource not in constants.VALID_SOURCE_URI:
LOG.warning("Resource {} is not valid".format(resource))
abort(404)
ptpservice = PtpService(notification_control)
ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address, optional)
ptpstatus = ptpservice.query(service_node_name,
self.resource_address, optional)
LOG.debug('Got ptpstatus: %s' % ptpstatus)
# Change time from float to ascii format
# ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(ptpstatus['time']))
for item in ptpstatus:
ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']).\
strftime('%Y-%m-%dT%H:%M:%S%fZ')
ptpstatus[item]['time'] = datetime.fromtimestamp(
ptpstatus[item]['time']).strftime(
'%Y-%m-%dT%H:%M:%S%fZ')
return ptpstatus
except client_exception.NodeNotAvailable as ex:
LOG.warning("Node is not available:{0}".format(str(ex)))
@ -72,6 +72,5 @@ class ResourceAddressController(object):
# raise ex
abort(500)
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -14,9 +14,10 @@ from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
REGISTRATION_USER = os.environ.get("REGISTRATION_USER", "admin")
REGISTRATION_PASS = os.environ.get("REGISTRATION_PASS", "admin")
REGISTRATION_PORT = os.environ.get("REGISTRATION_PORT", "5672")
REGISTRATION_HOST = os.environ.get("REGISTRATION_HOST",'registration.notification.svc.cluster.local')
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
THIS_POD_IP = os.environ.get("THIS_POD_IP",'127.0.0.1')
REGISTRATION_HOST = os.environ.get(
"REGISTRATION_HOST", "registration.notification.svc.cluster.local")
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", "controller-0")
THIS_POD_IP = os.environ.get("THIS_POD_IP", "127.0.0.1")
NOTIFICATION_BROKER_USER = os.environ.get("NOTIFICATIONSERVICE_USER", "admin")
NOTIFICATION_BROKER_PASS = os.environ.get("NOTIFICATIONSERVICE_PASS", "admin")
@ -32,11 +33,11 @@ sqlalchemy_conf_json = json.dumps(sqlalchemy_conf)
daemon_context = {
'SQLALCHEMY_CONF_JSON': sqlalchemy_conf_json,
'THIS_NODE_NAME': THIS_NODE_NAME,
'SERVICE_NODE_NAME': THIS_NODE_NAME,
'REGISTRATION_TRANSPORT_ENDPOINT': REGISTRATION_TRANSPORT_ENDPOINT,
'NOTIFICATION_BROKER_USER': NOTIFICATION_BROKER_USER,
'NOTIFICATION_BROKER_PASS': NOTIFICATION_BROKER_PASS,
'NOTIFICATION_BROKER_PORT': NOTIFICATION_BROKER_PORT
}
notification_control = DaemonControl(daemon_context)
NodeInfoHelper.set_residing_node(THIS_NODE_NAME)
notification_control = DaemonControl(daemon_context)