Cole Walker 93778b296e Build Debian container image for notificationservice-base-v1-api
Add the required code and Dockerfile to build a Debian based container
image of notificationservice-base to support v1 API backwards
compatibility.

The notificationservice-base-v1 code added here is being ported forward
from stx.6.0. This code existed as-is in StarlingX prior to the work
done to implement the ORAN Notification API. It is being returned in
order to support user's needs for backwards compatbility. Subsequent
work will done to update the helm charts, allowing
notificationservice-base-v1-api and notificationservice-base-v2-api
(ORAN Notification) to be deployed in parallel.

Test plan:
PASS: Build container image
PASS: Manually deploy image and test with v1 client

Story: 2010538
Task: 47175

Change-Id: I2f672fe1b226cb60b62bd5fe7f8abbedc7b0c4cc
2023-01-30 15:05:15 +00:00

127 lines
4.5 KiB
Python

#
# Copyright (c) 2022-2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import logging
import datetime
import os.path
import re
from abc import ABC, abstractmethod
from trackingfunctionsdk.common.helpers import constants
from trackingfunctionsdk.common.helpers import log_helper
from trackingfunctionsdk.common.helpers.cgu_handler import CguHandler
from trackingfunctionsdk.model.dto.gnssstate import GnssState
LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
class Observer(ABC):
@abstractmethod
def update(self, subject, matched_line) -> None:
"""
Receive update from subject.
"""
pass
class GnssMonitor(Observer):
gnss_eec_state = ""
gnss_pps_state = ""
_state = GnssState()
gnss_cgu_handler = None
def __init__(self, config_file, nmea_serialport=None, pci_addr=None,
cgu_path=None):
self.config_file = config_file
try:
pattern = '(?<=' + \
constants.TS2PHC_CONFIG_PATH + \
'ts2phc-).*(?=.conf)'
match = re.search(pattern, self.config_file)
self.ts2phc_service_name = match.group()
except AttributeError:
LOG.warning(
"GnssMonitor: Unable to determine tsphc_service name from %s"
% self.config_file)
# Setup GNSS data
self.gnss_cgu_handler = CguHandler(config_file, nmea_serialport,
pci_addr, cgu_path)
if self.gnss_cgu_handler.nmea_serialport is None:
self.gnss_cgu_handler.get_gnss_nmea_serialport_from_ts2phc_config()
if self.gnss_cgu_handler.pci_addr is None:
self.gnss_cgu_handler.convert_nmea_serialport_to_pci_addr()
if self.gnss_cgu_handler.cgu_path is None:
self.gnss_cgu_handler.get_cgu_path_from_pci_addr()
self.gnss_cgu_handler.read_cgu()
self.gnss_cgu_handler.cgu_output_to_dict()
self.dmesg_values_to_check = {
'pin': 'GNSS-1PPS',
'pci_addr': self.gnss_cgu_handler.pci_addr
}
# Initialize status
if self.gnss_cgu_handler.cgu_output_parsed[
'EEC DPLL']['Current reference'] == 'GNSS-1PPS':
self.gnss_eec_state = \
self.gnss_cgu_handler.cgu_output_parsed['EEC DPLL']['Status']
if self.gnss_cgu_handler.cgu_output_parsed[
'PPS DPLL']['Current reference'] == 'GNSS-1PPS':
self.gnss_pps_state = \
self.gnss_cgu_handler.cgu_output_parsed['PPS DPLL']['Status']
def update(self, subject, matched_line) -> None:
LOG.info("Kernel event detected. %s" % matched_line)
self.set_gnss_status()
def set_gnss_status(self):
# Check that ts2phc is running, else Freerun
if not os.path.isfile('/var/run/ts2phc-%s.pid'
% self.ts2phc_service_name):
LOG.warning("TS2PHC instance %s is not running, "
"reporting GNSS unlocked."
% self.ts2phc_service_name)
self._state = GnssState.Failure_Nofix
return
self.gnss_cgu_handler.read_cgu()
self.gnss_cgu_handler.cgu_output_to_dict()
self.gnss_eec_state = self.gnss_eec_state = \
self.gnss_cgu_handler.cgu_output_parsed['EEC DPLL']['Status']
self.gnss_pps_state = \
self.gnss_cgu_handler.cgu_output_parsed['PPS DPLL']['Status']
LOG.debug("GNSS EEC Status is: %s" % self.gnss_eec_state)
LOG.debug("GNSS PPS Status is: %s" % self.gnss_pps_state)
if self.gnss_pps_state in [
constants.GNSS_LOCKED_HO_ACK,
constants.GNSS_LOCKED_HO_ACQ] and \
self.gnss_eec_state in [
constants.GNSS_LOCKED_HO_ACK,
constants.GNSS_LOCKED_HO_ACQ]:
self._state = GnssState.Synchronized
else:
self._state = GnssState.Failure_Nofix
LOG.debug("Set state GNSS to %s" % self._state)
def get_gnss_status(self, holdover_time, freq, sync_state, event_time):
previous_sync_state = sync_state
self.set_gnss_status()
# determine if GNSS state has changed since the last check
if self._state != previous_sync_state:
new_event = True
event_time = datetime.datetime.utcnow().timestamp()
else:
new_event = False
return new_event, self._state, event_time