diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py index 629b9aa..af73074 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/daemon.py @@ -46,6 +46,12 @@ class DaemonControl(object): self.refresh() pass + def __del__(self): + if self.locationservice_client: + self.locationservice_client.cleanup() + self.locationservice_client = None + return + def refresh(self): self.subscription_event.set() self.event.set() diff --git a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index d0f21bd..76345b7 100644 --- a/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/centos/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -35,27 +35,56 @@ class PtpService(object): def __del__(self): del self.subscription_repo - self.locationservice_client.cleanup() - del self.locationservice_client + self.locationservice_client = None return + def __query_locationinfo(self, broker_name, timeout=5, retry=2): + try: + location_info = self.locationservice_client.query_location(broker_name, timeout, retry) + LOG.debug("Pulled location info@{0}:{1}".format(broker_name, location_info)) + return location_info + except Exception as ex: + LOG.warning("Failed to query location of node:{0} due to: {1}".format( + broker_name, str(ex))) + raise client_exception.NodeNotAvailable(broker_name) + + def __get_node_info(self, default_node_name, timeout=5, retry=2): + try: + nodeinfo_repo = NodeRepo(autocommit=False) + nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) + broker_pod_ip = None + supported_resource_types = [] + + if nodeinfo: + broker_pod_ip = nodeinfo.PodIP + supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') + + if not broker_pod_ip: + # try to query the broker ip + location_info = self.__query_locationinfo(default_node_name, timeout, retry) + broker_pod_ip = location_info.get("PodIP", None) + supported_resource_types = location_info.get("ResourceTypes", []) + + return broker_pod_ip, supported_resource_types + finally: + del nodeinfo_repo + def query(self, broker_name): default_node_name = NodeInfoHelper.default_node_name(broker_name) - nodeinfo_repo = NodeRepo(autocommit=False) - nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) - # check node availability from DB - if not nodeinfo: + broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) + + if not broker_pod_ip: + LOG.warning("Node {0} is not available yet".format(default_node_name)) raise client_exception.NodeNotAvailable(broker_name) - else: - broker_pod_ip = nodeinfo.PodIP - supported_resource_types = json.loads(nodeinfo.ResourceTypes or '[]') - if ResourceType.TypePTP in supported_resource_types: - return self._query(default_node_name, broker_pod_ip) - else: - raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) + + if not ResourceType.TypePTP in supported_resource_types: + LOG.warning("Resource {0}@{1} is not available yet".format( + ResourceType.TypePTP, default_node_name)) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) + + return self._query(default_node_name, broker_pod_ip) def _query(self, broker_name, broker_pod_ip): - # broker_host = "notificationservice-{0}".format(broker_name) broker_host = "[{0}]".format(broker_pod_ip) broker_transport_endpoint = "rabbit://{0}:{1}@{2}:{3}".format( self.daemon_control.daemon_context['NOTIFICATION_BROKER_USER'], @@ -85,17 +114,21 @@ class PtpService(object): subscription_orm = SubscriptionOrm(**subscription_dto.to_orm()) broker_name = subscription_dto.ResourceQualifier.NodeName default_node_name = NodeInfoHelper.default_node_name(broker_name) - nodeinfos = NodeInfoHelper.enumerate_nodes(broker_name) - # check node availability from DB - if not nodeinfos or not default_node_name in nodeinfos: + + broker_pod_ip, supported_resource_types = self.__get_node_info(default_node_name) + + if not broker_pod_ip: LOG.warning("Node {0} is not available yet".format(default_node_name)) raise client_exception.NodeNotAvailable(broker_name) + if not ResourceType.TypePTP in supported_resource_types: + LOG.warning("Resource {0}@{1} is not available yet".format( + ResourceType.TypePTP, default_node_name)) + raise client_exception.ResourceNotAvailable(broker_name, ResourceType.TypePTP) + # get initial resource status if default_node_name: - nodeinfo_repo = NodeRepo(autocommit=False) - nodeinfo = nodeinfo_repo.get_one(Status=1, NodeName=default_node_name) - broker_pod_ip = nodeinfo.PodIP + ptpstatus = None ptpstatus = self._query(default_node_name, broker_pod_ip) LOG.info("initial ptpstatus:{0}".format(ptpstatus))