Cole Walker a482d1ff96 Add v2 id to rabbitmq topics and add v1 container to daemonset
Update the notificationservice-base-v2 container image to use a v2
identifier on rabbitmq topics. This allows v1 and v2 messages to be
handled separately. Update the notificationclient image to use the v2
identifier as well.

The v1 notificationservice-base will continue to use the default
rabbitmq topics with no additional identifier. This is compatible with
the following notificationclient-base image:

starlingx/notificationclient-base:stx.5.0-v1.0.4

This change also updates the daemonset to deploy both v1 and v2
notificationservice-base images and provides a helm overrided to allow
either one to be disabled.

Finally, update the notificationclient-base Dockerfile to pin the
version of sqlalchemy to 1.4.12. This is the same version used for the v1
client, and the latest 2.x.x version of sqlalchemy has changes which
break notificationclient-base.

Test plan:
PASS: Build all container images
PASS: Build and deploy ptp-notification application
Pass: Test ptp-notification Pull, Subscribe, Delete functionality for v1
and v2

Story: 2010538
Task: 47285
Task: 47286

Change-Id: Ib033661f496439f62af785f8f37b1069ccb74ba1
2023-02-08 20:13:09 +00:00

204 lines
7.3 KiB
Python

#
# Copyright (c) 2021-2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
import json
import time
import oslo_messaging
from oslo_config import cfg
from trackingfunctionsdk.client.base import BrokerClientBase
import logging
LOG = logging.getLogger(__name__)
from trackingfunctionsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class PtpEventProducer(object):
class ListenerEndpoint(object):
target = oslo_messaging.Target(namespace='notification', version='1.0')
def __init__(self, handler=None):
self.handler = handler
self.init_time = time.time()
pass
def QueryStatus(self, ctx, **rpc_kwargs):
LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs)
if self.handler:
return self.handler.query_status(**rpc_kwargs)
else:
return None
def TriggerDelivery(self, ctx, **rpc_kwargs):
LOG.debug("PtpEventProducer TriggerDelivery called %s" % rpc_kwargs)
if self.handler:
return self.handler.trigger_delivery(**rpc_kwargs)
else:
return None
def __init__(self, node_name, local_broker_transport_endpoint,
registration_broker_transport_endpoint=None):
self.Id = id(self)
self.node_name = node_name
self.local_broker_client = BrokerClientBase(
'LocalPtpEventProducer', local_broker_transport_endpoint)
if registration_broker_transport_endpoint:
self.registration_broker_client = BrokerClientBase(
'AllPtpEventProducer', registration_broker_transport_endpoint)
else:
self.registration_broker_client = None
return
def __del__(self):
if self.local_broker_client:
del self.local_broker_client
self.local_broker_client = None
if self.registration_broker_client:
del self.registration_broker_client
self.registration_broker_client = None
return
def publish_status(self, ptpstatus, retry=3):
result = False
result1 = self.publish_status_local(ptpstatus,
retry) if self.local_broker_client else result
result2 = self.publish_status_all(ptpstatus,
retry) if self.registration_broker_client else result
return result1, result2
def publish_status_local(self, ptpstatus, source, retry=3):
if not self.local_broker_client:
return False
topic = '{0}-Event-v2-{1}'.format(source, self.node_name)
server = None
isretrystopped = False
while not isretrystopped:
try:
self.local_broker_client.cast(
topic, 'NotifyStatus', notification=ptpstatus)
LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic))
break
except Exception as ex:
LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format(
ptpstatus, topic, str(ex)))
retry = retry - 1
isretrystopped = False if retry > 0 else True
if isretrystopped:
LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format(
ptpstatus, topic))
return isretrystopped == False
def publish_status_all(self, ptpstatus, retry=3):
if not self.registration_broker_client:
return False
topic_all = 'PTP-Event-v2-*'
server = None
isretrystopped = False
while not isretrystopped:
try:
self.registration_broker_client.cast(
topic_all, 'NotifyStatus', notification=ptpstatus)
LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic_all))
break
except Exception as ex:
LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format(
ptpstatus, topic_all, str(ex)))
retry = retry - 1
isretrystopped = False if retry > 0 else True
if isretrystopped:
LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format(
ptpstatus, topic_all))
return isretrystopped == False
def start_status_listener(self, handler=None):
result = False
result1 = self.start_status_listener_local(handler) if self.local_broker_client else result
result2 = self.start_status_listener_all(
handler) if self.registration_broker_client else result
result = result1 and result2
return result
def start_status_listener_local(self, handler=None):
if not self.local_broker_client:
return False
topic = 'PTP-Status-v2'
server = 'PTP-Tracking-{0}'.format(self.node_name)
endpoints = [PtpEventProducer.ListenerEndpoint(handler)]
self.local_broker_client.add_listener(
topic, server, endpoints)
return True
def start_status_listener_all(self, handler=None):
if not self.registration_broker_client:
return False
topic = 'PTP-Status-v2'
server = 'PTP-Tracking-{0}'.format(self.node_name)
endpoints = [PtpEventProducer.ListenerEndpoint(handler)]
self.registration_broker_client.add_listener(
topic, server, endpoints)
return True
def stop_status_listener(self):
result = False
result1 = self.stop_status_listener_local() if self.local_broker_client else result
result2 = self.stop_status_listener_all() if self.registration_broker_client else result
result = result1 and result2
return result
def stop_status_listener_local(self):
if not self.local_broker_client:
return False
topic = 'PTP-Status-v2'
server = "PTP-Tracking-{0}".format(self.node_name)
self.local_broker_client.remove_listener(
topic, server)
def stop_status_listener_all(self):
if not self.registration_broker_client:
return False
topic = 'PTP-Status-v2'
server = "PTP-Tracking-{0}".format(self.node_name)
self.registration_broker_client.remove_listener(
topic, server)
def is_listening(self):
result = False
result1 = self.is_listening_local() if self.local_broker_client else result
result2 = self.is_listening_all() if self.registration_broker_client else result
result = result1 and result2
return result
def is_listening_local(self):
if not self.local_broker_client:
return False
topic = 'PTP-Status-v2'
server = "PTP-Tracking-{0}".format(self.node_name)
return self.local_broker_client.is_listening(
topic, server)
def is_listening_all(self):
if not self.registration_broker_client:
return False
topic = 'PTP-Status-v2'
server = "PTP-Tracking-{0}".format(self.node_name)
return self.registration_broker_client.is_listening(
topic, server)