Added subscriber and publisher modules
For each action, except for creation of a new project, two messages will be sent to rabbitmq. One with the main resource; resource_id; user_id; method and the other with the event_id; user_id; a faked method POST and a faked resource TIMELINE_EVENT Publisher creates an exchange called 'storyboard' and publishes the messages to it. Subscriber creates the queues with different binding_keys which bind themself to the storyboard exchange and start consuming the messages from the exchange. The consumed messages will be printed on the console for now which will be modified later. Do not merge until storyboard has Rabbitmq running. Change-Id: Ic4697f79aaab82dadf1fb1ae66f414a90ae28dac
This commit is contained in:
parent
cc0b5cccd9
commit
9a5a5b59d6
@ -46,6 +46,30 @@ lock_path = $state_path/lock
|
||||
# page_size_maximum = 500
|
||||
# page_size_default = 20
|
||||
|
||||
# Enable notifications. This feature drives deferred processing, reporting,
|
||||
# and subscriptions.
|
||||
# enable_notifications = True
|
||||
|
||||
[notifications]
|
||||
|
||||
# Host of the rabbitmq server.
|
||||
# rabbit_host=localhost
|
||||
|
||||
# The RabbitMQ login method
|
||||
# rabbit_login_method = AMQPLAIN
|
||||
|
||||
# The RabbitMQ userid.
|
||||
# rabbit_userid = storyboard
|
||||
|
||||
# The RabbitMQ password.
|
||||
# rabbit_password = storyboard
|
||||
|
||||
# The RabbitMQ broker port where a single node is used.
|
||||
# rabbit_port = 5672
|
||||
|
||||
# The virtual host within which our queues and exchanges live.
|
||||
# rabbit_virtual_host = /
|
||||
|
||||
[database]
|
||||
# This line MUST be changed to actually run storyboard
|
||||
# Example:
|
||||
|
@ -7,6 +7,7 @@ oauthlib>=0.6
|
||||
oslo.config>=1.2.1
|
||||
pecan>=0.4.5
|
||||
oslo.db>=0.2.0
|
||||
pika>=0.9.14
|
||||
python-openid
|
||||
PyYAML>=3.1.0
|
||||
requests>=1.1
|
||||
|
@ -32,6 +32,7 @@ data_files =
|
||||
[entry_points]
|
||||
console_scripts =
|
||||
storyboard-api = storyboard.api.app:start
|
||||
storyboard-subscriber = storyboard.notifications.subscriber:subscribe
|
||||
storyboard-db-manage = storyboard.db.migration.cli:main
|
||||
|
||||
[build_sphinx]
|
||||
|
@ -22,14 +22,17 @@ from wsgiref import simple_server
|
||||
from storyboard.api.auth.token_storage import impls as storage_impls
|
||||
from storyboard.api.auth.token_storage import storage
|
||||
from storyboard.api import config as api_config
|
||||
from storyboard.api.middleware import resource_hook
|
||||
from storyboard.api.middleware import token_middleware
|
||||
from storyboard.api.middleware import user_id_hook
|
||||
from storyboard.api.v1.search import impls as search_engine_impls
|
||||
from storyboard.api.v1.search import search_engine
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common.gettextutils import _ # noqa
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
API_OPTS = [
|
||||
@ -38,7 +41,10 @@ API_OPTS = [
|
||||
help='API host'),
|
||||
cfg.IntOpt('bind_port',
|
||||
default=8080,
|
||||
help='API port')
|
||||
help='API port'),
|
||||
cfg.BoolOpt('enable_notifications',
|
||||
default=False,
|
||||
help='Enable Notifications')
|
||||
]
|
||||
CONF.register_opts(API_OPTS)
|
||||
|
||||
@ -62,6 +68,10 @@ def setup_app(pecan_config=None):
|
||||
])
|
||||
log.setup('storyboard')
|
||||
|
||||
hooks = [
|
||||
user_id_hook.UserIdHook()
|
||||
]
|
||||
|
||||
# Setup token storage
|
||||
token_storage_type = CONF.token_storage_type
|
||||
storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type]
|
||||
@ -72,10 +82,15 @@ def setup_app(pecan_config=None):
|
||||
search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name]
|
||||
search_engine.set_engine(search_engine_cls())
|
||||
|
||||
# Setup notifier
|
||||
if CONF.enable_notifications:
|
||||
connection_service.initialize()
|
||||
hooks.append(resource_hook.ResourceHook())
|
||||
|
||||
app = pecan.make_app(
|
||||
pecan_config.app.root,
|
||||
debug=CONF.debug,
|
||||
hooks=[user_id_hook.UserIdHook()],
|
||||
hooks=hooks,
|
||||
force_canonical=getattr(pecan_config.app, 'force_canonical', True),
|
||||
guess_content_type_from_ext=False
|
||||
)
|
||||
|
31
storyboard/api/middleware/resource_hook.py
Normal file
31
storyboard/api/middleware/resource_hook.py
Normal file
@ -0,0 +1,31 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from pecan import hooks
|
||||
|
||||
from storyboard.notifications import publisher
|
||||
|
||||
|
||||
class ResourceHook(hooks.PecanHook):
|
||||
|
||||
def __init__(self):
|
||||
super(ResourceHook, self).__init__()
|
||||
|
||||
def after(self, state):
|
||||
# Ignore get methods, we only care about changes.
|
||||
if state.request.method == 'GET':
|
||||
return
|
||||
|
||||
publisher.publish(state)
|
@ -14,10 +14,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from oslo.config import cfg
|
||||
from pecan import request
|
||||
|
||||
from storyboard.common import event_types
|
||||
from storyboard.db.api import base as api_base
|
||||
from storyboard.db import models
|
||||
from storyboard.notifications import connection_service
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def event_get(event_id):
|
||||
@ -40,7 +45,30 @@ def events_get_count(**kwargs):
|
||||
|
||||
|
||||
def event_create(values):
|
||||
return api_base.entity_create(models.TimeLineEvent, values)
|
||||
new_event = api_base.entity_create(models.TimeLineEvent, values)
|
||||
|
||||
if CONF.enable_notifications:
|
||||
|
||||
payload_timeline_events = {
|
||||
"user_id": request.current_user_id,
|
||||
"method": "POST",
|
||||
"resource": "timeline_event",
|
||||
"event_id": new_event.id
|
||||
}
|
||||
payload_timeline_events = json.dumps(payload_timeline_events)
|
||||
routing_key = "timeline_events"
|
||||
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
|
||||
channel.basic_publish(exchange='storyboard',
|
||||
routing_key=routing_key,
|
||||
body=payload_timeline_events)
|
||||
|
||||
channel.close()
|
||||
|
||||
return new_event
|
||||
|
||||
|
||||
def story_created_event(story_id, author_id):
|
||||
|
0
storyboard/notifications/__init__.py
Normal file
0
storyboard/notifications/__init__.py
Normal file
34
storyboard/notifications/conf.py
Normal file
34
storyboard/notifications/conf.py
Normal file
@ -0,0 +1,34 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
NOTIFICATION_OPTS = [
|
||||
cfg.StrOpt("rabbit_host", default="localhost",
|
||||
help="Host of the rabbitmq server."),
|
||||
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
|
||||
help="The RabbitMQ login method."),
|
||||
cfg.StrOpt("rabbit_userid", default="storyboard",
|
||||
help="The RabbitMQ userid."),
|
||||
cfg.StrOpt("rabbit_password", default="storyboard",
|
||||
help="The RabbitMQ password."),
|
||||
cfg.IntOpt("rabbit_port", default=5672,
|
||||
help="The RabbitMQ broker port where a single node is used."),
|
||||
cfg.StrOpt("rabbit_virtual_host", default="/",
|
||||
help="The virtual host within which our queues and exchanges "
|
||||
"live."),
|
||||
]
|
62
storyboard/notifications/connection_service.py
Normal file
62
storyboard/notifications/connection_service.py
Normal file
@ -0,0 +1,62 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import pika
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from storyboard.notifications.conf import NOTIFICATION_OPTS
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONN = None
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionService:
|
||||
|
||||
def __init__(self, conf):
|
||||
self.credentials = pika.PlainCredentials(
|
||||
conf.rabbit_userid,
|
||||
conf.rabbit_password)
|
||||
|
||||
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
conf.rabbit_host,
|
||||
conf.rabbit_port,
|
||||
conf.rabbit_virtual_host,
|
||||
self.credentials))
|
||||
|
||||
def create_exchange(self, channel, exchange, type):
|
||||
self.exchange = exchange
|
||||
self.type = type
|
||||
self.channel = channel
|
||||
self.channel.exchange_declare(exchange=self.exchange,
|
||||
type=self.type, durable=True)
|
||||
|
||||
def close_connection(self):
|
||||
self.connection.close()
|
||||
|
||||
|
||||
def initialize():
|
||||
# Initialize the AMQP event publisher.
|
||||
global CONN
|
||||
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
|
||||
CONN = ConnectionService(CONF.notifications)
|
||||
|
||||
|
||||
def get_connection():
|
||||
global CONN
|
||||
return CONN
|
77
storyboard/notifications/publisher.py
Normal file
77
storyboard/notifications/publisher.py
Normal file
@ -0,0 +1,77 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import re
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def publish(state):
|
||||
|
||||
def parse(s):
|
||||
url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?"
|
||||
"\/?([a-z]+)?$", s)
|
||||
if url_pattern and url_pattern.groups()[0] != "openid":
|
||||
return url_pattern.groups()
|
||||
else:
|
||||
return
|
||||
|
||||
request = state.request
|
||||
req_method = request.method
|
||||
req_user_id = request.current_user_id
|
||||
req_path = request.path
|
||||
req_resource_grp = parse(req_path)
|
||||
|
||||
if req_resource_grp:
|
||||
resource = req_resource_grp[0]
|
||||
resource_id = req_resource_grp[1]
|
||||
else:
|
||||
return
|
||||
|
||||
if not resource_id:
|
||||
response_str = state.response.body
|
||||
response = json.loads(response_str)
|
||||
|
||||
if response:
|
||||
resource_id = response.get('id')
|
||||
else:
|
||||
resource_id = None
|
||||
|
||||
payload = {
|
||||
"user_id": req_user_id,
|
||||
"method": req_method,
|
||||
"resource_name": resource,
|
||||
"resource_id": resource_id,
|
||||
}
|
||||
|
||||
payload = json.dumps(payload)
|
||||
routing_key = resource
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
|
||||
channel.basic_publish(exchange='storyboard',
|
||||
routing_key=routing_key,
|
||||
body=payload)
|
||||
channel.close()
|
48
storyboard/notifications/subscriber.py
Normal file
48
storyboard/notifications/subscriber.py
Normal file
@ -0,0 +1,48 @@
|
||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from storyboard.notifications import connection_service
|
||||
from storyboard.openstack.common import log
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def subscribe():
|
||||
|
||||
CONF(project='storyboard')
|
||||
connection_service.initialize()
|
||||
conn = connection_service.get_connection()
|
||||
channel = conn.connection.channel()
|
||||
conn.create_exchange(channel, 'storyboard', 'topic')
|
||||
result = channel.queue_declare(exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
binding_keys = ['projects', 'tasks', 'stories', 'timeline_events']
|
||||
|
||||
for binding_key in binding_keys:
|
||||
channel.queue_bind(exchange='storyboard',
|
||||
queue=queue_name,
|
||||
routing_key=binding_key)
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(" [x] %r %r %r %r"
|
||||
% (method.routing_key, body, ch, properties))
|
||||
|
||||
channel.basic_consume(callback,
|
||||
queue=queue_name,
|
||||
no_ack=True)
|
||||
|
||||
channel.start_consuming()
|
Loading…
x
Reference in New Issue
Block a user