Revert "Add configurable notification subscriber and mqtt driver"
This reverts commit c55d63a16c0ec58d73da9f186c7a09c4120de8f0 because it depends on change Ie82e625fcce7d5b5b794a46f6456ccff1dc5ec3e which is being reverted. Change-Id: I6a3b33f81668072093ebff56991543169e91d226
This commit is contained in:
parent
ae1d2e1e6d
commit
a9d5cbb229
@ -1,94 +0,0 @@
|
||||
# Copyright 2018 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_config import cfg
|
||||
import paho.mqtt.client as mqtt
|
||||
from stevedore import enabled
|
||||
|
||||
from storyboard.notifications.mqtt import conf
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications')
|
||||
|
||||
|
||||
def handle_event(ext, body):
|
||||
"""Handle an event from the queue.
|
||||
|
||||
:param ext: The extension that's handling this event.
|
||||
:param body: The body of the event.
|
||||
:return: The result of the handler.
|
||||
"""
|
||||
payload = json.loads(body)
|
||||
return ext.obj.event(author_id=payload['author_id'] or None,
|
||||
method=payload['method'] or None,
|
||||
url=payload['url'] or None,
|
||||
path=payload['path'] or None,
|
||||
query_string=payload['query_string'] or None,
|
||||
status=payload['status'] or None,
|
||||
resource=payload['resource'] or None,
|
||||
resource_id=payload['resource_id'] or None,
|
||||
sub_resource=payload['sub_resource'] or None,
|
||||
sub_resource_id=payload['sub_resource_id'] or None,
|
||||
resource_before=payload['resource_before'] or None,
|
||||
resource_after=payload['resource_after'] or None)
|
||||
|
||||
|
||||
def check_enabled(ext):
|
||||
"""Check to see whether an extension should be enabled.
|
||||
|
||||
:param ext: The extension instance to check.
|
||||
:return: True if it should be enabled. Otherwise false.
|
||||
"""
|
||||
return ext.obj.enabled()
|
||||
|
||||
|
||||
def subscriber(topic=None):
|
||||
if not topic:
|
||||
topic = CONF.mqtt_notifications.base_topic + '/#'
|
||||
auth = None
|
||||
if CONF.mqtt_notifications.username:
|
||||
auth = {'username': CONF.mqtt_notifications.username,
|
||||
'password': CONF.mqtt_notifications.password}
|
||||
tls = None
|
||||
if CONF.mqtt_notifications.ca_certs:
|
||||
tls = {'ca_certs': CONF.mqtt_notifications.ca_certs,
|
||||
'certfile': CONF.mqtt_notifications.certfile,
|
||||
'keyfile': CONF.mqtt_notifications.keyfile}
|
||||
client = mqtt.Client()
|
||||
if tls:
|
||||
client.tls_set(**tls)
|
||||
if auth:
|
||||
client.username_pw_set(auth['username'],
|
||||
password=auth.get('password'))
|
||||
|
||||
manager = enabled.EnabledExtensionManager(
|
||||
namespace='storyboard.plugin.worker',
|
||||
check_func=check_enabled,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(CONF,))
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
# If no topic is specified subscribe to all messages on base_topic
|
||||
client.subscribe(topic, qos=CONF.mqtt_notifications.qos)
|
||||
|
||||
def on_message(client, userdata, msg):
|
||||
manager.map(handle_event, msg.payload)
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
client.connect(CONF.mqtt_notifications.hostname,
|
||||
CONF.mqtt_notifications.port)
|
||||
client.loop_forever()
|
@ -30,7 +30,7 @@ CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def subscribe(topic=None):
|
||||
def subscribe():
|
||||
try:
|
||||
log.register_options(CONF)
|
||||
except cfg.ArgsAlreadyParsedError:
|
||||
@ -39,9 +39,6 @@ def subscribe(topic=None):
|
||||
log.setup(CONF, 'storyboard')
|
||||
CONF(project='storyboard')
|
||||
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
|
||||
if topic:
|
||||
LOG.warning("A subscription topic was specified, but the pika driver"
|
||||
"doesn't use topics")
|
||||
|
||||
subscriber = Subscriber(CONF.notifications)
|
||||
subscriber.start()
|
||||
|
@ -1,30 +0,0 @@
|
||||
# Copyright 2018 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
|
||||
from storyboard.notifications import conf
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(conf.OPTS, 'notifications')
|
||||
|
||||
|
||||
def subscribe(topic=None):
|
||||
|
||||
subscriber_module = importutils.import_module(
|
||||
'storyboard.notifications.' + CONF.notifications.driver +
|
||||
'.subscriber')
|
||||
subscriber_module.subscribe(topic)
|
@ -23,7 +23,7 @@ import six
|
||||
|
||||
import storyboard.db.api.base as db_api
|
||||
from storyboard.notifications.notification_hook import class_mappings
|
||||
from storyboard.notifications import subscriber
|
||||
from storyboard.notifications.pika.subscriber import subscribe
|
||||
from storyboard._i18n import _LI, _LW
|
||||
from storyboard.plugin.base import PluginBase
|
||||
|
||||
@ -56,7 +56,7 @@ def run_daemon():
|
||||
signal.signal(signal.SIGTERM, terminate)
|
||||
signal.signal(signal.SIGINT, terminate)
|
||||
|
||||
MANAGER = DaemonManager(daemon_method=subscriber.subscribe,
|
||||
MANAGER = DaemonManager(daemon_method=subscribe,
|
||||
child_process_count=CONF.worker_count)
|
||||
MANAGER.start()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user