From b5740072f769775ad7319671d210819f5dd737f7 Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Fri, 26 Jul 2013 12:29:14 +0400 Subject: [PATCH] Mq refactoring, SSL support and message acknowledgment --- muranocommon/messaging/__init__.py | 20 +++++ muranocommon/messaging/message.py | 53 +++++++++++++ muranocommon/{mq.py => messaging/mqclient.py} | 78 ++++--------------- muranocommon/messaging/subscription.py | 43 ++++++++++ requirements.txt | 5 +- 5 files changed, 132 insertions(+), 67 deletions(-) create mode 100644 muranocommon/messaging/__init__.py create mode 100644 muranocommon/messaging/message.py rename muranocommon/{mq.py => messaging/mqclient.py} (59%) create mode 100644 muranocommon/messaging/subscription.py diff --git a/muranocommon/messaging/__init__.py b/muranocommon/messaging/__init__.py new file mode 100644 index 0000000..fcb15f4 --- /dev/null +++ b/muranocommon/messaging/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from message import Message +from subscription import Subscription +from mqclient import MqClient + +__all__ = ['Message', 'Subscription', 'MqClient'] \ No newline at end of file diff --git a/muranocommon/messaging/message.py b/muranocommon/messaging/message.py new file mode 100644 index 0000000..519d000 --- /dev/null +++ b/muranocommon/messaging/message.py @@ -0,0 +1,53 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import anyjson +import logging + +log = logging.getLogger("murano-common.messaging") + + +class Message(object): + def __init__(self, client=None, message_handle=None): + self._client = client + self._message_handle = message_handle + self.id = None if message_handle is None else \ + message_handle['headers'].get('message_id') + try: + self.body = None if message_handle is None else \ + anyjson.loads(message_handle['body']) + except ValueError as e: + self.body = None + log.exception(e) + + + @property + def body(self): + return self._body + + @body.setter + def body(self, value): + self._body = value + + @property + def id(self): + return self._id + + @id.setter + def id(self, value): + self._id = value or '' + + def ack(self): + self._client.basic_ack(self._message_handle) diff --git a/muranocommon/mq.py b/muranocommon/messaging/mqclient.py similarity index 59% rename from muranocommon/mq.py rename to muranocommon/messaging/mqclient.py index b2a9f13..b47a391 100644 --- a/muranocommon/mq.py +++ b/muranocommon/messaging/mqclient.py @@ -16,17 +16,27 @@ from eventlet import patcher puka = patcher.import_patched('puka') import anyjson +from subscription import Subscription class MqClient(object): - def __init__(self, login, password, host, port, virtual_host): - self._client = puka.Client('amqp://{0}:{1}@{2}:{3}/{4}'.format( + def __init__(self, login, password, host, port, virtual_host, + ssl=False, ca_certs=None): + scheme = 'amqp:' if not ssl else 'amqps:' + + ssl_parameters = None + if ssl: + ssl_parameters = puka.SslConnectionParameters() + ssl_parameters.ca_certs = ca_certs + + self._client = puka.Client('{0}//{1}:{2}@{3}:{4}/{5}'.format( + scheme, login, password, host, port, virtual_host - )) + ), ssl_parameters=ssl_parameters) self._connected = False def __enter__(self): @@ -74,66 +84,8 @@ class MqClient(object): headers=headers) self._client.wait(promise, timeout=timeout) - def open(self, queue): + def open(self, queue, prefetch_count=1): if not self._connected: raise RuntimeError('Not connected to RabbitMQ') - return Subscription(self._client, queue) - - -class Subscription(object): - def __init__(self, client, queue): - self._client = client - self._queue = queue - self._promise = None - self._lastMessage = None - - def __enter__(self): - self._promise = self._client.basic_consume( - queue=self._queue, - prefetch_count=1) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._ack_last() - promise = self._client.basic_cancel(self._promise) - self._client.wait(promise) - return False - - def _ack_last(self): - if self._lastMessage: - self._client.basic_ack(self._lastMessage) - self._lastMessage = None - - def get_message(self, timeout=None): - if not self._promise: - raise RuntimeError( - "Subscription object must be used within 'with' block") - self._ack_last() - self._lastMessage = self._client.wait(self._promise, timeout=timeout) - msg = Message() - msg.body = anyjson.loads(self._lastMessage['body']) - msg.id = self._lastMessage['headers'].get('message_id') - return msg - - -class Message(object): - def __init__(self): - self._body = {} - self._id = '' - - @property - def body(self): - return self._body - - @body.setter - def body(self, value): - self._body = value - - @property - def id(self): - return self._id - - @id.setter - def id(self, value): - self._id = value or '' + return Subscription(self._client, queue, prefetch_count) diff --git a/muranocommon/messaging/subscription.py b/muranocommon/messaging/subscription.py new file mode 100644 index 0000000..1136613 --- /dev/null +++ b/muranocommon/messaging/subscription.py @@ -0,0 +1,43 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from message import Message + + +class Subscription(object): + def __init__(self, client, queue, prefetch_count=1): + self._client = client + self._queue = queue + self._promise = None + self._prefetch_count = prefetch_count + + def __enter__(self): + self._promise = self._client.basic_consume( + queue=self._queue, + prefetch_count=self._prefetch_count) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + promise = self._client.basic_cancel(self._promise) + self._client.wait(promise) + return False + + def get_message(self, timeout=None): + if not self._promise: + raise RuntimeError( + "Subscription object must be used within 'with' block") + msg_handle = self._client.wait(self._promise, timeout=timeout) + msg = Message(self._client, msg_handle) + return msg diff --git a/requirements.txt b/requirements.txt index 369b923..f3dc3c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,4 @@ d2to1>=0.2.10,<0.3 pbr>=0.5,<0.6 anyjson eventlet>=0.9.12 -argparse -iso8601>=0.1.4 -oslo.config -https://github.com/istalker2/puka/releases/download/0.0.7a/puka-0.0.7a.tar.gz#egg=puka-0.0.7a \ No newline at end of file +https://github.com/istalker2/puka/releases/download/1.0.7b/puka-1.0.7b.tar.gz#egg=puka-1.0.7b \ No newline at end of file