Merge "Integration work for ORAN notification"

This commit is contained in:
Zuul 2022-09-14 15:32:19 +00:00 committed by Gerrit Code Review
commit 51c321afb7
20 changed files with 384 additions and 210 deletions

View File

@ -58,12 +58,19 @@ class NotificationServiceClient(BrokerClientBase):
return
def query_resource_status(self, resource_type,
timeout=None, retry=None, resource_qualifier_json=None, resource_address=None):
timeout=None,
retry=None,
resource_qualifier_json=None,
resource_address=None,
optional=None):
topic = '{0}-Status'.format(resource_type)
server = '{0}-Tracking-{1}'.format(resource_type, self.target_node_name)
return self.call(
topic, server, 'QueryStatus', timeout=timeout, retry=retry,
QualifierJson=resource_qualifier_json, ResourceAddress=resource_address)
topic, server, 'QueryStatus',
timeout=timeout, retry=retry,
QualifierJson=resource_qualifier_json,
ResourceAddress=resource_address,
optional=optional)
def add_resource_status_listener(self, resource_type, status_handler=None):
if not status_handler:

View File

@ -9,6 +9,8 @@ DATA_TYPE_METRIC = "metric"
VALUE_TYPE_ENUMERATION = "enumeration"
VALUE_TYPE_METRIC = "metric"
PTP_V1_KEY = "ptp_notification_v1"
SOURCE_SYNC_ALL = '/sync'
SOURCE_SYNC_GNSS_SYNC_STATUS = '/sync/gnss-status/gnss-sync-status'
SOURCE_SYNC_PTP_CLOCK_CLASS = '/sync/ptp-status/clock-class'
@ -19,6 +21,18 @@ SOURCE_SYNCE_CLOCK_QUALITY = '/sync/synce-status/clock-quality'
SOURCE_SYNCE_LOCK_STATE_EXTENDED = '/sync/synce-status/lock-state-extended'
SOURCE_SYNCE_LOCK_STATE = '/sync/synce-status/lock-state'
RESOURCE_ADDRESS_MAPPINGS = {
SOURCE_SYNC_ALL: 'sync',
SOURCE_SYNC_GNSS_SYNC_STATUS: 'gnss_sync_state',
SOURCE_SYNC_PTP_CLOCK_CLASS: 'ptp_clock_class',
SOURCE_SYNC_PTP_LOCK_STATE: 'ptp_lock_state',
SOURCE_SYNC_OS_CLOCK: 'os_clock_sync_state',
SOURCE_SYNC_SYNC_STATE: 'sync_state',
SOURCE_SYNCE_CLOCK_QUALITY: 'synce_clock_quality',
SOURCE_SYNCE_LOCK_STATE_EXTENDED: 'synce_lock_state_extended',
SOURCE_SYNCE_LOCK_STATE: 'synce_lock_state'
}
VALID_SOURCE_URI = {
SOURCE_SYNC_ALL,
SOURCE_SYNC_GNSS_SYNC_STATUS,

View File

@ -1,21 +1,22 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
import logging
import sys
import os
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "INFO")
def get_logger(module_name):
logger = logging.getLogger(module_name)
return config_logger(logger)
def config_logger(logger):
'''
configure the logger: uncomment following lines for debugging
'''
logger.setLevel(LOGGING_LEVEL)
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO"))
return logger

View File

@ -4,27 +4,27 @@
# SPDX-License-Identifier: Apache-2.0
#
import os
import json
import re
import requests
import logging
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from datetime import datetime
from notificationclientsdk.common.helpers import constants
from notificationclientsdk.common.helpers import log_helper
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
def notify(subscriptioninfo, notification, timeout=2, retry=3):
result = False
while True:
retry = retry - 1
try:
headers = {'Content-Type': 'application/json'}
data = json.dumps(notification)
data = format_notification_data(subscriptioninfo, notification)
data = json.dumps(data)
url = subscriptioninfo.EndpointUri
response = requests.post(url, data=data, headers=headers, timeout=timeout)
response.raise_for_status()
@ -52,13 +52,58 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3):
return result
def format_notification_data(subscriptioninfo, notification):
if hasattr(subscriptioninfo, 'ResourceType'):
LOG.debug("format_notification_data: Found v1 subscription, no formatting required.")
return notification
elif hasattr(subscriptioninfo, 'ResourceAddress'):
_, _, resource_path, _, _ = parse_resource_address(subscriptioninfo.ResourceAddress)
resource_mapped_value = constants.RESOURCE_ADDRESS_MAPPINGS[resource_path]
formatted_notification = {resource_mapped_value: []}
for instance in notification:
# Add the instance identifier to ResourceAddress for PTP lock-state
# and PTP clockClass
if notification[instance]['source'] in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
constants.SOURCE_SYNC_PTP_LOCK_STATE]:
temp_values = notification[instance].get('data', {}).get('values', [])
resource_address = temp_values[0].get('ResourceAddress', None)
if instance not in resource_address:
add_instance_name = resource_address.split('/', 3)
add_instance_name.insert(3, instance)
resource_address = '/'.join(add_instance_name)
notification[instance]['data']['values'][0]['ResourceAddress'] = resource_address
formatted_notification[resource_mapped_value].append(notification[instance])
for instance in formatted_notification[resource_mapped_value]:
this_delivery_time = instance['time']
if type(this_delivery_time) != str:
format_time = datetime.fromtimestamp(float(this_delivery_time)).\
strftime('%Y-%m-%dT%H:%M:%S%fZ')
instance['time'] = format_time
else:
raise Exception("format_notification_data: No valid source address found in notification")
LOG.debug(
"format_notification_data: Added parent key for client consumption: %s" %
formatted_notification)
return formatted_notification
def parse_resource_address(resource_address):
# The format of resource address is:
# /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource}
# Assume no optional hierarchy for now
clusterName = resource_address.split('/')[1]
nodeName = resource_address.split('/')[2]
resource_path = '/' + re.split('[/]', resource_address, 3)[3]
resource_list = re.findall(r'[^/]+', resource_path)
if len(resource_list) == 4:
remove_optional = '/' + resource_list[0]
resource_path = resource_path.replace(remove_optional, '')
resource_address = resource_address.replace(remove_optional, '')
optional = resource_list[0]
LOG.debug("Optional hierarchy found when parsing resource address: %s" % optional)
else:
optional = None
return clusterName, nodeName, resource_path
# resource_address is the full address without any optional hierarchy
# resource_path is the specific identifier for the resource
return clusterName, nodeName, resource_path, optional, resource_address

View File

@ -17,13 +17,16 @@ from notificationclientsdk.model.dto.broker_state import BrokerState
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class BrokerStateManager:
'''
Manager to manage broker states
Note: Now it is not thread safe
'''
def __init__(self):
self.broker_state_map = {}
self.disabled_brokers = []
@ -84,7 +87,7 @@ class BrokerStateManager:
for broker_name, brokerstate in self.broker_state_map.items():
try:
if brokerstate.any_obsolete_subscription(
self.subscription_refresh_iteration):
self.subscription_refresh_iteration):
return True
except Exception as ex:
LOG.warning(
@ -104,14 +107,17 @@ class BrokerStateManager:
broker_name = subscription.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId))
LOG.debug(
"Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId))
return False
else:
subscription = SubscriptionInfoV2(subscription_orm)
_, nodename, resource = subscription_helper.parse_resource_address(subscription.ResourceAddress)
_, nodename, resource, _, _ = subscription_helper.parse_resource_address(
subscription.ResourceAddress)
broker_name = nodename
LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status))
LOG.debug(
"subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status))
if subscription_orm.Status != 1:
return False
@ -200,8 +206,10 @@ class BrokerStateManager:
# trigger to sync up notification after (re-)connection
LOG.debug("Trigger to re-sync up data: {0}".format(broker_name))
result = brokerstate.signal_data_syncup()
elif brokerstate.is_resource_subscribed_changed() or brokerstate.is_resources_changed():
LOG.debug("Update watching due to resources changed: {0}".format(broker_name))
elif brokerstate.is_resource_subscribed_changed() or \
brokerstate.is_resources_changed():
LOG.debug(
"Update watching due to resources changed: {0}".format(broker_name))
result = broker_connection_manager.update_watching_resources(brokerstate)
# leave the signals as it is to re-sync up in next loop in case of failure

View File

@ -26,8 +26,8 @@ from notificationclientsdk.client.notificationservice import NotificationHandler
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
log_helper.config_logger(LOG)
class NotificationHandler(NotificationHandlerBase):
@ -55,26 +55,25 @@ class NotificationHandler(NotificationHandlerBase):
if not resource_type:
raise Exception("abnormal notification@{0}".format(node_name))
if not resource_type in self.__supported_resource_types:
raise Exception("notification with unsupported resource type:{0}".format(resource_type))
raise Exception(
"notification with unsupported resource type:{0}".format(resource_type))
this_delivery_time = notification_info['EventTimestamp']
else:
source = notification_info.get('source', None)
values = notification_info.get('data', {}).get('values', [])
parent_key = list(notification_info.keys())[0]
source = notification_info[parent_key].get('source', None)
values = notification_info[parent_key].get('data', {}).get('values', [])
resource_address = values[0].get('ResourceAddress', None)
this_delivery_time = notification_info[parent_key].get('time')
if not resource_address:
raise Exception("No resource address in notification source".format(source))
_,node_name,_ = subscription_helper.parse_resource_address(resource_address)
this_delivery_time = notification_info['time']
# Change time from float to ascii format
notification_info['time'] = datetime.fromtimestamp(this_delivery_time).strftime('%Y-%m-%dT%H:%M:%S%fZ')
# notification_info['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(this_delivery_time))
_, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address)
entries = subscription_repo.get(Status=1)
for entry in entries:
subscriptionid = entry.SubscriptionId
if entry.ResourceAddress:
_,entry_node_name,_ = subscription_helper.parse_resource_address(entry.ResourceAddress)
_, entry_node_name, _, _, _ = subscription_helper.parse_resource_address(
entry.ResourceAddress)
subscription_dto2 = SubscriptionInfoV2(entry)
else:
ResourceQualifierJson = entry.ResourceQualifierJson or '{}'
@ -87,10 +86,12 @@ class NotificationHandler(NotificationHandlerBase):
continue
try:
last_delivery_time = self.__get_latest_delivery_timestamp(node_name, subscriptionid)
last_delivery_time = self.__get_latest_delivery_timestamp(node_name,
subscriptionid)
if last_delivery_time and last_delivery_time >= this_delivery_time:
# skip this entry since already delivered
LOG.debug("Ignore the outdated notification for: {0}".format(entry.SubscriptionId))
LOG.debug("Ignore the outdated notification for: {0}".format(
entry.SubscriptionId))
continue
subscription_helper.notify(subscription_dto2, notification_info)
@ -117,7 +118,7 @@ class NotificationHandler(NotificationHandlerBase):
del subscription_repo
def __get_latest_delivery_timestamp(self, node_name, subscriptionid):
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
return last_delivery_time
@ -126,18 +127,18 @@ class NotificationHandler(NotificationHandlerBase):
self.notification_stat[node_name] = {
subscriptionid: {
'EventTimestamp': this_delivery_time
}
}
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
elif not self.notification_stat[node_name].get(subscriptionid, None):
self.notification_stat[node_name][subscriptionid] = {
'EventTimestamp': this_delivery_time
}
}
LOG.debug("delivery time @node: {0},subscription:{1} is added".format(
node_name, subscriptionid))
else:
last_delivery_stat = self.notification_stat.get(node_name,{}).get(subscriptionid,{})
last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {})
last_delivery_time = last_delivery_stat.get('EventTimestamp', None)
if (last_delivery_time and last_delivery_time >= this_delivery_time):
return

View File

@ -10,6 +10,7 @@ import logging
import multiprocessing as mp
import threading
import sys
if sys.version > '3':
import queue as Queue
else:
@ -18,6 +19,7 @@ else:
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
@ -38,14 +40,13 @@ from notificationclientsdk.services.broker_connection_manager import BrokerConne
from notificationclientsdk.services.notification_handler import NotificationHandler
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class NotificationWorker:
class NotificationWorker:
class LocationInfoHandler(LocationHandlerBase):
'''Glue code to forward location info to daemon method'''
def __init__(self, locationinfo_dispatcher):
self.locationinfo_dispatcher = locationinfo_dispatcher
super(NotificationWorker.LocationInfoHandler, self).__init__()
@ -55,7 +56,7 @@ class NotificationWorker:
return self.locationinfo_dispatcher.produce_location_event(location_info)
def __init__(
self, event, subscription_event, daemon_context):
self, event, subscription_event, daemon_context):
self.__alive = True
@ -254,7 +255,8 @@ class NotificationWorker:
elif s.ResourceAddress:
# Get nodename from resource address
LOG.info("Parse resource address {}".format(s.ResourceAddress))
_,nodename,_ = subscription_helper.parse_resource_address(s.ResourceAddress)
_, nodename, _, _, _ = subscription_helper.parse_resource_address(
s.ResourceAddress)
broker_name = nodename
else:
LOG.debug("Subscription {} does not have ResourceType or "
@ -298,7 +300,7 @@ class NotificationWorker:
try:
LOG.debug("try to sync up data for {0} brokers".format(
self.broker_state_manager.count_brokers()))
result,_,_ = self.broker_state_manager.syncup_broker_data(
result, _, _ = self.broker_state_manager.syncup_broker_data(
self.broker_connection_manager)
except Exception as ex:
result = False

View File

@ -12,6 +12,8 @@ from datetime import datetime, timezone
from notificationclientsdk.client.notificationservice import NotificationServiceClient
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.common.helpers import log_helper
from notificationclientsdk.common.helpers import constants
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
@ -21,14 +23,12 @@ from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.services.daemon import DaemonControl
from notificationclientsdk.exception import client_exception
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class PtpService(object):
def __init__(self, daemon_control):
@ -72,7 +72,7 @@ class PtpService(object):
finally:
del nodeinfo_repo
def query(self, broker_name, resource_address=None):
def query(self, broker_name, resource_address=None, optional=None):
default_node_name = NodeInfoHelper.default_node_name(broker_name)
broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name)
@ -85,9 +85,9 @@ class PtpService(object):
ResourceType.TypePTP, default_node_name))
raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP)
return self._query(default_node_name, broker_pod_ip, resource_address)
return self._query(default_node_name, broker_pod_ip, resource_address, optional)
def _query(self, broker_name, broker_pod_ip, resource_address=None):
def _query(self, broker_name, broker_pod_ip, resource_address=None, optional=None):
broker_host = "[{0}]".format(broker_pod_ip)
broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format(
self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'],
@ -99,7 +99,8 @@ class PtpService(object):
notificationservice_client = NotificationServiceClient(
broker_name, broker_transport_endpoint, broker_pod_ip)
resource_status = notificationservice_client.query_resource_status(
ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address)
ResourceType.TypePTP, timeout=5, retry=10, resource_address=resource_address,
optional=optional)
return resource_status
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("ptp status is not available @node {0} due to {1}".format(
@ -117,7 +118,8 @@ class PtpService(object):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
resource_address = None
if hasattr(subscription_dto, 'ResourceAddress'):
_,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress)
_, nodename, _, _, _ = subscription_helper.parse_resource_address(subscription_dto.
ResourceAddress)
broker_name = nodename
resource_address = subscription_dto.ResourceAddress
elif hasattr(subscription_dto, 'ResourceType'):
@ -139,17 +141,22 @@ class PtpService(object):
if default_node_name:
ptpstatus = None
ptpstatus = self._query(default_node_name, broker_pod_ip, resource_address)
ptpstatus = self._query(default_node_name,
broker_pod_ip,
resource_address,
optional=None)
LOG.info("initial ptpstatus:{0}".format(ptpstatus))
# construct subscription entry
timestamp = ptpstatus.get('EventTimestamp', None)
if timestamp is None:
timestamp = ptpstatus.get('time', None)
# Change time from float to ascii format
ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ')
# ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(timestamp))
if constants.PTP_V1_KEY in ptpstatus:
timestamp = ptpstatus[constants.PTP_V1_KEY].get('EventTimestamp', None)
ptpstatus = ptpstatus[constants.PTP_V1_KEY]
else:
for item in ptpstatus:
timestamp = ptpstatus[item].get('time', None)
# Change time from float to ascii format
ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']) \
.strftime('%Y-%m-%dT%H:%M:%S%fZ')
subscription_orm.InitialDeliveryTimestamp = timestamp
entry = self.subscription_repo.add(subscription_orm)
@ -179,7 +186,7 @@ class PtpService(object):
def remove_subscription(self, subscriptionid):
try:
# 1, delete entry
self.subscription_repo.delete_one(SubscriptionId = subscriptionid)
self.subscription_repo.delete_one(SubscriptionId=subscriptionid)
self.subscription_repo.commit()
# 2, refresh daemon
self.daemon_control.refresh()

View File

@ -37,7 +37,8 @@ class ResourceAddressController(object):
def CurrentState(self):
try:
# validate resource address
_, nodename, resource = subscription_helper.parse_resource_address(self.resource_address)
_, nodename, resource, optional, self.resource_address = subscription_helper.\
parse_resource_address(self.resource_address)
if nodename != THIS_NODE_NAME and nodename != '.':
LOG.warning("Node {} is not available".format(nodename))
abort(404)
@ -45,11 +46,13 @@ class ResourceAddressController(object):
LOG.warning("Resource {} is not valid".format(resource))
abort(404)
ptpservice = PtpService(notification_control)
ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address)
ptpstatus = ptpservice.query(THIS_NODE_NAME, self.resource_address, optional)
# Change time from float to ascii format
# ptpstatus['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ',
# time.gmtime(ptpstatus['time']))
ptpstatus['time'] = datetime.fromtimestamp(ptpstatus['time']).strftime('%Y-%m-%dT%H:%M:%S%fZ')
for item in ptpstatus:
ptpstatus[item]['time'] = datetime.fromtimestamp(ptpstatus[item]['time']).\
strftime('%Y-%m-%dT%H:%M:%S%fZ')
return ptpstatus
except client_exception.NodeNotAvailable as ex:
LOG.warning("Node is not available:{0}".format(str(ex)))

View File

@ -14,22 +14,21 @@ import logging
from wsme import types as wtypes
from wsmeext.pecan import wsexpose
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.services.ptp import PtpService
from notificationclientsdk.exception import client_exception
from notificationclientsdk.common.helpers import log_helper
from sidecar.repository.notification_control import notification_control
from sidecar.repository.dbcontext_default import defaults
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME", 'controller-0')
class SubscriptionsControllerV2(rest.RestController):
@ -47,7 +46,7 @@ class SubscriptionsControllerV2(rest.RestController):
abort(400)
subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v2/subscriptions".format(
conf.server.get('protocol','http'),
conf.server.get('protocol', 'http'),
conf.server.get('host', '127.0.0.1'),
conf.server.get('port', '8080')
)
@ -77,13 +76,13 @@ class SubscriptionsControllerV2(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
abort(500)
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)
@expose('json')
def get(self):
try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False)
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False)
entries = repo.get(Status=1)
response.status = 200
subs = []
@ -99,7 +98,7 @@ class SubscriptionsControllerV2(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)
@expose()
@ -115,6 +114,7 @@ class SubscriptionsControllerV2(rest.RestController):
except:
return False
class SubscriptionController(rest.RestController):
def __init__(self, subscription_id):
self.subscription_id = subscription_id
@ -122,7 +122,7 @@ class SubscriptionController(rest.RestController):
@expose('json')
def get(self):
try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False)
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id, Status=1)
if not entry:
@ -139,13 +139,13 @@ class SubscriptionController(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)
@wsexpose(status_code=204)
def delete(self):
try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False)
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit=False)
entry = repo.get_one(SubscriptionId=self.subscription_id)
if entry:
if entry.SubscriptionId:
@ -164,5 +164,5 @@ class SubscriptionController(rest.RestController):
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
LOG.error("Exception:{0}@{1}".format(type(ex), str(ex)))
abort(500)

View File

@ -17,8 +17,10 @@ import logging
LOG = logging.getLogger(__name__)
from trackingfunctionsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class PtpEventProducer(object):
class ListenerEndpoint(object):
target = oslo_messaging.Target(namespace='notification', version='1.0')
@ -30,28 +32,28 @@ class PtpEventProducer(object):
pass
def QueryStatus(self, ctx, **rpc_kwargs):
LOG.debug("PtpEventProducer QueryStatus called %s" %rpc_kwargs)
LOG.debug("PtpEventProducer QueryStatus called %s" % rpc_kwargs)
if self.handler:
return self.handler.query_status(**rpc_kwargs)
else:
return None
def TriggerDelivery(self, ctx, **rpc_kwargs):
LOG.debug("PtpEventProducer TriggerDelivery called %s" %rpc_kwargs)
LOG.debug("PtpEventProducer TriggerDelivery called %s" % rpc_kwargs)
if self.handler:
return self.handler.trigger_delivery(**rpc_kwargs)
else:
return None
def __init__(self, node_name, local_broker_transport_endpoint,
registration_broker_transport_endpoint=None):
registration_broker_transport_endpoint=None):
self.Id = id(self)
self.node_name = node_name
self.local_broker_client = BrokerClientBase(
'LocalPtpEventProducer', local_broker_transport_endpoint)
if registration_broker_transport_endpoint:
self.registration_broker_client = BrokerClientBase(
'AllPtpEventProducer', registration_broker_transport_endpoint)
'AllPtpEventProducer', registration_broker_transport_endpoint)
else:
self.registration_broker_client = None
return
@ -67,14 +69,16 @@ class PtpEventProducer(object):
def publish_status(self, ptpstatus, retry=3):
result = False
result1 = self.publish_status_local(ptpstatus, retry) if self.local_broker_client else result
result2 = self.publish_status_all(ptpstatus, retry) if self.registration_broker_client else result
result1 = self.publish_status_local(ptpstatus,
retry) if self.local_broker_client else result
result2 = self.publish_status_all(ptpstatus,
retry) if self.registration_broker_client else result
return result1, result2
def publish_status_local(self, ptpstatus, source, retry=3):
if not self.local_broker_client:
return False
topic='{0}-Event-{1}'.format(source, self.node_name)
topic = '{0}-Event-{1}'.format(source, self.node_name)
server = None
isretrystopped = False
while not isretrystopped:
@ -97,7 +101,7 @@ class PtpEventProducer(object):
def publish_status_all(self, ptpstatus, retry=3):
if not self.registration_broker_client:
return False
topic_all='PTP-Event-*'
topic_all = 'PTP-Event-*'
server = None
isretrystopped = False
while not isretrystopped:
@ -114,13 +118,14 @@ class PtpEventProducer(object):
if isretrystopped:
LOG.error("Failed to publish ptp status:{0}@Topic:{1}".format(
ptpstatus, topic))
ptpstatus, topic_all))
return isretrystopped == False
def start_status_listener(self, handler=None):
result = False
result1 = self.start_status_listener_local(handler) if self.local_broker_client else result
result2 = self.start_status_listener_all(handler) if self.registration_broker_client else result
result2 = self.start_status_listener_all(
handler) if self.registration_broker_client else result
result = result1 and result2
return result
@ -128,8 +133,8 @@ class PtpEventProducer(object):
if not self.local_broker_client:
return False
topic='PTP-Status'
server='PTP-Tracking-{0}'.format(self.node_name)
topic = 'PTP-Status'
server = 'PTP-Tracking-{0}'.format(self.node_name)
endpoints = [PtpEventProducer.ListenerEndpoint(handler)]
self.local_broker_client.add_listener(
@ -140,8 +145,8 @@ class PtpEventProducer(object):
if not self.registration_broker_client:
return False
topic='PTP-Status'
server='PTP-Tracking-{0}'.format(self.node_name)
topic = 'PTP-Status'
server = 'PTP-Tracking-{0}'.format(self.node_name)
endpoints = [PtpEventProducer.ListenerEndpoint(handler)]
self.registration_broker_client.add_listener(
@ -159,8 +164,8 @@ class PtpEventProducer(object):
if not self.local_broker_client:
return False
topic='PTP-Status'
server="PTP-Tracking-{0}".format(self.node_name)
topic = 'PTP-Status'
server = "PTP-Tracking-{0}".format(self.node_name)
self.local_broker_client.remove_listener(
topic, server)
@ -168,8 +173,8 @@ class PtpEventProducer(object):
if not self.registration_broker_client:
return False
topic='PTP-Status'
server="PTP-Tracking-{0}".format(self.node_name)
topic = 'PTP-Status'
server = "PTP-Tracking-{0}".format(self.node_name)
self.registration_broker_client.remove_listener(
topic, server)
@ -184,15 +189,15 @@ class PtpEventProducer(object):
if not self.local_broker_client:
return False
topic='PTP-Status'
server="PTP-Tracking-{0}".format(self.node_name)
topic = 'PTP-Status'
server = "PTP-Tracking-{0}".format(self.node_name)
return self.local_broker_client.is_listening(
topic, server)
def is_listening_all(self):
if not self.registration_broker_client:
return False
topic='PTP-Status'
server="PTP-Tracking-{0}".format(self.node_name)
topic = 'PTP-Status'
server = "PTP-Tracking-{0}".format(self.node_name)
return self.registration_broker_client.is_listening(
topic, server)

View File

@ -80,7 +80,6 @@ class CguHandler:
def cgu_output_to_dict(self):
# Take raw cgu output and parse it into a dict
cgu_output = self.cgu_output_raw.splitlines()
LOG.debug("CGU output: %s" % cgu_output)
cgu_dict = {'input': {},
'EEC DPLL': {
'Current reference': '',

View File

@ -45,6 +45,8 @@ CLOCK_REALTIME = "CLOCK_REALTIME"
PHC2SYS_TOLERANCE_LOW = 36999999000
PHC2SYS_TOLERANCE_HIGH = 37000001000
PTP_V1_KEY = "ptp_notification_v1"
# testing values
CGU_PATH_VALID = "/sys/kernel/debug/ice/0000:18:00.0/cgu"

View File

@ -5,6 +5,7 @@
#
import logging
import datetime
import re
from abc import ABC, abstractmethod
@ -34,6 +35,13 @@ class GnssMonitor(Observer):
def __init__(self, config_file, nmea_serialport=None, pci_addr=None, cgu_path=None):
self.config_file = config_file
try:
pattern = '(?<=/ptp/ptpinstance/ts2phc-).*(?=.conf)'
match = re.search(pattern, self.config_file)
self.ts2phc_service_name = match.group()
except AttributeError:
LOG.warning("GnssMonitor: Unable to determine tsphc_service name from %s"
% self.config_file)
# Setup GNSS data
self.gnss_cgu_handler = CguHandler(config_file, nmea_serialport, pci_addr, cgu_path)

View File

@ -15,6 +15,8 @@ def get_logger(module_name):
def config_logger(logger):
logging.basicConfig(stream=sys.stdout)
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger.setLevel(level=os.environ.get("LOGGING_LEVEL", "INFO"))
return logger

View File

@ -50,14 +50,11 @@ class PtpMonitor:
pmc_query_results = {}
def __init__(self, ptp4l_config, holdover_time, freq, init=True):
def __init__(self, ptp4l_instance, holdover_time, freq, init=True):
if init:
self.ptp4l_config = ptp4l_config
pattern = '(?<=/ptp/ptpinstance/ptp4l-).*(?=.conf)'
match = re.search(pattern, self.ptp4l_config)
self.ptp4l_service_name = match.group()
LOG.debug(self.ptp4l_service_name)
self.ptp4l_config = "/ptp/ptpinstance/ptp4l-%s.conf" % ptp4l_instance
self.ptp4l_service_name = ptp4l_instance
self.phc2sys_service_name = os.environ.get('PHC2SYS_SERVICE_NAME', 'phc2sys')
self.holdover_time = int(holdover_time)
self.freq = int(freq)

View File

@ -8,7 +8,6 @@ import json
import logging
import multiprocessing as mp
import os
import sys
import threading
import time
from oslo_utils import uuidutils
@ -43,7 +42,6 @@ source_type = {
'/sync/synce-status/clock-quality': 'event.sync.synce-status.synce-clock-quality-change',
'/sync/synce-status/lock-state-extended': 'event.sync.synce-status.synce-state-change-extended',
'/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change',
'/sync/synce-status/lock-state': 'event.sync.synce-status.synce-state-change',
}
'''Entry point of Default Process Worker'''
@ -77,12 +75,13 @@ class PtpWatcherDefault:
}
class PtpRequestHandlerDefault(object):
def __init__(self, watcher):
def __init__(self, watcher, daemon_context):
self.watcher = watcher
self.init_time = time.time()
self.daemon_context = daemon_context
def _build_event_response(self, resource_path, last_event_time, resource_address,
sync_state):
sync_state, value_type=constants.VALUE_TYPE_ENUMERATION):
if resource_path in [constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
constants.SOURCE_SYNCE_CLOCK_QUALITY]:
data_type = constants.DATA_TYPE_METRIC
@ -100,7 +99,7 @@ class PtpWatcherDefault:
{
'data_type': data_type,
'ResourceAddress': resource_address,
'value_type': constants.VALUE_TYPE_ENUMERATION,
'value_type': value_type,
'value': sync_state
}
]
@ -111,28 +110,90 @@ class PtpWatcherDefault:
def query_status(self, **rpc_kwargs):
lastStatus = {}
resource_address = rpc_kwargs.get('ResourceAddress', None)
optional = rpc_kwargs.get('optional', None)
if resource_address:
_, nodename, resource_path = utils.parse_resource_address(resource_address)
if resource_path == constants.SOURCE_SYNC_ALL:
resource_path = constants.SOURCE_SYNC_SYNC_STATE
if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS:
self.watcher.gnsstracker_context_lock.acquire()
sync_state = self.watcher.gnsstracker_context.get('sync_state',
GnssState.Freerun)
last_event_time = self.watcher.gnsstracker_context.get('last_event_time',
time.time())
if optional:
sync_state = self.watcher.gnsstracker_context[optional]. \
get('sync_state', GnssState.Freerun)
last_event_time = self.watcher.gnsstracker_context[optional].get(
'last_event_time',
time.time())
lastStatus[optional] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
else:
for config in self.daemon_context['GNSS_INSTANCES']:
sync_state = self.watcher.gnsstracker_context[config] \
.get('sync_state', GnssState.Freerun)
last_event_time = self.watcher.gnsstracker_context[config].get(
'last_event_time',
time.time())
lastStatus[config] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
self.watcher.gnsstracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
# elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS:
elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS:
self.watcher.ptptracker_context_lock.acquire()
if optional:
clock_class = self.watcher.ptptracker_context[optional].get('clock_class',
'248')
last_clock_class_event_time = self.watcher.ptptracker_context[optional].get(
'last_clock_class_event_time',
time.time())
lastStatus[optional] = \
self._build_event_response(resource_path,
last_clock_class_event_time,
resource_address,
clock_class,
constants.VALUE_TYPE_METRIC)
else:
for config in self.daemon_context['PTP4L_INSTANCES']:
clock_class = self.watcher.ptptracker_context[config].get('clock_class',
'248')
last_clock_class_event_time = \
self.watcher.ptptracker_context[config].get(
'last_clock_class_event_time',
time.time())
lastStatus[config] = \
self._build_event_response(resource_path,
last_clock_class_event_time,
resource_address,
clock_class,
constants.VALUE_TYPE_METRIC)
self.watcher.ptptracker_context_lock.release()
elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE:
self.watcher.ptptracker_context_lock.acquire()
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context.get('last_event_time',
time.time())
if optional:
sync_state = self.watcher.ptptracker_context[optional].get('sync_state',
PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context[optional].get(
'last_event_time',
time.time())
lastStatus[optional] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
else:
for config in self.daemon_context['PTP4L_INSTANCES']:
sync_state = \
self.watcher.ptptracker_context[config].get('sync_state',
PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context[config].get(
'last_event_time',
time.time())
lastStatus[config] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
self.watcher.ptptracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
elif resource_path == constants.SOURCE_SYNC_OS_CLOCK:
self.watcher.osclocktracker_context_lock.acquire()
sync_state = self.watcher.osclocktracker_context.get('sync_state',
@ -140,8 +201,10 @@ class PtpWatcherDefault:
last_event_time = self.watcher.osclocktracker_context.get('last_event_time',
time.time())
self.watcher.osclocktracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
lastStatus['os_clock_status'] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
elif resource_path == constants.SOURCE_SYNC_SYNC_STATE:
self.watcher.overalltracker_context_lock.acquire()
sync_state = self.watcher.overalltracker_context.get('sync_state',
@ -149,16 +212,24 @@ class PtpWatcherDefault:
last_event_time = self.watcher.overalltracker_context.get('last_event_time',
time.time())
self.watcher.overalltracker_context_lock.release()
lastStatus = self._build_event_response(resource_path, last_event_time,
resource_address, sync_state)
lastStatus['overall_sync_status'] = self._build_event_response(resource_path,
last_event_time,
resource_address,
sync_state)
LOG.debug("query_status: {}".format(lastStatus))
else:
# Request is for PTP v1 notification
# PTP v1 only supports single instance ptp
instance = self.daemon_context['PTP4L_INSTANCES'][0]
if len(self.daemon_context['PTP4L_INSTANCES']) > 1:
LOG.warning(
"Multiple ptp4l instances configured, retrieving status for %s" % instance)
self.watcher.ptptracker_context_lock.acquire()
sync_state = self.watcher.ptptracker_context.get('sync_state', PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context.get('last_event_time',
time.time())
self.watcher.ptptracker_context_lock.release()
lastStatus = {
sync_state = self.watcher.ptptracker_context[instance].get('sync_state',
PtpState.Freerun)
last_event_time = self.watcher.ptptracker_context[instance].get('last_event_time',
time.time())
lastStatus[constants.PTP_V1_KEY] = {
'ResourceType': ResourceType.TypePTP,
'EventData': {
'State': sync_state
@ -168,6 +239,9 @@ class PtpWatcherDefault:
},
'EventTimestamp': last_event_time
}
self.watcher.ptptracker_context_lock.release()
LOG.warning("query_status PTP v1: {}".format(lastStatus))
return lastStatus
def trigger_delivery(self, **rpc_kwargs):
@ -184,7 +258,7 @@ class PtpWatcherDefault:
# PTP Context
self.ptptracker_context = {}
for config in self.daemon_context['PTP4L_CONFIGS']:
for config in self.daemon_context['PTP4L_INSTANCES']:
self.ptptracker_context[config] = self.daemon_context.get(
'ptptracker_context', PtpWatcherDefault.DEFAULT_PTPTRACKER_CONTEXT)
self.ptptracker_context[config]['sync_state'] = PtpState.Freerun
@ -197,7 +271,7 @@ class PtpWatcherDefault:
# GNSS Context
self.gnsstracker_context = {}
for config in self.daemon_context['GNSS_CONFIGS']:
for config in self.daemon_context['GNSS_INSTANCES']:
self.gnsstracker_context[config] = self.daemon_context.get(
'gnsstracker_context', PtpWatcherDefault.DEFAULT_GNSSTRACKER_CONTEXT)
self.gnsstracker_context[config]['sync_state'] = GnssState.Freerun
@ -238,7 +312,8 @@ class PtpWatcherDefault:
self.broker_endpoint.TransportEndpoint,
self.registration_broker_endpoint.TransportEndpoint)
self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self)
self.__ptprequest_handler = PtpWatcherDefault.PtpRequestHandlerDefault(self,
self.daemon_context)
# Set forced_publishing to True so that initial states are published
# Main loop in run() sets it to false after the first iteration
@ -258,7 +333,7 @@ class PtpWatcherDefault:
self.ptp_monitor_list = [
PtpMonitor(config, self.ptptracker_context[config]['holdover_seconds'],
self.ptptracker_context[config]['poll_freq_seconds']) for config in
self.daemon_context['PTP4L_CONFIGS']]
self.daemon_context['PTP4L_INSTANCES']]
def signal_ptp_event(self):
if self.event:
@ -340,7 +415,6 @@ class PtpWatcherDefault:
gnss_state = GnssState.Locked
os_clock_state = self.os_clock_monitor.get_os_clock_state()
ptp_state = self.ptptracker_context.get('sync_state')
if gnss_state is GnssState.Freerun or os_clock_state is OsClockState.Freerun or ptp_state \
@ -393,11 +467,11 @@ class PtpWatcherDefault:
freq = float(self.osclocktracker_context['poll_freq_seconds'])
sync_state = self.osclocktracker_context.get('sync_state', 'Unknown')
last_event_time = self.osclocktracker_context.get('last_event_time', time.time())
lastStatus = {}
new_event, sync_state, new_event_time = self.__get_os_clock_status(
holdover_time, freq, sync_state, last_event_time)
LOG.debug("Got os clock status.")
LOG.info("os_clock_status: state is %s, new_event is %s " % (sync_state, new_event))
if new_event or forced:
self.osclocktracker_context_lock.acquire()
self.osclocktracker_context['sync_state'] = sync_state
@ -405,20 +479,10 @@ class PtpWatcherDefault:
self.osclocktracker_context_lock.release()
LOG.debug("Publish OS Clock Status")
lastStatus = {
'ResourceType': 'OS Clock',
'EventData': {
'State': sync_state
},
'ResourceQualifier': {
'NodeName': self.node_name
},
'EventTimestamp': new_event_time
}
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_OS_CLOCK)
lastStatus = {
lastStatus['os_clock_status'] = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_OS_CLOCK,
@ -441,6 +505,7 @@ class PtpWatcherDefault:
return
def __publish_overall_sync_status(self, forced=False):
lastStatus = {}
holdover_time = float(self.overalltracker_context['holdover_seconds'])
freq = float(self.overalltracker_context['poll_freq_seconds'])
sync_state = self.overalltracker_context.get('sync_state', 'Unknown')
@ -448,6 +513,7 @@ class PtpWatcherDefault:
new_event, sync_state, new_event_time = self.__get_overall_sync_state(
holdover_time, freq, sync_state, last_event_time)
LOG.info("overall_sync_state: state is %s, new_event is %s " % (sync_state, new_event))
if new_event or forced:
# Update context
@ -459,7 +525,7 @@ class PtpWatcherDefault:
LOG.debug("Publish overall sync status.")
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_SYNC_STATE)
lastStatus = {
lastStatus['overall_sync_status'] = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_SYNC_STATE,
@ -477,28 +543,32 @@ class PtpWatcherDefault:
]
}
}
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_SYNC_STATE)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
def __publish_gnss_status(self, forced=False):
lastStatus = {}
for gnss in self.observer_list:
holdover_time = float(self.gnsstracker_context[gnss.config_file]['holdover_seconds'])
freq = float(self.gnsstracker_context[gnss.config_file]['poll_freq_seconds'])
sync_state = self.gnsstracker_context[gnss.config_file].get('sync_state', 'Unknown')
last_event_time = self.gnsstracker_context[gnss.config_file].get('last_event_time',
time.time())
holdover_time = float(
self.gnsstracker_context[gnss.ts2phc_service_name]['holdover_seconds'])
freq = float(self.gnsstracker_context[gnss.ts2phc_service_name]['poll_freq_seconds'])
sync_state = self.gnsstracker_context[gnss.ts2phc_service_name].get('sync_state',
'Unknown')
last_event_time = self.gnsstracker_context[gnss.ts2phc_service_name].get(
'last_event_time',
time.time())
new_event, sync_state, new_event_time = self.__get_gnss_status(
holdover_time, freq, sync_state, last_event_time, gnss)
LOG.debug("GNSS sync_state %s" % sync_state)
LOG.info("%s gnss_status: state is %s, new_event is %s" % (
gnss.ts2phc_service_name, sync_state, new_event))
if new_event or forced:
# update context
self.gnsstracker_context_lock.acquire()
self.gnsstracker_context[gnss.config_file]['sync_state'] = sync_state
self.gnsstracker_context[gnss.config_file]['last_event_time'] = new_event_time
self.gnsstracker_context[gnss.ts2phc_service_name]['sync_state'] = sync_state
self.gnsstracker_context[gnss.ts2phc_service_name][
'last_event_time'] = new_event_time
self.gnsstracker_context_lock.release()
LOG.debug("Publish GNSS status.")
@ -506,7 +576,7 @@ class PtpWatcherDefault:
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_GNSS_SYNC_STATUS)
lastStatus = {
lastStatus[gnss.ts2phc_service_name] = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_GNSS_SYNC_STATUS,
@ -530,29 +600,34 @@ class PtpWatcherDefault:
return
def __publish_ptpstatus(self, forced=False):
lastStatus = {}
lastClockClassStatus = {}
for ptp_monitor in self.ptp_monitor_list:
holdover_time = \
float(self.ptptracker_context[ptp_monitor.ptp4l_config]['holdover_seconds'])
freq = float(self.ptptracker_context[ptp_monitor.ptp4l_config]['poll_freq_seconds'])
sync_state = self.ptptracker_context[ptp_monitor.ptp4l_config]. \
float(self.ptptracker_context[ptp_monitor.ptp4l_service_name]['holdover_seconds'])
freq = float(
self.ptptracker_context[ptp_monitor.ptp4l_service_name]['poll_freq_seconds'])
sync_state = self.ptptracker_context[ptp_monitor.ptp4l_service_name]. \
get('sync_state', 'Unknown')
last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_config] \
last_event_time = self.ptptracker_context[ptp_monitor.ptp4l_service_name] \
.get('last_event_time', time.time())
new_event, sync_state, new_event_time = self.__get_ptp_status(
holdover_time, freq, sync_state, last_event_time, ptp_monitor)
LOG.info("%s PTP sync state: state is %s, new_event is %s" % (
ptp_monitor.ptp4l_service_name, sync_state, new_event))
new_clock_class_event, clock_class, clock_class_event_time = \
ptp_monitor.get_ptp_clock_class()
LOG.info("%s PTP clock class: clockClass is %s, new_event is %s" % (
ptp_monitor.ptp4l_service_name, clock_class, new_clock_class_event))
if new_event or forced:
# update context
self.ptptracker_context_lock.acquire()
self.ptptracker_context[ptp_monitor.ptp4l_config]['sync_state'] = sync_state
self.ptptracker_context[ptp_monitor.ptp4l_config][
self.ptptracker_context[ptp_monitor.ptp4l_service_name]['sync_state'] = sync_state
self.ptptracker_context[ptp_monitor.ptp4l_service_name][
'last_event_time'] = new_event_time
self.ptptracker_context_lock.release()
# publish new event
LOG.debug("Publish ptp status to clients")
@ -571,7 +646,7 @@ class PtpWatcherDefault:
# publish new event in API version v2 format
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_PTP_LOCK_STATE)
lastStatus = {
lastStatus[ptp_monitor.ptp4l_service_name] = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_PTP_LOCK_STATE,
@ -589,6 +664,7 @@ class PtpWatcherDefault:
]
}
}
self.ptptracker_context_lock.release()
self.ptpeventproducer.publish_status(lastStatus,
constants.SOURCE_SYNC_PTP_LOCK_STATE)
self.ptpeventproducer.publish_status(lastStatus, constants.SOURCE_SYNC_ALL)
@ -596,15 +672,16 @@ class PtpWatcherDefault:
if new_clock_class_event or forced:
# update context
self.ptptracker_context_lock.acquire()
self.ptptracker_context[ptp_monitor.ptp4l_config]['clock_class'] = clock_class
self.ptptracker_context[ptp_monitor.ptp4l_config]['last_clock_class_event_time'] \
self.ptptracker_context[ptp_monitor.ptp4l_service_name]['clock_class'] = clock_class
self.ptptracker_context[ptp_monitor.ptp4l_service_name][
'last_clock_class_event_time'] \
= clock_class_event_time
self.ptptracker_context_lock.release()
resource_address = utils.format_resource_address(
self.node_name, constants.SOURCE_SYNC_PTP_CLOCK_CLASS)
lastClockClassStatus = {
lastClockClassStatus[ptp_monitor.ptp4l_service_name] = {
'id': uuidutils.generate_uuid(),
'specversion': constants.SPEC_VERSION,
'source': constants.SOURCE_SYNC_PTP_CLOCK_CLASS,
@ -622,6 +699,7 @@ class PtpWatcherDefault:
]
}
}
self.ptptracker_context_lock.release()
LOG.info("Publishing clockClass for %s: %s" % (ptp_monitor.ptp4l_service_name,
clock_class))
self.ptpeventproducer.publish_status(lastClockClassStatus,

View File

@ -3,28 +3,15 @@
#
# SPDX-License-Identifier: Apache-2.0
#
#!/bin/bash
# apt-get update -y
# #sleep infinity
# apt-get install -y gcc
# apt-get install -y python-dev
# apt-get install -y python3-pip
# export https_proxy=http://128.224.230.5:9090
# pip3 install oslo-config
# pip3 install oslo-messaging
cat <<EOF>/root/ptptracking-daemon.py
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import logging
import re
import os
import json
from trackingfunctionsdk.common.helpers import log_helper
from trackingfunctionsdk.common.helpers import constants
from trackingfunctionsdk.services.daemon import DaemonControl
LOG = logging.getLogger(__name__)
@ -67,10 +54,21 @@ OS_CLOCK_POLL_FREQ_SECONDS = os.environ.get("OS_CLOCK_POLL_FREQ_SECONDS", 2)
OVERALL_HOLDOVER_SECONDS = os.environ.get("OVERALL_HOLDOVER_SECONDS", 30)
OVERALL_POLL_FREQ_SECONDS = os.environ.get("OVERALL_POLL_FREQ_SECONDS", 2)
GNSS_CONFIGS = json.loads(os.environ.get("TS2PHC_CONFIGS", '["/ptp/ptpinstance/ts2phc-tc1.conf"]'))
PHC2SYS_CONFIG = os.environ.get("PHC2SYS_CONFIG", "/ptp/ptpinstance/phc2sys-phc-inst1.conf")
PTP4L_CONFIGS = json.loads(os.environ.get("PTP4L_CONFIGS", '["/ptp/ptpinstance/ptp4l-ptp-legacy.conf"]'))
PHC2SYS_CONFIG = "/ptp/ptpinstance/phc2sys-%s.conf" % os.environ.get("PHC2SYS_SERVICE_NAME", "phc2sys-legacy")
PTP4L_INSTANCES = os.environ.get("PTP4L_SERVICE_NAME", "ptp4l-legacy")
PTP4L_INSTANCES = str(PTP4L_INSTANCES).replace('[','').replace(']','')
PTP4L_INSTANCES = PTP4L_INSTANCES.split()
PTP4L_CONFIGS = []
for item in PTP4L_INSTANCES:
PTP4L_CONFIGS.append("/ptp/ptpinstance/ptp4l-%s.conf" % item)
GNSS_INSTANCES = os.environ.get("TS2PHC_SERVICE_NAME", None)
GNSS_INSTANCES = str(GNSS_INSTANCES).replace('[','').replace(']','')
GNSS_INSTANCES = GNSS_INSTANCES.split()
GNSS_CONFIGS = []
for item in GNSS_INSTANCES:
GNSS_CONFIGS.append("/ptp/ptpinstance/ts2phc-%s.conf" % item)
context = {
'THIS_NAMESPACE': THIS_NAMESPACE,
@ -81,6 +79,8 @@ context = {
'GNSS_CONFIGS': GNSS_CONFIGS,
'PHC2SYS_CONFIG': PHC2SYS_CONFIG,
'PTP4L_CONFIGS' : PTP4L_CONFIGS,
'GNSS_INSTANCES': GNSS_INSTANCES,
'PTP4L_INSTANCES': PTP4L_INSTANCES,
'ptptracker_context': {
'device_simulated': PTP_DEVICE_SIMULATED,
@ -108,7 +108,7 @@ sqlalchemy_conf = {
'pool_recycle': 3600,
'encoding': 'utf-8'
}
LOG.info("PTP tracking service startup context %s" % context)
sqlalchemy_conf_json = json.dumps(sqlalchemy_conf)
default_daemoncontrol = DaemonControl(sqlalchemy_conf_json, json.dumps(context))
@ -116,12 +116,5 @@ default_daemoncontrol.refresh()
while True:
pass
EOF
echo "done"
PYTHONPATH=/opt/ptptrackingfunction python3 /root/ptptracking-daemon.py &
sleep infinity

View File

@ -103,6 +103,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: PYTHONPATH
value: /opt/ptptrackingfunction
- name: THIS_NAMESPACE
value: {{ .Values.global.namespace }}
- name: PTP_DEVICE_SIMULATED
@ -145,11 +147,9 @@ spec:
value: "/ptp/ptpinstance/phc2sys-{{.Values.ptptracking.phc2sysServiceName}}.conf"
- name: TS2PHC_SERVICE_NAME
value: "{{ .Values.ptptracking.ts2phcServiceName }}"
- name: TS2PHC_CONFIGS
value: '["/ptp/ptpinstance/ts2phc-{{.Values.ptptracking.ts2phcServiceName}}.conf"]'
- name: LOGGING_LEVEL
value: "{{ .Values.ptptracking.log_level }}"
command: ["/bin/bash", "/mnt/ptptracking_start.sh"]
command: ["python3", "/mnt/ptptracking_start.py"]
securityContext:
privileged: true
capabilities:

View File

@ -67,7 +67,9 @@ location:
ptptracking:
imagePullSecrets: default-registry-key
ptp4lSocket: /var/run/ptp4l-ptp4l-legacy
ptp4lServiceName: ptp4l-legacy
ptp4lServiceName:
- ptp1
- ptp2
phc2sysServiceName: phc2sys-legacy
ts2phcServiceName: ts2phc-legacy
log_level: INFO