Merge "Add new version of subscription API"

This commit is contained in:
Zuul 2022-06-30 17:42:57 +00:00 committed by Gerrit Code Review
commit a334bc2610
13 changed files with 257 additions and 64 deletions

View File

@ -14,7 +14,6 @@ from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.client.base import BrokerClientBase
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
import logging

View File

@ -7,6 +7,7 @@
import os
import json
import re
import requests
import logging
@ -52,3 +53,13 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3):
return result
def parse_resource_address(resource_address):
# The format of resource address is:
# /{clusterName}/{siteName}(/optional/hierarchy/..)/{nodeName}/{resource}
# Assume no optional hierarchy for now
clusterName = resource_address.split('/')[1]
nodeName = resource_address.split('/')[2]
resource_path = re.split('[/]', resource_address, 3)[3]
return clusterName, nodeName, resource_path

View File

@ -4,12 +4,13 @@
# SPDX-License-Identifier: Apache-2.0
#
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.subscription import ResourceQualifierPtp
from wsme.rest.json import tojson
@tojson.when_object(SubscriptionInfo)
@tojson.when_object(SubscriptionInfoV0)
def subscriptioninfo_tojson(datatype, value):
if value is None:
return None
@ -20,3 +21,9 @@ def resourcequalifierptp_tojson(datatype, value):
if value is None:
return None
return value.to_dict()
@tojson.when_object(SubscriptionInfoV1)
def subscriptioninfo_tojson(datatype, value):
if value is None:
return None
return value.to_dict()

View File

@ -45,7 +45,7 @@ class ResourceQualifierPtp(ResourceQualifierBase):
'''
ViewModel of Subscription
'''
class SubscriptionInfo(wtypes.Base):
class SubscriptionInfoV0(wtypes.Base):
SubscriptionId = wtypes.text
UriLocation = wtypes.text
ResourceType = EnumResourceType
@ -63,11 +63,12 @@ class SubscriptionInfo(wtypes.Base):
if not self._ResourceQualifer:
if self.ResourceType == ResourceType.TypePTP:
self._ResourceQualifer = ResourceQualifierPtp(**self._ResourceQualifierJson)
else:
self._ResourceQualifer = None
return self._ResourceQualifer
ResourceQualifier = wtypes.wsproperty(wtypes.Base,
get_resource_qualifier, set_resource_qualifier, mandatory=True)
get_resource_qualifier, set_resource_qualifier)
def __init__(self, orm_entry=None):
@ -84,7 +85,7 @@ class SubscriptionInfo(wtypes.Base):
'ResourceType': self.ResourceType,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri,
'ResourceQualifier': self.ResourceQualifier.to_dict()
'ResourceQualifier': self.ResourceQualifier.to_dict(),
}
return d
@ -94,6 +95,37 @@ class SubscriptionInfo(wtypes.Base):
'ResourceType': self.ResourceType or '',
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri or '',
'ResourceQualifierJson': json.dumps(self.ResourceQualifier.to_dict()) or ''
'ResourceQualifierJson': json.dumps(self.ResourceQualifier.to_dict()) or '',
}
return d
class SubscriptionInfoV1(wtypes.Base):
SubscriptionId = wtypes.text
UriLocation = wtypes.text
EndpointUri = wtypes.text
ResourceAddress = wtypes.text
def __init__(self, orm_entry=None):
if orm_entry:
self.SubscriptionId = orm_entry.SubscriptionId
self.UriLocation = orm_entry.UriLocation
self.EndpointUri = orm_entry.EndpointUri
self.ResourceAddress = orm_entry.ResourceAddress
def to_dict(self):
d = {
'SubscriptionId': self.SubscriptionId,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri,
'ResourceAddress': self.ResourceAddress,
}
return d
def to_orm(self):
d = {
'SubscriptionId': self.SubscriptionId,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri or '',
'ResourceAddress': self.ResourceAddress or ''
}
return d

View File

@ -19,6 +19,7 @@ class Subscription(OrmBase):
CreateTime = Column(Float)
LastUpdateTime = Column(Float)
ResourceQualifierJson = Column(String)
ResourceAddress = Column(String(512))
def create_tables(orm_engine):
Subscription.metadata.create_all(orm_engine)

View File

@ -8,8 +8,6 @@ import time
import oslo_messaging
import logging
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.broker_state import BrokerState

View File

@ -6,9 +6,11 @@
import json
import logging
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.common.helpers import subscription_helper
from notificationclientsdk.model.dto.broker_state import BrokerState
@ -94,29 +96,34 @@ class BrokerStateManager:
changed = False
broker_name = None
subscription = SubscriptionInfo(subscription_orm)
resource_type = subscription.ResourceType
LOG.debug("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status))
if subscription_orm.Status != 1:
return False
# assume PTP and not wildcast
if resource_type == ResourceType.TypePTP:
broker_name = subscription.ResourceQualifier.NodeName
LOG.info("__refresh_by_subscription: subscription_orm={}".format(subscription_orm))
if getattr(subscription_orm, 'ResourceType') is not None:
subscription = SubscriptionInfoV0(subscription_orm)
resource = subscription.ResourceType
# assume PTP and not wildcard
if resource == ResourceType.TypePTP:
broker_name = subscription.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subscription_orm.SubscriptionId))
return False
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
subscription = SubscriptionInfoV1(subscription_orm)
_, nodename, resource = subscription_helper.parse_resource_address(subscription.ResourceAddress)
broker_name = nodename
LOG.info("subscription:{0}, Status:{1}".format(subscription.to_dict(), subscription_orm.Status))
if subscription_orm.Status != 1:
return False
if not broker_name:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subscription.SubscriptionId))
LOG.info("Ignore the subscription for: {0}".format(subscription.SubscriptionId))
return False
enumerated_broker_names = NodeInfoHelper.enumerate_nodes(broker_name)
if not enumerated_broker_names:
LOG.debug("Failed to enumerate broker names for {0}".format(broker_name))
LOG.info("Failed to enumerate broker names for {0}".format(broker_name))
return False
for expanded_broker_name in enumerated_broker_names:
@ -125,8 +132,8 @@ class BrokerStateManager:
brokerstate = self.add_broker(expanded_broker_name)
changed = True
changed = changed or (brokerstate.is_resource_subscribed(resource_type) == False)
brokerstate.try_subscribe_resource(resource_type, self.subscription_refresh_iteration)
changed = changed or (brokerstate.is_resource_subscribed(resource) == False)
brokerstate.try_subscribe_resource(resource, self.subscription_refresh_iteration)
return changed

View File

@ -11,7 +11,8 @@ import logging
import multiprocessing as mp
import threading
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
@ -47,27 +48,36 @@ class NotificationHandler(NotificationHandlerBase):
self.notification_lock.acquire()
subscription_repo = SubscriptionRepo(autocommit=True)
resource_type = notification_info.get('ResourceType', None)
node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None)
if not resource_type:
raise Exception("abnormal notification@{0}".format(node_name))
resource_address = notification_info.get('ResourceAddress', None)
# Get nodename from resource address
if resource_address:
_,node_name,_ = subscription_helper.parse_resource_address(resource_address)
else:
node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None)
if not resource_type:
raise Exception("abnormal notification@{0}".format(node_name))
if not resource_type in self.__supported_resource_types:
raise Exception("notification with unsupported resource type:{0}".format(resource_type))
if not resource_type in self.__supported_resource_types:
raise Exception("notification with unsupported resource type:{0}".format(resource_type))
this_delivery_time = notification_info['EventTimestamp']
entries = subscription_repo.get(ResourceType=resource_type, Status=1)
for entry in entries:
subscriptionid = entry.SubscriptionId
ResourceQualifierJson = entry.ResourceQualifierJson or '{}'
ResourceQualifier = json.loads(ResourceQualifierJson)
# qualify by NodeName
entry_node_name = ResourceQualifier.get('NodeName', None)
if entry.ResourceAddress:
_,entry_node_name,_ = subscription_helper.parse_resource_address(entry.ResourceAddress)
subscription_dto2 = SubscriptionInfoV1(entry)
else:
ResourceQualifierJson = entry.ResourceQualifierJson or '{}'
ResourceQualifier = json.loads(ResourceQualifierJson)
# qualify by NodeName
entry_node_name = ResourceQualifier.get('NodeName', None)
subscription_dto2 = SubscriptionInfoV0(entry)
node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name)
if not node_name_matched:
continue
subscription_dto2 = SubscriptionInfo(entry)
try:
last_delivery_time = self.__get_latest_delivery_timestamp(node_name, subscriptionid)
if last_delivery_time and last_delivery_time >= this_delivery_time:

View File

@ -20,7 +20,7 @@ from notificationclientsdk.common.helpers import rpc_helper, hostfile_helper
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.dto.rpc_endpoint import RpcEndpointInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.location import LocationInfo
@ -241,15 +241,24 @@ class NotificationWorker:
self.broker_state_manager.refresh_by_nodeinfos(nodeinfos)
for s in subs:
subinfo = SubscriptionInfo(s)
# assume resource type being PTP and not wildcast
resource_type = s.ResourceType
if resource_type == ResourceType.TypePTP:
broker_name = subinfo.ResourceQualifier.NodeName
if s.ResourceType:
subinfo = SubscriptionInfoV0(s)
# assume resource type being PTP and not wildcard
resource_type = s.ResourceType
if resource_type == ResourceType.TypePTP:
broker_name = subinfo.ResourceQualifier.NodeName
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
continue
elif s.ResourceAddress:
# Get nodename from resource address
LOG.info("Parse resource address {}".format(s.ResourceAddress))
_,nodename,_ = subscription_helper.parse_resource_address(s.ResourceAddress)
broker_name = nodename
else:
# ignore the subscription due to unsupported type
LOG.debug("Ignore the subscription for: {0}".format(subinfo.SubscriptionId))
LOG.debug("Subscription {} does not have ResourceType or "
"ResourceAddress".format(s.SubscriptionId))
continue
if s.Status == 1:

View File

@ -12,7 +12,8 @@ import kombu
from notificationclientsdk.repository.node_repo import NodeRepo
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper
from notificationclientsdk.model.orm.subscription import Subscription as SubscriptionOrm
from notificationclientsdk.client.notificationservice import NotificationServiceClient
@ -112,7 +113,11 @@ class PtpService(object):
def add_subscription(self, subscription_dto):
subscription_orm = SubscriptionOrm(**subscription_dto.to_orm())
broker_name = subscription_dto.ResourceQualifier.NodeName
if hasattr(subscription_dto, 'ResourceAddress'):
_,nodename,_ = subscription_helper.parse_resource_address(subscription_dto.ResourceAddress)
broker_name = nodename
elif hasattr(subscription_dto, 'ResourceType'):
broker_name = subscription_dto.ResourceQualifier.NodeName
default_node_name = NodeInfoHelper.default_node_name(broker_name)
broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name)
@ -138,7 +143,11 @@ class PtpService(object):
entry = self.subscription_repo.add(subscription_orm)
# Delivery the initial notification of ptp status
subscription_dto2 = SubscriptionInfo(entry)
if hasattr(subscription_dto, 'ResourceType'):
subscription_dto2 = SubscriptionInfoV0(entry)
else:
subscription_dto2 = SubscriptionInfoV1(entry)
try:
subscription_helper.notify(subscription_dto2, ptpstatus)
LOG.info("initial ptpstatus is delivered successfully")

View File

@ -5,7 +5,7 @@
# SPDX-License-Identifier: Apache-2.0
#
from pecan import expose, rest, abort
from pecan import expose, rest, abort, request
from webob.exc import status_map
import os
@ -14,8 +14,13 @@ from wsmeext.pecan import wsexpose
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
from sidecar.controllers.v1.subscriptions import SubscriptionsController
from sidecar.controllers.v1.subscriptions import SubscriptionsControllerV0
from sidecar.controllers.v1.subscriptions import SubscriptionsControllerV1
from sidecar.controllers.v1.resource.ptp import PtpController
import logging
LOG = logging.getLogger(__name__)
from notificationclientsdk.common.helpers import log_helper
log_helper.config_logger(LOG)
class HealthController(rest.RestController):
@ -35,11 +40,19 @@ class V1Controller(rest.RestController):
@expose("json")
def _lookup(self, primary_key, *remainder):
LOG.info("_lookup: primary_key={} remainder={}".format(primary_key, remainder))
payload = None
if request.is_body_readable:
payload = request.json_body
LOG.info("_lookup: payload={}".format(payload))
if primary_key:
if 'ptp' == primary_key.lower():
return PtpController(), remainder
elif 'subscriptions' == primary_key.lower():
return SubscriptionsController(), remainder
if payload and 'ResourceType' in payload:
return SubscriptionsControllerV0(), remainder
else:
return SubscriptionsControllerV1(), remainder
abort(404)
class ocloudDaemonController(rest.RestController):

View File

@ -16,7 +16,8 @@ from wsme import types as wtypes
from wsmeext.pecan import wsexpose
from notificationclientsdk.model.dto.resourcetype import ResourceType
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.repository.subscription_repo import SubscriptionRepo
from notificationclientsdk.services.ptp import PtpService
@ -32,8 +33,9 @@ log_helper.config_logger(LOG)
THIS_NODE_NAME = os.environ.get("THIS_NODE_NAME",'controller-0')
class SubscriptionsController(rest.RestController):
@wsexpose(SubscriptionInfo, body=SubscriptionInfo, status_code=201)
class SubscriptionsControllerV0(rest.RestController):
@wsexpose(SubscriptionInfoV0, body=SubscriptionInfoV0, status_code=201)
def post(self, subscription):
# decode the request body
try:
@ -47,13 +49,13 @@ class SubscriptionsController(rest.RestController):
subscription.ResourceType))
abort(404)
if not self._validate(subscription):
if not self._validateV0(subscription):
LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict()))
abort(400)
subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format(
conf.server.get('protocol','http'),
conf.server.get('host', '127.0.01'),
conf.server.get('host', '127.0.0.1'),
conf.server.get('port', '8080')
)
if subscription.ResourceType == ResourceType.TypePTP:
@ -93,7 +95,7 @@ class SubscriptionsController(rest.RestController):
entries = repo.get(Status=1)
response.status = 200
return [SubscriptionInfo(x).to_dict() for x in entries if x.Status == 1]
return [SubscriptionInfoV0(x).to_dict() for x in entries if x.Status == 1]
except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
raise ex
@ -108,7 +110,7 @@ class SubscriptionsController(rest.RestController):
def _lookup(self, subscription_id, *remainder):
return SubscriptionController(subscription_id), remainder
def _validate(self, subscription_request):
def _validateV0(self, subscription_request):
try:
assert subscription_request.ResourceType == 'PTP'
assert subscription_request.EndpointUri
@ -117,6 +119,93 @@ class SubscriptionsController(rest.RestController):
except:
return False
class SubscriptionsControllerV1(rest.RestController):
@wsexpose(SubscriptionInfoV1, body=SubscriptionInfoV1, status_code=201)
def post(self, subscription):
# decode the request body
try:
if subscription.ResourceAddress:
LOG.info(' subscribe: ResourceAddress {0} with callback uri {1}'.format(
subscription.ResourceAddress,
subscription.EndpointUri))
if not self._validateV1(subscription):
LOG.warning(' Invalid Request data:{0}'.format(subscription.to_dict()))
abort(400)
subscription.UriLocation = "{0}://{1}:{2}/ocloudNotifications/v1/subscriptions".format(
conf.server.get('protocol','http'),
conf.server.get('host', '127.0.0.1'),
conf.server.get('port', '8080')
)
if subscription.ResourceAddress:
ptpservice = PtpService(notification_control)
entry = ptpservice.add_subscription(subscription)
del ptpservice
if not entry:
abort(404)
subscription.SubscriptionId = entry.SubscriptionId
subscription.UriLocation = entry.UriLocation
LOG.info('created subscription: {0}'.format(subscription.to_dict()))
return subscription
except client_exception.InvalidSubscription as ex:
abort(400)
except client_exception.InvalidEndpoint as ex:
abort(400)
except client_exception.NodeNotAvailable as ex:
abort(404)
except client_exception.ResourceNotAvailable as ex:
abort(404)
except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
abort(400)
except HTTPServerError as ex:
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
abort(500)
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
abort(500)
@expose('json')
def get(self):
try:
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False)
entries = repo.get(Status=1)
response.status = 200
subs = []
for x in entries:
if x.Status == 1:
if getattr(x, 'ResourceType', None):
subs.append(SubscriptionInfoV0(x).to_dict())
else:
subs.append(SubscriptionInfoV1(x).to_dict())
return subs
except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
raise ex
except HTTPServerError as ex:
LOG.error("Server side error:{0},{1}".format(type(ex), str(ex)))
raise ex
except Exception as ex:
LOG.error("Exception:{0}@{1}".format(type(ex),str(ex)))
abort(500)
@expose()
def _lookup(self, subscription_id, *remainder):
return SubscriptionController(subscription_id), remainder
def _validateV1(self, subscription_request):
try:
assert subscription_request.ResourceAddress
assert subscription_request.EndpointUri
return True
except:
return False
class SubscriptionController(rest.RestController):
def __init__(self, subscription_id):
self.subscription_id = subscription_id
@ -131,7 +220,10 @@ class SubscriptionController(rest.RestController):
abort(404)
else:
response.status = 200
return SubscriptionInfo(entry).to_dict()
if getattr(entry, 'ResourceType', None):
return SubscriptionInfoV0(entry).to_dict()
else:
return SubscriptionInfoV1(entry).to_dict()
except HTTPException as ex:
LOG.warning("Client side error:{0},{1}".format(type(ex), str(ex)))
raise ex
@ -148,7 +240,7 @@ class SubscriptionController(rest.RestController):
repo = SubscriptionRepo(defaults['dbcontext'].get_session(), autocommit = False)
entry = repo.get_one(SubscriptionId=self.subscription_id)
if entry:
if entry.ResourceType == ResourceType.TypePTP:
if entry.SubscriptionId:
ptpservice = PtpService(notification_control)
ptpservice.remove_subscription(entry.SubscriptionId)
del ptpservice

View File

@ -4,12 +4,13 @@
# SPDX-License-Identifier: Apache-2.0
#
from notificationclientsdk.model.dto.subscription import SubscriptionInfo
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV0
from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1
from notificationclientsdk.model.dto.subscription import ResourceQualifierPtp
from pecan.jsonify import jsonify
@jsonify.register(SubscriptionInfo)
@jsonify.register(SubscriptionInfoV0)
def jsonify_subscriptioninfo(subscriptionInfo):
return subscriptionInfo.to_dict()
@ -17,5 +18,9 @@ def jsonify_subscriptioninfo(subscriptionInfo):
def jsonify_resourcequalifierptp(resourceQualifierPtp):
return resourceQualifierPtp.to_dict()
@jsonify.register(SubscriptionInfoV1)
def jsonify_subscriptioninfo(subscriptionInfo):
return subscriptionInfo.to_dict()
def __init__():
pass