Cole Walker 939b72d8b1 [PTP] Add clockClass tracking and multi ptp instance support
Adds notification functionality for PTP clockClass data and re-works the
overall PTP sync status to allow for multiple ptp4l instances to be
tracked. Each ptp4l instance reports its own sync status and clockClass.

This change moves several static functions out of ptpsync.py and into
the PtpMonitor class. The remaining static functions have been moved
into utils.py

This change retains backwards compatibility allowing for v1 PTP
notifications and subscriptions to continue working while also providing
the v2 functionality required by the ORAN Notification standard.

This change also provides an override value for the logging level
exposed via a helm chart override.

Testing:
PASS: GET and Subscribe functions for each notification type
PASS: Multiple PTP instance support

Story: 2010056
Task: 46038
Task: 46039

Signed-off-by: Cole Walker <cole.walker@windriver.com>
Change-Id: Ic5456bc5a36f95186cdc6fe01ef1068b7124a5fc
2022-08-22 20:33:21 +00:00

652 lines
30 KiB
Python

#
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import datetime
import json
import logging
import multiprocessing as mp
import os
import sys
import threading
import time
from oslo_utils import uuidutils
from trackingfunctionsdk.client.ptpeventproducer import PtpEventProducer
from trackingfunctionsdk.common.helpers import constants
from trackingfunctionsdk.common.helpers import ptpsync as utils
from trackingfunctionsdk.common.helpers import log_helper
from trackingfunctionsdk.common.helpers.dmesg_watcher import DmesgWatcher
from trackingfunctionsdk.common.helpers.gnss_monitor import GnssMonitor
from trackingfunctionsdk.common.helpers.os_clock_monitor import OsClockMonitor
from trackingfunctionsdk.common.helpers.ptp_monitor import PtpMonitor
from trackingfunctionsdk.model.dto.ptpstate import PtpState
from trackingfunctionsdk.model.dto.gnssstate import GnssState
from trackingfunctionsdk.model.dto.osclockstate import OsClockState
from trackingfunctionsdk.model.dto.overallclockstate import OverallClockState
from trackingfunctionsdk.model.dto.resourcetype import ResourceType
from trackingfunctionsdk.model.dto.rpc_endpoint import RpcEndpointInfo
LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0')
# Event source to event type mapping
source_type = {
'/sync/gnss-status/gnss-sync-status': 'event.sync.gnss-status.gnss-state-change',
'/sync/ptp-status/clock-class': 'event.sync.ptp-status.ptp-clock-class-change',
'/sync/ptp-status/lock-state': 'event.sync.ptp-status.ptp-state-change',
'/sync/sync-status/os-clock-sync-state': 'event.sync.sync-status.os-clock-sync-state-change',
'/sync/sync-status/sync-state': 'event.sync.sync-status.synchronization-state-change',
'/sync/synce-status/clock-quality': 'event.sync.synce-status.synce-clock-quality-change',
'/sync/synce-status/lock-state-extended': 'event.sync.synce-status.synce-state-change-extended',
'/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change',
'/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change',
}
'''Entry point of Default Process Worker'''
def ProcessWorkerDefault(event, sqlalchemy_conf_json, broker_transport_endpoint):
worker = PtpWatcherDefault(event, sqlalchemy_conf_json, broker_transport_endpoint)
worker.run()
return
class PtpWatcherDefault:
DEFAULT_PTPTRACKER_CONTEXT = {
'holdover_seconds': 30,
'poll_freq_seconds': 2
}
DEFAULT_GNSSTRACKER_CONTEXT = {
'holdover_seconds': 30,
'poll_freq_seconds': 2
}
DEFAULT_OS_CLOCK_TRACKER_CONTEXT = {
'holdover_seconds': 30,
'poll_freq_seconds': 2
}
DEFAULT_OVERALL_SYNC_TRACKER_CONTEXT = {
'holdover_seconds': 30,
'poll_freq_seconds': 2
}
class PtpRequestHandlerDefault(object):
def __init__(self, watcher):
self.watcher = watcher
self.init_time = time.time()
def _build_event_response(self, resource_path, last_event_time, resource_address,
sync_state):
if resource_path in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
constants.SOURCE_SYNCE_CLOCK_QUALITY]:
data_type = constants.DATA_TYPE_METRIC
else:
data_type = constants.DATA_TYPE_NOTIFICATION
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': resource_path,
'type': source_type[resource_path],
'time': last_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': data_type,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
return lastStatus
def query_status(self, **rpc_kwargs):
lastStatus = {}
resource_address = rpc_kwargs.get('ResourceAddress', None)
if resource_address:
_, nodename, resource_path = utils.parse_resource_address(resource_address)
if resource_path == constants.SOURCE_SYNC_ALL:
resource_path = constants.SOURCE_SYNC_SYNC_STATE
if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS:
self.watcher.gnsstracker_context_lock.acquire()
sync_state = self.watcher.gnsstracker_context.get('sync_state',
GnssState.Freerun)
last_event_time = self.watcher.gnsstracker_context.get('last_event_time',
time.time())
self.watcher.gnsstracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
# elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS:
elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE:
self.watcher.ptptracker_context_lock.acquire()
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context.get('last_event_time',
time.time())
self.watcher.ptptracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
elif resource_path == constants.SOURCE_SYNC_OS_CLOCK:
self.watcher.osclocktracker_context_lock.acquire()
sync_state = self.watcher.osclocktracker_context.get('sync_state',
OsClockState.Freerun)
last_event_time = self.watcher.osclocktracker_context.get('last_event_time',
time.time())
self.watcher.osclocktracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
elif resource_path == constants.SOURCE_SYNC_SYNC_STATE:
self.watcher.overalltracker_context_lock.acquire()
sync_state = self.watcher.overalltracker_context.get('sync_state',
OverallClockState.Freerun)
last_event_time = self.watcher.overalltracker_context.get('last_event_time',
time.time())
self.watcher.overalltracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
LOG.debug("query_status: {}".format(lastStatus))
else:
self.watcher.ptptracker_context_lock.acquire()
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context.get('last_event_time',
time.time())
self.watcher.ptptracker_context_lock.release()
lastStatus = {
'ResourceType': ResourceType.TypePTP,
'EventData': {
'State': sync_state
},
'ResourceQualifier': {
'NodeName': self.watcher.node_name
},
'EventTimestamp': last_event_time
}
return lastStatus
def trigger_delivery(self, **rpc_kwargs):
self.watcher.forced_publishing = True
self.watcher.signal_ptp_event()
pass
def __init__(self, event, sqlalchemy_conf_json, daemon_context_json):
self.sqlalchemy_conf = json.loads(sqlalchemy_conf_json)
self.event = event
self.init_time = time.time()
self.daemon_context = json.loads(daemon_context_json)
# PTP Context
self.ptptracker_context = {}
for config in self.daemon_context['PTP4L_CONFIGS']:
self.ptptracker_context[config] = self.daemon_context.get(
'ptptracker_context', PtpWatcherDefault.DEFAULT_PTPTRACKER_CONTEXT)
self.ptptracker_context[config]['sync_state'] = PtpState.Freerun
self.ptptracker_context[config]['last_event_time'] = self.init_time
self.ptp_device_simulated = "true" == self.ptptracker_context[config].get(
'device_simulated',
"False")
self.ptptracker_context_lock = threading.Lock()
LOG.debug("ptptracker_context: %s" % self.ptptracker_context)
# GNSS Context
self.gnsstracker_context = {}
for config in self.daemon_context['GNSS_CONFIGS']:
self.gnsstracker_context[config] = self.daemon_context.get(
'gnsstracker_context', PtpWatcherDefault.DEFAULT_GNSSTRACKER_CONTEXT)
self.gnsstracker_context[config]['sync_state'] = GnssState.Freerun
self.gnsstracker_context[config]['last_event_time'] = self.init_time
self.gnsstracker_context_lock = threading.Lock()
LOG.debug("gnsstracker_context: %s" % self.gnsstracker_context)
# OS Clock Context
self.osclocktracker_context = {}
self.osclocktracker_context = self.daemon_context.get(
'os_clock_tracker_context', PtpWatcherDefault.DEFAULT_OS_CLOCK_TRACKER_CONTEXT)
self.osclocktracker_context['sync_state'] = OsClockState.Freerun
self.osclocktracker_context['last_event_time'] = self.init_time
self.osclocktracker_context_lock = threading.Lock()
# Overall Sync Context
self.overalltracker_context = {}
self.overalltracker_context = self.daemon_context.get(
'overall_sync_tracker_context', PtpWatcherDefault.DEFAULT_OVERALL_SYNC_TRACKER_CONTEXT)
self.overalltracker_context['sync_state'] = OverallClockState.Freerun
self.overalltracker_context['last_event_time'] = self.init_time
self.overalltracker_context_lock = threading.Lock()
self.event_timeout = float(os.environ.get('CONTROL_TIMEOUT', 2))
self.node_name = self.daemon_context['THIS_NODE_NAME']
self.namespace = self.daemon_context.get('THIS_NAMESPACE', 'notification')
broker_transport_endpoint = self.daemon_context['NOTIFICATION_TRANSPORT_ENDPOINT']
registration_transport_endpoint = self.daemon_context['REGISTRATION_TRANSPORT_ENDPOINT']
self.broker_endpoint = RpcEndpointInfo(broker_transport_endpoint)
self.registration_broker_endpoint = RpcEndpointInfo(registration_transport_endpoint)
self.ptpeventproducer = PtpEventProducer(
self.node_name,
self.broker_endpoint.TransportEndpoint,
self.registration_broker_endpoint.TransportEndpoint)
self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self)
# Set forced_publishing to True so that initial states are published
# Main loop in run() sets it to false after the first iteration
self.forced_publishing = True
self.watcher = DmesgWatcher()
self.observer_list = [GnssMonitor(i) for i in self.daemon_context['GNSS_CONFIGS']]
for observer in self.observer_list:
self.watcher.attach(observer)
self.watcher_thread = threading.Thread(target=self.watcher.run_watcher)
# Setup OS Clock monitor
self.os_clock_monitor = OsClockMonitor(phc2sys_config=self.daemon_context['PHC2SYS_CONFIG'])
# Setup PTP Monitor(s)
self.ptp_monitor_list = [
PtpMonitor(config, self.ptptracker_context[config]['holdover_seconds'],
self.ptptracker_context[config]['poll_freq_seconds']) for config in
self.daemon_context['PTP4L_CONFIGS']]
def signal_ptp_event(self):
if self.event:
self.event.set()
else:
LOG.warning("Unable to assert ptp event")
pass
def run(self):
# start location listener
self.__start_listener()
# start GNSS monitoring
self.watcher_thread.start()
while True:
# announce the location
forced = self.forced_publishing
self.forced_publishing = False
if self.ptptracker_context:
self.__publish_ptpstatus(forced)
if self.gnsstracker_context:
self.__publish_gnss_status(forced)
self.__publish_os_clock_status(forced)
self.__publish_overall_sync_status(forced)
if self.event.wait(self.event_timeout):
LOG.debug("daemon control event is asserted")
self.event.clear()
else:
LOG.debug("daemon control event is timeout")
pass
continue
self.__stop_listener()
'''Start listener to answer querying from clients'''
def __start_listener(self):
LOG.debug("start listener to answer location querying")
self.ptpeventproducer.start_status_listener(
self.__ptprequest_handler
)
return
def __stop_listener(self):
LOG.debug("stop listener to answer location querying")
self.ptpeventproducer.stop_status_listener(self.location_info)
return
def __get_gnss_status(self, holdover_time, freq, sync_state, last_event_time, gnss_monitor):
new_event, sync_state, new_event_time = gnss_monitor.get_gnss_status(
holdover_time, freq, sync_state, last_event_time)
LOG.debug("Getting GNSS status.")
return new_event, sync_state, new_event_time
def __get_os_clock_status(self, holdover_time, freq, sync_state, last_event_time):
new_event, sync_state, new_event_time = self.os_clock_monitor.os_clock_status(
holdover_time, freq, sync_state, last_event_time)
LOG.debug("Getting os clock status.")
return new_event, sync_state, new_event_time
def __get_overall_sync_state(self, holdover_time, freq, sync_state, last_event_time):
new_event = False
new_event_time = last_event_time
previous_sync_state = sync_state
current_time = datetime.datetime.utcnow().timestamp()
time_in_holdover = round(current_time - last_event_time)
max_holdover_time = (holdover_time - freq * 2)
gnss_state = None
os_clock_state = None
ptp_state = None
LOG.debug("Getting overall sync state.")
for gnss in self.observer_list:
if gnss._state == GnssState.Holdover or gnss._state == GnssState.Freerun:
gnss_state = GnssState.Freerun
elif gnss._state == GnssState.Locked and gnss_state != GnssState.Freerun:
gnss_state = GnssState.Locked
os_clock_state = self.os_clock_monitor.get_os_clock_state()
ptp_state = self.ptptracker_context.get('sync_state')
if gnss_state is GnssState.Freerun or os_clock_state is OsClockState.Freerun or ptp_state \
is PtpState.Freerun:
sync_state = OverallClockState.Freerun
else:
sync_state = OverallClockState.Locked
if sync_state == OverallClockState.Freerun:
if previous_sync_state in [constants.UNKNOWN_PHC_STATE, constants.FREERUN_PHC_STATE]:
sync_state = OverallClockState.Freerun
elif previous_sync_state == constants.LOCKED_PHC_STATE:
sync_state = OverallClockState.Holdover
elif previous_sync_state == constants.HOLDOVER_PHC_STATE and \
time_in_holdover < max_holdover_time:
sync_state = OverallClockState.Holdover
else:
sync_state = OverallClockState.Freerun
if sync_state != previous_sync_state:
new_event = True
new_event_time = datetime.datetime.utcnow().timestamp()
return new_event, sync_state, new_event_time
def __get_ptp_status(self, holdover_time, freq, sync_state, last_event_time, ptp_monitor):
new_event = False
new_event_time = last_event_time
if self.ptp_device_simulated:
now = time.time()
timediff = now - last_event_time
if timediff > holdover_time:
new_event = True
new_event_time = now
if sync_state == PtpState.Freerun:
sync_state = PtpState.Locked
elif sync_state == PtpState.Locked:
sync_state = PtpState.Holdover
elif sync_state == PtpState.Holdover:
sync_state = PtpState.Freerun
else:
sync_state = PtpState.Freerun
else:
new_event, sync_state, new_event_time = ptp_monitor.ptp_status()
return new_event, sync_state, new_event_time
'''announce location'''
def __publish_os_clock_status(self, forced=False):
holdover_time = float(self.osclocktracker_context['holdover_seconds'])
freq = float(self.osclocktracker_context['poll_freq_seconds'])
sync_state = self.osclocktracker_context.get('sync_state', 'Unknown')
last_event_time = self.osclocktracker_context.get('last_event_time', time.time())
new_event, sync_state, new_event_time = self.__get_os_clock_status(
holdover_time, freq, sync_state, last_event_time)
LOG.debug("Got os clock status.")
if new_event or forced:
self.osclocktracker_context_lock.acquire()
self.osclocktracker_context['sync_state'] = sync_state
self.osclocktracker_context['last_event_time'] = new_event_time
self.osclocktracker_context_lock.release()
LOG.debug("Publish OS Clock Status")
lastStatus = {
'ResourceType': 'OS Clock',
'EventData': {
'State': sync_state
},
'ResourceQualifier': {
'NodeName': self.node_name
},
'EventTimestamp': new_event_time
}
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_OS_CLOCK)
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_OS_CLOCK,
'type': source_type[constants.SOURCE_SYNC_OS_CLOCK],
'time': new_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': constants.DATA_TYPE_NOTIFICATION,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_OS_CLOCK)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
return
def __publish_overall_sync_status(self, forced=False):
holdover_time = float(self.overalltracker_context['holdover_seconds'])
freq = float(self.overalltracker_context['poll_freq_seconds'])
sync_state = self.overalltracker_context.get('sync_state', 'Unknown')
last_event_time = self.overalltracker_context.get('last_event_time', time.time())
new_event, sync_state, new_event_time = self.__get_overall_sync_state(
holdover_time, freq, sync_state, last_event_time)
if new_event or forced:
# Update context
self.overalltracker_context_lock.acquire()
self.overalltracker_context['sync_state'] = sync_state
self.overalltracker_context['last_event_time'] = new_event_time
self.overalltracker_context_lock.release()
LOG.debug("Publish overall sync status.")
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_SYNC_STATE)
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_SYNC_STATE,
'type': source_type[constants.SOURCE_SYNC_SYNC_STATE],
'time': new_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': constants.DATA_TYPE_NOTIFICATION,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_SYNC_STATE)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
def __publish_gnss_status(self, forced=False):
for gnss in self.observer_list:
holdover_time = float(self.gnsstracker_context[gnss.config_file]['holdover_seconds'])
freq = float(self.gnsstracker_context[gnss.config_file]['poll_freq_seconds'])
sync_state = self.gnsstracker_context[gnss.config_file].get('sync_state', 'Unknown')
last_event_time = self.gnsstracker_context[gnss.config_file].get('last_event_time',
time.time())
new_event, sync_state, new_event_time = self.__get_gnss_status(
holdover_time, freq, sync_state, last_event_time, gnss)
LOG.debug("GNSS sync_state %s" % sync_state)
if new_event or forced:
# update context
self.gnsstracker_context_lock.acquire()
self.gnsstracker_context[gnss.config_file]['sync_state'] = sync_state
self.gnsstracker_context[gnss.config_file]['last_event_time'] = new_event_time
self.gnsstracker_context_lock.release()
LOG.debug("Publish GNSS status.")
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS)
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_GNSS_SYNC_STATUS,
'type': source_type[constants.SOURCE_SYNC_GNSS_SYNC_STATUS],
'time': new_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': constants.DATA_TYPE_NOTIFICATION,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
self.ptpeventproducer.publish_status(lastStatus,
constants.SOURCE_SYNC_GNSS_SYNC_STATUS)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
return
def __publish_ptpstatus(self, forced=False):
for ptp_monitor in self.ptp_monitor_list:
holdover_time = \
float(self.ptptracker_context[ptp_monitor.ptp4l_config]['holdover_seconds'])
freq = float(self.ptptracker_context[ptp_monitor.ptp4l_config]['poll_freq_seconds'])
sync_state = self.ptptracker_context[ptp_monitor.ptp4l_config]. \
get('sync_state', 'Unknown')
last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_config] \
.get('last_event_time', time.time())
new_event, sync_state, new_event_time = self.__get_ptp_status(
holdover_time, freq, sync_state, last_event_time, ptp_monitor)
new_clock_class_event, clock_class, clock_class_event_time = \
ptp_monitor.get_ptp_clock_class()
if new_event or forced:
# update context
self.ptptracker_context_lock.acquire()
self.ptptracker_context[ptp_monitor.ptp4l_config]['sync_state'] = sync_state
self.ptptracker_context[ptp_monitor.ptp4l_config][
'last_event_time'] = new_event_time
self.ptptracker_context_lock.release()
# publish new event
LOG.debug("Publish ptp status to clients")
lastStatus = {
'ResourceType': 'PTP',
'EventData': {
'State': sync_state
},
'ResourceQualifier': {
'NodeName': self.node_name
},
'EventTimestamp': new_event_time
}
self.ptpeventproducer.publish_status(lastStatus, 'PTP')
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE)
lastStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_PTP_LOCK_STATE,
'type': source_type[constants.SOURCE_SYNC_PTP_LOCK_STATE],
'time': new_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': constants.DATA_TYPE_NOTIFICATION,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value': sync_state
}
]
}
}
self.ptpeventproducer.publish_status(lastStatus,
constants.SOURCE_SYNC_PTP_LOCK_STATE)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
if new_clock_class_event or forced:
# update context
self.ptptracker_context_lock.acquire()
self.ptptracker_context[ptp_monitor.ptp4l_config]['clock_class'] = clock_class
self.ptptracker_context[ptp_monitor.ptp4l_config]['last_clock_class_event_time'] \
= clock_class_event_time
self.ptptracker_context_lock.release()
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS)
lastClockClassStatus = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
'type': source_type[constants.SOURCE_SYNC_PTP_CLOCK_CLASS],
'time': clock_class_event_time,
'data': {
'version': constants.DATA_VERSION,
'values': [
{
'data_type': constants.DATA_TYPE_NOTIFICATION,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_METRIC,
'value': clock_class
}
]
}
}
LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name,
clock_class))
self.ptpeventproducer.publish_status(lastClockClassStatus,
constants.SOURCE_SYNC_PTP_CLOCK_CLASS)
self.ptpeventproducer.publish_status(lastClockClassStatus,
constants.SOURCE_SYNC_ALL)
return
class DaemonControl(object):
def __init__(self, sqlalchemy_conf_json, daemon_context_json, process_worker=None):
self.event = mp.Event()
self.daemon_context = json.loads(daemon_context_json)
self.node_name = self.daemon_context['THIS_NODE_NAME']
if not process_worker:
process_worker = ProcessWorkerDefault
self.sqlalchemy_conf_json = sqlalchemy_conf_json
self.daemon_context_json = daemon_context_json
self.process_worker = process_worker
return
def refresh(self):
self.process_worker(self.event, self.sqlalchemy_conf_json, self.daemon_context_json)
self.event.set()