From 9a5a5b59d6d0701cb1a1cfa5f7ec777d438ec969 Mon Sep 17 00:00:00 2001 From: Nikita Konovalov Date: Thu, 26 Jun 2014 18:26:40 +0400 Subject: [PATCH] Added subscriber and publisher modules For each action, except for creation of a new project, two messages will be sent to rabbitmq. One with the main resource; resource_id; user_id; method and the other with the event_id; user_id; a faked method POST and a faked resource TIMELINE_EVENT Publisher creates an exchange called 'storyboard' and publishes the messages to it. Subscriber creates the queues with different binding_keys which bind themself to the storyboard exchange and start consuming the messages from the exchange. The consumed messages will be printed on the console for now which will be modified later. Do not merge until storyboard has Rabbitmq running. Change-Id: Ic4697f79aaab82dadf1fb1ae66f414a90ae28dac --- etc/storyboard.conf.sample | 24 ++++++ requirements.txt | 1 + setup.cfg | 1 + storyboard/api/app.py | 19 ++++- storyboard/api/middleware/resource_hook.py | 31 ++++++++ storyboard/db/api/timeline_events.py | 30 +++++++- storyboard/notifications/__init__.py | 0 storyboard/notifications/conf.py | 34 ++++++++ .../notifications/connection_service.py | 62 +++++++++++++++ storyboard/notifications/publisher.py | 77 +++++++++++++++++++ storyboard/notifications/subscriber.py | 48 ++++++++++++ 11 files changed, 324 insertions(+), 3 deletions(-) create mode 100644 storyboard/api/middleware/resource_hook.py create mode 100644 storyboard/notifications/__init__.py create mode 100644 storyboard/notifications/conf.py create mode 100644 storyboard/notifications/connection_service.py create mode 100644 storyboard/notifications/publisher.py create mode 100644 storyboard/notifications/subscriber.py diff --git a/etc/storyboard.conf.sample b/etc/storyboard.conf.sample index 6d3d57a3..a3f52772 100644 --- a/etc/storyboard.conf.sample +++ b/etc/storyboard.conf.sample @@ -46,6 +46,30 @@ lock_path = $state_path/lock # page_size_maximum = 500 # page_size_default = 20 +# Enable notifications. This feature drives deferred processing, reporting, +# and subscriptions. +# enable_notifications = True + +[notifications] + +# Host of the rabbitmq server. +# rabbit_host=localhost + +# The RabbitMQ login method +# rabbit_login_method = AMQPLAIN + +# The RabbitMQ userid. +# rabbit_userid = storyboard + +# The RabbitMQ password. +# rabbit_password = storyboard + +# The RabbitMQ broker port where a single node is used. +# rabbit_port = 5672 + +# The virtual host within which our queues and exchanges live. +# rabbit_virtual_host = / + [database] # This line MUST be changed to actually run storyboard # Example: diff --git a/requirements.txt b/requirements.txt index e5d67e15..41d1255f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ oauthlib>=0.6 oslo.config>=1.2.1 pecan>=0.4.5 oslo.db>=0.2.0 +pika>=0.9.14 python-openid PyYAML>=3.1.0 requests>=1.1 diff --git a/setup.cfg b/setup.cfg index cfc6a9f8..3a64f92b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ data_files = [entry_points] console_scripts = storyboard-api = storyboard.api.app:start + storyboard-subscriber = storyboard.notifications.subscriber:subscribe storyboard-db-manage = storyboard.db.migration.cli:main [build_sphinx] diff --git a/storyboard/api/app.py b/storyboard/api/app.py index 58bfaefa..a2f4a0d7 100644 --- a/storyboard/api/app.py +++ b/storyboard/api/app.py @@ -22,14 +22,17 @@ from wsgiref import simple_server from storyboard.api.auth.token_storage import impls as storage_impls from storyboard.api.auth.token_storage import storage from storyboard.api import config as api_config +from storyboard.api.middleware import resource_hook from storyboard.api.middleware import token_middleware from storyboard.api.middleware import user_id_hook from storyboard.api.v1.search import impls as search_engine_impls from storyboard.api.v1.search import search_engine +from storyboard.notifications import connection_service from storyboard.openstack.common.gettextutils import _ # noqa from storyboard.openstack.common import log CONF = cfg.CONF + LOG = log.getLogger(__name__) API_OPTS = [ @@ -38,7 +41,10 @@ API_OPTS = [ help='API host'), cfg.IntOpt('bind_port', default=8080, - help='API port') + help='API port'), + cfg.BoolOpt('enable_notifications', + default=False, + help='Enable Notifications') ] CONF.register_opts(API_OPTS) @@ -62,6 +68,10 @@ def setup_app(pecan_config=None): ]) log.setup('storyboard') + hooks = [ + user_id_hook.UserIdHook() + ] + # Setup token storage token_storage_type = CONF.token_storage_type storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type] @@ -72,10 +82,15 @@ def setup_app(pecan_config=None): search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name] search_engine.set_engine(search_engine_cls()) + # Setup notifier + if CONF.enable_notifications: + connection_service.initialize() + hooks.append(resource_hook.ResourceHook()) + app = pecan.make_app( pecan_config.app.root, debug=CONF.debug, - hooks=[user_id_hook.UserIdHook()], + hooks=hooks, force_canonical=getattr(pecan_config.app, 'force_canonical', True), guess_content_type_from_ext=False ) diff --git a/storyboard/api/middleware/resource_hook.py b/storyboard/api/middleware/resource_hook.py new file mode 100644 index 00000000..63b52521 --- /dev/null +++ b/storyboard/api/middleware/resource_hook.py @@ -0,0 +1,31 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pecan import hooks + +from storyboard.notifications import publisher + + +class ResourceHook(hooks.PecanHook): + + def __init__(self): + super(ResourceHook, self).__init__() + + def after(self, state): + # Ignore get methods, we only care about changes. + if state.request.method == 'GET': + return + + publisher.publish(state) diff --git a/storyboard/db/api/timeline_events.py b/storyboard/db/api/timeline_events.py index 8ed2a253..d277f0ba 100644 --- a/storyboard/db/api/timeline_events.py +++ b/storyboard/db/api/timeline_events.py @@ -14,10 +14,15 @@ # limitations under the License. import json +from oslo.config import cfg +from pecan import request from storyboard.common import event_types from storyboard.db.api import base as api_base from storyboard.db import models +from storyboard.notifications import connection_service + +CONF = cfg.CONF def event_get(event_id): @@ -40,7 +45,30 @@ def events_get_count(**kwargs): def event_create(values): - return api_base.entity_create(models.TimeLineEvent, values) + new_event = api_base.entity_create(models.TimeLineEvent, values) + + if CONF.enable_notifications: + + payload_timeline_events = { + "user_id": request.current_user_id, + "method": "POST", + "resource": "timeline_event", + "event_id": new_event.id + } + payload_timeline_events = json.dumps(payload_timeline_events) + routing_key = "timeline_events" + + conn = connection_service.get_connection() + channel = conn.connection.channel() + conn.create_exchange(channel, 'storyboard', 'topic') + + channel.basic_publish(exchange='storyboard', + routing_key=routing_key, + body=payload_timeline_events) + + channel.close() + + return new_event def story_created_event(story_id, author_id): diff --git a/storyboard/notifications/__init__.py b/storyboard/notifications/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/notifications/conf.py b/storyboard/notifications/conf.py new file mode 100644 index 00000000..9a4ffb8d --- /dev/null +++ b/storyboard/notifications/conf.py @@ -0,0 +1,34 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +CONF = cfg.CONF + +NOTIFICATION_OPTS = [ + cfg.StrOpt("rabbit_host", default="localhost", + help="Host of the rabbitmq server."), + cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", + help="The RabbitMQ login method."), + cfg.StrOpt("rabbit_userid", default="storyboard", + help="The RabbitMQ userid."), + cfg.StrOpt("rabbit_password", default="storyboard", + help="The RabbitMQ password."), + cfg.IntOpt("rabbit_port", default=5672, + help="The RabbitMQ broker port where a single node is used."), + cfg.StrOpt("rabbit_virtual_host", default="/", + help="The virtual host within which our queues and exchanges " + "live."), +] diff --git a/storyboard/notifications/connection_service.py b/storyboard/notifications/connection_service.py new file mode 100644 index 00000000..be99d550 --- /dev/null +++ b/storyboard/notifications/connection_service.py @@ -0,0 +1,62 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pika + +from oslo.config import cfg + +from storyboard.notifications.conf import NOTIFICATION_OPTS +from storyboard.openstack.common import log + +CONF = cfg.CONF +CONN = None + +LOG = log.getLogger(__name__) + + +class ConnectionService: + + def __init__(self, conf): + self.credentials = pika.PlainCredentials( + conf.rabbit_userid, + conf.rabbit_password) + + self.connection = pika.BlockingConnection(pika.ConnectionParameters( + conf.rabbit_host, + conf.rabbit_port, + conf.rabbit_virtual_host, + self.credentials)) + + def create_exchange(self, channel, exchange, type): + self.exchange = exchange + self.type = type + self.channel = channel + self.channel.exchange_declare(exchange=self.exchange, + type=self.type, durable=True) + + def close_connection(self): + self.connection.close() + + +def initialize(): + # Initialize the AMQP event publisher. + global CONN + CONF.register_opts(NOTIFICATION_OPTS, "notifications") + CONN = ConnectionService(CONF.notifications) + + +def get_connection(): + global CONN + return CONN diff --git a/storyboard/notifications/publisher.py b/storyboard/notifications/publisher.py new file mode 100644 index 00000000..9ac40fe7 --- /dev/null +++ b/storyboard/notifications/publisher.py @@ -0,0 +1,77 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re + +from oslo.config import cfg + +from storyboard.notifications import connection_service +from storyboard.openstack.common import log + +CONF = cfg.CONF + +LOG = log.getLogger(__name__) + + +def publish(state): + + def parse(s): + url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?" + "\/?([a-z]+)?$", s) + if url_pattern and url_pattern.groups()[0] != "openid": + return url_pattern.groups() + else: + return + + request = state.request + req_method = request.method + req_user_id = request.current_user_id + req_path = request.path + req_resource_grp = parse(req_path) + + if req_resource_grp: + resource = req_resource_grp[0] + resource_id = req_resource_grp[1] + else: + return + + if not resource_id: + response_str = state.response.body + response = json.loads(response_str) + + if response: + resource_id = response.get('id') + else: + resource_id = None + + payload = { + "user_id": req_user_id, + "method": req_method, + "resource_name": resource, + "resource_id": resource_id, + } + + payload = json.dumps(payload) + routing_key = resource + conn = connection_service.get_connection() + channel = conn.connection.channel() + + conn.create_exchange(channel, 'storyboard', 'topic') + + channel.basic_publish(exchange='storyboard', + routing_key=routing_key, + body=payload) + channel.close() diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/subscriber.py new file mode 100644 index 00000000..83d70521 --- /dev/null +++ b/storyboard/notifications/subscriber.py @@ -0,0 +1,48 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +from storyboard.notifications import connection_service +from storyboard.openstack.common import log + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +def subscribe(): + + CONF(project='storyboard') + connection_service.initialize() + conn = connection_service.get_connection() + channel = conn.connection.channel() + conn.create_exchange(channel, 'storyboard', 'topic') + result = channel.queue_declare(exclusive=True) + queue_name = result.method.queue + binding_keys = ['projects', 'tasks', 'stories', 'timeline_events'] + + for binding_key in binding_keys: + channel.queue_bind(exchange='storyboard', + queue=queue_name, + routing_key=binding_key) + + def callback(ch, method, properties, body): + print(" [x] %r %r %r %r" + % (method.routing_key, body, ch, properties)) + + channel.basic_consume(callback, + queue=queue_name, + no_ack=True) + + channel.start_consuming()