Added subscriber and publisher modules
For each action, except for creation of a new project, two messages will be sent to rabbitmq. One with the main resource; resource_id; user_id; method and the other with the event_id; user_id; a faked method POST and a faked resource TIMELINE_EVENT Publisher creates an exchange called 'storyboard' and publishes the messages to it. Subscriber creates the queues with different binding_keys which bind themself to the storyboard exchange and start consuming the messages from the exchange. The consumed messages are used by CR 113016. Co-Authored-By: Nikita Konovalov <nkonovalov@mirantis.com> Change-Id: Ia573437302dc2d0b1a68d2343e83f9dd397fac04
This commit is contained in:
parent
d576442b69
commit
c8cbc9720d
@ -46,6 +46,30 @@ lock_path = $state_path/lock
|
||||
# page_size_maximum = 500
|
||||
# page_size_default = 20
|
||||
|
||||
# Enable notifications. This feature drives deferred processing, reporting,
|
||||
# and subscriptions.
|
||||
# enable_notifications = True
|
||||
|
||||
[notifications]
|
||||
|
||||
# Host of the rabbitmq server.
|
||||
# rabbit_host=localhost
|
||||
|
||||
# The RabbitMQ login method
|
||||
# rabbit_login_method = AMQPLAIN
|
||||
|
||||
# The RabbitMQ userid.
|
||||
# rabbit_userid = storyboard
|
||||
|
||||
# The RabbitMQ password.
|
||||
# rabbit_password = storyboard
|
||||
|
||||
# The RabbitMQ broker port where a single node is used.
|
||||
# rabbit_port = 5672
|
||||
|
||||
# The virtual host within which our queues and exchanges live.
|
||||
# rabbit_virtual_host = /
|
||||
|
||||
[database]
|
||||
# This line MUST be changed to actually run storyboard
|
||||
# Example:
|
||||
|
@ -7,6 +7,7 @@ oauthlib>=0.6
|
||||
oslo.config>=1.2.1
|
||||
pecan>=0.4.5
|
||||
oslo.db>=0.2.0
|
||||
pika>=0.9.14
|
||||
python-openid
|
||||
PyYAML>=3.1.0
|
||||
requests>=1.1
|
||||
|
@ -32,6 +32,7 @@ data_files =
|
||||
[entry_points]
|
||||
console_scripts =
|
||||
storyboard-api = storyboard.api.app:start
|
||||
storyboard-subscriber = storyboard.notifications.subscriber:subscribe
|
||||
storyboard-db-manage = storyboard.db.migration.cli:main
|
||||
|
||||
[build_sphinx]
|
||||
|
@ -22,14 +22,17 @@ from wsgiref import simple_server
|
||||
from storyboard.api.auth.token_storage import impls as storage_impls
|
||||
from storyboard.api.auth.token_storage import storage
|
||||
from storyboard.api import config as api_config
|
||||
from storyboard.api.middleware import resource_hook
|
||||
from storyboard.api.middleware import token_middleware
|
||||
from storyboard.api.middleware import user_id_hook
|
||||
from storyboard.api.v1.search import impls as search_engine_impls
|
||||
from storyboard.api.v1.search import search_engine
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common.gettextutils import _ # noqa
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
API_OPTS = [
|
||||
@ -38,7 +41,10 @@ API_OPTS = [
|
||||
help='API host'),
|
||||
cfg.IntOpt('bind_port',
|
||||
default=8080,
|
||||
help='API port')
|
||||
help='API port'),
|
||||
cfg.BoolOpt('enable_notifications',
|
||||
default=False,
|
||||
help='Enable Notifications')
|
||||
]
|
||||
CONF.register_opts(API_OPTS)
|
||||
|
||||
@ -62,6 +68,10 @@ def setup_app(pecan_config=None):
|
||||
])
|
||||
log.setup('storyboard')
|
||||
|
||||
hooks = [
|
||||
user_id_hook.UserIdHook()
|
||||
]
|
||||
|
||||
# Setup token storage
|
||||
token_storage_type = CONF.token_storage_type
|
||||
storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type]
|
||||
@ -72,10 +82,15 @@ def setup_app(pecan_config=None):
|
||||
search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name]
|
||||
search_engine.set_engine(search_engine_cls())
|
||||
|
||||
# Setup notifier
|
||||
if CONF.enable_notifications:
|
||||
connection_service.initialize()
|
||||
hooks.append(resource_hook.ResourceHook())
|
||||
|
||||
app = pecan.make_app(
|
||||
pecan_config.app.root,
|
||||
debug=CONF.debug,
|
||||
hooks=[user_id_hook.UserIdHook()],
|
||||
hooks=hooks,
|
||||
force_canonical=getattr(pecan_config.app, 'force_canonical', True),
|
||||
guess_content_type_from_ext=False
|
||||
)
|
||||
|
31
storyboard/api/middleware/resource_hook.py
Normal file
31
storyboard/api/middleware/resource_hook.py
Normal file
@ -0,0 +1,31 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from pecan import hooks
|
||||
|
||||
from storyboard.notifications import publisher
|
||||
|
||||
|
||||
class ResourceHook(hooks.PecanHook):
|
||||
|
||||
def __init__(self):
|
||||
super(ResourceHook, self).__init__()
|
||||
|
||||
def after(self, state):
|
||||
# Ignore get methods, we only care about changes.
|
||||
if state.request.method == 'GET':
|
||||
return
|
||||
|
||||
publisher.publish(state)
|
@ -14,10 +14,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from oslo.config import cfg
|
||||
from pecan import request
|
||||
|
||||
from storyboard.common import event_types
|
||||
from storyboard.db.api import base as api_base
|
||||
from storyboard.db import models
|
||||
from storyboard.notifications import connection_service
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def event_get(event_id):
|
||||
@ -40,7 +45,30 @@ def events_get_count(**kwargs):
|
||||
|
||||
|
||||
def event_create(values):
|
||||
return api_base.entity_create(models.TimeLineEvent, values)
|
||||
new_event = api_base.entity_create(models.TimeLineEvent, values)
|
||||
|
||||
if CONF.enable_notifications:
|
||||
|
||||
payload_timeline_events = {
|
||||
"user_id": request.current_user_id,
|
||||
"method": "POST",
|
||||
"resource": "timeline_event",
|
||||
"event_id": new_event.id
|
||||
}
|
||||
payload_timeline_events = json.dumps(payload_timeline_events)
|
||||
routing_key = "timeline_events"
|
||||
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
|
||||
channel.basic_publish(exchange='storyboard',
|
||||
routing_key=routing_key,
|
||||
body=payload_timeline_events)
|
||||
|
||||
channel.close()
|
||||
|
||||
return new_event
|
||||
|
||||
|
||||
def story_created_event(story_id, author_id):
|
||||
|
0
storyboard/notifications/__init__.py
Normal file
0
storyboard/notifications/__init__.py
Normal file
34
storyboard/notifications/conf.py
Normal file
34
storyboard/notifications/conf.py
Normal file
@ -0,0 +1,34 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
NOTIFICATION_OPTS = [
|
||||
cfg.StrOpt("rabbit_host", default="localhost",
|
||||
help="Host of the rabbitmq server."),
|
||||
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
|
||||
help="The RabbitMQ login method."),
|
||||
cfg.StrOpt("rabbit_userid", default="storyboard",
|
||||
help="The RabbitMQ userid."),
|
||||
cfg.StrOpt("rabbit_password", default="storyboard",
|
||||
help="The RabbitMQ password."),
|
||||
cfg.IntOpt("rabbit_port", default=5672,
|
||||
help="The RabbitMQ broker port where a single node is used."),
|
||||
cfg.StrOpt("rabbit_virtual_host", default="/",
|
||||
help="The virtual host within which our queues and exchanges "
|
||||
"live."),
|
||||
]
|
62
storyboard/notifications/connection_service.py
Normal file
62
storyboard/notifications/connection_service.py
Normal file
@ -0,0 +1,62 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import pika
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from storyboard.notifications.conf import NOTIFICATION_OPTS
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONN = None
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionService:
|
||||
|
||||
def __init__(self, conf):
|
||||
self.credentials = pika.PlainCredentials(
|
||||
conf.rabbit_userid,
|
||||
conf.rabbit_password)
|
||||
|
||||
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
conf.rabbit_host,
|
||||
conf.rabbit_port,
|
||||
conf.rabbit_virtual_host,
|
||||
self.credentials))
|
||||
|
||||
def create_exchange(self, channel, exchange, type):
|
||||
self.exchange = exchange
|
||||
self.type = type
|
||||
self.channel = channel
|
||||
self.channel.exchange_declare(exchange=self.exchange,
|
||||
type=self.type, durable=True)
|
||||
|
||||
def close_connection(self):
|
||||
self.connection.close()
|
||||
|
||||
|
||||
def initialize():
|
||||
# Initialize the AMQP event publisher.
|
||||
global CONN
|
||||
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
|
||||
CONN = ConnectionService(CONF.notifications)
|
||||
|
||||
|
||||
def get_connection():
|
||||
global CONN
|
||||
return CONN
|
77
storyboard/notifications/publisher.py
Normal file
77
storyboard/notifications/publisher.py
Normal file
@ -0,0 +1,77 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import re
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def publish(state):
|
||||
|
||||
def parse(s):
|
||||
url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?"
|
||||
"\/?([a-z]+)?$", s)
|
||||
if url_pattern and url_pattern.groups()[0] != "openid":
|
||||
return url_pattern.groups()
|
||||
else:
|
||||
return
|
||||
|
||||
request = state.request
|
||||
req_method = request.method
|
||||
req_user_id = request.current_user_id
|
||||
req_path = request.path
|
||||
req_resource_grp = parse(req_path)
|
||||
|
||||
if req_resource_grp:
|
||||
resource = req_resource_grp[0]
|
||||
resource_id = req_resource_grp[1]
|
||||
else:
|
||||
return
|
||||
|
||||
if not resource_id:
|
||||
response_str = state.response.body
|
||||
response = json.loads(response_str)
|
||||
|
||||
if response:
|
||||
resource_id = response.get('id')
|
||||
else:
|
||||
resource_id = None
|
||||
|
||||
payload = {
|
||||
"user_id": req_user_id,
|
||||
"method": req_method,
|
||||
"resource_name": resource,
|
||||
"resource_id": resource_id,
|
||||
}
|
||||
|
||||
payload = json.dumps(payload)
|
||||
routing_key = resource
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
|
||||
channel.basic_publish(exchange='storyboard',
|
||||
routing_key=routing_key,
|
||||
body=payload)
|
||||
channel.close()
|
48
storyboard/notifications/subscriber.py
Normal file
48
storyboard/notifications/subscriber.py
Normal file
@ -0,0 +1,48 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def subscribe():
|
||||
|
||||
CONF(project='storyboard')
|
||||
connection_service.initialize()
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
result = channel.queue_declare(exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
binding_keys = ['projects', 'tasks', 'stories', 'timeline_events']
|
||||
|
||||
for binding_key in binding_keys:
|
||||
channel.queue_bind(exchange='storyboard',
|
||||
queue=queue_name,
|
||||
routing_key=binding_key)
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(" [x] %r %r %r %r"
|
||||
% (method.routing_key, body, ch, properties))
|
||||
|
||||
channel.basic_consume(callback,
|
||||
queue=queue_name,
|
||||
no_ack=True)
|
||||
|
||||
channel.start_consuming()
|
Loading…
x
Reference in New Issue
Block a user