diff --git a/notificationclient-base/centos/Dockerfile b/notificationclient-base/centos/Dockerfile index 00d36ae..e898387 100644 --- a/notificationclient-base/centos/Dockerfile +++ b/notificationclient-base/centos/Dockerfile @@ -10,6 +10,7 @@ RUN set -ex ;\ $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ -y \ gcc python3-devel python3-pip \ + && pip3 install --upgrade pip \ && pip3 install --user pecan \ && pip3 install oslo-config \ && pip3 install oslo-messaging \ diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py index f2d205d..dd3245a 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/client/notificationservice.py @@ -58,12 +58,19 @@ class NotificationServiceClient(BrokerClientBase): return def query_resource_status(self, resource_type, - timeout=None, retry=None, resource_qualifier_json=None, resource_address=None): + timeout=None, + retry=None, + resource_qualifier_json=None, + resource_address=None, + optional=None): topic = '{0}-Status'.format(resource_type) server = '{0}-Tracking-{1}'.format(resource_type, self.target_node_name) return self.call( - topic, server, 'QueryStatus', timeout=timeout, retry=retry, - QualifierJson=resource_qualifier_json, ResourceAddress=resource_address) + topic, server, 'QueryStatus', + timeout=timeout, retry=retry, + QualifierJson=resource_qualifier_json, + ResourceAddress=resource_address, + optional=optional) def add_resource_status_listener(self, resource_type, status_handler=None): if not status_handler: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py index f0d24b5..ff36991 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/constants.py @@ -9,6 +9,8 @@ DATA_TYPE_METRIC = "metric" VALUE_TYPE_ENUMERATION = "enumeration" VALUE_TYPE_METRIC = "metric" +PTP_V1_KEY = "ptp_notification_v1" + SOURCE_SYNC_ALL = '/sync' SOURCE_SYNC_GNSS_SYNC_STATUS = '/sync/gnss-status/gnss-sync-status' SOURCE_SYNC_PTP_CLOCK_CLASS = '/sync/ptp-status/clock-class' @@ -19,6 +21,18 @@ SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality' SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended' SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state' +RESOURCE_ADDRESS_MAPPINGS = { + SOURCE_SYNC_ALL: 'sync', + SOURCE_SYNC_GNSS_SYNC_STATUS: 'gnss_sync_state', + SOURCE_SYNC_PTP_CLOCK_CLASS: 'ptp_clock_class', + SOURCE_SYNC_PTP_LOCK_STATE: 'ptp_lock_state', + SOURCE_SYNC_OS_CLOCK: 'os_clock_sync_state', + SOURCE_SYNC_SYNC_STATE: 'sync_state', + SOURCE_SYNCE_CLOCK_QUALITY: 'synce_clock_quality', + SOURCE_SYNCE_LOCK_STATE_EXTENDED: 'synce_lock_state_extended', + SOURCE_SYNCE_LOCK_STATE: 'synce_lock_state' +} + VALID_SOURCE_URI = { SOURCE_SYNC_ALL, SOURCE_SYNC_GNSS_SYNC_STATUS, diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py index f801c02..28e2d21 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/log_helper.py @@ -1,21 +1,22 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -import os import logging +import sys +import os -LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO") def get_logger(module_name): logger = logging.getLogger(module_name) return config_logger(logger) + def config_logger(logger): - ''' - configure the logger: uncomment following lines for debugging - ''' - logger.setLevel(LOGGING_LEVEL) + logging.basicConfig(stream=sys.stdout, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO")) return logger diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 020c0af..25e1621 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -4,27 +4,27 @@ # SPDX-License-Identifier: Apache-2.0 # -import os import json import re import requests import logging - -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from datetime import datetime +from notificationclientsdk.common.helpers import constants +from notificationclientsdk.common.helpers import log_helper LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) + def notify(subscriptioninfo, notification, timeout=2, retry=3): result = False while True: retry = retry - 1 try: headers = {'Content-Type': 'application/json'} - data = json.dumps(notification) + data = format_notification_data(subscriptioninfo, notification) + data = json.dumps(data) url = subscriptioninfo.EndpointUri response = requests.post(url, data=data, headers=headers, timeout=timeout) response.raise_for_status() @@ -52,13 +52,58 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): return result + +def format_notification_data(subscriptioninfo, notification): + if hasattr(subscriptioninfo, 'ResourceType'): + LOG.debug("format_notification_data: Found v1 subscription, no formatting required.") + return notification + elif hasattr(subscriptioninfo, 'ResourceAddress'): + _, _, resource_path, _, _ = parse_resource_address(subscriptioninfo.ResourceAddress) + resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[resource_path] + formatted_notification = {resource_mapped_value: []} + for instance in notification: + # Add the instance identifier to ResourceAddress for PTP lock-state + # and PTP clockClass + if notification[instance]['source'] in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + constants.SOURCE_SYNC_PTP_LOCK_STATE]: + temp_values = notification[instance].get('data', {}).get('values', []) + resource_address = temp_values[0].get('ResourceAddress', None) + if instance not in resource_address: + add_instance_name = resource_address.split('/', 3) + add_instance_name.insert(3, instance) + resource_address = '/'.join(add_instance_name) + notification[instance]['data']['values'][0]['ResourceAddress'] = resource_address + formatted_notification[resource_mapped_value].append(notification[instance]) + for instance in formatted_notification[resource_mapped_value]: + this_delivery_time = instance['time'] + if type(this_delivery_time) != str: + format_time = datetime.fromtimestamp(float(this_delivery_time)).\ + strftime('%Y-%m-%dT%H:%M:%S%fZ') + instance['time'] = format_time + else: + raise Exception("format_notification_data: No valid source address found in notification") + LOG.debug( + "format_notification_data: Added parent key for client consumption: %s" % + formatted_notification) + return formatted_notification + + def parse_resource_address(resource_address): # The format of resource address is: # /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource} - # Assume no optional hierarchy for now clusterName = resource_address.split('/')[1] nodeName = resource_address.split('/')[2] resource_path = '/' + re.split('[/]', resource_address, 3)[3] + resource_list = re.findall(r'[^/]+', resource_path) + if len(resource_list) == 4: + remove_optional = '/' + resource_list[0] + resource_path = resource_path.replace(remove_optional, '') + resource_address = resource_address.replace(remove_optional, '') + optional = resource_list[0] + LOG.debug("Optional hierarchy found when parsing resource address: %s" % optional) + else: + optional = None - return clusterName, nodeName, resource_path - + # resource_address is the full address without any optional hierarchy + # resource_path is the specific identifier for the resource + return clusterName, nodeName, resource_path, optional, resource_address diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py index 0107ab7..09d3f5f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/broker_state_manager.py @@ -17,13 +17,16 @@ from notificationclientsdk.model.dto.broker_state import BrokerState LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper + log_helper.config_logger(LOG) + class BrokerStateManager: ''' Manager to manage broker states Note: Now it is not thread safe ''' + def __init__(self): self.broker_state_map = {} self.disabled_brokers = [] @@ -84,7 +87,7 @@ class BrokerStateManager: for broker_name, brokerstate in self.broker_state_map.items(): try: if brokerstate.any_obsolete_subscription( - self.subscription_refresh_iteration): + self.subscription_refresh_iteration): return True except Exception as ex: LOG.warning( @@ -104,14 +107,17 @@ class BrokerStateManager: broker_name = subscription.ResourceQualifier.NodeName else: # ignore the subscription due to unsupported type - LOG.debug("Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId)) + LOG.debug( + "Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId)) return False else: subscription = SubscriptionInfoV2(subscription_orm) - _, nodename, resource = subscription_helper.parse_resource_address(subscription.ResourceAddress) + _, nodename, resource, _, _ = subscription_helper.parse_resource_address( + subscription.ResourceAddress) broker_name = nodename - LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) + LOG.debug( + "subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status)) if subscription_orm.Status != 1: return False @@ -200,8 +206,10 @@ class BrokerStateManager: # trigger to sync up notification after (re-)connection LOG.debug("Trigger to re-sync up data: {0}".format(broker_name)) result = brokerstate.signal_data_syncup() - elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed(): - LOG.debug("Update watching due to resources changed: {0}".format(broker_name)) + elif brokerstate.is_resource_subscribed_changed() or \ + brokerstate.is_resources_changed(): + LOG.debug( + "Update watching due to resources changed: {0}".format(broker_name)) result = broker_connection_manager.update_watching_resources(brokerstate) # leave the signals as it is to re-sync up in next loop in case of failure diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index af8beaf..722b463 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -26,8 +26,8 @@ from notificationclientsdk.client.notificationservice import NotificationHandler LOG = logging.getLogger(__name__) from notificationclientsdk.common.helpers import log_helper -log_helper.config_logger(LOG) +log_helper.config_logger(LOG) class NotificationHandler(NotificationHandlerBase): @@ -55,26 +55,25 @@ class NotificationHandler(NotificationHandlerBase): if not resource_type: raise Exception("abnormal notification@{0}".format(node_name)) if not resource_type in self.__supported_resource_types: - raise Exception("notification with unsupported resource type:{0}".format(resource_type)) + raise Exception( + "notification with unsupported resource type:{0}".format(resource_type)) this_delivery_time = notification_info['EventTimestamp'] else: - source = notification_info.get('source', None) - values = notification_info.get('data', {}).get('values', []) + parent_key = list(notification_info.keys())[0] + source = notification_info[parent_key].get('source', None) + values = notification_info[parent_key].get('data', {}).get('values', []) resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = notification_info[parent_key].get('time') if not resource_address: raise Exception("No resource address in notification source".format(source)) - _,node_name,_ = subscription_helper.parse_resource_address(resource_address) - this_delivery_time = notification_info['time'] - # Change time from float to ascii format - notification_info['time'] = datetime.fromtimestamp(this_delivery_time).strftime('%Y-%m-%dT%H:%M:%S%fZ') - # notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', - # time.gmtime(this_delivery_time)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) entries = subscription_repo.get(Status=1) for entry in entries: subscriptionid = entry.SubscriptionId if entry.ResourceAddress: - _,entry_node_name,_ = subscription_helper.parse_resource_address(entry.ResourceAddress) + _, entry_node_name, _, _, _ = subscription_helper.parse_resource_address( + entry.ResourceAddress) subscription_dto2 = SubscriptionInfoV2(entry) else: ResourceQualifierJson = entry.ResourceQualifierJson or '{}' @@ -87,10 +86,12 @@ class NotificationHandler(NotificationHandlerBase): continue try: - last_delivery_time = self.__get_latest_delivery_timestamp(node_name, subscriptionid) + last_delivery_time = self.__get_latest_delivery_timestamp(node_name, + subscriptionid) if last_delivery_time and last_delivery_time >= this_delivery_time: # skip this entry since already delivered - LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId)) + LOG.debug("Ignore the outdated notification for: {0}".format( + entry.SubscriptionId)) continue subscription_helper.notify(subscription_dto2, notification_info) @@ -117,7 +118,7 @@ class NotificationHandler(NotificationHandlerBase): del subscription_repo def __get_latest_delivery_timestamp(self, node_name, subscriptionid): - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) return last_delivery_time @@ -126,18 +127,18 @@ class NotificationHandler(NotificationHandlerBase): self.notification_stat[node_name] = { subscriptionid: { 'EventTimestamp': this_delivery_time - } } + } LOG.debug("delivery time @node: {0},subscription:{1} is added".format( node_name, subscriptionid)) elif not self.notification_stat[node_name].get(subscriptionid, None): self.notification_stat[node_name][subscriptionid] = { 'EventTimestamp': this_delivery_time - } + } LOG.debug("delivery time @node: {0},subscription:{1} is added".format( node_name, subscriptionid)) else: - last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{}) + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) if (last_delivery_time and last_delivery_time >= this_delivery_time): return diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py index 4206903..9fca840 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_worker.py @@ -10,6 +10,7 @@ import logging import multiprocessing as mp import threading import sys + if sys.version > '3': import queue as Queue else: @@ -18,6 +19,7 @@ else: from notificationclientsdk.common.helpers import subscription_helper from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.common.helpers import log_helper from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 @@ -38,14 +40,13 @@ from notificationclientsdk.services.broker_connection_manager import BrokerConne from notificationclientsdk.services.notification_handler import NotificationHandler LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) -class NotificationWorker: +class NotificationWorker: class LocationInfoHandler(LocationHandlerBase): '''Glue code to forward location info to daemon method''' + def __init__(self, locationinfo_dispatcher): self.locationinfo_dispatcher = locationinfo_dispatcher super(NotificationWorker.LocationInfoHandler, self).__init__() @@ -55,7 +56,7 @@ class NotificationWorker: return self.locationinfo_dispatcher.produce_location_event(location_info) def __init__( - self, event, subscription_event, daemon_context): + self, event, subscription_event, daemon_context): self.__alive = True @@ -254,7 +255,8 @@ class NotificationWorker: elif s.ResourceAddress: # Get nodename from resource address LOG.info("Parse resource address {}".format(s.ResourceAddress)) - _,nodename,_ = subscription_helper.parse_resource_address(s.ResourceAddress) + _, nodename, _, _, _ = subscription_helper.parse_resource_address( + s.ResourceAddress) broker_name = nodename else: LOG.debug("Subscription {} does not have ResourceType or " @@ -298,7 +300,7 @@ class NotificationWorker: try: LOG.debug("try to sync up data for {0} brokers".format( self.broker_state_manager.count_brokers())) - result,_,_ = self.broker_state_manager.syncup_broker_data( + result, _, _ = self.broker_state_manager.syncup_broker_data( self.broker_connection_manager) except Exception as ex: result = False diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 0d2fb1d..62a33c4 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -12,6 +12,8 @@ from datetime import datetime, timezone from notificationclientsdk.client.notificationservice import NotificationServiceClient from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers import log_helper +from notificationclientsdk.common.helpers import constants from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 @@ -21,14 +23,12 @@ from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.services.daemon import DaemonControl - from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) + class PtpService(object): def __init__(self, daemon_control): @@ -72,7 +72,7 @@ class PtpService(object): finally: del nodeinfo_repo - def query(self, broker_name, resource_address=None): + def query(self, broker_name, resource_address=None, optional=None): default_node_name = NodeInfoHelper.default_node_name(broker_name) broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) @@ -85,9 +85,9 @@ class PtpService(object): ResourceType.TypePTP, default_node_name)) raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) - return self._query(default_node_name, broker_pod_ip, resource_address) + return self._query(default_node_name, broker_pod_ip, resource_address, optional) - def _query(self, broker_name, broker_pod_ip, resource_address=None): + def _query(self, broker_name, broker_pod_ip, resource_address=None, optional=None): broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], @@ -99,7 +99,8 @@ class PtpService(object): notificationservice_client = NotificationServiceClient( broker_name, broker_transport_endpoint, broker_pod_ip) resource_status = notificationservice_client.query_resource_status( - ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address) + ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address, + optional=optional) return resource_status except oslo_messaging.exceptions.MessagingTimeout as ex: LOG.warning("ptp status is not available @node {0} due to {1}".format( @@ -117,7 +118,8 @@ class PtpService(object): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) resource_address = None if hasattr(subscription_dto, 'ResourceAddress'): - _,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress) + _, nodename, _, _, _ = subscription_helper.parse_resource_address(subscription_dto. + ResourceAddress) broker_name = nodename resource_address = subscription_dto.ResourceAddress elif hasattr(subscription_dto, 'ResourceType'): @@ -139,17 +141,22 @@ class PtpService(object): if default_node_name: ptpstatus = None - ptpstatus = self._query(default_node_name, broker_pod_ip, resource_address) + ptpstatus = self._query(default_node_name, + broker_pod_ip, + resource_address, + optional=None) LOG.info("initial ptpstatus:{0}".format(ptpstatus)) # construct subscription entry - timestamp = ptpstatus.get('EventTimestamp', None) - if timestamp is None: - timestamp = ptpstatus.get('time', None) - # Change time from float to ascii format - ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ') - # ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', - # time.gmtime(timestamp)) + if constants.PTP_V1_KEY in ptpstatus: + timestamp = ptpstatus[constants.PTP_V1_KEY].get('EventTimestamp', None) + ptpstatus = ptpstatus[constants.PTP_V1_KEY] + else: + for item in ptpstatus: + timestamp = ptpstatus[item].get('time', None) + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']) \ + .strftime('%Y-%m-%dT%H:%M:%S%fZ') subscription_orm.InitialDeliveryTimestamp = timestamp entry = self.subscription_repo.add(subscription_orm) @@ -179,7 +186,7 @@ class PtpService(object): def remove_subscription(self, subscriptionid): try: # 1, delete entry - self.subscription_repo.delete_one(SubscriptionId = subscriptionid) + self.subscription_repo.delete_one(SubscriptionId=subscriptionid) self.subscription_repo.commit() # 2, refresh daemon self.daemon_control.refresh() diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py index 872b3ec..33ff8d5 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -37,7 +37,8 @@ class ResourceAddressController(object): def CurrentState(self): try: # validate resource address - _, nodename, resource = subscription_helper.parse_resource_address(self.resource_address) + _, nodename, resource, optional, self.resource_address = subscription_helper.\ + parse_resource_address(self.resource_address) if nodename != THIS_NODE_NAME and nodename != '.': LOG.warning("Node {} is not available".format(nodename)) abort(404) @@ -45,11 +46,13 @@ class ResourceAddressController(object): LOG.warning("Resource {} is not valid".format(resource)) abort(404) ptpservice = PtpService(notification_control) - ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address) + ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address, optional) # Change time from float to ascii format # ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', # time.gmtime(ptpstatus['time'])) - ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ') + for item in ptpstatus: + ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']).\ + strftime('%Y-%m-%dT%H:%M:%S%fZ') return ptpstatus except client_exception.NodeNotAvailable as ex: LOG.warning("Node is not available:{0}".format(str(ex))) diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py index e06a5c1..b44da1f 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py @@ -14,22 +14,21 @@ import logging from wsme import types as wtypes from wsmeext.pecan import wsexpose -from notificationclientsdk.model.dto.resourcetype import ResourceType from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 from notificationclientsdk.repository.subscription_repo import SubscriptionRepo from notificationclientsdk.services.ptp import PtpService from notificationclientsdk.exception import client_exception +from notificationclientsdk.common.helpers import log_helper from sidecar.repository.notification_control import notification_control from sidecar.repository.dbcontext_default import defaults LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper log_helper.config_logger(LOG) -THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0') +THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0') + class SubscriptionsControllerV2(rest.RestController): @@ -47,7 +46,7 @@ class SubscriptionsControllerV2(rest.RestController): abort(400) subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format( - conf.server.get('protocol','http'), + conf.server.get('protocol', 'http'), conf.server.get('host', '127.0.0.1'), conf.server.get('port', '8080') ) @@ -77,13 +76,13 @@ class SubscriptionsControllerV2(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) abort(500) except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entries = repo.get(Status=1) response.status = 200 subs = [] @@ -99,7 +98,7 @@ class SubscriptionsControllerV2(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @expose() @@ -115,6 +114,7 @@ class SubscriptionsControllerV2(rest.RestController): except: return False + class SubscriptionController(rest.RestController): def __init__(self, subscription_id): self.subscription_id = subscription_id @@ -122,7 +122,7 @@ class SubscriptionController(rest.RestController): @expose('json') def get(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1) if not entry: @@ -139,13 +139,13 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) @wsexpose(status_code=204) def delete(self): try: - repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False) + repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False) entry = repo.get_one(SubscriptionId=self.subscription_id) if entry: if entry.SubscriptionId: @@ -164,5 +164,5 @@ class SubscriptionController(rest.RestController): LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) raise ex except Exception as ex: - LOG.error("Exception:{0}@{1}".format(type(ex),str(ex))) + LOG.error("Exception:{0}@{1}".format(type(ex), str(ex))) abort(500) diff --git a/notificationservice-base/centos/Dockerfile b/notificationservice-base/centos/Dockerfile index cf916a0..a1557e6 100644 --- a/notificationservice-base/centos/Dockerfile +++ b/notificationservice-base/centos/Dockerfile @@ -10,6 +10,7 @@ RUN set -ex ;\ $(grep '^name=' ${STX_REPO_FILE} | awk -F '=' '{printf "--enablerepo=" $2 " "}') \ -y \ gcc python3-devel python3-pip \ + && pip3 install --upgrade pip \ && pip3 install --user pecan \ && pip3 install pygtail \ && pip3 install oslo-config \ diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py index 46fe83f..35e5c7d 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py @@ -17,8 +17,10 @@ import logging LOG = logging.getLogger(__name__) from trackingfunctionsdk.common.helpers import log_helper + log_helper.config_logger(LOG) + class PtpEventProducer(object): class ListenerEndpoint(object): target = oslo_messaging.Target(namespace='notification', version='1.0') @@ -30,28 +32,28 @@ class PtpEventProducer(object): pass def QueryStatus(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer QueryStatus called %s" %rpc_kwargs) + LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs) if self.handler: return self.handler.query_status(**rpc_kwargs) else: return None def TriggerDelivery(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer TriggerDelivery called %s" %rpc_kwargs) + LOG.debug("PtpEventProducer TriggerDelivery called %s" % rpc_kwargs) if self.handler: return self.handler.trigger_delivery(**rpc_kwargs) else: return None def __init__(self, node_name, local_broker_transport_endpoint, - registration_broker_transport_endpoint=None): + registration_broker_transport_endpoint=None): self.Id = id(self) self.node_name = node_name self.local_broker_client = BrokerClientBase( 'LocalPtpEventProducer', local_broker_transport_endpoint) if registration_broker_transport_endpoint: self.registration_broker_client = BrokerClientBase( - 'AllPtpEventProducer', registration_broker_transport_endpoint) + 'AllPtpEventProducer', registration_broker_transport_endpoint) else: self.registration_broker_client = None return @@ -67,14 +69,16 @@ class PtpEventProducer(object): def publish_status(self, ptpstatus, retry=3): result = False - result1 = self.publish_status_local(ptpstatus, retry) if self.local_broker_client else result - result2 = self.publish_status_all(ptpstatus, retry) if self.registration_broker_client else result + result1 = self.publish_status_local(ptpstatus, + retry) if self.local_broker_client else result + result2 = self.publish_status_all(ptpstatus, + retry) if self.registration_broker_client else result return result1, result2 def publish_status_local(self, ptpstatus, source, retry=3): if not self.local_broker_client: return False - topic='{0}-Event-{1}'.format(source, self.node_name) + topic = '{0}-Event-{1}'.format(source, self.node_name) server = None isretrystopped = False while not isretrystopped: @@ -97,7 +101,7 @@ class PtpEventProducer(object): def publish_status_all(self, ptpstatus, retry=3): if not self.registration_broker_client: return False - topic_all='PTP-Event-*' + topic_all = 'PTP-Event-*' server = None isretrystopped = False while not isretrystopped: @@ -114,13 +118,14 @@ class PtpEventProducer(object): if isretrystopped: LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( - ptpstatus, topic)) + ptpstatus, topic_all)) return isretrystopped == False def start_status_listener(self, handler=None): result = False result1 = self.start_status_listener_local(handler) if self.local_broker_client else result - result2 = self.start_status_listener_all(handler) if self.registration_broker_client else result + result2 = self.start_status_listener_all( + handler) if self.registration_broker_client else result result = result1 and result2 return result @@ -128,8 +133,8 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server='PTP-Tracking-{0}'.format(self.node_name) + topic = 'PTP-Status' + server = 'PTP-Tracking-{0}'.format(self.node_name) endpoints = [PtpEventProducer.ListenerEndpoint(handler)] self.local_broker_client.add_listener( @@ -140,8 +145,8 @@ class PtpEventProducer(object): if not self.registration_broker_client: return False - topic='PTP-Status' - server='PTP-Tracking-{0}'.format(self.node_name) + topic = 'PTP-Status' + server = 'PTP-Tracking-{0}'.format(self.node_name) endpoints = [PtpEventProducer.ListenerEndpoint(handler)] self.registration_broker_client.add_listener( @@ -159,8 +164,8 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) self.local_broker_client.remove_listener( topic, server) @@ -168,8 +173,8 @@ class PtpEventProducer(object): if not self.registration_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) self.registration_broker_client.remove_listener( topic, server) @@ -184,15 +189,15 @@ class PtpEventProducer(object): if not self.local_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) return self.local_broker_client.is_listening( topic, server) def is_listening_all(self): if not self.registration_broker_client: return False - topic='PTP-Status' - server="PTP-Tracking-{0}".format(self.node_name) + topic = 'PTP-Status' + server = "PTP-Tracking-{0}".format(self.node_name) return self.registration_broker_client.is_listening( topic, server) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py index 2ff9ee3..9104b2f 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/cgu_handler.py @@ -80,7 +80,6 @@ class CguHandler: def cgu_output_to_dict(self): # Take raw cgu output and parse it into a dict cgu_output = self.cgu_output_raw.splitlines() - LOG.debug("CGU output: %s" % cgu_output) cgu_dict = {'input': {}, 'EEC DPLL': { 'Current reference': '', diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index 68762de..d9c827d 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -45,6 +45,8 @@ CLOCK_REALTIME = "CLOCK_REALTIME" PHC2SYS_TOLERANCE_LOW = 36999999000 PHC2SYS_TOLERANCE_HIGH = 37000001000 +PTP_V1_KEY = "ptp_notification_v1" + # testing values CGU_PATH_VALID = "/sys/kernel/debug/ice/0000:18:00.0/cgu" diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py index f0c3898..6e8ac2e 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py @@ -5,6 +5,7 @@ # import logging import datetime +import re from abc import ABC, abstractmethod @@ -34,6 +35,13 @@ class GnssMonitor(Observer): def __init__(self, config_file, nmea_serialport=None, pci_addr=None, cgu_path=None): self.config_file = config_file + try: + pattern = '(?<=/ptp/ptpinstance/ts2phc-).*(?=.conf)' + match = re.search(pattern, self.config_file) + self.ts2phc_service_name = match.group() + except AttributeError: + LOG.warning("GnssMonitor: Unable to determine tsphc_service name from %s" + % self.config_file) # Setup GNSS data self.gnss_cgu_handler = CguHandler(config_file, nmea_serialport, pci_addr, cgu_path) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py index 9b3d87d..28e2d21 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/log_helper.py @@ -15,6 +15,8 @@ def get_logger(module_name): def config_logger(logger): - logging.basicConfig(stream=sys.stdout) + logging.basicConfig(stream=sys.stdout, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO")) return logger diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py index 199be60..c4fddcf 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py @@ -50,14 +50,11 @@ class PtpMonitor: pmc_query_results = {} - def __init__(self, ptp4l_config, holdover_time, freq, init=True): + def __init__(self, ptp4l_instance, holdover_time, freq, init=True): if init: - self.ptp4l_config = ptp4l_config - pattern = '(?<=/ptp/ptpinstance/ptp4l-).*(?=.conf)' - match = re.search(pattern, self.ptp4l_config) - self.ptp4l_service_name = match.group() - LOG.debug(self.ptp4l_service_name) + self.ptp4l_config = "/ptp/ptpinstance/ptp4l-%s.conf" % ptp4l_instance + self.ptp4l_service_name = ptp4l_instance self.phc2sys_service_name = os.environ.get('PHC2SYS_SERVICE_NAME', 'phc2sys') self.holdover_time = int(holdover_time) self.freq = int(freq) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index 3183994..62101a2 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -8,7 +8,6 @@ import json import logging import multiprocessing as mp import os -import sys import threading import time from oslo_utils import uuidutils @@ -43,7 +42,6 @@ source_type = { '/sync/synce-status/clock-quality': 'event.sync.synce-status.synce-clock-quality-change', '/sync/synce-status/lock-state-extended': 'event.sync.synce-status.synce-state-change-extended', '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', - '/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change', } '''Entry point of Default Process Worker''' @@ -77,12 +75,13 @@ class PtpWatcherDefault: } class PtpRequestHandlerDefault(object): - def __init__(self, watcher): + def __init__(self, watcher, daemon_context): self.watcher = watcher self.init_time = time.time() + self.daemon_context = daemon_context def _build_event_response(self, resource_path, last_event_time, resource_address, - sync_state): + sync_state, value_type=constants.VALUE_TYPE_ENUMERATION): if resource_path in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS, constants.SOURCE_SYNCE_CLOCK_QUALITY]: data_type = constants.DATA_TYPE_METRIC @@ -100,7 +99,7 @@ class PtpWatcherDefault: { 'data_type': data_type, 'ResourceAddress': resource_address, - 'value_type': constants.VALUE_TYPE_ENUMERATION, + 'value_type': value_type, 'value': sync_state } ] @@ -111,28 +110,90 @@ class PtpWatcherDefault: def query_status(self, **rpc_kwargs): lastStatus = {} resource_address = rpc_kwargs.get('ResourceAddress', None) + optional = rpc_kwargs.get('optional', None) if resource_address: _, nodename, resource_path = utils.parse_resource_address(resource_address) if resource_path == constants.SOURCE_SYNC_ALL: resource_path = constants.SOURCE_SYNC_SYNC_STATE if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS: self.watcher.gnsstracker_context_lock.acquire() - sync_state = self.watcher.gnsstracker_context.get('sync_state', - GnssState.Freerun) - last_event_time = self.watcher.gnsstracker_context.get('last_event_time', - time.time()) + if optional: + sync_state = self.watcher.gnsstracker_context[optional]. \ + get('sync_state', GnssState.Freerun) + last_event_time = self.watcher.gnsstracker_context[optional].get( + 'last_event_time', + time.time()) + lastStatus[optional] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) + else: + for config in self.daemon_context['GNSS_INSTANCES']: + sync_state = self.watcher.gnsstracker_context[config] \ + .get('sync_state', GnssState.Freerun) + last_event_time = self.watcher.gnsstracker_context[config].get( + 'last_event_time', + time.time()) + lastStatus[config] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) self.watcher.gnsstracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) - # elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS: + elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS: + self.watcher.ptptracker_context_lock.acquire() + if optional: + clock_class = self.watcher.ptptracker_context[optional].get('clock_class', + '248') + last_clock_class_event_time = self.watcher.ptptracker_context[optional].get( + 'last_clock_class_event_time', + time.time()) + lastStatus[optional] = \ + self._build_event_response(resource_path, + last_clock_class_event_time, + resource_address, + clock_class, + constants.VALUE_TYPE_METRIC) + else: + for config in self.daemon_context['PTP4L_INSTANCES']: + clock_class = self.watcher.ptptracker_context[config].get('clock_class', + '248') + last_clock_class_event_time = \ + self.watcher.ptptracker_context[config].get( + 'last_clock_class_event_time', + time.time()) + lastStatus[config] = \ + self._build_event_response(resource_path, + last_clock_class_event_time, + resource_address, + clock_class, + constants.VALUE_TYPE_METRIC) + self.watcher.ptptracker_context_lock.release() elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE: self.watcher.ptptracker_context_lock.acquire() - sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) - last_event_time = self.watcher.ptptracker_context.get('last_event_time', - time.time()) + if optional: + sync_state = self.watcher.ptptracker_context[optional].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[optional].get( + 'last_event_time', + time.time()) + lastStatus[optional] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) + else: + for config in self.daemon_context['PTP4L_INSTANCES']: + sync_state = \ + self.watcher.ptptracker_context[config].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[config].get( + 'last_event_time', + time.time()) + lastStatus[config] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) self.watcher.ptptracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + elif resource_path == constants.SOURCE_SYNC_OS_CLOCK: self.watcher.osclocktracker_context_lock.acquire() sync_state = self.watcher.osclocktracker_context.get('sync_state', @@ -140,8 +201,10 @@ class PtpWatcherDefault: last_event_time = self.watcher.osclocktracker_context.get('last_event_time', time.time()) self.watcher.osclocktracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + lastStatus['os_clock_status'] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) elif resource_path == constants.SOURCE_SYNC_SYNC_STATE: self.watcher.overalltracker_context_lock.acquire() sync_state = self.watcher.overalltracker_context.get('sync_state', @@ -149,16 +212,24 @@ class PtpWatcherDefault: last_event_time = self.watcher.overalltracker_context.get('last_event_time', time.time()) self.watcher.overalltracker_context_lock.release() - lastStatus = self._build_event_response(resource_path, last_event_time, - resource_address, sync_state) + lastStatus['overall_sync_status'] = self._build_event_response(resource_path, + last_event_time, + resource_address, + sync_state) LOG.debug("query_status: {}".format(lastStatus)) else: + # Request is for PTP v1 notification + # PTP v1 only supports single instance ptp + instance = self.daemon_context['PTP4L_INSTANCES'][0] + if len(self.daemon_context['PTP4L_INSTANCES']) > 1: + LOG.warning( + "Multiple ptp4l instances configured, retrieving status for %s" % instance) self.watcher.ptptracker_context_lock.acquire() - sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun) - last_event_time = self.watcher.ptptracker_context.get('last_event_time', - time.time()) - self.watcher.ptptracker_context_lock.release() - lastStatus = { + sync_state = self.watcher.ptptracker_context[instance].get('sync_state', + PtpState.Freerun) + last_event_time = self.watcher.ptptracker_context[instance].get('last_event_time', + time.time()) + lastStatus[constants.PTP_V1_KEY] = { 'ResourceType': ResourceType.TypePTP, 'EventData': { 'State': sync_state @@ -168,6 +239,9 @@ class PtpWatcherDefault: }, 'EventTimestamp': last_event_time } + self.watcher.ptptracker_context_lock.release() + LOG.warning("query_status PTP v1: {}".format(lastStatus)) + return lastStatus def trigger_delivery(self, **rpc_kwargs): @@ -184,7 +258,7 @@ class PtpWatcherDefault: # PTP Context self.ptptracker_context = {} - for config in self.daemon_context['PTP4L_CONFIGS']: + for config in self.daemon_context['PTP4L_INSTANCES']: self.ptptracker_context[config] = self.daemon_context.get( 'ptptracker_context', PtpWatcherDefault.DEFAULT_PTPTRACKER_CONTEXT) self.ptptracker_context[config]['sync_state'] = PtpState.Freerun @@ -197,7 +271,7 @@ class PtpWatcherDefault: # GNSS Context self.gnsstracker_context = {} - for config in self.daemon_context['GNSS_CONFIGS']: + for config in self.daemon_context['GNSS_INSTANCES']: self.gnsstracker_context[config] = self.daemon_context.get( 'gnsstracker_context', PtpWatcherDefault.DEFAULT_GNSSTRACKER_CONTEXT) self.gnsstracker_context[config]['sync_state'] = GnssState.Freerun @@ -238,7 +312,8 @@ class PtpWatcherDefault: self.broker_endpoint.TransportEndpoint, self.registration_broker_endpoint.TransportEndpoint) - self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self) + self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self, + self.daemon_context) # Set forced_publishing to True so that initial states are published # Main loop in run() sets it to false after the first iteration @@ -258,7 +333,7 @@ class PtpWatcherDefault: self.ptp_monitor_list = [ PtpMonitor(config, self.ptptracker_context[config]['holdover_seconds'], self.ptptracker_context[config]['poll_freq_seconds']) for config in - self.daemon_context['PTP4L_CONFIGS']] + self.daemon_context['PTP4L_INSTANCES']] def signal_ptp_event(self): if self.event: @@ -340,7 +415,6 @@ class PtpWatcherDefault: gnss_state = GnssState.Locked os_clock_state = self.os_clock_monitor.get_os_clock_state() - ptp_state = self.ptptracker_context.get('sync_state') if gnss_state is GnssState.Freerun or os_clock_state is OsClockState.Freerun or ptp_state \ @@ -393,11 +467,11 @@ class PtpWatcherDefault: freq = float(self.osclocktracker_context['poll_freq_seconds']) sync_state = self.osclocktracker_context.get('sync_state', 'Unknown') last_event_time = self.osclocktracker_context.get('last_event_time', time.time()) + lastStatus = {} new_event, sync_state, new_event_time = self.__get_os_clock_status( holdover_time, freq, sync_state, last_event_time) - LOG.debug("Got os clock status.") - + LOG.info("os_clock_status: state is %s, new_event is %s " % (sync_state, new_event)) if new_event or forced: self.osclocktracker_context_lock.acquire() self.osclocktracker_context['sync_state'] = sync_state @@ -405,20 +479,10 @@ class PtpWatcherDefault: self.osclocktracker_context_lock.release() LOG.debug("Publish OS Clock Status") - lastStatus = { - 'ResourceType': 'OS Clock', - 'EventData': { - 'State': sync_state - }, - 'ResourceQualifier': { - 'NodeName': self.node_name - }, - 'EventTimestamp': new_event_time - } # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_OS_CLOCK) - lastStatus = { + lastStatus['os_clock_status'] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_OS_CLOCK, @@ -441,6 +505,7 @@ class PtpWatcherDefault: return def __publish_overall_sync_status(self, forced=False): + lastStatus = {} holdover_time = float(self.overalltracker_context['holdover_seconds']) freq = float(self.overalltracker_context['poll_freq_seconds']) sync_state = self.overalltracker_context.get('sync_state', 'Unknown') @@ -448,6 +513,7 @@ class PtpWatcherDefault: new_event, sync_state, new_event_time = self.__get_overall_sync_state( holdover_time, freq, sync_state, last_event_time) + LOG.info("overall_sync_state: state is %s, new_event is %s " % (sync_state, new_event)) if new_event or forced: # Update context @@ -459,7 +525,7 @@ class PtpWatcherDefault: LOG.debug("Publish overall sync status.") resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_SYNC_STATE) - lastStatus = { + lastStatus['overall_sync_status'] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_SYNC_STATE, @@ -477,28 +543,32 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_SYNC_STATE) self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL) def __publish_gnss_status(self, forced=False): - + lastStatus = {} for gnss in self.observer_list: - holdover_time = float(self.gnsstracker_context[gnss.config_file]['holdover_seconds']) - freq = float(self.gnsstracker_context[gnss.config_file]['poll_freq_seconds']) - sync_state = self.gnsstracker_context[gnss.config_file].get('sync_state', 'Unknown') - last_event_time = self.gnsstracker_context[gnss.config_file].get('last_event_time', - time.time()) + holdover_time = float( + self.gnsstracker_context[gnss.ts2phc_service_name]['holdover_seconds']) + freq = float(self.gnsstracker_context[gnss.ts2phc_service_name]['poll_freq_seconds']) + sync_state = self.gnsstracker_context[gnss.ts2phc_service_name].get('sync_state', + 'Unknown') + last_event_time = self.gnsstracker_context[gnss.ts2phc_service_name].get( + 'last_event_time', + time.time()) new_event, sync_state, new_event_time = self.__get_gnss_status( holdover_time, freq, sync_state, last_event_time, gnss) - LOG.debug("GNSS sync_state %s" % sync_state) + LOG.info("%s gnss_status: state is %s, new_event is %s" % ( + gnss.ts2phc_service_name, sync_state, new_event)) if new_event or forced: # update context self.gnsstracker_context_lock.acquire() - self.gnsstracker_context[gnss.config_file]['sync_state'] = sync_state - self.gnsstracker_context[gnss.config_file]['last_event_time'] = new_event_time + self.gnsstracker_context[gnss.ts2phc_service_name]['sync_state'] = sync_state + self.gnsstracker_context[gnss.ts2phc_service_name][ + 'last_event_time'] = new_event_time self.gnsstracker_context_lock.release() LOG.debug("Publish GNSS status.") @@ -506,7 +576,7 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) - lastStatus = { + lastStatus[gnss.ts2phc_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_GNSS_SYNC_STATUS, @@ -530,29 +600,34 @@ class PtpWatcherDefault: return def __publish_ptpstatus(self, forced=False): - + lastStatus = {} + lastClockClassStatus = {} for ptp_monitor in self.ptp_monitor_list: holdover_time = \ - float(self.ptptracker_context[ptp_monitor.ptp4l_config]['holdover_seconds']) - freq = float(self.ptptracker_context[ptp_monitor.ptp4l_config]['poll_freq_seconds']) - sync_state = self.ptptracker_context[ptp_monitor.ptp4l_config]. \ + float(self.ptptracker_context[ptp_monitor.ptp4l_service_name]['holdover_seconds']) + freq = float( + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['poll_freq_seconds']) + sync_state = self.ptptracker_context[ptp_monitor.ptp4l_service_name]. \ get('sync_state', 'Unknown') - last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_config] \ + last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_service_name] \ .get('last_event_time', time.time()) new_event, sync_state, new_event_time = self.__get_ptp_status( holdover_time, freq, sync_state, last_event_time, ptp_monitor) + LOG.info("%s PTP sync state: state is %s, new_event is %s" % ( + ptp_monitor.ptp4l_service_name, sync_state, new_event)) new_clock_class_event, clock_class, clock_class_event_time = \ ptp_monitor.get_ptp_clock_class() - + LOG.info("%s PTP clock class: clockClass is %s, new_event is %s" % ( + ptp_monitor.ptp4l_service_name, clock_class, new_clock_class_event)) if new_event or forced: # update context self.ptptracker_context_lock.acquire() - self.ptptracker_context[ptp_monitor.ptp4l_config]['sync_state'] = sync_state - self.ptptracker_context[ptp_monitor.ptp4l_config][ + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['sync_state'] = sync_state + self.ptptracker_context[ptp_monitor.ptp4l_service_name][ 'last_event_time'] = new_event_time - self.ptptracker_context_lock.release() + # publish new event LOG.debug("Publish ptp status to clients") @@ -571,7 +646,7 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE) - lastStatus = { + lastStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_PTP_LOCK_STATE, @@ -589,6 +664,7 @@ class PtpWatcherDefault: ] } } + self.ptptracker_context_lock.release() self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL) @@ -596,15 +672,16 @@ class PtpWatcherDefault: if new_clock_class_event or forced: # update context self.ptptracker_context_lock.acquire() - self.ptptracker_context[ptp_monitor.ptp4l_config]['clock_class'] = clock_class - self.ptptracker_context[ptp_monitor.ptp4l_config]['last_clock_class_event_time'] \ + self.ptptracker_context[ptp_monitor.ptp4l_service_name]['clock_class'] = clock_class + self.ptptracker_context[ptp_monitor.ptp4l_service_name][ + 'last_clock_class_event_time'] \ = clock_class_event_time - self.ptptracker_context_lock.release() + resource_address = utils.format_resource_address( self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS) - lastClockClassStatus = { + lastClockClassStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, 'source': constants.SOURCE_SYNC_PTP_CLOCK_CLASS, @@ -622,6 +699,7 @@ class PtpWatcherDefault: ] } } + self.ptptracker_context_lock.release() LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name, clock_class)) self.ptpeventproducer.publish_status(lastClockClassStatus, diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py similarity index 79% rename from stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh rename to stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py index 31f957a..5d21185 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.sh +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/resources/scripts/init/ptptracking_start.py @@ -3,28 +3,15 @@ # # SPDX-License-Identifier: Apache-2.0 # -#!/bin/bash - -# apt-get update -y -# #sleep infinity - -# apt-get install -y gcc -# apt-get install -y python-dev -# apt-get install -y python3-pip - -# export https_proxy=http://128.224.230.5:9090 - -# pip3 install oslo-config -# pip3 install oslo-messaging - -cat </root/ptptracking-daemon.py #!/usr/bin/python3 # -*- coding: UTF-8 -*- import logging +import re import os import json from trackingfunctionsdk.common.helpers import log_helper +from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.services.daemon import DaemonControl LOG = logging.getLogger(__name__) @@ -67,10 +54,21 @@ OS_CLOCK_POLL_FREQ_SECONDS = os.environ.get("OS_CLOCK_POLL_FREQ_SECONDS", 2) OVERALL_HOLDOVER_SECONDS = os.environ.get("OVERALL_HOLDOVER_SECONDS", 30) OVERALL_POLL_FREQ_SECONDS = os.environ.get("OVERALL_POLL_FREQ_SECONDS", 2) -GNSS_CONFIGS = json.loads(os.environ.get("TS2PHC_CONFIGS", '["/ptp/ptpinstance/ts2phc-tc1.conf"]')) -PHC2SYS_CONFIG = os.environ.get("PHC2SYS_CONFIG", "/ptp/ptpinstance/phc2sys-phc-inst1.conf") -PTP4L_CONFIGS = json.loads(os.environ.get("PTP4L_CONFIGS", '["/ptp/ptpinstance/ptp4l-ptp-legacy.conf"]')) +PHC2SYS_CONFIG = "/ptp/ptpinstance/phc2sys-%s.conf" % os.environ.get("PHC2SYS_SERVICE_NAME", "phc2sys-legacy") +PTP4L_INSTANCES = os.environ.get("PTP4L_SERVICE_NAME", "ptp4l-legacy") +PTP4L_INSTANCES = str(PTP4L_INSTANCES).replace('[','').replace(']','') +PTP4L_INSTANCES = PTP4L_INSTANCES.split() +PTP4L_CONFIGS = [] +for item in PTP4L_INSTANCES: + PTP4L_CONFIGS.append("/ptp/ptpinstance/ptp4l-%s.conf" % item) + +GNSS_INSTANCES = os.environ.get("TS2PHC_SERVICE_NAME", None) +GNSS_INSTANCES = str(GNSS_INSTANCES).replace('[','').replace(']','') +GNSS_INSTANCES = GNSS_INSTANCES.split() +GNSS_CONFIGS = [] +for item in GNSS_INSTANCES: + GNSS_CONFIGS.append("/ptp/ptpinstance/ts2phc-%s.conf" % item) context = { 'THIS_NAMESPACE': THIS_NAMESPACE, @@ -81,6 +79,8 @@ context = { 'GNSS_CONFIGS': GNSS_CONFIGS, 'PHC2SYS_CONFIG': PHC2SYS_CONFIG, 'PTP4L_CONFIGS' : PTP4L_CONFIGS, + 'GNSS_INSTANCES': GNSS_INSTANCES, + 'PTP4L_INSTANCES': PTP4L_INSTANCES, 'ptptracker_context': { 'device_simulated': PTP_DEVICE_SIMULATED, @@ -108,7 +108,7 @@ sqlalchemy_conf = { 'pool_recycle': 3600, 'encoding': 'utf-8' } - +LOG.info("PTP tracking service startup context %s" % context) sqlalchemy_conf_json = json.dumps(sqlalchemy_conf) default_daemoncontrol = DaemonControl(sqlalchemy_conf_json, json.dumps(context)) @@ -116,12 +116,5 @@ default_daemoncontrol.refresh() while True: pass -EOF - -echo "done" - -PYTHONPATH=/opt/ptptrackingfunction python3 /root/ptptracking-daemon.py & - -sleep infinity diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml index c8337fe..0a2e185 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/templates/daemonset.yaml @@ -103,6 +103,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: PYTHONPATH + value: /opt/ptptrackingfunction - name: THIS_NAMESPACE value: {{ .Values.global.namespace }} - name: PTP_DEVICE_SIMULATED @@ -145,11 +147,9 @@ spec: value: "/ptp/ptpinstance/phc2sys-{{.Values.ptptracking.phc2sysServiceName}}.conf" - name: TS2PHC_SERVICE_NAME value: "{{ .Values.ptptracking.ts2phcServiceName }}" - - name: TS2PHC_CONFIGS - value: '["/ptp/ptpinstance/ts2phc-{{.Values.ptptracking.ts2phcServiceName}}.conf"]' - name: LOGGING_LEVEL value: "{{ .Values.ptptracking.log_level }}" - command: ["/bin/bash", "/mnt/ptptracking_start.sh"] + command: ["python3", "/mnt/ptptracking_start.py"] securityContext: privileged: true capabilities: diff --git a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml index 3870edf..60de44c 100644 --- a/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml +++ b/stx-ptp-notification-helm/stx-ptp-notification-helm/helm-charts/ptp-notification/values.yaml @@ -67,7 +67,9 @@ location: ptptracking: imagePullSecrets: default-registry-key ptp4lSocket: /var/run/ptp4l-ptp4l-legacy - ptp4lServiceName: ptp4l-legacy + ptp4lServiceName: + - ptp1 + - ptp2 phc2sysServiceName: phc2sys-legacy ts2phcServiceName: ts2phc-legacy log_level: INFO