In Python 3.7 "async" and "await" will become reserved keywords
This patch changes the use of "async" with the PEP8 recommended convention. As of Python 3.6, the following deprecation warning is displayed: See PEP 492 for further details.: https://www.python.org/dev/peps/pep-0492/ Change-Id: I0e3fd5b70fb34ee3b4f1eb444c27279479b3bc60 Story: 2006176 Task: 35700
This commit is contained in:
parent
14e9401201
commit
afc64e7de1
@ -38,7 +38,7 @@ class KafkaProducer(object):
|
||||
self._kafka = kafka_client.KafkaClient(url)
|
||||
self._producer = kafka_producer.KeyedProducer(
|
||||
self._kafka,
|
||||
async=False,
|
||||
is_async=False,
|
||||
req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=2000)
|
||||
|
||||
|
@ -239,7 +239,7 @@ class Producer(object):
|
||||
|
||||
Arguments:
|
||||
client (KafkaClient): instance to use for broker communications.
|
||||
If async=True, the background thread will use client.copy(),
|
||||
If is_async=True, the background thread will use client.copy(),
|
||||
which is expected to return a thread-safe object.
|
||||
codec (kafka.protocol.ALL_CODECS): compression codec to use.
|
||||
req_acks (int, optional): A value indicating the acknowledgements that
|
||||
@ -250,11 +250,11 @@ class Producer(object):
|
||||
sync_fail_on_error (bool, optional): whether sync producer should
|
||||
raise exceptions (True), or just return errors (False),
|
||||
defaults to True.
|
||||
async (bool, optional): send message using a background thread,
|
||||
is_async (bool, optional): send message using a background thread,
|
||||
defaults to False.
|
||||
batch_send_every_n (int, optional): If async is True, messages are
|
||||
batch_send_every_n (int, optional): If is_async is True, messages are
|
||||
sent in batches of this size, defaults to 20.
|
||||
batch_send_every_t (int or float, optional): If async is True,
|
||||
batch_send_every_t (int or float, optional): If is_async is True,
|
||||
messages are sent immediately after this timeout in seconds, even
|
||||
if there are fewer than batch_send_every_n, defaults to 20.
|
||||
async_retry_limit (int, optional): number of retries for failed messages
|
||||
@ -281,7 +281,7 @@ class Producer(object):
|
||||
|
||||
Deprecated Arguments:
|
||||
batch_send (bool, optional): If True, messages are sent by a background
|
||||
thread in batches, defaults to False. Deprecated, use 'async'
|
||||
thread in batches, defaults to False. Deprecated, use 'is_async'
|
||||
"""
|
||||
ACK_NOT_REQUIRED = 0 # No ack is required
|
||||
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
|
||||
@ -294,8 +294,8 @@ class Producer(object):
|
||||
codec=None,
|
||||
codec_compresslevel=None,
|
||||
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
|
||||
async=False,
|
||||
batch_send=False, # deprecated, use async
|
||||
is_async=False,
|
||||
batch_send=False, # deprecated, use is_async
|
||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||
@ -306,13 +306,13 @@ class Producer(object):
|
||||
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
|
||||
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
|
||||
|
||||
if async:
|
||||
if is_async:
|
||||
assert batch_send_every_n > 0
|
||||
assert batch_send_every_t > 0
|
||||
assert async_queue_maxsize >= 0
|
||||
|
||||
self.client = client
|
||||
self.async = async
|
||||
self.is_async = is_async
|
||||
self.req_acks = req_acks
|
||||
self.ack_timeout = ack_timeout
|
||||
self.stopped = False
|
||||
@ -325,7 +325,7 @@ class Producer(object):
|
||||
self.codec = codec
|
||||
self.codec_compresslevel = codec_compresslevel
|
||||
|
||||
if self.async:
|
||||
if self.is_async:
|
||||
# Messages are sent through this queue
|
||||
self.queue = Queue(async_queue_maxsize)
|
||||
self.async_queue_put_timeout = async_queue_put_timeout
|
||||
@ -401,7 +401,7 @@ class Producer(object):
|
||||
if key is not None and not isinstance(key, six.binary_type):
|
||||
raise TypeError("the key must be type bytes")
|
||||
|
||||
if self.async:
|
||||
if self.is_async:
|
||||
for idx, m in enumerate(msg):
|
||||
try:
|
||||
item = (TopicAndPartition(topic, partition), m, key)
|
||||
@ -437,7 +437,7 @@ class Producer(object):
|
||||
log.warning('timeout argument to stop() is deprecated - '
|
||||
'it will be removed in future release')
|
||||
|
||||
if not self.async:
|
||||
if not self.is_async:
|
||||
log.warning('producer.stop() called, but producer is not async')
|
||||
return
|
||||
|
||||
@ -445,7 +445,7 @@ class Producer(object):
|
||||
log.warning('producer.stop() called, but producer is already stopped')
|
||||
return
|
||||
|
||||
if self.async:
|
||||
if self.is_async:
|
||||
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
||||
self.thread_stop_event.set()
|
||||
self.thread.join()
|
||||
|
@ -61,4 +61,4 @@ class KeyedProducer(Producer):
|
||||
return self.send_messages(topic, key, msg)
|
||||
|
||||
def __repr__(self):
|
||||
return '<KeyedProducer batch=%s>' % self.async
|
||||
return '<KeyedProducer batch=%s>' % self.is_async
|
||||
|
@ -67,4 +67,4 @@ class SimpleProducer(Producer):
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return '<SimpleProducer batch=%s>' % self.async
|
||||
return '<SimpleProducer batch=%s>' % self.is_async
|
||||
|
Loading…
x
Reference in New Issue
Block a user