From afc64e7de1cc5bbb24050cca581964150ad3fb7f Mon Sep 17 00:00:00 2001 From: Sumit Jamgade Date: Fri, 5 Jul 2019 15:00:11 +0200 Subject: [PATCH] In Python 3.7 "async" and "await" will become reserved keywords This patch changes the use of "async" with the PEP8 recommended convention. As of Python 3.6, the following deprecation warning is displayed: See PEP 492 for further details.: https://www.python.org/dev/peps/pep-0492/ Change-Id: I0e3fd5b70fb34ee3b4f1eb444c27279479b3bc60 Story: 2006176 Task: 35700 --- monasca_common/kafka/producer.py | 2 +- monasca_common/kafka_lib/producer/base.py | 26 ++++++++++----------- monasca_common/kafka_lib/producer/keyed.py | 2 +- monasca_common/kafka_lib/producer/simple.py | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index 425f6c17..89e644e4 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -38,7 +38,7 @@ class KafkaProducer(object): self._kafka = kafka_client.KafkaClient(url) self._producer = kafka_producer.KeyedProducer( self._kafka, - async=False, + is_async=False, req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) diff --git a/monasca_common/kafka_lib/producer/base.py b/monasca_common/kafka_lib/producer/base.py index b0803f74..f6549b3e 100644 --- a/monasca_common/kafka_lib/producer/base.py +++ b/monasca_common/kafka_lib/producer/base.py @@ -239,7 +239,7 @@ class Producer(object): Arguments: client (KafkaClient): instance to use for broker communications. - If async=True, the background thread will use client.copy(), + If is_async=True, the background thread will use client.copy(), which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. req_acks (int, optional): A value indicating the acknowledgements that @@ -250,11 +250,11 @@ class Producer(object): sync_fail_on_error (bool, optional): whether sync producer should raise exceptions (True), or just return errors (False), defaults to True. - async (bool, optional): send message using a background thread, + is_async (bool, optional): send message using a background thread, defaults to False. - batch_send_every_n (int, optional): If async is True, messages are + batch_send_every_n (int, optional): If is_async is True, messages are sent in batches of this size, defaults to 20. - batch_send_every_t (int or float, optional): If async is True, + batch_send_every_t (int or float, optional): If is_async is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20. async_retry_limit (int, optional): number of retries for failed messages @@ -281,7 +281,7 @@ class Producer(object): Deprecated Arguments: batch_send (bool, optional): If True, messages are sent by a background - thread in batches, defaults to False. Deprecated, use 'async' + thread in batches, defaults to False. Deprecated, use 'is_async' """ ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log @@ -294,8 +294,8 @@ class Producer(object): codec=None, codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, - async=False, - batch_send=False, # deprecated, use async + is_async=False, + batch_send=False, # deprecated, use is_async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -306,13 +306,13 @@ class Producer(object): async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): - if async: + if is_async: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 self.client = client - self.async = async + self.is_async = is_async self.req_acks = req_acks self.ack_timeout = ack_timeout self.stopped = False @@ -325,7 +325,7 @@ class Producer(object): self.codec = codec self.codec_compresslevel = codec_compresslevel - if self.async: + if self.is_async: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout @@ -401,7 +401,7 @@ class Producer(object): if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") - if self.async: + if self.is_async: for idx, m in enumerate(msg): try: item = (TopicAndPartition(topic, partition), m, key) @@ -437,7 +437,7 @@ class Producer(object): log.warning('timeout argument to stop() is deprecated - ' 'it will be removed in future release') - if not self.async: + if not self.is_async: log.warning('producer.stop() called, but producer is not async') return @@ -445,7 +445,7 @@ class Producer(object): log.warning('producer.stop() called, but producer is already stopped') return - if self.async: + if self.is_async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread_stop_event.set() self.thread.join() diff --git a/monasca_common/kafka_lib/producer/keyed.py b/monasca_common/kafka_lib/producer/keyed.py index 78da89c9..b923594b 100644 --- a/monasca_common/kafka_lib/producer/keyed.py +++ b/monasca_common/kafka_lib/producer/keyed.py @@ -61,4 +61,4 @@ class KeyedProducer(Producer): return self.send_messages(topic, key, msg) def __repr__(self): - return '' % self.async + return '' % self.is_async diff --git a/monasca_common/kafka_lib/producer/simple.py b/monasca_common/kafka_lib/producer/simple.py index f13bad5d..85101858 100644 --- a/monasca_common/kafka_lib/producer/simple.py +++ b/monasca_common/kafka_lib/producer/simple.py @@ -67,4 +67,4 @@ class SimpleProducer(Producer): ) def __repr__(self): - return '' % self.async + return '' % self.is_async