From 4178a8697551d3e9eb2db4792e890dccb3f30c7d Mon Sep 17 00:00:00 2001 From: Cole Walker Date: Mon, 29 Aug 2022 16:34:12 -0400 Subject: [PATCH] Integration work for ORAN notification Several improvements and fixes to enable the end-to-end functionality of all of the components in support of the O-RAN Spec Compliant Timing API Notification work. 1. Add time stamps to logging for notificationservice and notificationclient 2. Add support for the "optional" hierarchy in the resource address which allows the client to query the status of a specific ptp instances. ie. get the status of instance ptp1 rather than all ptp instances 3. Add a parent key to the returned notification data so that multiple statuses can be returned to the client with a single notification' 4. Reworked the notificationservice daemonset to start its process directly rather than using an intermediary script. This allows the container logs to show properly via kubectl logs and will also allow the container to crash properly if the program errors out. 5. Reworked the helm values for ptp4l and ts2phc instances to allow users to supply overrides with multiple instances Test plan: PASS: PTP notification v1 compatibility PASS: GET all v2 resources PASS: SUBSCRIBE/LIST/DELETE v2 resources PASS: Build and deploy containers/fluxcd app Story: 2010056 Task: 46226 Change-Id: Id471fdc0815afdcc5639e81c6457616e268e6cd7 Signed-off-by: Cole Walker --- notificationclient-base/centos/Dockerfile | 1 + .../client/notificationservice.py | 13 +- .../common/helpers/constants.py | 14 ++ .../common/helpers/log_helper.py | 15 +- .../common/helpers/subscription_helper.py | 63 ++++- .../services/broker_state_manager.py | 20 +- .../services/notification_handler.py | 35 +-- .../services/notification_worker.py | 14 +- .../notificationclientsdk/services/ptp.py | 41 ++-- .../controllers/v2/resource_address.py | 9 +- .../sidecar/controllers/v2/subscriptions.py | 24 +- notificationservice-base/centos/Dockerfile | 1 + .../client/ptpeventproducer.py | 49 ++-- .../common/helpers/cgu_handler.py | 1 - .../common/helpers/constants.py | 2 + .../common/helpers/gnss_monitor.py | 8 + .../common/helpers/log_helper.py | 4 +- .../common/helpers/ptp_monitor.py | 9 +- .../trackingfunctionsdk/services/daemon.py | 218 ++++++++++++------ ...tracking_start.sh => ptptracking_start.py} | 45 ++-- .../ptp-notification/templates/daemonset.yaml | 6 +- .../helm-charts/ptp-notification/values.yaml | 4 +- 22 files changed, 386 insertions(+), 210 deletions(-) rename stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/{ptptracking_start.sh => ptptracking_start.py} (79%) diff --git a/notificationclient-base/centos/Dockerfile b/notificationclient-base/centos/Dockerfile index 00d36ae..e898387 100644 --- a/notificationclient-base/centos/Dockerfile +++ b/notificationclient-base/centos/Dockerfile @@ -10,6 +10,7 @@ RUN set -ex ;\ $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ -y \ gcc python3-devel python3-pip \ + && pip3 install --upgrade pip \ && pip3 install --user pecan \ && pip3 install oslo-config \ && pip3 install oslo-messaging \ diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py index f2d205d..dd3245a 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -58,12 +58,19 @@ class NotificationServiceClient(BrokerClientBase): return def query_resource_status(self, resource_type, - timeout=None, retry=None, resource_qualifier_json=None, resource_address=None): + timeout=None, + retry=None, + resource_qualifier_json=None, + resource_address=None, + optional=None): topic = '{0}-Status'.format(resource_type) server = '{0}-Tracking-{1}'.format(resource_type, self.target_node_name) return self.call( - topic, server, 'QueryStatus', timeout=timeout, retry=retry, - QualifierJson=resource_qualifier_json, ResourceAddress=resource_address) + topic, server, 'QueryStatus', + timeout=timeout, retry=retry, + QualifierJson=resource_qualifier_json, + ResourceAddress=resource_address, + optional=optional) def add_resource_status_listener(self, resource_type, status_handler=None): if not status_handler: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py index f0d24b5..ff36991 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py @@ -9,6 +9,8 @@ DATA_TYPE_METRIC = "metric" VALUE_TYPE_ENUMERATION = "enumeration" VALUE_TYPE_METRIC = "metric" +PTP_V1_KEY = "ptp_notification_v1" + SOURCE_SYNC_ALL = '/sync' SOURCE_SYNC_GNSS_SYNC_STATUS = '/sync/gnss-status/gnss-sync-status' SOURCE_SYNC_PTP_CLOCK_CLASS = '/sync/ptp-status/clock-class' @@ -19,6 +21,18 @@ SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality' SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended' SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state' +RESOURCE_ADDRESS_MAPPINGS = { + SOURCE_SYNC_ALL: 'sync', + SOURCE_SYNC_GNSS_SYNC_STATUS: 'gnss_sync_state', + SOURCE_SYNC_PTP_CLOCK_CLASS: 'ptp_clock_class', + SOURCE_SYNC_PTP_LOCK_STATE: 'ptp_lock_state', + SOURCE_SYNC_OS_CLOCK: 'os_clock_sync_state', + SOURCE_SYNC_SYNC_STATE: 'sync_state', + SOURCE_SYNCE_CLOCK_QUALITY: 'synce_clock_quality', + SOURCE_SYNCE_LOCK_STATE_EXTENDED: 'synce_lock_state_extended', + SOURCE_SYNCE_LOCK_STATE: 'synce_lock_state' +} + VALID_SOURCE_URI = { SOURCE_SYNC_ALL, SOURCE_SYNC_GNSS_SYNC_STATUS, diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py index f801c02..28e2d21 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py @@ -1,21 +1,22 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -import os import logging +import sys +import os -LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") def get_logger(module_name): logger = logging.getLogger(module_name) return config_logger(logger) + def config_logger(logger): - ''' - configure the logger: uncomment following lines for debugging - ''' - logger.setLevel(LOGGING_LEVEL) + logging.basicConfig(stream=sys.stdout, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO")) return logger diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 020c0af..25e1621 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -4,27 +4,27 @@ # SPDX-License-Identifier: Apache-2.0 # -import os import json import re import requests import logging - -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from datetime import datetime +from notificationclientsdk.common.helpers import constants +from notificationclientsdk.common.helpers import log_helper LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) + def notify(subscriptioninfo, notification, timeout=2, retry=3): result = False while True: retry = retry - 1 try: headers = {'Content-Type': 'application/json'} - data = json.dumps(notification) + data = format_notification_data(subscriptioninfo, notification) + data = json.dumps(data) url = subscriptioninfo.EndpointUri response = requests.post(url, data=data, headers=headers, timeout=timeout) response.raise_for_status() @@ -52,13 +52,58 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): return result + +def format_notification_data(subscriptioninfo, notification): + if hasattr(subscriptioninfo, 'ResourceType'): + LOG.debug("format_notification_data: Found v1 subscription, no formatting required.") + return notification + elif hasattr(subscriptioninfo, 'ResourceAddress'): + _, _, resource_path, _, _ = parse_resource_address(subscriptioninfo.ResourceAddress) + resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[resource_path] + formatted_notification = {resource_mapped_value: []} + for instance in notification: + # Add the instance identifier to ResourceAddress for PTP lock-state + # and PTP clockClass + if notification[instance]['source'] in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + constants.SOURCE_SYNC_PTP_LOCK_STATE]: + temp_values = notification[instance].get('data', {}).get('values', []) + resource_address = temp_values[0].get('ResourceAddress', None) + if instance not in resource_address: + add_instance_name = resource_address.split('/', 3) + add_instance_name.insert(3, instance) + resource_address = '/'.join(add_instance_name) + notification[instance]['data']['values'][0]['ResourceAddress'] = resource_address + formatted_notification[resource_mapped_value].append(notification[instance]) + for instance in formatted_notification[resource_mapped_value]: + this_delivery_time = instance['time'] + if type(this_delivery_time) != str: + format_time = datetime.fromtimestamp(float(this_delivery_time)).\ + strftime('%Y-%m-%dT%H:%M:%S%fZ') + instance['time'] = format_time + else: + raise Exception("format_notification_data: No valid source address found in notification") + LOG.debug( + "format_notification_data: Added parent key for client consumption: %s" % + formatted_notification) + return formatted_notification + + def parse_resource_address(resource_address): # The format of resource address is: # /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource} - # Assume no optional hierarchy for now clusterName = resource_address.split('/')[1] nodeName = resource_address.split('/')[2] resource_path = '/' + re.split('[/]', resource_address, 3)[3] + resource_list = re.findall(r'[^/]+', resource_path) + if len(resource_list) == 4: + remove_optional = '/' + resource_list[0] + resource_path = resource_path.replace(remove_optional, '') + resource_address = resource_address.replace(remove_optional, '') + optional = resource_list[0] + LOG.debug("Optional hierarchy found when parsing resource address: %s" % optional) + else: + optional = None - return clusterName, nodeName, resource_path - + # resource_address is the full address without any optional hierarchy + # resource_path is the specific identifier for the resource + return clusterName, nodeName, resource_path, optional, resource_address diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py index 0107ab7..09d3f5f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py @@ -17,13 +17,16 @@ from notificationclientsdk.model.dto.broker_state import BrokerState LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper + log_helper.config_logger(LOG) + class BrokerStateManager: ''' Manager to manage broker states Note: Now it is not thread safe ''' + def __init__(self): self.broker_state_map = {} self.disabled_brokers = [] @@ -84,7 +87,7 @@ class BrokerStateManager: for broker_name, brokerstate in self.broker_state_map.items(): try: if brokerstate.any_obsolete_subscription( - self.subscription_refresh_iteration): + self.subscription_refresh_iteration): return True except Exception as ex: LOG.warning( @@ -104,14 +107,17 @@ class BrokerStateManager: broker_name = subscription.ResourceQualifier.NodeName else: # ignore the subscription due to unsupported type - LOG.debug("Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId)) + LOG.debug( + "Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId)) return False else: subscription = SubscriptionInfoV2(subscription_orm) - _, nodename, resource = subscription_helper.parse_resource_address(subscription.ResourceAddress) + _, nodename, resource, _, _ = subscription_helper.parse_resource_address( + subscription.ResourceAddress) broker_name = nodename - LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) + LOG.debug( + "subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) if subscription_orm.Status != 1: return False @@ -200,8 +206,10 @@ class BrokerStateManager: # trigger to sync up notification after (re-)connection LOG.debug("Trigger to re-sync up data: {0}".format(broker_name)) result = brokerstate.signal_data_syncup() - elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed(): - LOG.debug("Update watching due to resources changed: {0}".format(broker_name)) + elif brokerstate.is_resource_subscribed_changed() or \ + brokerstate.is_resources_changed(): + LOG.debug( + "Update watching due to resources changed: {0}".format(broker_name)) result = broker_connection_manager.update_watching_resources(brokerstate) # leave the signals as it is to re-sync up in next loop in case of failure diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index af8beaf..722b463 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -26,8 +26,8 @@ from notificationclientsdk.client.notificationservice import NotificationHandler LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper -log_helper.config_logger(LOG) +log_helper.config_logger(LOG) class NotificationHandler(NotificationHandlerBase): @@ -55,26 +55,25 @@ class NotificationHandler(NotificationHandlerBase): if not resource_type: raise Exception("abnormal notification@{0}".format(node_name)) if not resource_type in self.__supported_resource_types: - raise Exception("notification with unsupported resource type:{0}".format(resource_type)) + raise Exception( + "notification with unsupported resource type:{0}".format(resource_type)) this_delivery_time = notification_info['EventTimestamp'] else: - source = notification_info.get('source', None) - values = notification_info.get('data', {}).get('values', []) + parent_key = list(notification_info.keys())[0] + source = notification_info[parent_key].get('source', None) + values = notification_info[parent_key].get('data', {}).get('values', []) resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = notification_info[parent_key].get('time') if not resource_address: raise Exception("No resource address in notification source".format(source)) - _,node_name,_ = subscription_helper.parse_resource_address(resource_address) - this_delivery_time = notification_info['time'] - # Change time from float to ascii format - notification_info['time'] = datetime.fromtimestamp(this_delivery_time).strftime('%Y-%m-%dT%H:%M:%S%fZ') - # notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', - # time.gmtime(this_delivery_time)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) entries = subscription_repo.get(Status=1) for entry in entries: subscriptionid = entry.SubscriptionId if entry.ResourceAddress: - _,entry_node_name,_ = subscription_helper.parse_resource_address(entry.ResourceAddress) + _, entry_node_name, _, _, _ = subscription_helper.parse_resource_address( + entry.ResourceAddress) subscription_dto2 = SubscriptionInfoV2(entry) else: ResourceQualifierJson = entry.ResourceQualifierJson or '{}' @@ -87,10 +86,12 @@ class NotificationHandler(NotificationHandlerBase): continue try: - last_delivery_time = self.__get_latest_delivery_timestamp(node_name, subscriptionid) + last_delivery_time = self.__get_latest_delivery_timestamp(node_name, + subscriptionid) if last_delivery_time and last_delivery_time >= this_delivery_time: # skip this entry since already delivered - LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId)) + LOG.debug("Ignore the outdated notification for: {0}".format( + entry.SubscriptionId)) continue subscription_helper.notify(subscription_dto2, notification_info) @@ -117,7 +118,7 @@ class NotificationHandler(NotificationHandlerBase): del subscription_repo def __get_latest_delivery_timestamp(self, node_name, subscriptionid): - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) return last_delivery_time @@ -126,18 +127,18 @@ class NotificationHandler(NotificationHandlerBase): self.notification_stat[node_name] = { subscriptionid: { 'EventTimestamp': this_delivery_time - } } + } LOG.debug("delivery time @node: {0},subscription:{1} is added".format( node_name, subscriptionid)) elif not self.notification_stat[node_name].get(subscriptionid, None): self.notification_stat[node_name][subscriptionid] = { 'EventTimestamp': this_delivery_time - } + } LOG.debug("delivery time @node: {0},subscription:{1} is added".format( node_name, subscriptionid)) else: - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) if (last_delivery_time and last_delivery_time >= this_delivery_time): return diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py index 4206903..9fca840 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py @@ -10,6 +10,7 @@ import logging import multiprocessing as mp import threading import sys + if sys.version > '3': import queue as Queue else: @@ -18,6 +19,7 @@ else: from notificationclientsdk.common.helpers import subscription_helper from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 @@ -38,14 +40,13 @@ from notificationclientsdk.services.broker_connection_manager import BrokerConne from notificationclientsdk.services.notification_handler import NotificationHandler LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) -class NotificationWorker: +class NotificationWorker: class LocationInfoHandler(LocationHandlerBase): '''Glue code to forward location info to daemon method''' + def __init__(self, locationinfo_dispatcher): self.locationinfo_dispatcher = locationinfo_dispatcher super(NotificationWorker.LocationInfoHandler, self).__init__() @@ -55,7 +56,7 @@ class NotificationWorker: return self.locationinfo_dispatcher.produce_location_event(location_info) def __init__( - self, event, subscription_event, daemon_context): + self, event, subscription_event, daemon_context): self.__alive = True @@ -254,7 +255,8 @@ class NotificationWorker: elif s.ResourceAddress: # Get nodename from resource address LOG.info("Parse resource address {}".format(s.ResourceAddress)) - _,nodename,_ = subscription_helper.parse_resource_address(s.ResourceAddress) + _, nodename, _, _, _ = subscription_helper.parse_resource_address( + s.ResourceAddress) broker_name = nodename else: LOG.debug("Subscription {} does not have ResourceType or " @@ -298,7 +300,7 @@ class NotificationWorker: try: LOG.debug("try to sync up data for {0} brokers".format( self.broker_state_manager.count_brokers())) - result,_,_ = self.broker_state_manager.syncup_broker_data( + result, _, _ = self.broker_state_manager.syncup_broker_data( self.broker_connection_manager) except Exception as ex: result = False diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 0d2fb1d..62a33c4 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -12,6 +12,8 @@ from datetime import datetime, timezone from notificationclientsdk.client.notificationservice import NotificationServiceClient from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers import log_helper +from notificationclientsdk.common.helpers import constants from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 @@ -21,14 +23,12 @@ from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.services.daemon import DaemonControl - from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) + class PtpService(object): def __init__(self, daemon_control): @@ -72,7 +72,7 @@ class PtpService(object): finally: del nodeinfo_repo - def query(self, broker_name, resource_address=None): + def query(self, broker_name, resource_address=None, optional=None): default_node_name = NodeInfoHelper.default_node_name(broker_name) broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) @@ -85,9 +85,9 @@ class PtpService(object): ResourceType.TypePTP, default_node_name)) raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - return self._query(default_node_name, broker_pod_ip, resource_address) + return self._query(default_node_name, broker_pod_ip, resource_address, optional) - def _query(self, broker_name, broker_pod_ip, resource_address=None): + def _query(self, broker_name, broker_pod_ip, resource_address=None, optional=None): broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], @@ -99,7 +99,8 @@ class PtpService(object): notificationservice_client = NotificationServiceClient( broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( - ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address) + ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address, + optional=optional) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( @@ -117,7 +118,8 @@ class PtpService(object): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) resource_address = None if hasattr(subscription_dto, 'ResourceAddress'): - _,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress) + _, nodename, _, _, _ = subscription_helper.parse_resource_address(subscription_dto. + ResourceAddress) broker_name = nodename resource_address = subscription_dto.ResourceAddress elif hasattr(subscription_dto, 'ResourceType'): @@ -139,17 +141,22 @@ class PtpService(object): if default_node_name: ptpstatus = None - ptpstatus = self._query(default_node_name, broker_pod_ip, resource_address) + ptpstatus = self._query(default_node_name, + broker_pod_ip, + resource_address, + optional=None) LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry - timestamp = ptpstatus.get('EventTimestamp', None) - if timestamp is None: - timestamp = ptpstatus.get('time', None) - # Change time from float to ascii format - ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ') - # ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', - # time.gmtime(timestamp)) + if constants.PTP_V1_KEY in ptpstatus: + timestamp = ptpstatus[constants.PTP_V1_KEY].get('EventTimestamp', None) + ptpstatus = ptpstatus[constants.PTP_V1_KEY] + else: + for item in ptpstatus: + timestamp = ptpstatus[item].get('time', None) + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']) \ + .strftime('%Y-%m-%dT%H:%M:%S%fZ') subscription_orm.InitialDeliveryTimestamp = timestamp entry = self.subscription_repo.add(subscription_orm) @@ -179,7 +186,7 @@ class PtpService(object): def remove_subscription(self, subscriptionid): try: # 1, delete entry - self.subscription_repo.delete_one(SubscriptionId = subscriptionid) + self.subscription_repo.delete_one(SubscriptionId=subscriptionid) self.subscription_repo.commit() # 2, refresh daemon self.daemon_control.refresh() diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py index 872b3ec..33ff8d5 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -37,7 +37,8 @@ class ResourceAddressController(object): def CurrentState(self): try: # validate resource address - _, nodename, resource = subscription_helper.parse_resource_address(self.resource_address) + _, nodename, resource, optional, self.resource_address = subscription_helper.\ + parse_resource_address(self.resource_address) if nodename != THIS_NODE_NAME and nodename != '.': LOG.warning("Node {} is not available".format(nodename)) abort(404) @@ -45,11 +46,13 @@ class ResourceAddressController(object): LOG.warning("Resource {} is not valid".format(resource)) abort(404) ptpservice = PtpService(notification_control) - ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address) + ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address, optional) # Change time from float to ascii format # ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', # time.gmtime(ptpstatus['time'])) - ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ') + for item in ptpstatus: + ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']).\ + strftime('%Y-%m-%dT%H:%M:%S%fZ') return ptpstatus except client_exception.NodeNotAvailable as ex: LOG.warning("Node is not available:{0}".format(str(ex))) diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py index e06a5c1..b44da1f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py @@ -14,22 +14,21 @@ import logging from wsme import types as wtypes from wsmeext.pecan import wsexpose -from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.services.ptp import PtpService from notificationclientsdk.exception import client_exception +from notificationclientsdk.common.helpers import log_helper from sidecar.repository.notification_control import notification_control from sidecar.repository.dbcontext_default import defaults LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) -THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0') + class SubscriptionsControllerV2(rest.RestController): @@ -47,7 +46,7 @@ class SubscriptionsControllerV2(rest.RestController): abort(400) subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( - conf.server.get('protocol','http'), + conf.server.get('protocol', 'http'), conf.server.get('host', '127.0.0.1'), conf.server.get('port', '8080') ) @@ -77,13 +76,13 @@ class SubscriptionsControllerV2(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) abort(500) except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entries = repo.get(Status=1) response.status = 200 subs = [] @@ -99,7 +98,7 @@ class SubscriptionsControllerV2(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose() @@ -115,6 +114,7 @@ class SubscriptionsControllerV2(rest.RestController): except: return False + class SubscriptionController(rest.RestController): def __init__(self, subscription_id): self.subscription_id = subscription_id @@ -122,7 +122,7 @@ class SubscriptionController(rest.RestController): @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) if not entry: @@ -139,13 +139,13 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @wsexpose(status_code=204) def delete(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id) if entry: if entry.SubscriptionId: @@ -164,5 +164,5 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) diff --git a/notificationservice-base/centos/Dockerfile b/notificationservice-base/centos/Dockerfile index cf916a0..a1557e6 100644 --- a/notificationservice-base/centos/Dockerfile +++ b/notificationservice-base/centos/Dockerfile @@ -10,6 +10,7 @@ RUN set -ex ;\ $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ -y \ gcc python3-devel python3-pip \ + && pip3 install --upgrade pip \ && pip3 install --user pecan \ && pip3 install pygtail \ && pip3 install oslo-config \ diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py index 46fe83f..35e5c7d 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py @@ -17,8 +17,10 @@ import logging LOG = logging.getLogger(__name__) from trackingfunctionsdk.common.helpers import log_helper + log_helper.config_logger(LOG) + class PtpEventProducer(object): class ListenerEndpoint(object): target = oslo_messaging.Target(namespace='notification', version='1.0') @@ -30,28 +32,28 @@ class PtpEventProducer(object): pass def QueryStatus(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer QueryStatus called %s" %rpc_kwargs) + LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs) if self.handler: return self.handler.query_status(**rpc_kwargs) else: return None def TriggerDelivery(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer TriggerDelivery called %s" %rpc_kwargs) + LOG.debug("PtpEventProducer TriggerDelivery called %s" % rpc_kwargs) if self.handler: return self.handler.trigger_delivery(**rpc_kwargs) else: return None def __init__(self, node_name, local_broker_transport_endpoint, - registration_broker_transport_endpoint=None): + registration_broker_transport_endpoint=None): self.Id = id(self) self.node_name = node_name self.local_broker_client = BrokerClientBase( 'LocalPtpEventProducer', local_broker_transport_endpoint) if registration_broker_transport_endpoint: self.registration_broker_client = BrokerClientBase( - 'AllPtpEventProducer', registration_broker_transport_endpoint) + 'AllPtpEventProducer', registration_broker_transport_endpoint) else: self.registration_broker_client = None return @@ -67,14 +69,16 @@ class PtpEventProducer(object): def publish_status(self, ptpstatus, retry=3): result = False - result1 = self.publish_status_local(ptpstatus, retry) if self.local_broker_client else result - result2 = self.publish_status_all(ptpstatus, retry) if self.registration_broker_client else result + result1 = self.publish_status_local(ptpstatus, + retry) if self.local_broker_client else result + result2 = self.publish_status_all(ptpstatus, + retry) if self.registration_broker_client else result return result1, result2 def publish_status_local(self, ptpstatus, source, retry=3): if not self.local_broker_client: return False - topic='{0}-Event-{1}'.format(source, self.node_name) + topic = '{0}-Event-{1}'.format(source, self.node_name) server = None isretrystopped = False while not isretrystopped: @@ -97,7 +101,7 @@ class PtpEventProducer(object): def publish_status_all(self, ptpstatus, retry=3): if not self.registration_broker_client: return False - topic_all='PTP-Event-*' + topic_all = 'PTP-Event-*' server = None isretrystopped = False while not isretrystopped: @@ -114,13 +118,14 @@ class PtpEventProducer(object): if isretrystopped: LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( - ptpstatus, topic)) + ptpstatus, topic_all)) return isretrystopped == False def start_status_listener(self, handler=None): result = False result1 = self.start_status_listener_local(handler) if self.local_broker_client else result - result2 = self.start_status_listener_all(handler) if self.registration_broker_client else result + result2 = self.start_status_listener_all( + handler) if self.registration_broker_client else result result = result1 and result2 return result @@ -128,8 +133,8 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server='PTP-Tracking-{0}'.format(self.node_name) + topic = 'PTP-Status' + server = 'PTP-Tracking-{0}'.format(self.node_name) endpoints = [PtpEventProducer.ListenerEndpoint(handler)] self.local_broker_client.add_listener( @@ -140,8 +145,8 @@ class PtpEventProducer(object): if not self.registration_broker_client: return False - topic='PTP-Status' - server='PTP-Tracking-{0}'.format(self.node_name) + topic = 'PTP-Status' + server = 'PTP-Tracking-{0}'.format(self.node_name) endpoints = [PtpEventProducer.ListenerEndpoint(handler)] self.registration_broker_client.add_listener( @@ -159,8 +164,8 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) self.local_broker_client.remove_listener( topic, server) @@ -168,8 +173,8 @@ class PtpEventProducer(object): if not self.registration_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) self.registration_broker_client.remove_listener( topic, server) @@ -184,15 +189,15 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) return self.local_broker_client.is_listening( topic, server) def is_listening_all(self): if not self.registration_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) return self.registration_broker_client.is_listening( topic, server) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py index 2ff9ee3..9104b2f 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py @@ -80,7 +80,6 @@ class CguHandler: def cgu_output_to_dict(self): # Take raw cgu output and parse it into a dict cgu_output = self.cgu_output_raw.splitlines() - LOG.debug("CGU output: %s" % cgu_output) cgu_dict = {'input': {}, 'EEC DPLL': { 'Current reference': '', diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index 68762de..d9c827d 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -45,6 +45,8 @@ CLOCK_REALTIME = "CLOCK_REALTIME" PHC2SYS_TOLERANCE_LOW = 36999999000 PHC2SYS_TOLERANCE_HIGH = 37000001000 +PTP_V1_KEY = "ptp_notification_v1" + # testing values CGU_PATH_VALID = "/sys/kernel/debug/ice/0000:18:00.0/cgu" diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py index f0c3898..6e8ac2e 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py @@ -5,6 +5,7 @@ # import logging import datetime +import re from abc import ABC, abstractmethod @@ -34,6 +35,13 @@ class GnssMonitor(Observer): def __init__(self, config_file, nmea_serialport=None, pci_addr=None, cgu_path=None): self.config_file = config_file + try: + pattern = '(?<=/ptp/ptpinstance/ts2phc-).*(?=.conf)' + match = re.search(pattern, self.config_file) + self.ts2phc_service_name = match.group() + except AttributeError: + LOG.warning("GnssMonitor: Unable to determine tsphc_service name from %s" + % self.config_file) # Setup GNSS data self.gnss_cgu_handler = CguHandler(config_file, nmea_serialport, pci_addr, cgu_path) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py index 9b3d87d..28e2d21 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py @@ -15,6 +15,8 @@ def get_logger(module_name): def config_logger(logger): - logging.basicConfig(stream=sys.stdout) + logging.basicConfig(stream=sys.stdout, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO")) return logger diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py index 199be60..c4fddcf 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py @@ -50,14 +50,11 @@ class PtpMonitor: pmc_query_results = {} - def __init__(self, ptp4l_config, holdover_time, freq, init=True): + def __init__(self, ptp4l_instance, holdover_time, freq, init=True): if init: - self.ptp4l_config = ptp4l_config - pattern = '(?<=/ptp/ptpinstance/ptp4l-).*(?=.conf)' - match = re.search(pattern, self.ptp4l_config) - self.ptp4l_service_name = match.group() - LOG.debug(self.ptp4l_service_name) + self.ptp4l_config = "/ptp/ptpinstance/ptp4l-%s.conf" % ptp4l_instance + self.ptp4l_service_name = ptp4l_instance self.phc2sys_service_name = os.environ.get('PHC2SYS_SERVICE_NAME', 'phc2sys') self.holdover_time = int(holdover_time) self.freq = int(freq) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index 3183994..62101a2 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -8,7 +8,6 @@ import json import logging import multiprocessing as mp import os -import sys import threading import time from oslo_utils import uuidutils @@ -43,7 +42,6 @@ source_type = { '/sync/synce-status/clock-quality': 'event.sync.synce-status.synce-clock-quality-change', '/sync/synce-status/lock-state-extended': 'event.sync.synce-status.synce-state-change-extended', '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', - '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', } '''Entry point of Default Process Worker''' @@ -77,12 +75,13 @@ class PtpWatcherDefault: } class PtpRequestHandlerDefault(object): - def __init__(self, watcher): + def __init__(self, watcher, daemon_context): self.watcher = watcher self.init_time = time.time() + self.daemon_context = daemon_context def _build_event_response(self, resource_path, last_event_time, resource_address, - sync_state): + sync_state, value_type=constants.VALUE_TYPE_ENUMERATION): if resource_path in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, constants.SOURCE_SYNCE_CLOCK_QUALITY]: data_type = constants.DATA_TYPE_METRIC @@ -100,7 +99,7 @@ class PtpWatcherDefault: { 'data_type': data_type, 'ResourceAddress': resource_address, - 'value_type': constants.VALUE_TYPE_ENUMERATION, + 'value_type': value_type, 'value': sync_state } ] @@ -111,28 +110,90 @@ class PtpWatcherDefault: def query_status(self, **rpc_kwargs): lastStatus = {} resource_address = rpc_kwargs.get('ResourceAddress', None) + optional = rpc_kwargs.get('optional', None) if resource_address: _, nodename, resource_path = utils.parse_resource_address(resource_address) if resource_path == constants.SOURCE_SYNC_ALL: resource_path = constants.SOURCE_SYNC_SYNC_STATE if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS: self.watcher.gnsstracker_context_lock.acquire() - sync_state = self.watcher.gnsstracker_context.get('sync_state', - GnssState.Freerun) - last_event_time = self.watcher.gnsstracker_context.get('last_event_time', - time.time()) + if optional: + sync_state = self.watcher.gnsstracker_context[optional]. \ + get('sync_state', GnssState.Freerun) + last_event_time = self.watcher.gnsstracker_context[optional].get( + 'last_event_time', + time.time()) + lastStatus[optional] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) + else: + for config in self.daemon_context['GNSS_INSTANCES']: + sync_state = self.watcher.gnsstracker_context[config] \ + .get('sync_state', GnssState.Freerun) + last_event_time = self.watcher.gnsstracker_context[config].get( + 'last_event_time', + time.time()) + lastStatus[config] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) self.watcher.gnsstracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) - # elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS: + elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS: + self.watcher.ptptracker_context_lock.acquire() + if optional: + clock_class = self.watcher.ptptracker_context[optional].get('clock_class', + '248') + last_clock_class_event_time = self.watcher.ptptracker_context[optional].get( + 'last_clock_class_event_time', + time.time()) + lastStatus[optional] = \ + self._build_event_response(resource_path, + last_clock_class_event_time, + resource_address, + clock_class, + constants.VALUE_TYPE_METRIC) + else: + for config in self.daemon_context['PTP4L_INSTANCES']: + clock_class = self.watcher.ptptracker_context[config].get('clock_class', + '248') + last_clock_class_event_time = \ + self.watcher.ptptracker_context[config].get( + 'last_clock_class_event_time', + time.time()) + lastStatus[config] = \ + self._build_event_response(resource_path, + last_clock_class_event_time, + resource_address, + clock_class, + constants.VALUE_TYPE_METRIC) + self.watcher.ptptracker_context_lock.release() elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE: self.watcher.ptptracker_context_lock.acquire() - sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) - last_event_time = self.watcher.ptptracker_context.get('last_event_time', - time.time()) + if optional: + sync_state = self.watcher.ptptracker_context[optional].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[optional].get( + 'last_event_time', + time.time()) + lastStatus[optional] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) + else: + for config in self.daemon_context['PTP4L_INSTANCES']: + sync_state = \ + self.watcher.ptptracker_context[config].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[config].get( + 'last_event_time', + time.time()) + lastStatus[config] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) self.watcher.ptptracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + elif resource_path == constants.SOURCE_SYNC_OS_CLOCK: self.watcher.osclocktracker_context_lock.acquire() sync_state = self.watcher.osclocktracker_context.get('sync_state', @@ -140,8 +201,10 @@ class PtpWatcherDefault: last_event_time = self.watcher.osclocktracker_context.get('last_event_time', time.time()) self.watcher.osclocktracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + lastStatus['os_clock_status'] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) elif resource_path == constants.SOURCE_SYNC_SYNC_STATE: self.watcher.overalltracker_context_lock.acquire() sync_state = self.watcher.overalltracker_context.get('sync_state', @@ -149,16 +212,24 @@ class PtpWatcherDefault: last_event_time = self.watcher.overalltracker_context.get('last_event_time', time.time()) self.watcher.overalltracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + lastStatus['overall_sync_status'] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) LOG.debug("query_status: {}".format(lastStatus)) else: + # Request is for PTP v1 notification + # PTP v1 only supports single instance ptp + instance = self.daemon_context['PTP4L_INSTANCES'][0] + if len(self.daemon_context['PTP4L_INSTANCES']) > 1: + LOG.warning( + "Multiple ptp4l instances configured, retrieving status for %s" % instance) self.watcher.ptptracker_context_lock.acquire() - sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) - last_event_time = self.watcher.ptptracker_context.get('last_event_time', - time.time()) - self.watcher.ptptracker_context_lock.release() - lastStatus = { + sync_state = self.watcher.ptptracker_context[instance].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[instance].get('last_event_time', + time.time()) + lastStatus[constants.PTP_V1_KEY] = { 'ResourceType': ResourceType.TypePTP, 'EventData': { 'State': sync_state @@ -168,6 +239,9 @@ class PtpWatcherDefault: }, 'EventTimestamp': last_event_time } + self.watcher.ptptracker_context_lock.release() + LOG.warning("query_status PTP v1: {}".format(lastStatus)) + return lastStatus def trigger_delivery(self, **rpc_kwargs): @@ -184,7 +258,7 @@ class PtpWatcherDefault: # PTP Context self.ptptracker_context = {} - for config in self.daemon_context['PTP4L_CONFIGS']: + for config in self.daemon_context['PTP4L_INSTANCES']: self.ptptracker_context[config] = self.daemon_context.get( 'ptptracker_context', PtpWatcherDefault.DEFAULT_PTPTRACKER_CONTEXT) self.ptptracker_context[config]['sync_state'] = PtpState.Freerun @@ -197,7 +271,7 @@ class PtpWatcherDefault: # GNSS Context self.gnsstracker_context = {} - for config in self.daemon_context['GNSS_CONFIGS']: + for config in self.daemon_context['GNSS_INSTANCES']: self.gnsstracker_context[config] = self.daemon_context.get( 'gnsstracker_context', PtpWatcherDefault.DEFAULT_GNSSTRACKER_CONTEXT) self.gnsstracker_context[config]['sync_state'] = GnssState.Freerun @@ -238,7 +312,8 @@ class PtpWatcherDefault: self.broker_endpoint.TransportEndpoint, self.registration_broker_endpoint.TransportEndpoint) - self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self) + self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self, + self.daemon_context) # Set forced_publishing to True so that initial states are published # Main loop in run() sets it to false after the first iteration @@ -258,7 +333,7 @@ class PtpWatcherDefault: self.ptp_monitor_list = [ PtpMonitor(config, self.ptptracker_context[config]['holdover_seconds'], self.ptptracker_context[config]['poll_freq_seconds']) for config in - self.daemon_context['PTP4L_CONFIGS']] + self.daemon_context['PTP4L_INSTANCES']] def signal_ptp_event(self): if self.event: @@ -340,7 +415,6 @@ class PtpWatcherDefault: gnss_state = GnssState.Locked os_clock_state = self.os_clock_monitor.get_os_clock_state() - ptp_state = self.ptptracker_context.get('sync_state') if gnss_state is GnssState.Freerun or os_clock_state is OsClockState.Freerun or ptp_state \ @@ -393,11 +467,11 @@ class PtpWatcherDefault: freq = float(self.osclocktracker_context['poll_freq_seconds']) sync_state = self.osclocktracker_context.get('sync_state', 'Unknown') last_event_time = self.osclocktracker_context.get('last_event_time', time.time()) + lastStatus = {} new_event, sync_state, new_event_time = self.__get_os_clock_status( holdover_time, freq, sync_state, last_event_time) - LOG.debug("Got os clock status.") - + LOG.info("os_clock_status: state is %s, new_event is %s " % (sync_state, new_event)) if new_event or forced: self.osclocktracker_context_lock.acquire() self.osclocktracker_context['sync_state'] = sync_state @@ -405,20 +479,10 @@ class PtpWatcherDefault: self.osclocktracker_context_lock.release() LOG.debug("Publish OS Clock Status") - lastStatus = { - 'ResourceType': 'OS Clock', - 'EventData': { - 'State': sync_state - }, - 'ResourceQualifier': { - 'NodeName': self.node_name - }, - 'EventTimestamp': new_event_time - } # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_OS_CLOCK) - lastStatus = { + lastStatus['os_clock_status'] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_OS_CLOCK, @@ -441,6 +505,7 @@ class PtpWatcherDefault: return def __publish_overall_sync_status(self, forced=False): + lastStatus = {} holdover_time = float(self.overalltracker_context['holdover_seconds']) freq = float(self.overalltracker_context['poll_freq_seconds']) sync_state = self.overalltracker_context.get('sync_state', 'Unknown') @@ -448,6 +513,7 @@ class PtpWatcherDefault: new_event, sync_state, new_event_time = self.__get_overall_sync_state( holdover_time, freq, sync_state, last_event_time) + LOG.info("overall_sync_state: state is %s, new_event is %s " % (sync_state, new_event)) if new_event or forced: # Update context @@ -459,7 +525,7 @@ class PtpWatcherDefault: LOG.debug("Publish overall sync status.") resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_SYNC_STATE) - lastStatus = { + lastStatus['overall_sync_status'] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_SYNC_STATE, @@ -477,28 +543,32 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_SYNC_STATE) self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL) def __publish_gnss_status(self, forced=False): - + lastStatus = {} for gnss in self.observer_list: - holdover_time = float(self.gnsstracker_context[gnss.config_file]['holdover_seconds']) - freq = float(self.gnsstracker_context[gnss.config_file]['poll_freq_seconds']) - sync_state = self.gnsstracker_context[gnss.config_file].get('sync_state', 'Unknown') - last_event_time = self.gnsstracker_context[gnss.config_file].get('last_event_time', - time.time()) + holdover_time = float( + self.gnsstracker_context[gnss.ts2phc_service_name]['holdover_seconds']) + freq = float(self.gnsstracker_context[gnss.ts2phc_service_name]['poll_freq_seconds']) + sync_state = self.gnsstracker_context[gnss.ts2phc_service_name].get('sync_state', + 'Unknown') + last_event_time = self.gnsstracker_context[gnss.ts2phc_service_name].get( + 'last_event_time', + time.time()) new_event, sync_state, new_event_time = self.__get_gnss_status( holdover_time, freq, sync_state, last_event_time, gnss) - LOG.debug("GNSS sync_state %s" % sync_state) + LOG.info("%s gnss_status: state is %s, new_event is %s" % ( + gnss.ts2phc_service_name, sync_state, new_event)) if new_event or forced: # update context self.gnsstracker_context_lock.acquire() - self.gnsstracker_context[gnss.config_file]['sync_state'] = sync_state - self.gnsstracker_context[gnss.config_file]['last_event_time'] = new_event_time + self.gnsstracker_context[gnss.ts2phc_service_name]['sync_state'] = sync_state + self.gnsstracker_context[gnss.ts2phc_service_name][ + 'last_event_time'] = new_event_time self.gnsstracker_context_lock.release() LOG.debug("Publish GNSS status.") @@ -506,7 +576,7 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) - lastStatus = { + lastStatus[gnss.ts2phc_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_GNSS_SYNC_STATUS, @@ -530,29 +600,34 @@ class PtpWatcherDefault: return def __publish_ptpstatus(self, forced=False): - + lastStatus = {} + lastClockClassStatus = {} for ptp_monitor in self.ptp_monitor_list: holdover_time = \ - float(self.ptptracker_context[ptp_monitor.ptp4l_config]['holdover_seconds']) - freq = float(self.ptptracker_context[ptp_monitor.ptp4l_config]['poll_freq_seconds']) - sync_state = self.ptptracker_context[ptp_monitor.ptp4l_config]. \ + float(self.ptptracker_context[ptp_monitor.ptp4l_service_name]['holdover_seconds']) + freq = float( + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['poll_freq_seconds']) + sync_state = self.ptptracker_context[ptp_monitor.ptp4l_service_name]. \ get('sync_state', 'Unknown') - last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_config] \ + last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_service_name] \ .get('last_event_time', time.time()) new_event, sync_state, new_event_time = self.__get_ptp_status( holdover_time, freq, sync_state, last_event_time, ptp_monitor) + LOG.info("%s PTP sync state: state is %s, new_event is %s" % ( + ptp_monitor.ptp4l_service_name, sync_state, new_event)) new_clock_class_event, clock_class, clock_class_event_time = \ ptp_monitor.get_ptp_clock_class() - + LOG.info("%s PTP clock class: clockClass is %s, new_event is %s" % ( + ptp_monitor.ptp4l_service_name, clock_class, new_clock_class_event)) if new_event or forced: # update context self.ptptracker_context_lock.acquire() - self.ptptracker_context[ptp_monitor.ptp4l_config]['sync_state'] = sync_state - self.ptptracker_context[ptp_monitor.ptp4l_config][ + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['sync_state'] = sync_state + self.ptptracker_context[ptp_monitor.ptp4l_service_name][ 'last_event_time'] = new_event_time - self.ptptracker_context_lock.release() + # publish new event LOG.debug("Publish ptp status to clients") @@ -571,7 +646,7 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE) - lastStatus = { + lastStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_PTP_LOCK_STATE, @@ -589,6 +664,7 @@ class PtpWatcherDefault: ] } } + self.ptptracker_context_lock.release() self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL) @@ -596,15 +672,16 @@ class PtpWatcherDefault: if new_clock_class_event or forced: # update context self.ptptracker_context_lock.acquire() - self.ptptracker_context[ptp_monitor.ptp4l_config]['clock_class'] = clock_class - self.ptptracker_context[ptp_monitor.ptp4l_config]['last_clock_class_event_time'] \ + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['clock_class'] = clock_class + self.ptptracker_context[ptp_monitor.ptp4l_service_name][ + 'last_clock_class_event_time'] \ = clock_class_event_time - self.ptptracker_context_lock.release() + resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS) - lastClockClassStatus = { + lastClockClassStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_PTP_CLOCK_CLASS, @@ -622,6 +699,7 @@ class PtpWatcherDefault: ] } } + self.ptptracker_context_lock.release() LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name, clock_class)) self.ptpeventproducer.publish_status(lastClockClassStatus, diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py similarity index 79% rename from stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh rename to stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py index 31f957a..5d21185 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py @@ -3,28 +3,15 @@ # # SPDX-License-Identifier: Apache-2.0 # -#!/bin/bash - -# apt-get update -y -# #sleep infinity - -# apt-get install -y gcc -# apt-get install -y python-dev -# apt-get install -y python3-pip - -# export https_proxy=http://128.224.230.5:9090 - -# pip3 install oslo-config -# pip3 install oslo-messaging - -cat </root/ptptracking-daemon.py #!/usr/bin/python3 # -*- coding: UTF-8 -*- import logging +import re import os import json from trackingfunctionsdk.common.helpers import log_helper +from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.services.daemon import DaemonControl LOG = logging.getLogger(__name__) @@ -67,10 +54,21 @@ OS_CLOCK_POLL_FREQ_SECONDS = os.environ.get("OS_CLOCK_POLL_FREQ_SECONDS", 2) OVERALL_HOLDOVER_SECONDS = os.environ.get("OVERALL_HOLDOVER_SECONDS", 30) OVERALL_POLL_FREQ_SECONDS = os.environ.get("OVERALL_POLL_FREQ_SECONDS", 2) -GNSS_CONFIGS = json.loads(os.environ.get("TS2PHC_CONFIGS", '["/ptp/ptpinstance/ts2phc-tc1.conf"]')) -PHC2SYS_CONFIG = os.environ.get("PHC2SYS_CONFIG", "/ptp/ptpinstance/phc2sys-phc-inst1.conf") -PTP4L_CONFIGS = json.loads(os.environ.get("PTP4L_CONFIGS", '["/ptp/ptpinstance/ptp4l-ptp-legacy.conf"]')) +PHC2SYS_CONFIG = "/ptp/ptpinstance/phc2sys-%s.conf" % os.environ.get("PHC2SYS_SERVICE_NAME", "phc2sys-legacy") +PTP4L_INSTANCES = os.environ.get("PTP4L_SERVICE_NAME", "ptp4l-legacy") +PTP4L_INSTANCES = str(PTP4L_INSTANCES).replace('[','').replace(']','') +PTP4L_INSTANCES = PTP4L_INSTANCES.split() +PTP4L_CONFIGS = [] +for item in PTP4L_INSTANCES: + PTP4L_CONFIGS.append("/ptp/ptpinstance/ptp4l-%s.conf" % item) + +GNSS_INSTANCES = os.environ.get("TS2PHC_SERVICE_NAME", None) +GNSS_INSTANCES = str(GNSS_INSTANCES).replace('[','').replace(']','') +GNSS_INSTANCES = GNSS_INSTANCES.split() +GNSS_CONFIGS = [] +for item in GNSS_INSTANCES: + GNSS_CONFIGS.append("/ptp/ptpinstance/ts2phc-%s.conf" % item) context = { 'THIS_NAMESPACE': THIS_NAMESPACE, @@ -81,6 +79,8 @@ context = { 'GNSS_CONFIGS': GNSS_CONFIGS, 'PHC2SYS_CONFIG': PHC2SYS_CONFIG, 'PTP4L_CONFIGS' : PTP4L_CONFIGS, + 'GNSS_INSTANCES': GNSS_INSTANCES, + 'PTP4L_INSTANCES': PTP4L_INSTANCES, 'ptptracker_context': { 'device_simulated': PTP_DEVICE_SIMULATED, @@ -108,7 +108,7 @@ sqlalchemy_conf = { 'pool_recycle': 3600, 'encoding': 'utf-8' } - +LOG.info("PTP tracking service startup context %s" % context) sqlalchemy_conf_json = json.dumps(sqlalchemy_conf) default_daemoncontrol = DaemonControl(sqlalchemy_conf_json, json.dumps(context)) @@ -116,12 +116,5 @@ default_daemoncontrol.refresh() while True: pass -EOF - -echo "done" - -PYTHONPATH=/opt/ptptrackingfunction python3 /root/ptptracking-daemon.py & - -sleep infinity diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml index c8337fe..0a2e185 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml @@ -103,6 +103,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: PYTHONPATH + value: /opt/ptptrackingfunction - name: THIS_NAMESPACE value: {{ .Values.global.namespace }} - name: PTP_DEVICE_SIMULATED @@ -145,11 +147,9 @@ spec: value: "/ptp/ptpinstance/phc2sys-{{.Values.ptptracking.phc2sysServiceName}}.conf" - name: TS2PHC_SERVICE_NAME value: "{{ .Values.ptptracking.ts2phcServiceName }}" - - name: TS2PHC_CONFIGS - value: '["/ptp/ptpinstance/ts2phc-{{.Values.ptptracking.ts2phcServiceName}}.conf"]' - name: LOGGING_LEVEL value: "{{ .Values.ptptracking.log_level }}" - command: ["/bin/bash", "/mnt/ptptracking_start.sh"] + command: ["python3", "/mnt/ptptracking_start.py"] securityContext: privileged: true capabilities: diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml index 3870edf..60de44c 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml @@ -67,7 +67,9 @@ location: ptptracking: imagePullSecrets: default-registry-key ptp4lSocket: /var/run/ptp4l-ptp4l-legacy - ptp4lServiceName: ptp4l-legacy + ptp4lServiceName: + - ptp1 + - ptp2 phc2sysServiceName: phc2sys-legacy ts2phcServiceName: ts2phc-legacy log_level: INFO