From eb02de82233eff0a63708bf6608af22ee91d8ba8 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Wed, 8 Jun 2016 13:02:22 +0300 Subject: [PATCH] [zmq] Remove redundant Envelope class Change-Id: Ia5f28fab6fa3baeb4f4c48ff6019df128cef042b Partial-Bug: #1582207 --- .../dealer/zmq_dealer_call_publisher.py | 2 - .../publishers/dealer/zmq_dealer_publisher.py | 2 - .../publishers/dealer/zmq_reply_waiter.py | 4 +- .../client/publishers/zmq_pub_publisher.py | 2 +- .../zmq_driver/client/zmq_envelope.py | 89 ------------------- .../_drivers/zmq_driver/client/zmq_request.py | 15 ---- .../server/consumers/zmq_router_consumer.py | 7 +- .../zmq_driver/server/zmq_incoming_message.py | 4 +- 8 files changed, 6 insertions(+), 119 deletions(-) delete mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index a8f2d71..7d1cdf1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -86,10 +86,8 @@ class CallSender(zmq_publisher_base.QueuedSender): self.reply_waiter = reply_waiter def _do_send_request(self, socket, request): - envelope = request.create_envelope() # DEALER socket specific envelope empty delimiter socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 0cd13ff..5934519 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -30,7 +30,6 @@ class DealerPublisher(zmq_publisher_base.QueuedSender): def _send_message_data(socket, request): socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", @@ -69,7 +68,6 @@ class DealerPublisherAsync(object): @staticmethod def _send_message_data(socket, request): socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py index 027bc7b..bb15fb2 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py @@ -47,10 +47,8 @@ class ReplyWaiter(object): def receive_method(self, socket): empty = socket.recv() assert empty == b'', "Empty expected!" - envelope = socket.recv_pyobj() - assert envelope is not None, "Invalid envelope!" reply = socket.recv_pyobj() - LOG.debug("Received reply %s", envelope) + LOG.debug("Received reply %s", reply.message_id) return reply def run_loop(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index dbe995b..68b9de2 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -55,8 +55,8 @@ class PubPublisherProxy(object): assert message_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE), "Fanout expected!" topic_filter = multipart_message.pop(0) - message_id = multipart_message.pop(0) reply_id = multipart_message.pop(0) + message_id = multipart_message.pop(0) assert reply_id is not None, "Reply id expected!" self.socket.send(topic_filter, zmq.SNDMORE) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py deleted file mode 100644 index d1913b4..0000000 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_names - - -class Envelope(object): - - def __init__(self, msg_type=None, message_id=None, target=None, - routing_key=None, **kwargs): - self._msg_type = msg_type - self._message_id = message_id - self._target = target - self._reply_id = None - self._routing_key = routing_key - self._kwargs = kwargs - - @property - def reply_id(self): - return self._reply_id - - @reply_id.setter - def reply_id(self, value): - self._reply_id = value - - @property - def routing_key(self): - return self._routing_key - - @routing_key.setter - def routing_key(self, value): - self._routing_key = value - - @property - def msg_type(self): - return self._msg_type - - @msg_type.setter - def msg_type(self, value): - self._msg_type = value - - @property - def message_id(self): - return self._message_id - - @property - def target(self): - return self._target - - @property - def is_mult_send(self): - return self._msg_type in zmq_names.MULTISEND_TYPES - - @property - def topic_filter(self): - return zmq_address.target_to_subscribe_filter(self._target) - - def has(self, key): - return key in self._kwargs - - def set(self, key, value): - self._kwargs[key] = value - - def get(self, key): - self._kwargs.get(key) - - def to_dict(self): - envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type, - zmq_names.FIELD_MSG_ID: self._message_id, - zmq_names.FIELD_TARGET: self._target, - zmq_names.FIELD_ROUTING_KEY: self._routing_key} - envelope.update({k: v for k, v in self._kwargs.items() - if v is not None}) - return envelope - - def __str__(self): - return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index a9ba36e..b3f8aae 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -18,7 +18,6 @@ import uuid import six -from oslo_messaging._drivers.zmq_driver.client import zmq_envelope from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE @@ -70,14 +69,6 @@ class Request(object): self.message_id = str(uuid.uuid1()) - def create_envelope(self, routing_key=None, reply_id=None): - envelope = zmq_envelope.Envelope(msg_type=self.msg_type, - message_id=self.message_id, - target=self.target, - routing_key=routing_key) - envelope.reply_id = reply_id - return envelope - @abc.abstractproperty def msg_type(self): """ZMQ message type""" @@ -112,12 +103,6 @@ class CallRequest(RpcRequest): super(CallRequest, self).__init__(*args, **kwargs) - def create_envelope(self, routing_key=None, reply_id=None): - envelope = super(CallRequest, self).create_envelope( - routing_key, reply_id) - envelope.set('timeout', self.timeout) - return envelope - class CastRequest(RpcRequest): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index da487f5..0e40d5c 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -57,13 +57,12 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' - envelope = socket.recv_pyobj() request = socket.recv_pyobj() - return request, envelope, reply_id + return request, reply_id def receive_message(self, socket): try: - request, envelope, reply_id = self._receive_request(socket) + request, reply_id = self._receive_request(socket) LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", {"host": self.host, "type": request.msg_type, @@ -72,7 +71,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - socket, reply_id, request, envelope, self.poller) + socket, reply_id, request, self.poller) elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: return RouterIncomingMessage( request.context, request.message, socket, reply_id, diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 2dc8ec3..51c83e2 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -29,13 +29,12 @@ zmq = zmq_async.import_zmq() class ZmqIncomingRequest(base.RpcIncomingMessage): - def __init__(self, socket, rep_id, request, envelope, poller): + def __init__(self, socket, rep_id, request, poller): super(ZmqIncomingRequest, self).__init__(request.context, request.message) self.reply_socket = socket self.reply_id = rep_id self.request = request - self.envelope = envelope self.received = None self.poller = poller @@ -53,7 +52,6 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE) self.reply_socket.send_pyobj(response) def requeue(self):