diff --git a/requirements.txt b/requirements.txt index 41d1255f..b68c12b3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ WSME>=0.6 sqlalchemy-migrate>=0.8.2,!=0.8.4 SQLAlchemy-FullText-Search eventlet>=0.13.0 +stevedore>=1.0.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 01538726..19c3656e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,8 @@ console_scripts = storyboard-worker-daemon = storyboard.worker.daemon:run storyboard-db-manage = storyboard.db.migration.cli:main storyboard-migrate = storyboard.migrate.cli:main +storyboard.worker.task = + subscription = storyboard.worker.task.subscription:Subscription [build_sphinx] source-dir = doc/source diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/subscriber.py index 9de2e989..a5e3a452 100644 --- a/storyboard/notifications/subscriber.py +++ b/storyboard/notifications/subscriber.py @@ -13,19 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast import time from oslo.config import cfg from pika.exceptions import ConnectionClosed +from stevedore import enabled -from storyboard.db.api import timeline_events from storyboard.notifications.conf import NOTIFICATION_OPTS from storyboard.notifications.connection_service import ConnectionService -from storyboard.notifications.subscriptions_handler import handle_deletions -from storyboard.notifications.subscriptions_handler import handle_resources -from storyboard.notifications.subscriptions_handler import \ - handle_timeline_events from storyboard.openstack.common import log @@ -41,6 +36,13 @@ def subscribe(): subscriber = Subscriber(CONF.notifications) subscriber.start() + manager = enabled.EnabledExtensionManager( + namespace='storyboard.worker.task', + check_func=check_enabled, + invoke_on_load=True, + invoke_args=(CONF,) + ) + while subscriber.started: (method, properties, body) = subscriber.get() @@ -49,35 +51,31 @@ def subscribe(): time.sleep(5) continue - body_dict = ast.literal_eval(body) - if 'event_id' in body_dict: - event_id = body_dict['event_id'] - event = timeline_events.event_get(event_id) - handle_timeline_events(event, body_dict['author_id']) + manager.map(handle_event, body) - else: - if body_dict['resource'] == 'project_groups': - if 'sub_resource_id' in body_dict: - handle_resources(method=body_dict['method'], - resource_id=body_dict['resource_id'], - sub_resource_id=body_dict[ - 'sub_resource_id'], - author_id=body_dict['author_id']) - else: - handle_resources(method=body_dict['method'], - resource_id=body_dict['resource_id'], - author_id=body_dict['author_id']) - - if body_dict['method'] == 'DELETE': - resource_name = body_dict['resource'] - resource_id = body_dict['resource_id'] - if 'sub_resource_id' not in body_dict: - handle_deletions(resource_name, resource_id) - - # Handle the message + # Ack the message subscriber.ack(method.delivery_tag) +def handle_event(ext, body): + """Handle an event from the queue. + + :param ext: The extension that's handling this event. + :param body: The body of the event. + :return: The result of the handler. + """ + return ext.obj.handle(body) + + +def check_enabled(ext): + """Check to see whether an extension should be enabled. + + :param ext: The extension instance to check. + :return: True if it should be enabled. Otherwise false. + """ + return ext.obj.enabled() + + class Subscriber(ConnectionService): def __init__(self, conf): """Setup the subscriber instance based on our configuration. diff --git a/storyboard/worker/task/__init__.py b/storyboard/worker/task/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/worker/task/base.py b/storyboard/worker/task/base.py new file mode 100644 index 00000000..a5cd8162 --- /dev/null +++ b/storyboard/worker/task/base.py @@ -0,0 +1,37 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class WorkerTaskBase(object): + """Base class for a worker that listens to events that occur within the + API. + """ + + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + self.config = config + + @abc.abstractmethod + def enabled(self): + """A method which indicates whether this worker task is properly + configured and should be enabled. If it's ready to go, return True. + Otherwise, return False. + """ + + @abc.abstractmethod + def handle(self, body): + """Handle an event.""" diff --git a/storyboard/worker/task/subscription.py b/storyboard/worker/task/subscription.py new file mode 100644 index 00000000..56583344 --- /dev/null +++ b/storyboard/worker/task/subscription.py @@ -0,0 +1,59 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from storyboard.db.api import timeline_events +from storyboard.notifications.subscriptions_handler import handle_deletions +from storyboard.notifications.subscriptions_handler import handle_resources +from storyboard.notifications.subscriptions_handler import \ + handle_timeline_events +from storyboard.worker.task.base import WorkerTaskBase + + +class Subscription(WorkerTaskBase): + def handle(self, body): + """This worker handles API events and attempts to determine whether + they correspond to user subscriptions. + + :param body: The event message body. + :return: + """ + body_dict = json.loads(body) + if 'event_id' in body_dict: + event_id = body_dict['event_id'] + event = timeline_events.event_get(event_id) + handle_timeline_events(event, body_dict['author_id']) + + else: + if body_dict['resource'] == 'project_groups': + if 'sub_resource_id' in body_dict: + handle_resources(method=body_dict['method'], + resource_id=body_dict['resource_id'], + sub_resource_id=body_dict[ + 'sub_resource_id'], + author_id=body_dict['author_id']) + else: + handle_resources(method=body_dict['method'], + resource_id=body_dict['resource_id'], + author_id=body_dict['author_id']) + + if body_dict['method'] == 'DELETE': + resource_name = body_dict['resource'] + resource_id = body_dict['resource_id'] + if 'sub_resource_id' not in body_dict: + handle_deletions(resource_name, resource_id) + + def enabled(self): + return True