diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index 425f6c17..89e644e4 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -38,7 +38,7 @@ class KafkaProducer(object): self._kafka = kafka_client.KafkaClient(url) self._producer = kafka_producer.KeyedProducer( self._kafka, - async=False, + is_async=False, req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) diff --git a/monasca_common/kafka_lib/producer/base.py b/monasca_common/kafka_lib/producer/base.py index b0803f74..f6549b3e 100644 --- a/monasca_common/kafka_lib/producer/base.py +++ b/monasca_common/kafka_lib/producer/base.py @@ -239,7 +239,7 @@ class Producer(object): Arguments: client (KafkaClient): instance to use for broker communications. - If async=True, the background thread will use client.copy(), + If is_async=True, the background thread will use client.copy(), which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. req_acks (int, optional): A value indicating the acknowledgements that @@ -250,11 +250,11 @@ class Producer(object): sync_fail_on_error (bool, optional): whether sync producer should raise exceptions (True), or just return errors (False), defaults to True. - async (bool, optional): send message using a background thread, + is_async (bool, optional): send message using a background thread, defaults to False. - batch_send_every_n (int, optional): If async is True, messages are + batch_send_every_n (int, optional): If is_async is True, messages are sent in batches of this size, defaults to 20. - batch_send_every_t (int or float, optional): If async is True, + batch_send_every_t (int or float, optional): If is_async is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20. async_retry_limit (int, optional): number of retries for failed messages @@ -281,7 +281,7 @@ class Producer(object): Deprecated Arguments: batch_send (bool, optional): If True, messages are sent by a background - thread in batches, defaults to False. Deprecated, use 'async' + thread in batches, defaults to False. Deprecated, use 'is_async' """ ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log @@ -294,8 +294,8 @@ class Producer(object): codec=None, codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, - async=False, - batch_send=False, # deprecated, use async + is_async=False, + batch_send=False, # deprecated, use is_async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -306,13 +306,13 @@ class Producer(object): async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): - if async: + if is_async: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 self.client = client - self.async = async + self.is_async = is_async self.req_acks = req_acks self.ack_timeout = ack_timeout self.stopped = False @@ -325,7 +325,7 @@ class Producer(object): self.codec = codec self.codec_compresslevel = codec_compresslevel - if self.async: + if self.is_async: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout @@ -401,7 +401,7 @@ class Producer(object): if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") - if self.async: + if self.is_async: for idx, m in enumerate(msg): try: item = (TopicAndPartition(topic, partition), m, key) @@ -437,7 +437,7 @@ class Producer(object): log.warning('timeout argument to stop() is deprecated - ' 'it will be removed in future release') - if not self.async: + if not self.is_async: log.warning('producer.stop() called, but producer is not async') return @@ -445,7 +445,7 @@ class Producer(object): log.warning('producer.stop() called, but producer is already stopped') return - if self.async: + if self.is_async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread_stop_event.set() self.thread.join() diff --git a/monasca_common/kafka_lib/producer/keyed.py b/monasca_common/kafka_lib/producer/keyed.py index 78da89c9..b923594b 100644 --- a/monasca_common/kafka_lib/producer/keyed.py +++ b/monasca_common/kafka_lib/producer/keyed.py @@ -61,4 +61,4 @@ class KeyedProducer(Producer): return self.send_messages(topic, key, msg) def __repr__(self): - return '' % self.async + return '' % self.is_async diff --git a/monasca_common/kafka_lib/producer/simple.py b/monasca_common/kafka_lib/producer/simple.py index f13bad5d..85101858 100644 --- a/monasca_common/kafka_lib/producer/simple.py +++ b/monasca_common/kafka_lib/producer/simple.py @@ -67,4 +67,4 @@ class SimpleProducer(Producer): ) def __repr__(self): - return '' % self.async + return '' % self.is_async