diff --git a/.zuul.yaml b/.zuul.yaml index dea13d1..3245fee 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -9,7 +9,7 @@ check: jobs: - openstack-tox-linters - - ptp-notification-tox-py36 + - ptp-notification-tox-py39 - k8sapp-ptp-notification-tox-py39 - k8sapp-ptp-notification-tox-pylint - k8sapp-ptp-notification-tox-flake8 @@ -17,7 +17,7 @@ gate: jobs: - openstack-tox-linters - - ptp-notification-tox-py36 + - ptp-notification-tox-py39 - k8sapp-ptp-notification-tox-py39 - k8sapp-ptp-notification-tox-pylint - k8sapp-ptp-notification-tox-flake8 @@ -27,13 +27,13 @@ - stx-ptp-notification-armada-app-upload-git-mirror - job: - name: ptp-notification-tox-py36 - parent: tox-py36 + name: ptp-notification-tox-py39 + parent: tox-py39 description: | - Run py36 test for ptp-notification - nodeset: ubuntu-bionic + Run py39 test for ptp-notification + nodeset: debian-bullseye vars: - tox_envlist: py36 + tox_envlist: py39 - job: name: k8sapp-ptp-notification-tox-py39 diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py index fac8711..3bc32cb 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/constants.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2024 Wind River Systems, Inc. +# Copyright (c) 2021-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -81,3 +81,8 @@ SOURCE_SYNC_SYNC_STATE = '/sync/sync-status/sync-state' SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality' SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended' SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state' + +class ClockSourceType(object): + TypePTP = "PTP" + TypeGNSS = "GNSS" + TypeNA = "NA" \ No newline at end of file diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py index 172cb9d..a78b395 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022-2023 Wind River Systems, Inc. +# Copyright (c) 2022-2023,2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -12,6 +12,7 @@ from abc import ABC, abstractmethod from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.common.helpers import log_helper +from trackingfunctionsdk.common.helpers import ptpsync as utils from trackingfunctionsdk.common.helpers.cgu_handler import CguHandler from trackingfunctionsdk.model.dto.gnssstate import GnssState @@ -48,6 +49,7 @@ class GnssMonitor(Observer): "GnssMonitor: Unable to determine tsphc_service name from %s" % self.config_file) + self.set_ptp_devices() # Setup GNSS data self.gnss_cgu_handler = CguHandler(config_file, nmea_serialport, pci_addr, cgu_path) @@ -78,6 +80,34 @@ class GnssMonitor(Observer): self.gnss_pps_state = \ self.gnss_cgu_handler.cgu_output_parsed['PPS DPLL']['Status'] + def set_ptp_devices(self): + ptp_devices = set() + phc_interfaces = self._check_config_file_interfaces() + for phc_interface in phc_interfaces: + ptp_device = utils.get_interface_phc_device(phc_interface) + if ptp_device is not None: + ptp_devices.add(ptp_device) + self.ptp_devices = list(ptp_devices) + LOG.debug("TS2PHC PTP devices are %s" % self.ptp_devices) + + def get_ptp_devices(self): + return self.ptp_devices + + def _check_config_file_interfaces(self): + phc_interfaces = [] + with open(self.config_file, 'r') as f: + config_lines = f.readlines() + config_lines = [line.rstrip() for line in config_lines] + + for line in config_lines: + # Find the interface value inside the square brackets + if re.match(r"^\[.*\]$", line) and line != "[global]": + phc_interface = line.strip("[]") + LOG.debug("TS2PHC interface is %s" % phc_interface) + phc_interfaces.append(phc_interface) + + return phc_interfaces + def update(self, subject, matched_line) -> None: LOG.info("Kernel event detected. %s" % matched_line) self.set_gnss_status() diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py index bd64948..84251d1 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/os_clock_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022-2023 Wind River Systems, Inc. +# Copyright (c) 2022-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -249,26 +249,7 @@ class OsClockMonitor: def _get_interface_phc_device(self): """Determine the phc device for the interface""" - pattern = "/hostsys/class/net/" + self.phc_interface + "/device/ptp/*" - ptp_device = glob(pattern) - if len(ptp_device) == 0: - # Try the 0th interface instead, required for some NIC types - phc_interface_base = self.phc_interface[:-1] + "0" - LOG.info("No ptp device found at %s trying %s instead" - % (pattern, phc_interface_base)) - pattern = "/hostsys/class/net/" + phc_interface_base + \ - "/device/ptp/*" - ptp_device = glob(pattern) - if len(ptp_device) == 0: - LOG.warning("No ptp device found for base interface at %s" - % pattern) - return None - if len(ptp_device) > 1: - LOG.error("More than one ptp device found at %s" % pattern) - return None - ptp_device = os.path.basename(ptp_device[0]) - LOG.debug("Found ptp device %s at %s" % (ptp_device, pattern)) - return ptp_device + return utils.get_interface_phc_device(self.phc_interface) def get_os_clock_offset(self): """Get the os CLOCK_REALTIME offset""" @@ -327,6 +308,12 @@ class OsClockMonitor: def get_os_clock_state(self): return self._state + def get_source_ptp_device(self): + # PTP device that is disciplining the OS clock + # This is also valid in case of HA source devices as + # __publish_os_clock_status updates ptp_device. + return self.ptp_device + def os_clock_status(self, holdover_time, freq, sync_state, event_time): current_time = datetime.datetime.utcnow().timestamp() time_in_holdover = None diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py index a6e3147..dad6092 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptp_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2024 Wind River Systems, Inc. +# Copyright (c) 2021-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -13,6 +13,7 @@ # import datetime import logging +import re import sys from trackingfunctionsdk.model.dto.ptpstate import PtpState @@ -62,12 +63,48 @@ class PtpMonitor: self.phc2sys_service_name = phc2sys_service_name self.holdover_time = int(holdover_time) self.freq = int(freq) + self.set_ptp_devices() + self.sync_source = constants.ClockSourceType.TypeNA self._ptp_event_time = datetime.datetime.utcnow().timestamp() self._clock_class_event_time = \ datetime.datetime.utcnow().timestamp() self.set_ptp_sync_state() self.set_ptp_clock_class() + def set_ptp_devices(self): + ptp_devices = set() + phc_interfaces = self._check_config_file_interfaces() + for phc_interface in phc_interfaces: + ptp_device = utils.get_interface_phc_device(phc_interface) + if ptp_device is not None: + ptp_devices.add(ptp_device) + self.ptp_devices = list(ptp_devices) + LOG.debug("PTP4l PTP devices are %s" % self.ptp_devices) + + def get_ptp_devices(self): + return self.ptp_devices + + def get_ptp_sync_source(self): + return self.sync_source + + def _check_config_file_interfaces(self): + phc_interfaces = [] + with open(self.ptp4l_config, 'r') as f: + config_lines = f.readlines() + config_lines = [line.rstrip() for line in config_lines] + + for line in config_lines: + # Find the interface value inside the square brackets + if re.match(r"^\[.*\]$", line) and line not in [ + "[global]", + "[unicast_master_table]", + ]: + phc_interface = line.strip("[]") + LOG.debug("PTP4l interface is %s" % phc_interface) + phc_interfaces.append(phc_interface) + + return phc_interfaces + def set_ptp_sync_state(self): new_ptp_sync_event, ptp_sync_state, ptp_event_time = self.ptp_status() if ptp_sync_state != self._ptp_sync_state: @@ -132,6 +169,9 @@ class PtpMonitor: # max holdover time is calculated to be in a 'safety' zone max_holdover_time = (self.holdover_time - self.freq * 2) + previous_sync_source = self.sync_source + sync_source = constants.ClockSourceType.TypeNA + pmc, ptp4l, _, ptp4lconf = \ utils.check_critical_resources(self.ptp4l_service_name, self.phc2sys_service_name) @@ -141,11 +181,13 @@ class PtpMonitor: self.pmc_query_results, total_ptp_keywords, port_count = \ self.ptpsync() try: - sync_state = utils.check_results(self.pmc_query_results, - total_ptp_keywords, port_count) + sync_state, sync_source = utils.check_results( + self.pmc_query_results,total_ptp_keywords, port_count + ) except RuntimeError as err: LOG.warning(err) sync_state = previous_sync_state + sync_source = previous_sync_source else: LOG.warning("Missing critical resource: " "PMC %s PTP4L %s PTP4LCONF %s" @@ -169,7 +211,16 @@ class PtpMonitor: # determine if ptp sync state has changed since the last one LOG.debug("ptp_monitor: sync_state %s, " - "previous_sync_state %s" % (sync_state, previous_sync_state)) + "previous_sync_state %s, " + "sync_source %s, " + "previous sync_source %s" % ( + sync_state, previous_sync_state, sync_source, previous_sync_source + ) + ) + + # record sync_source of this poll. + self.sync_source = sync_source + if sync_state != previous_sync_state: new_event = True self._ptp_event_time = datetime.datetime.utcnow().timestamp() diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py index 646376f..98b17ae 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/ptpsync.py @@ -1,6 +1,6 @@ #! /usr/bin/python3 # -# Copyright (c) 2021-2024 Wind River Systems, Inc. +# Copyright (c) 2021-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -16,6 +16,7 @@ import os import re import subprocess import logging +from glob import glob from trackingfunctionsdk.common.helpers import constants from trackingfunctionsdk.common.helpers import log_helper @@ -69,6 +70,9 @@ def check_results(result, total_ptp_keywords, port_count): # sync state is in 'Locked' state and will be overwritten if # it is not the case sync_state = constants.LOCKED_PHC_STATE + # sync source is in 'NA' and will be overwritten when source + # found to be GNSS or PTP. + sync_source = constants.ClockSourceType.TypeNA local_gm = False @@ -77,16 +81,18 @@ def check_results(result, total_ptp_keywords, port_count): LOG.info("Results %s" % result) LOG.info("Results len %s, total_ptp_keywords %s" % (len(result), total_ptp_keywords)) raise RuntimeError("PMC results are incomplete, retrying") - # determine the current sync state + # determine the current sync state and sync source if (result[constants.GM_PRESENT].lower() != constants.GM_IS_PRESENT and result[constants.GRANDMASTER_IDENTITY] != result[constants.CLOCK_IDENTITY]): sync_state = constants.FREERUN_PHC_STATE elif result[constants.GRANDMASTER_IDENTITY] == result[constants.CLOCK_IDENTITY]: local_gm = True + sync_source = constants.ClockSourceType.TypeGNSS LOG.debug("Local node is a GM") if not local_gm: for port in range(1, port_count + 1): if result[constants.PORT.format(port)].lower() == constants.SLAVE_MODE: + sync_source = constants.ClockSourceType.TypePTP break else: sync_state = constants.FREERUN_PHC_STATE @@ -101,7 +107,7 @@ def check_results(result, total_ptp_keywords, port_count): sync_state = constants.FREERUN_PHC_STATE if (result[constants.GM_CLOCK_CLASS] not in ptp4l_clock_class_locked): sync_state = constants.FREERUN_PHC_STATE - return sync_state + return sync_state, sync_source def parse_resource_address(resource_address): @@ -123,3 +129,27 @@ def format_resource_address(node_name, resource, instance=None): resource_address = resource_address + resource LOG.debug("format_resource_address %s" % resource_address) return resource_address + + +def get_interface_phc_device(phc_interface): + """Determine the phc device for the interface""" + pattern = "/hostsys/class/net/" + phc_interface + "/device/ptp/*" + ptp_device = glob(pattern) + if len(ptp_device) == 0: + # Try the 0th interface instead, required for some NIC types + phc_interface_base = phc_interface[:-1] + "0" + LOG.info("No ptp device found at %s trying %s instead" + % (pattern, phc_interface_base)) + pattern = "/hostsys/class/net/" + phc_interface_base + \ + "/device/ptp/*" + ptp_device = glob(pattern) + if len(ptp_device) == 0: + LOG.warning("No ptp device found for base interface at %s" + % pattern) + return None + if len(ptp_device) > 1: + LOG.error("More than one ptp device found at %s" % pattern) + return None + ptp_device = os.path.basename(ptp_device[0]) + LOG.debug("Found ptp device %s at %s" % (ptp_device, pattern)) + return ptp_device diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index 860fc4b..04d9a28 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2024 Wind River Systems, Inc. +# Copyright (c) 2021-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -466,8 +466,51 @@ class PtpWatcherDefault: LOG.debug("Getting os clock status.") return new_event, sync_state, new_event_time - def __get_overall_sync_state(self, holdover_time, freq, sync_state, - last_event_time): + def __get_primary_ptp_state(self, ptp_device): + # The PTP device itself is being disciplined or not ? + # Check which ptp4l instance disciplining this PTP device + # disciplining source could be either GNSS or PTP + primary_ptp4l = None + ptp_state = PtpState.Freerun + for ptp4l in self.ptp_monitor_list: + # runtime loading of ptp4l config + ptp4l.set_ptp_devices() + if ( + ptp_device in ptp4l.get_ptp_devices() + and ptp4l.get_ptp_sync_source() != constants.ClockSourceType.TypeNA + ): + primary_ptp4l = ptp4l + break + + if primary_ptp4l is not None: + _, read_state, _ = primary_ptp4l.get_ptp_sync_state() + if read_state == PtpState.Locked: + ptp_state = PtpState.Locked + + return primary_ptp4l, ptp_state + + def __get_primary_gnss_state(self, ptp_device): + # The PTP device itself is being disciplined or not ? + # Check which ts2phc instance disciplining this PTP device + primary_gnss = None + gnss_state = GnssState.Failure_Nofix + for gnss in self.observer_list: + # runtime loading of ts2phc config + gnss.set_ptp_devices() + if ptp_device in gnss.get_ptp_devices(): + primary_gnss = gnss + break + + if primary_gnss is not None: + read_state = primary_gnss._state + if read_state == GnssState.Synchronized: + gnss_state = GnssState.Synchronized + + return primary_gnss, gnss_state + + def __get_overall_sync_state( + self, holdover_time, freq, sync_state, last_event_time + ): new_event = False new_event_time = last_event_time previous_sync_state = sync_state @@ -481,45 +524,91 @@ class PtpWatcherDefault: ptp_state = None LOG.debug("Getting overall sync state.") - for gnss in self.observer_list: - if gnss._state == constants.UNKNOWN_PHC_STATE or \ - gnss._state == GnssState.Failure_Nofix: - gnss_state = GnssState.Failure_Nofix - elif gnss._state == GnssState.Synchronized and \ - gnss_state != GnssState.Failure_Nofix: - gnss_state = GnssState.Synchronized - - for ptp4l in self.ptp_monitor_list: - _, read_state, _ = ptp4l.get_ptp_sync_state() - if read_state == PtpState.Holdover or \ - read_state == PtpState.Freerun or \ - read_state == constants.UNKNOWN_PHC_STATE: - ptp_state = PtpState.Freerun - elif read_state == PtpState.Locked and \ - ptp_state != PtpState.Freerun: - ptp_state = PtpState.Locked + # overall state depends on os_clock_state and single chained gnss/ptp state + # Need to figure out which gnss/ptp is disciplining the PHC that syncs os_clock os_clock_state = self.os_clock_monitor.get_os_clock_state() + sync_state = OverallClockState.Freerun + if os_clock_state is not OsClockState.Freerun: + # PTP device that is disciplining the OS clock, + # valid even for HA source devices + ptp_device = self.os_clock_monitor.get_source_ptp_device() + if ptp_device is None: + # This may happen in virtualized environments + LOG.warning("No PTP device. Defaulting overall state Freerun") + else: + # What source (gnss or ptp) disciplining the PTP device at the + # moment (A PTP device could have both TS2PHC/gnss source and + # PTP4l/slave) + sync_source = constants.ClockSourceType.TypeNA + # any ts2phc instance disciplining the ptp device (source GNSS) + primary_gnss, gnss_state = self.__get_primary_gnss_state(ptp_device) + if primary_gnss is not None: + sync_source = constants.ClockSourceType.TypeGNSS - if gnss_state is GnssState.Failure_Nofix or \ - os_clock_state is OsClockState.Freerun or \ - ptp_state is PtpState.Freerun: - sync_state = OverallClockState.Freerun - else: - sync_state = OverallClockState.Locked + # any ptp4l instance disciplining the ptp device (source PTP or GNSS) + primary_ptp4l, ptp_state = self.__get_primary_ptp_state(ptp_device) + + # which source: PTP or GNSS + # In presence of ptp4l instance disciplining the ptp device, it truly + # dictates what source it is using. + if primary_ptp4l is not None: + sync_source = primary_ptp4l.get_ptp_sync_source() + + ptp4l_instance_and_state = ( + "NA" + if primary_ptp4l is None + else (primary_ptp4l.ptp4l_service_name, ptp_state) + ) + ts2phc_instance_and_state = ( + "NA" + if primary_gnss is None + else (primary_gnss.ts2phc_service_name, gnss_state) + ) + LOG.debug( + f"Overall sync state chaining info:\n" + f"os-clock's source ptp-device = {ptp_device}\n" + f"ptp-device's sync-source = {sync_source}\n" + f"ptp4l-instance-and-state = {ptp4l_instance_and_state}\n" + f"ts2phc-instance-and-state = {ts2phc_instance_and_state}" + ) + + # Based on sync_source that is used to discipline the ptp device, + # dependent ts2phc or ptp4l instance's state is chosen. + if sync_source == constants.ClockSourceType.TypeNA: + # The PTP device is not being disciplined by any PTP4l/TS2PHC instances + LOG.warning( + "PTP device used by PHC2SYS is not synced/configured on any PTP4l/TS2PHC instances." + ) + + elif ( + sync_source == constants.ClockSourceType.TypeGNSS + and gnss_state is GnssState.Synchronized + ): + sync_state = OverallClockState.Locked + + elif ( + sync_source == constants.ClockSourceType.TypePTP + and ptp_state is PtpState.Locked + ): + sync_state = OverallClockState.Locked if sync_state == OverallClockState.Freerun: if previous_sync_state in [ - constants.UNKNOWN_PHC_STATE, - constants.FREERUN_PHC_STATE]: + constants.UNKNOWN_PHC_STATE, + constants.FREERUN_PHC_STATE, + ]: sync_state = OverallClockState.Freerun elif previous_sync_state == constants.LOCKED_PHC_STATE: sync_state = OverallClockState.Holdover - elif previous_sync_state == constants.HOLDOVER_PHC_STATE and \ - time_in_holdover < max_holdover_time: - LOG.debug("Overall sync: Time in holdover is %s " - "Max time in holdover is %s" - % (time_in_holdover, max_holdover_time)) + elif ( + previous_sync_state == constants.HOLDOVER_PHC_STATE + and time_in_holdover < max_holdover_time + ): + LOG.debug( + "Overall sync: Time in holdover is %s " + "Max time in holdover is %s" % (time_in_holdover, max_holdover_time) + ) sync_state = OverallClockState.Holdover else: sync_state = OverallClockState.Freerun diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_daemon.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_daemon.py new file mode 100644 index 0000000..09bcc8d --- /dev/null +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_daemon.py @@ -0,0 +1,517 @@ +# +# Copyright (c) 2025 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +import mock +import unittest +import json +import time + +from dataclasses import dataclass +from trackingfunctionsdk.common.helpers import constants +from trackingfunctionsdk.model.dto.osclockstate import OsClockState +from trackingfunctionsdk.model.dto.overallclockstate import OverallClockState +from trackingfunctionsdk.model.dto.ptpstate import PtpState +from trackingfunctionsdk.model.dto.gnssstate import GnssState +from trackingfunctionsdk.services.daemon import PtpWatcherDefault + +context = { + "THIS_NAMESPACE": "notification", + "THIS_NODE_NAME": "controller-0", + "THIS_POD_IP": "172.16.192.71", + "REGISTRATION_TRANSPORT_ENDPOINT": "rabbit://admin:admin@registration.notification.svc.cluster.local:5672", + "NOTIFICATION_TRANSPORT_ENDPOINT": "rabbit://admin:admin@172.16.192.71:5672", + "GNSS_CONFIGS": [ + "/ptp/linuxptp/ptpinstance/ts2phc-ts1.conf", + "/ptp/linuxptp/ptpinstance/ts2phc-ts2.conf", + ], + "PHC2SYS_CONFIG": "/ptp/linuxptp/ptpinstance/phc2sys-phc-inst1.conf", + "PHC2SYS_SERVICE_NAME": "phc-inst1", + "PTP4L_CONFIGS": [ + "/ptp/linuxptp/ptpinstance/ptp4l-ptp-inst2.conf", + "/ptp/linuxptp/ptpinstance/ptp4l-ptp-inst1.conf", + ], + "GNSS_INSTANCES": ["ts1", "ts2"], + "PTP4L_INSTANCES": ["ptp-inst2", "ptp-inst1"], + "ptptracker_context": {"device_simulated": "false", "holdover_seconds": "15"}, + "gnsstracker_context": {"holdover_seconds": 30}, + "osclocktracker_context": {"holdover_seconds": "15"}, + "overalltracker_context": {"holdover_seconds": "15"}, +} + + +@dataclass +class OsClockData: + sync_state: str = OsClockState.Freerun + sync_source: str = "ptp0" + + +@dataclass +class PTP4lData: + ptp_devices: list[str] + sync_state: str = PtpState.Freerun + sync_source: str = constants.ClockSourceType.TypePTP + + +@dataclass +class ts2phcData: + ptp_devices: list[str] + sync_state: str = GnssState.Failure_Nofix + + +@dataclass +class TestData: + osclock: OsClockData + ptp4l: list[PTP4lData] + ts2phc: list[ts2phcData] + + +class DaemonTests(unittest.TestCase): + + @mock.patch("trackingfunctionsdk.services.daemon.PtpMonitor") + @mock.patch("trackingfunctionsdk.services.daemon.OsClockMonitor") + @mock.patch("trackingfunctionsdk.services.daemon.GnssMonitor") + def _setup(self, gnssmonitor_mock, osclockmonitor_mock, ptpmonitor_mock): + event = None + + sqlalchemy_conf = { + "url": "sqlite:///apiserver.db", + "echo": False, + "echo_pool": False, + "pool_recycle": 3600, + "encoding": "utf-8", + } + sqlalchemy_conf_json = json.dumps(sqlalchemy_conf) + daemon_context_json = json.dumps(context) + + # distint mock class instances, to have distinct mock method on instance basis + gnssmonitor_mock.side_effect = [ + mock.Mock(name=item) for item in context["GNSS_CONFIGS"] + ] + ptpmonitor_mock.side_effect = [ + mock.Mock(name=item) for item in context["PTP4L_CONFIGS"] + ] + + self.worker = PtpWatcherDefault( + event, sqlalchemy_conf_json, daemon_context_json + ) + + self.osclockmonitor_mock_instance = self.worker.os_clock_monitor + self.ptpmonitor_mock_instances = self.worker.ptp_monitor_list + self.gnssmonitor_mock_instances = self.worker.observer_list + + self.assertEqual( + len(self.gnssmonitor_mock_instances), len(context["GNSS_CONFIGS"]) + ) + self.assertEqual( + len(self.ptpmonitor_mock_instances), len(context["PTP4L_CONFIGS"]) + ) + + def _test__get_overall_sync_state(self, testdata, expected): + holdover_time = float(context["overalltracker_context"]["holdover_seconds"]) + freq = 2 + sync_state = OverallClockState.Freerun + last_event_time = time.time() + + self.osclockmonitor_mock_instance.get_source_ptp_device.return_value = ( + testdata.osclock.sync_source + ) + self.osclockmonitor_mock_instance.get_os_clock_state.return_value = ( + testdata.osclock.sync_state + ) + # test mocking as expected or not. + self.assertEqual( + self.worker.os_clock_monitor.get_source_ptp_device(), + testdata.osclock.sync_source, + ) + self.assertEqual( + self.worker.os_clock_monitor.get_os_clock_state(), + testdata.osclock.sync_state, + ) + + for i, gnssmonitor_mock_instance in enumerate(self.gnssmonitor_mock_instances): + gnssmonitor_mock_instance.get_ptp_devices.return_value = testdata.ts2phc[ + i + ].ptp_devices + gnssmonitor_mock_instance._state = testdata.ts2phc[i].sync_state + # test mocking as expected or not. + self.assertEqual( + self.gnssmonitor_mock_instances[0].get_ptp_devices(), + testdata.ts2phc[0].ptp_devices, + ) + self.assertEqual( + self.gnssmonitor_mock_instances[0]._state, testdata.ts2phc[0].sync_state + ) + self.assertEqual( + self.gnssmonitor_mock_instances[1].get_ptp_devices(), + testdata.ts2phc[1].ptp_devices, + ) + self.assertEqual( + self.gnssmonitor_mock_instances[1]._state, testdata.ts2phc[1].sync_state + ) + + for i, ptpmonitor_mock_instance in enumerate(self.ptpmonitor_mock_instances): + ptpmonitor_mock_instance.get_ptp_devices.return_value = testdata.ptp4l[ + i + ].ptp_devices + ptpmonitor_mock_instance.get_ptp_sync_state.return_value = ( + None, + testdata.ptp4l[i].sync_state, + None, + ) + ptpmonitor_mock_instance.get_ptp_sync_source.return_value = testdata.ptp4l[ + i + ].sync_source + # test mocking as expected or not. + self.assertEqual( + self.ptpmonitor_mock_instances[0].get_ptp_devices(), + testdata.ptp4l[0].ptp_devices, + ) + self.assertEqual( + self.ptpmonitor_mock_instances[0].get_ptp_sync_state(), + (None, testdata.ptp4l[0].sync_state, None), + ) + self.assertEqual( + self.ptpmonitor_mock_instances[0].get_ptp_sync_source(), + testdata.ptp4l[0].sync_source, + ) + self.assertEqual( + self.ptpmonitor_mock_instances[1].get_ptp_devices(), + testdata.ptp4l[1].ptp_devices, + ) + self.assertEqual( + self.ptpmonitor_mock_instances[1].get_ptp_sync_state(), + (None, testdata.ptp4l[1].sync_state, None), + ) + self.assertEqual( + self.ptpmonitor_mock_instances[1].get_ptp_sync_source(), + testdata.ptp4l[1].sync_source, + ) + + new_event, sync_state, new_event_time = ( + self.worker._PtpWatcherDefault__get_overall_sync_state( + holdover_time, freq, sync_state, last_event_time + ) + ) + # overall sync state assertion + self.assertEqual(sync_state, expected) + + def test__get_overall_sync_state__all_are_locked__overall_locked(self): + # when all are locked state -- overall state would be locked + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Locked + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__osclock_freerun__overall_freerun(self): + # when osclock is on freerun, and others are on locked state -- overall + # state would be freerun + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Freerun, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__ptp4l_ptp0_freerun__overall_freerun(self): + # when chained ptp4l ptp0 sync_state is freerun -- overall state would be freerun + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Freerun, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__ptp4l_ptp0_locked__overall_locked(self): + # when chained ptp4l ptp0 sync_state is locked -- overall state would be locked + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Freerun, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Failure_Nofix + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Failure_Nofix + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Locked + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__ts2phc_ptp0_freerun__overall_freerun(self): + # when chained ts2phc ptp0 sync_state is freerun -- overall state would be freerun + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypeGNSS, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Failure_Nofix + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__ts2phc_ptp0_locked__overall_locked(self): + # when chained ts2phc ptp0 sync_state is locked -- overall state would be locked + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypeGNSS, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Freerun, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Failure_Nofix + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Locked + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__ts2phc_ptp0_locked_no_ptp4l_for_ptp0__overall_locked( + self, + ): + # when chained ts2phc ptp0 sync_state is locked -- overall state would be locked + # In this case there are no ptp4l instances with ptp0 + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptpx"], + sync_state=PtpState.Freerun, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Freerun, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Failure_Nofix + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Locked + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__no_source_for_ptp0__overall_freerun(self): + # when chained ptp4l ptp0 sync_source NA (neither gnss nor ptp) -- overall + # state would be freerun + # In this case there are no ts2phc instances with ptp0 + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + # In this case, practically ptp0's ptp4l instance sync_state would be + # PtpState.Freerun, as there is no sync source. But still using locked state. + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypeNA, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptpx"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__no_backtrack_for_ptp0__overall_freerun(self): + # when chained ptp0 is not included neither on ptp4l nor ts2phc -- overall + # state would be freerun + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source="ptp0") + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptpx"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptpx"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) + + def test__get_overall_sync_state__os_clock_no_ptp_device__overall_freerun(self): + # when there is no sync_source e.g. ptp0 for os_clock -- overall state would be freerun + self._setup() + osclockdata = OsClockData(sync_state=OsClockState.Locked, sync_source=None) + + ptp4ldata0 = PTP4lData( + ptp_devices=["ptp0"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + ptp4ldata1 = PTP4lData( + ptp_devices=["ptp1"], + sync_state=PtpState.Locked, + sync_source=constants.ClockSourceType.TypePTP, + ) + + ts2phcdata0 = ts2phcData( + ptp_devices=["ptp0"], sync_state=GnssState.Synchronized + ) + ts2phcdata1 = ts2phcData( + ptp_devices=["ptp1"], sync_state=GnssState.Synchronized + ) + + testdata = TestData( + osclock=osclockdata, + ptp4l=[ptp4ldata0, ptp4ldata1], + ts2phc=[ts2phcdata0, ts2phcdata1], + ) + expected = OverallClockState.Freerun + self._test__get_overall_sync_state(testdata, expected) diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_gnss_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_gnss_monitor.py new file mode 100644 index 0000000..e7106dd --- /dev/null +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_gnss_monitor.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2025 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +import mock +import os +import unittest + +from trackingfunctionsdk.common.helpers.gnss_monitor import GnssMonitor + +testpath = os.environ.get("TESTPATH", "") + + +class GnssMonitorTests(unittest.TestCase): + + def test_check_config_file_interfaces(self): + cgu_path = testpath + "test_input_files/mock_cgu_output_logan_beach" + gnss_config = testpath + "test_input_files/ts2phc_valid.conf" + self.gnssmon = GnssMonitor(gnss_config, cgu_path = cgu_path) + self.assertEqual(self.gnssmon._check_config_file_interfaces(), ['ens1f0', 'ens2f0']) + + def test_set_ptp_devices(self): + cgu_path = testpath + "test_input_files/mock_cgu_output_logan_beach" + gnss_config = testpath + "test_input_files/ts2phc_valid.conf" + with mock.patch('trackingfunctionsdk.common.helpers.ptpsync.glob', + return_value=[]): + self.gnssmon = GnssMonitor(gnss_config, cgu_path = cgu_path) + self.assertEqual(self.gnssmon.get_ptp_devices(),[]) + + with mock.patch('trackingfunctionsdk.common.helpers.ptpsync.glob', + side_effect=[['/hostsys/class/net/ens1f0/device/ptp/ptp0'], + ['/hostsys/class/net/ens2f0/device/ptp/ptp1'] + ]): + self.gnssmon.set_ptp_devices() + + self.assertEqual(set(self.gnssmon.get_ptp_devices()),set(['ptp0','ptp1'])) + + with mock.patch('trackingfunctionsdk.common.helpers.ptpsync.glob', + side_effect=[['/hostsys/class/net/ens1f0/device/ptp/ptp0'], + ['/hostsys/class/net/ens2f0/device/ptp/ptp0'] + ]): + self.gnssmon.set_ptp_devices() + + self.assertEqual(self.gnssmon.get_ptp_devices(),['ptp0']) diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_input_files/ptp4l-ptp-inst1.conf b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_input_files/ptp4l-ptp-inst1.conf new file mode 100644 index 0000000..858c11f --- /dev/null +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_input_files/ptp4l-ptp-inst1.conf @@ -0,0 +1,35 @@ +[global] +## +## Default Data Set +## +boundary_clock_jbod 1 +clock_servo linreg +delay_mechanism E2E +domainNumber 24 +logAnnounceInterval -3 +logMinDelayReqInterval -4 +logSyncInterval -4 +message_tag ptp-inst1 +network_transport L2 +summary_interval 6 +time_stamping hardware +tx_timestamp_timeout 700 +uds_address /var/run/ptp4l-ptp-inst1 + + + +[enp81s0f3] +## +## Associated interface: oam1 +## + + +[enp81s0f4] +## + +[unicast_master_table] +table_id 1 +logQueryInterval 2 +UDPv4 192.168.1.11 +UDPv4 192.168.2.22 +UDPv4 192.168.3.33 \ No newline at end of file diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py index 719912b..644c46d 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_os_clock_monitor.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022-2023 Wind River Systems, Inc. +# Copyright (c) 2022-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -58,7 +58,7 @@ class OsClockMonitorTests(unittest.TestCase): mo.side_effect = handlers self.assertEqual(self.clockmon._get_phc2sys_command_line_option("/var/run/", "-s"), None) - @mock.patch('trackingfunctionsdk.common.helpers.os_clock_monitor.glob', + @mock.patch('trackingfunctionsdk.common.helpers.ptpsync.glob', side_effect=[['/hostsys/class/net/ens1f0/device/ptp/ptp0'], ['/hostsys/class/net/ens1f0/device/ptp/ptp0', '/hostsys/class/net/ens1f0/device/ptp/ptp1'], diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_ptp_monitor.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_ptp_monitor.py new file mode 100644 index 0000000..9f7adc8 --- /dev/null +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/test_ptp_monitor.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2025 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +import mock +import os +import unittest + +from trackingfunctionsdk.common.helpers.ptp_monitor import PtpMonitor + +testpath = os.environ.get("TESTPATH", "") + +holdover_seconds = 15 +poll_freq_seconds = 15 +phc2sys_service_name = "phc-inst1" +ptp4l_instance = "ptp-inst1" + + +class PtpMonitorTests(unittest.TestCase): + + def test_check_config_file_interfaces(self): + self.ptpmon = PtpMonitor( + ptp4l_instance, + holdover_seconds, + poll_freq_seconds, + phc2sys_service_name, + init=False, + ) + self.ptpmon.ptp4l_config = testpath + "test_input_files/ptp4l-ptp-inst1.conf" + self.assertEqual( + self.ptpmon._check_config_file_interfaces(), ["enp81s0f3", "enp81s0f4"] + ) + + def test_set_ptp_devices(self): + self.ptpmon = PtpMonitor( + ptp4l_instance, + holdover_seconds, + poll_freq_seconds, + phc2sys_service_name, + init=False, + ) + self.ptpmon.ptp4l_config = testpath + "test_input_files/ptp4l-ptp-inst1.conf" + + with mock.patch( + "trackingfunctionsdk.common.helpers.ptpsync.glob", return_value=[] + ): + self.ptpmon.set_ptp_devices() + self.assertEqual(self.ptpmon.get_ptp_devices(), []) + + with mock.patch( + "trackingfunctionsdk.common.helpers.ptpsync.glob", + side_effect=[ + ["/hostsys/class/net/enp81s0f3/device/ptp/ptp0"], + ["/hostsys/class/net/enp81s0f4/device/ptp/ptp1"], + ], + ): + self.ptpmon.set_ptp_devices() + self.assertEqual(set(self.ptpmon.get_ptp_devices()), set(["ptp0", "ptp1"])) + + with mock.patch( + "trackingfunctionsdk.common.helpers.ptpsync.glob", + side_effect=[ + ["/hostsys/class/net/enp81s0f3/device/ptp/ptp0"], + ["/hostsys/class/net/enp81s0f4/device/ptp/ptp0"], + ], + ): + self.ptpmon.set_ptp_devices() + + self.assertEqual(self.ptpmon.get_ptp_devices(), ["ptp0"]) diff --git a/tox.ini b/tox.ini index 6bac540..dfcb015 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = linters,py36 +envlist = linters,py39 minversion = 2.3 skipsdist = True sitepackages=False @@ -18,8 +18,8 @@ deps = allowlist_externals = bash -[testenv:py36] -basepython = python3.6 +[testenv:py39] +basepython = python3.9 setenv = TESTPATH=./notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/tests/ commands =