Add MQTT notification publisher
This commit adds an alternate notification publisher driver for mqtt. The intent here is to use this driver with firehose.openstack.org, which is infra's unified event bus. [1] To use this driver you first need to set it as the notification driver with the driver option in the notifications config group. Then set the required config options in the mqtt_notifications section. [1] https://docs.openstack.org/infra/system-config/firehose.html Change-Id: Ie82e625fcce7d5b5b794a46f6456ccff1dc5ec3e
This commit is contained in:
parent
c7e0e458ff
commit
d84e80ec99
@ -30,3 +30,4 @@ apscheduler>=3.0.1,<3.1.0
|
||||
python_dateutil>=2.4.0
|
||||
oslo.concurrency>=3.8.0 # Apache-2.0
|
||||
oslo.i18n>=2.1.0 # Apache-2.0
|
||||
paho-mqtt>=1.3.1
|
||||
|
@ -18,6 +18,6 @@ from oslo_config import cfg
|
||||
CONF = cfg.CONF
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('driver', choices=['pika'],
|
||||
cfg.StrOpt('driver', choices=['pika', 'mqtt'],
|
||||
help='The notification driver to use', default='pika')
|
||||
]
|
||||
|
0
storyboard/notifications/mqtt/__init__.py
Normal file
0
storyboard/notifications/mqtt/__init__.py
Normal file
51
storyboard/notifications/mqtt/conf.py
Normal file
51
storyboard/notifications/mqtt/conf.py
Normal file
@ -0,0 +1,51 @@
|
||||
# Copyright 2018 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
|
||||
MQTT_OPTS = [
|
||||
cfg.StrOpt('hostname', help="MQTT broker address/name"),
|
||||
cfg.IntOpt('port', default=1883, help='MQTT broker port'),
|
||||
cfg.StrOpt('username',
|
||||
help="Username to authenticate against the broker."),
|
||||
cfg.StrOpt('password', secret=True,
|
||||
help='Password to authenticate against the broker.'),
|
||||
cfg.IntOpt('qos', default=0, min=0, max=2,
|
||||
help='Max MQTT QoS available on messages. This can be 0, 1, '
|
||||
'or 2'),
|
||||
cfg.StrOpt('client_id',
|
||||
help='MQTT client identifier, default is hostname + pid'),
|
||||
cfg.StrOpt('base_topic', default='storyboard',
|
||||
help='The base MQTT topic to publish to'),
|
||||
cfg.StrOpt('ca_certs',
|
||||
help="The path to the Certificate Authority certificate files "
|
||||
"that are to be treated as trusted. If this is the only "
|
||||
"certificate option given then the client will operate in "
|
||||
"a similar manner to a web browser. That is to say it will"
|
||||
"require the broker to have a certificate signed by the "
|
||||
"Certificate Authorities in ca_certs and will communicate "
|
||||
"using TLS v1, but will not attempt any form of TLS"
|
||||
"certificate based authentication."),
|
||||
cfg.StrOpt('certfile',
|
||||
help="The path pointing to the PEM encoded client certificate. "
|
||||
"If this is set it will be used as client information for "
|
||||
"TLS based authentication. Support for this feature is "
|
||||
"broker dependent."),
|
||||
cfg.StrOpt('keyfile',
|
||||
help="The path pointing to the PEM encoded client private key."
|
||||
"If this is set it will be used as client information for "
|
||||
"TLS based authentication. Support for this feature is "
|
||||
"broker dependent"),
|
||||
]
|
104
storyboard/notifications/mqtt/publisher.py
Normal file
104
storyboard/notifications/mqtt/publisher.py
Normal file
@ -0,0 +1,104 @@
|
||||
# Copyright 2018 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_config import cfg
|
||||
import paho.mqtt.publish as mqtt_publish
|
||||
|
||||
from storyboard.notifications.mqtt import conf
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(conf.MQTT_OPTS, group='mqtt_notifications')
|
||||
|
||||
|
||||
class PushMQTT(object):
|
||||
def __init__(self, hostname, port=1883, client_id=None,
|
||||
keepalive=60, will=None, auth=None, tls=None, qos=0):
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
self.client_id = client_id
|
||||
self.keepalive = 60
|
||||
self.will = will
|
||||
self.auth = auth
|
||||
self.tls = tls
|
||||
self.qos = qos
|
||||
|
||||
def publish_single(self, topic, msg):
|
||||
mqtt_publish.single(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
def publish_multiple(self, topic, msg):
|
||||
mqtt_publish.multiple(topic, msg, hostname=self.hostname,
|
||||
port=self.port, client_id=self.client_id,
|
||||
keepalive=self.keepalive, will=self.will,
|
||||
auth=self.auth, tls=self.tls, qos=self.qos)
|
||||
|
||||
|
||||
def config_publisher():
|
||||
auth = None
|
||||
if CONF.mqtt_notifications.username:
|
||||
auth = {'username': CONF.mqtt_notifications.username,
|
||||
'password': CONF.mqtt_notifications.password}
|
||||
tls = None
|
||||
if CONF.mqtt_notifications.ca_certs:
|
||||
tls = {'ca_certs': CONF.mqtt_notifications.ca_certs,
|
||||
'certfile': CONF.mqtt_notifications.certfile,
|
||||
'keyfile': CONF.mqtt_notifications.keyfile}
|
||||
return PushMQTT(CONF.mqtt_notifications.hostname,
|
||||
port=CONF.mqtt_notifications.port,
|
||||
client_id=CONF.mqtt_notifications.client_id,
|
||||
auth=auth, tls=tls, qos=CONF.mqtt_notifications.qos)
|
||||
|
||||
|
||||
def _generate_topic(resource, resource_id=None, author_id=None,
|
||||
sub_resource=None, sub_resource_id=None):
|
||||
topic = [CONF.mqtt_notifications.base_topic]
|
||||
if resource:
|
||||
topic.append(resource)
|
||||
if resource_id:
|
||||
topic.append(resource_id)
|
||||
if author_id:
|
||||
topic.append(author_id)
|
||||
if sub_resource:
|
||||
topic.extend(['sub_resource', sub_resource])
|
||||
if sub_resource_id:
|
||||
topic.append(sub_resource_id)
|
||||
return '/'.join(topic)
|
||||
|
||||
|
||||
def publish(resource, author_id=None, method=None, url=None, path=None,
|
||||
query_string=None, status=None, resource_id=None,
|
||||
sub_resource=None, sub_resource_id=None, resource_before=None,
|
||||
resource_after=None):
|
||||
mqtt_publish = config_publisher()
|
||||
topic = _generate_topic(resource, resource_id, author_id, sub_resource,
|
||||
sub_resource_id)
|
||||
payload = {
|
||||
"author_id": author_id,
|
||||
"method": method,
|
||||
"url": url,
|
||||
"path": path,
|
||||
"query_string": query_string,
|
||||
"status": status,
|
||||
"resource": resource,
|
||||
"resource_id": resource_id,
|
||||
"sub_resource": sub_resource,
|
||||
"sub_resource_id": sub_resource_id,
|
||||
"resource_before": resource_before,
|
||||
"resource_after": resource_after
|
||||
}
|
||||
mqtt_publish.publish_single(topic, json.dumps(payload))
|
Loading…
x
Reference in New Issue
Block a user