diff --git a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml index e2b1773..9e1bdf1 100644 --- a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml +++ b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/templates/daemonset.yaml @@ -176,6 +176,8 @@ spec: value: "{{ .Values.ptptrackingv2.log_level }}" - name: CONTROL_TIMEOUT value: "{{ .Values.ptptrackingv2.control_timeout }}" + - name: NOTIFICATION_FORMAT + value: "{{ .Values.ptptrackingv2.notification_format }}" command: ["python3", "/mnt/ptptracking_start_v2.py"] {{- if .Values.ptptrackingv2.endpoint.liveness }} livenessProbe: diff --git a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml index 41eb36a..1b841df 100644 --- a/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml +++ b/helm-charts/custom/ptp-notification-helm/ptp-notification-helm/ptp-notification/values.yaml @@ -103,6 +103,7 @@ ptptracking: ptptrackingv2: enabled: True imagePullSecrets: default-registry-key + notification_format: "standard" ptp4lSocket: /var/run/ptp4l-ptp4l-legacy ptp4lServiceName: True ptp4lClockClassLockedList: "6,7,135" diff --git a/notificationclient-base/debian/Dockerfile b/notificationclient-base/debian/Dockerfile index d5cf9b5..4917057 100644 --- a/notificationclient-base/debian/Dockerfile +++ b/notificationclient-base/debian/Dockerfile @@ -9,6 +9,7 @@ RUN apt-get -y update \ gcc \ python3-dev \ python3 \ + curl \ && apt-get -y clean \ && rm -rf /var/lib/apt/lists/* RUN pip3 install --user pecan \ diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 133d27d..205d5aa 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -1,17 +1,16 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import json +import logging import re +from datetime import datetime import requests -import logging -from datetime import datetime -from notificationclientsdk.common.helpers import constants -from notificationclientsdk.common.helpers import log_helper +from notificationclientsdk.common.helpers import constants, log_helper from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) @@ -33,13 +32,36 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): timeout=timeout) response.raise_for_status() else: - # version 2 - for item in notification: - data = format_notification_data(subscriptioninfo, {item: notification[item]}) - data = json.dumps(data) - response = requests.post(url, data=data, headers=headers, - timeout=timeout) - response.raise_for_status() + if isinstance(notification, list): + # List-type notification response format + LOG.debug("Formatting subscription response: list") + # Post notification for each list item + for item in notification: + data = json.dumps(item) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + else: + # Dict type notification response format + LOG.debug("Formatting subscription response: dict") + if notification.get('id', None): + # Not a nested dict, post the data + data = json.dumps(notification) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + else: + for item in notification: + # Nested dict with instance tags, post each item + data = format_notification_data(subscriptioninfo, {item: notification[item]}) + data = json.dumps(data) + LOG.info("Notification to post %s", (data)) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() + if notification == {}: if hasattr(subscriptioninfo, 'ResourceType'): resource = "{'ResourceType':'" + \ @@ -64,6 +86,7 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): raise errt except requests.exceptions.RequestException as ex: LOG.warning("Failed to notify due to: {0}".format(str(ex))) + LOG.warning(" %s", (notification)) raise ex except requests.exceptions.HTTPError as ex: LOG.warning("Failed to notify due to: {0}".format(str(ex))) @@ -74,8 +97,11 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): return result - def format_notification_data(subscriptioninfo, notification): + if isinstance(notification, list): + return notification + + # Formatting for legacy notification if hasattr(subscriptioninfo, 'ResourceType'): LOG.debug("format_notification_data: Found v1 subscription, " "no formatting required.") @@ -112,7 +138,7 @@ def format_notification_data(subscriptioninfo, notification): float(this_delivery_time)).strftime('%Y-%m-%dT%H:%M:%S%fZ') instance['time'] = format_time else: - raise Exception("format_notification_data: No valid source " + LOG.warning("format_notification_data: No valid source " "address found in notification") LOG.debug("format_notification_data: Added parent key for client " "consumption: %s" % formatted_notification) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index 412ff50..f6b7e22 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -48,34 +48,47 @@ class NotificationHandler(NotificationHandlerBase): resource_address = None try: self.notification_lock.acquire() + LOG.info("Notification handler notification_info %s", notification_info) subscription_repo = SubscriptionRepo(autocommit=True) - resource_type = notification_info.get('ResourceType', None) - # Get nodename from resource address - if resource_type: - node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) - if not resource_type: - raise Exception("abnormal notification@{0}".format(node_name)) - if not resource_type in self.__supported_resource_types: - raise Exception( - "notification with unsupported resource type:{0}".format(resource_type)) - this_delivery_time = notification_info['EventTimestamp'] - # Get subscriptions from DB to deliver notification to - entries = subscription_repo.get(Status=1, ResourceType=resource_type) - else: - parent_key = list(notification_info.keys())[0] - source = notification_info[parent_key].get('source', None) - values = notification_info[parent_key].get('data', {}).get('values', []) - resource_address = values[0].get('ResourceAddress', None) - this_delivery_time = notification_info[parent_key].get('time') - if not resource_address: - raise Exception("No resource address in notification source".format(source)) - _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) - # Get subscriptions from DB to deliver notification to. - # Unable to filter on resource_address here because resource_address may contain - # either an unknown node name (ie. controller-0) or a '/./' resulting in entries - # being missed. Instead, filter these v2 subscriptions in the for loop below once - # the resource path has been obtained. - entries = subscription_repo.get(Status=1) + if isinstance(notification_info, dict): + resource_type = notification_info.get('ResourceType', None) + # Get nodename from resource address + if resource_type: + node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) + if not resource_type: + raise Exception("abnormal notification@{0}".format(node_name)) + if not resource_type in self.__supported_resource_types: + raise Exception( + "notification with unsupported resource type:{0}".format(resource_type)) + this_delivery_time = notification_info['EventTimestamp'] + # Get subscriptions from DB to deliver notification to + entries = subscription_repo.get(Status=1, ResourceType=resource_type) + else: + parent_key = list(notification_info.keys())[0] + source = notification_info[parent_key].get('source', None) + values = notification_info[parent_key].get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = notification_info[parent_key].get('time') + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) + # Get subscriptions from DB to deliver notification to. + # Unable to filter on resource_address here because resource_address may contain + # either an unknown node name (ie. controller-0) or a '/./' resulting in entries + # being missed. Instead, filter these v2 subscriptions in the for loop below once + # the resource path has been obtained. + entries = subscription_repo.get(Status=1) + elif isinstance(notification_info, list): + LOG.debug("Handle list") + for item in notification_info: + source = item.get('source', None) + values = item.get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = item.get('time') + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) + entries = subscription_repo.get(Status=1) for entry in entries: subscriptionid = entry.SubscriptionId @@ -106,10 +119,11 @@ class NotificationHandler(NotificationHandlerBase): entry.SubscriptionId)) continue - subscription_helper.notify(subscription_dto2, notification_info) - LOG.debug("notification is delivered successfully to {0}".format( + notification_to_send = self.__format_timestamps(notification_info) + LOG.info("Sending notification to subscribers: %s", notification_to_send) + subscription_helper.notify(subscription_dto2, notification_to_send) + LOG.info("notification is delivered successfully to {0}".format( entry.SubscriptionId)) - self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) except Exception as ex: @@ -129,6 +143,28 @@ class NotificationHandler(NotificationHandlerBase): if not subscription_repo: del subscription_repo + def __format_timestamps(self, ptpstatus): + if isinstance(ptpstatus, list): + LOG.debug("Format timestamps for standard subscription response") + for item in ptpstatus: + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, dict): + LOG.debug("Format timestamps for response with instance tags") + try: + for item in ptpstatus: + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except (TypeError, AttributeError): + LOG.debug("Format timestamp for single notification") + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + return ptpstatus + def __get_latest_delivery_timestamp(self, node_name, subscriptionid): last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) @@ -152,6 +188,7 @@ class NotificationHandler(NotificationHandlerBase): else: last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + LOG.debug("last_delivery_time %s this_delivery_time %s" % (last_delivery_time, this_delivery_time)) if (last_delivery_time and last_delivery_time >= this_delivery_time): return last_delivery_stat['EventTimestamp'] = this_delivery_time diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index b5297e5..edf289b 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -1,30 +1,29 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -import oslo_messaging -import logging import json -import kombu -import requests +import logging from datetime import datetime -from notificationclientsdk.client.notificationservice \ - import NotificationServiceClient -from notificationclientsdk.common.helpers import subscription_helper -from notificationclientsdk.common.helpers import log_helper -from notificationclientsdk.common.helpers import constants +import kombu +import oslo_messaging +import requests +from notificationclientsdk.client.notificationservice import \ + NotificationServiceClient +from notificationclientsdk.common.helpers import (constants, log_helper, + subscription_helper) from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper +from notificationclientsdk.exception import client_exception from notificationclientsdk.model.dto.resourcetype import ResourceType -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 -from notificationclientsdk.model.orm.subscription \ - import Subscription as SubscriptionOrm +from notificationclientsdk.model.dto.subscription import (SubscriptionInfoV1, + SubscriptionInfoV2) +from notificationclientsdk.model.orm.subscription import \ + Subscription as SubscriptionOrm from notificationclientsdk.repository.node_repo import NodeRepo from notificationclientsdk.repository.subscription_repo import SubscriptionRepo -from notificationclientsdk.exception import client_exception LOG = logging.getLogger(__name__) log_helper.config_logger(LOG) @@ -272,18 +271,33 @@ class PtpService(object): timestamp = ptpstatus[constants.PTP_V1_KEY].get( 'EventTimestamp', None) ptpstatus = ptpstatus[constants.PTP_V1_KEY] - else: + elif isinstance(ptpstatus, list): + LOG.debug("Format timestamps for standard subscription response") for item in ptpstatus: - timestamp = ptpstatus[item].get('time', None) - # Change time from float to ascii format - ptpstatus[item]['time'] = datetime.fromtimestamp( - ptpstatus[item]['time']).strftime( + timestamp = item.get('time', None) + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, dict): + LOG.debug("Format timestamps for response with instance tags") + try: + for item in ptpstatus: + timestamp = ptpstatus[item].get('time', None) + # Change time from float to ascii format + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except (TypeError, AttributeError): + LOG.debug("Format timestamp for single notification") + timestamp = ptpstatus.get('time', None) + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( '%Y-%m-%dT%H:%M:%S%fZ') - nodes[default_node_name] = ptpstatus subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) subscription_orm.InitialDeliveryTimestamp = timestamp + LOG.debug("Setting initial delivery timestamp %s", timestamp) entry = self.subscription_repo.add(subscription_orm) # Delivery the initial notification of ptp status diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py index 1e26426..a74a1e2 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/resource_address.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022 Wind River Systems, Inc. +# Copyright (c) 2022-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -36,7 +36,7 @@ class ResourceAddressController(object): self.resource_address) if nodename == constants.WILDCARD_CURRENT_NODE: nodename = notification_control.get_residing_nodename() - LOG.debug('Nodename to query: %s' % nodename) + LOG.info('Nodename to query: %s' % nodename) if not notification_control.in_service_nodenames(nodename): LOG.warning("Node {} is not available".format(nodename)) raise client_exception.NodeNotAvailable(nodename) @@ -46,11 +46,23 @@ class ResourceAddressController(object): ptpservice = PtpService(notification_control) ptpstatus = ptpservice.query(nodename, self.resource_address, optional) - LOG.debug('Got ptpstatus: %s' % ptpstatus) - for item in ptpstatus: - ptpstatus[item]['time'] = datetime.fromtimestamp( - ptpstatus[item]['time']).strftime( - '%Y-%m-%dT%H:%M:%S%fZ') + LOG.info('Received ptpstatus: %s', ptpstatus) + if isinstance(ptpstatus, dict): + try: + for item in ptpstatus: + ptpstatus[item]['time'] = datetime.fromtimestamp( + ptpstatus[item]['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + except TypeError: + # ptpstatus does not have instance tags + ptpstatus['time'] = datetime.fromtimestamp( + ptpstatus['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') + elif isinstance(ptpstatus, list): + for item in ptpstatus: + item['time'] = datetime.fromtimestamp( + item['time']).strftime( + '%Y-%m-%dT%H:%M:%S%fZ') return ptpstatus except client_exception.NodeNotAvailable as ex: LOG.warning("{0}".format(str(ex))) @@ -66,7 +78,8 @@ class ResourceAddressController(object): # raise ex abort(400) except TypeError as ex: - LOG.error("Resource {0} not found on {1}".format(self.resource_address, nodename)) + LOG.error("Resource {0} not found on {1}, error: {2}".format( + self.resource_address, nodename, ex)) abort(404) except HTTPServerError as ex: LOG.error("Server side error:{0},{1}".format(type(ex), str(ex))) diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py index d06d396..d0a21d5 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/client/ptpeventproducer.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -16,7 +16,7 @@ import logging LOG = logging.getLogger(__name__) -from trackingfunctionsdk.common.helpers import log_helper +from trackingfunctionsdk.common.helpers import log_helper, constants log_helper.config_logger(LOG) @@ -32,7 +32,8 @@ class PtpEventProducer(object): pass def QueryStatus(self, ctx, **rpc_kwargs): - LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs) + # This is where the "GET" commands run through + LOG.info("PtpEventProducer QueryStatus called %s" % rpc_kwargs) if self.handler: return self.handler.query_status(**rpc_kwargs) else: @@ -85,16 +86,16 @@ class PtpEventProducer(object): try: self.local_broker_client.cast( topic, 'NotifyStatus', notification=ptpstatus) - LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic)) + LOG.debug("Published ptp status local:{0}@Topic:{1}".format(ptpstatus, topic)) break except Exception as ex: - LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format( + LOG.warning("Failed to publish ptp status local:{0}@Topic:{1} due to: {2}".format( ptpstatus, topic, str(ex))) retry = retry - 1 isretrystopped = False if retry > 0 else True if isretrystopped: - LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + LOG.error("Failed to publish ptp status local:{0}@Topic:{1}".format( ptpstatus, topic)) return isretrystopped == False @@ -108,16 +109,16 @@ class PtpEventProducer(object): try: self.registration_broker_client.cast( topic_all, 'NotifyStatus', notification=ptpstatus) - LOG.debug("Published ptp status:{0}@Topic:{1}".format(ptpstatus, topic_all)) + LOG.debug("Published ptp status all:{0}@Topic:{1}".format(ptpstatus, topic_all)) break except Exception as ex: - LOG.warning("Failed to publish ptp status:{0}@Topic:{1} due to: {2}".format( + LOG.warning("Failed to publish ptp status all:{0}@Topic:{1} due to: {2}".format( ptpstatus, topic_all, str(ex))) retry = retry - 1 isretrystopped = False if retry > 0 else True if isretrystopped: - LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format( + LOG.error("Failed to publish ptp status all:{0}@Topic:{1}".format( ptpstatus, topic_all)) return isretrystopped == False diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index 2434616..fac8711 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -1,10 +1,11 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # from os import path +import os # phc states constants FREERUN_PHC_STATE = "Freerun" @@ -41,6 +42,9 @@ GNSS_DPLL_1 = "DPLL1" UTC_OFFSET = "37" +# Notification formatting +NOTIFICATION_FORMAT = os.environ.get("NOTIFICATION_FORMAT", 'standard') + if path.exists('/ptp/linuxptp/ptpinstance'): LINUXPTP_CONFIG_PATH = '/ptp/linuxptp/ptpinstance/' elif path.exists('/ptp/ptpinstance'): diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py index 632b14a..bd64948 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py @@ -15,6 +15,7 @@ from glob import glob from trackingfunctionsdk.common.helpers import log_helper from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.model.dto.osclockstate import OsClockState +from trackingfunctionsdk.common.helpers import ptpsync as utils LOG = logging.getLogger(__name__) log_helper.config_logger(LOG) @@ -304,12 +305,17 @@ class OsClockMonitor: def set_os_clock_state(self): offset_int = int(self.offset) + _, _, phc2sys, _ = \ + utils.check_critical_resources('', self.phc2sys_instance) if offset_int > self.phc2sys_tolerance_high or \ offset_int < self.phc2sys_tolerance_low: LOG.warning("PHC2SYS offset is outside of tolerance") self._state = OsClockState.Freerun + elif not phc2sys: + LOG.warning("Phc2sys instance %s is not running", self.phc2sys_instance) + self._state = OsClockState.Freerun else: - LOG.info("PHC2SYS offset is within tolerance") + LOG.info("PHC2SYS offset is within tolerance: %s", offset_int) self._state = OsClockState.Locked # Perform an extra check for HA Phc2sys to ensure we have a source interface @@ -341,7 +347,7 @@ class OsClockMonitor: self._state = constants.HOLDOVER_PHC_STATE elif previous_sync_state == constants.HOLDOVER_PHC_STATE and \ time_in_holdover < max_holdover_time: - LOG.debug("OS Clock: Time in holdover is %s " + LOG.info("OS Clock: Time in holdover is %s " "Max time in holdover is %s" % (time_in_holdover, max_holdover_time)) self._state = constants.HOLDOVER_PHC_STATE diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py index 652c2ad..a6e3147 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -83,7 +83,7 @@ class PtpMonitor: def set_ptp_clock_class(self): try: - clock_class = self.pmc_query_results['clockClass'] + clock_class = self.pmc_query_results['gm.ClockClass'] # Reset retry counter upon getting clock class self._clock_class_retry = 3 except KeyError: @@ -132,11 +132,12 @@ class PtpMonitor: # max holdover time is calculated to be in a 'safety' zone max_holdover_time = (self.holdover_time - self.freq * 2) - pmc, ptp4l, phc2sys, ptp4lconf = \ + pmc, ptp4l, _, ptp4lconf = \ utils.check_critical_resources(self.ptp4l_service_name, self.phc2sys_service_name) # run pmc command if preconditions met - if pmc and ptp4l and phc2sys and ptp4lconf: + # Removed check for phc2sys, ptp4l status should not depend on it + if pmc and ptp4l and ptp4lconf: self.pmc_query_results, total_ptp_keywords, port_count = \ self.ptpsync() try: @@ -147,12 +148,11 @@ class PtpMonitor: sync_state = previous_sync_state else: LOG.warning("Missing critical resource: " - "PMC %s PTP4L %s PHC2SYS %s PTP4LCONF %s" - % (pmc, ptp4l, phc2sys, ptp4lconf)) + "PMC %s PTP4L %s PTP4LCONF %s" + % (pmc, ptp4l, ptp4lconf)) sync_state = PtpState.Freerun - # determine if transition into holdover mode (cannot be in holdover if - # system clock is not in sync) - if sync_state == PtpState.Freerun and phc2sys: + # determine if transition into holdover mode + if sync_state == PtpState.Freerun: if previous_sync_state in [constants.UNKNOWN_PHC_STATE, PtpState.Freerun]: sync_state = PtpState.Freerun diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py index ae040a6..646376f 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py @@ -1,6 +1,6 @@ #! /usr/bin/python3 # -# Copyright (c) 2021-2023 Wind River Systems, Inc. +# Copyright (c) 2021-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -114,7 +114,12 @@ def parse_resource_address(resource_address): return clusterName, nodeName, resource_path -def format_resource_address(node_name, resource): +def format_resource_address(node_name, resource, instance=None): # Return a resource_address - resource_address = '/./' + node_name + resource + resource_address = '/./' + node_name + if instance: + resource_address = resource_address + '/' + instance + resource + else: + resource_address = resource_address + resource + LOG.debug("format_resource_address %s" % resource_address) return resource_address diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index d96f7f1..860fc4b 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -117,7 +117,12 @@ class PtpWatcherDefault: return lastStatus def query_status(self, **rpc_kwargs): + # Client PULL status requests come through here + # Dict is used for legacy notification format lastStatus = {} + # List is used for standard notification format + newStatus = [] + resource_address = rpc_kwargs.get('ResourceAddress', None) optional = rpc_kwargs.get('optional', None) if resource_address: @@ -137,8 +142,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_GNSS_SYNC_STATUS, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_GNSS_SYNC_STATUS), + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, optional), sync_state) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['GNSS_INSTANCES']: sync_state = \ @@ -151,8 +157,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_GNSS_SYNC_STATUS, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_GNSS_SYNC_STATUS), + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, config), sync_state) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.gnsstracker_context_lock.release() @@ -170,8 +177,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_CLOCK_CLASS, last_clock_class_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, optional), clock_class, constants.VALUE_TYPE_METRIC) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: clock_class = \ @@ -185,8 +193,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_CLOCK_CLASS, last_clock_class_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, config), clock_class, constants.VALUE_TYPE_METRIC) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.ptptracker_context_lock.release() @@ -204,8 +213,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_LOCK_STATE, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_LOCK_STATE), + constants.SOURCE_SYNC_PTP_LOCK_STATE, optional), sync_state) + newStatus.append(lastStatus[optional]) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: sync_state = \ @@ -218,8 +228,9 @@ class PtpWatcherDefault: constants.SOURCE_SYNC_PTP_LOCK_STATE, last_event_time, utils.format_resource_address(nodename, - constants.SOURCE_SYNC_PTP_LOCK_STATE), + constants.SOURCE_SYNC_PTP_LOCK_STATE, config), sync_state) + newStatus.append(lastStatus[config]) else: lastStatus = None self.watcher.ptptracker_context_lock.release() @@ -239,6 +250,7 @@ class PtpWatcherDefault: utils.format_resource_address(nodename, constants.SOURCE_SYNC_OS_CLOCK), sync_state) + newStatus.append(lastStatus['os_clock_status']) if resource_path == constants.SOURCE_SYNC_SYNC_STATE or \ resource_path == constants.SOURCE_SYNC_ALL: self.watcher.overalltracker_context_lock.acquire() @@ -253,9 +265,23 @@ class PtpWatcherDefault: utils.format_resource_address(nodename, constants.SOURCE_SYNC_SYNC_STATE), sync_state) - LOG.debug("query_status: {}".format(lastStatus)) + if resource_path == constants.SOURCE_SYNC_ALL: + newStatus.append(lastStatus['overall_sync_status']) + else: + # Special handling for overall_sync_status + # There will only ever be a single response from + # SOURCE_SYNC_SYNC_STATE. + # Return a dict rather than a list + newStatus = lastStatus['overall_sync_status'] + + + if constants.NOTIFICATION_FORMAT == 'standard': + LOG.info("PULL status returning: %s", newStatus) + return newStatus + else: + LOG.info("PULL status returning: {}".format(lastStatus)) + return lastStatus - return lastStatus def trigger_delivery(self, **rpc_kwargs): self.watcher.forced_publishing = True @@ -383,6 +409,16 @@ class PtpWatcherDefault: notificationservice_health = HealthServer() notificationservice_health.run() + # Need to give the notificationclient sidecar pods + # a few seconds to re-connect to the newly started + # RabbitMQ. If we don't wait here, the initial + # status delivieries can be sent before the clients + # are connected and they will never receive the + # notification + # This approach can probably be improved by + # checking the RabbitMQ endpoint + time.sleep(10) + while True: # announce the location forced = self.forced_publishing @@ -526,6 +562,7 @@ class PtpWatcherDefault: last_event_time = self.osclocktracker_context.get('last_event_time', time.time()) lastStatus = {} + newStatus = [] new_event, sync_state, new_event_time = self.__get_os_clock_status( holdover_time, freq, sync_state, last_event_time) @@ -559,13 +596,23 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_OS_CLOCK) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + + newStatus.append(lastStatus['os_clock_status']) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_OS_CLOCK) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_OS_CLOCK) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) def __publish_overall_sync_status(self, forced=False): lastStatus = {} + newStatus = [] holdover_time = float(self.overalltracker_context['holdover_seconds']) freq = float(self.overalltracker_context['poll_freq_seconds']) sync_state = self.overalltracker_context.get('sync_state', 'Unknown') @@ -605,14 +652,24 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_SYNC_STATE) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus['overall_sync_status']) + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_SYNC_STATE) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_SYNC_STATE) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) def __publish_gnss_status(self, forced=False): - lastStatus = {} + for gnss in self.observer_list: + # Ensure that status structs are cleared between each iteration + lastStatus = {} + newStatus = [] holdover_time = float( self.gnsstracker_context[ gnss.ts2phc_service_name]['holdover_seconds']) @@ -643,7 +700,9 @@ class PtpWatcherDefault: # publish new event in API version v2 format resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.node_name, + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, + gnss.ts2phc_service_name) lastStatus[gnss.ts2phc_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, @@ -663,15 +722,27 @@ class PtpWatcherDefault: ] } } - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus[gnss.ts2phc_service_name]) + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_GNSS_SYNC_STATUS) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) def __publish_ptpstatus(self, forced=False): - lastStatus = {} - lastClockClassStatus = {} + for ptp_monitor in self.ptp_monitor_list: + # Ensure that status structs are cleared between each iteration + newStatus = [] + newClockClassStatus = [] + lastStatus = {} + lastClockClassStatus = {} + holdover_time = float(self.ptptracker_context[ ptp_monitor.ptp4l_service_name]['holdover_seconds']) freq = float(self.ptptracker_context[ @@ -717,7 +788,9 @@ class PtpWatcherDefault: lastStatus = {} # publish new event in API version v2 format resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.node_name, + constants.SOURCE_SYNC_PTP_LOCK_STATE, + ptp_monitor.ptp4l_service_name) lastStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), 'specversion': constants.SPEC_VERSION, @@ -737,10 +810,18 @@ class PtpWatcherDefault: } } self.ptptracker_context_lock.release() - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) - self.ptpeventproducer.publish_status( - lastStatus, constants.SOURCE_SYNC_ALL) + newStatus.append(lastStatus[ptp_monitor.ptp4l_service_name]) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.ptpeventproducer.publish_status( + newStatus, constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_PTP_LOCK_STATE) + self.ptpeventproducer.publish_status( + lastStatus, constants.SOURCE_SYNC_ALL) if new_clock_class_event or forced: # update context @@ -752,7 +833,9 @@ class PtpWatcherDefault: = clock_class_event_time resource_address = utils.format_resource_address( - self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.node_name, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + ptp_monitor.ptp4l_service_name) lastClockClassStatus[ptp_monitor.ptp4l_service_name] = { 'id': uuidutils.generate_uuid(), @@ -772,14 +855,23 @@ class PtpWatcherDefault: ] } } + newClockClassStatus.append(lastClockClassStatus[ptp_monitor.ptp4l_service_name]) self.ptptracker_context_lock.release() LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name, clock_class)) - self.ptpeventproducer.publish_status( - lastClockClassStatus, - constants.SOURCE_SYNC_PTP_CLOCK_CLASS) - self.ptpeventproducer.publish_status(lastClockClassStatus, - constants.SOURCE_SYNC_ALL) + + if constants.NOTIFICATION_FORMAT == 'standard': + self.ptpeventproducer.publish_status( + newClockClassStatus, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.ptpeventproducer.publish_status(newClockClassStatus, + constants.SOURCE_SYNC_ALL) + else: + self.ptpeventproducer.publish_status( + lastClockClassStatus, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS) + self.ptpeventproducer.publish_status(lastClockClassStatus, + constants.SOURCE_SYNC_ALL) class DaemonControl(object): diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py index 508be75..719912b 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py @@ -85,7 +85,9 @@ class OsClockMonitorTests(unittest.TestCase): self.clockmon.get_os_clock_offset() assert self.clockmon.offset == '37000000015' - def test_set_os_closck_state(self): + @mock.patch('trackingfunctionsdk.common.helpers.ptpsync.check_critical_resources', + side_effect=[b'True']) + def test_set_os_clock_state(self, critical_patched): self.clockmon = OsClockMonitor(phc2sys_config=phc2sys_test_config, init=False) self.clockmon.offset = '37000000015' self.clockmon.set_os_clock_state()