Merge "In Python 3.7 "async" and "await" will become reserved keywords"
This commit is contained in:
commit
e3ea6d3160
@ -38,7 +38,7 @@ class KafkaProducer(object):
|
|||||||
self._kafka = kafka_client.KafkaClient(url)
|
self._kafka = kafka_client.KafkaClient(url)
|
||||||
self._producer = kafka_producer.KeyedProducer(
|
self._producer = kafka_producer.KeyedProducer(
|
||||||
self._kafka,
|
self._kafka,
|
||||||
async=False,
|
is_async=False,
|
||||||
req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
|
req_acks=kafka_producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
|
||||||
ack_timeout=2000)
|
ack_timeout=2000)
|
||||||
|
|
||||||
|
@ -239,7 +239,7 @@ class Producer(object):
|
|||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
client (KafkaClient): instance to use for broker communications.
|
client (KafkaClient): instance to use for broker communications.
|
||||||
If async=True, the background thread will use client.copy(),
|
If is_async=True, the background thread will use client.copy(),
|
||||||
which is expected to return a thread-safe object.
|
which is expected to return a thread-safe object.
|
||||||
codec (kafka.protocol.ALL_CODECS): compression codec to use.
|
codec (kafka.protocol.ALL_CODECS): compression codec to use.
|
||||||
req_acks (int, optional): A value indicating the acknowledgements that
|
req_acks (int, optional): A value indicating the acknowledgements that
|
||||||
@ -250,11 +250,11 @@ class Producer(object):
|
|||||||
sync_fail_on_error (bool, optional): whether sync producer should
|
sync_fail_on_error (bool, optional): whether sync producer should
|
||||||
raise exceptions (True), or just return errors (False),
|
raise exceptions (True), or just return errors (False),
|
||||||
defaults to True.
|
defaults to True.
|
||||||
async (bool, optional): send message using a background thread,
|
is_async (bool, optional): send message using a background thread,
|
||||||
defaults to False.
|
defaults to False.
|
||||||
batch_send_every_n (int, optional): If async is True, messages are
|
batch_send_every_n (int, optional): If is_async is True, messages are
|
||||||
sent in batches of this size, defaults to 20.
|
sent in batches of this size, defaults to 20.
|
||||||
batch_send_every_t (int or float, optional): If async is True,
|
batch_send_every_t (int or float, optional): If is_async is True,
|
||||||
messages are sent immediately after this timeout in seconds, even
|
messages are sent immediately after this timeout in seconds, even
|
||||||
if there are fewer than batch_send_every_n, defaults to 20.
|
if there are fewer than batch_send_every_n, defaults to 20.
|
||||||
async_retry_limit (int, optional): number of retries for failed messages
|
async_retry_limit (int, optional): number of retries for failed messages
|
||||||
@ -281,7 +281,7 @@ class Producer(object):
|
|||||||
|
|
||||||
Deprecated Arguments:
|
Deprecated Arguments:
|
||||||
batch_send (bool, optional): If True, messages are sent by a background
|
batch_send (bool, optional): If True, messages are sent by a background
|
||||||
thread in batches, defaults to False. Deprecated, use 'async'
|
thread in batches, defaults to False. Deprecated, use 'is_async'
|
||||||
"""
|
"""
|
||||||
ACK_NOT_REQUIRED = 0 # No ack is required
|
ACK_NOT_REQUIRED = 0 # No ack is required
|
||||||
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
|
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
|
||||||
@ -294,8 +294,8 @@ class Producer(object):
|
|||||||
codec=None,
|
codec=None,
|
||||||
codec_compresslevel=None,
|
codec_compresslevel=None,
|
||||||
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
|
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
|
||||||
async=False,
|
is_async=False,
|
||||||
batch_send=False, # deprecated, use async
|
batch_send=False, # deprecated, use is_async
|
||||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||||
async_retry_limit=ASYNC_RETRY_LIMIT,
|
async_retry_limit=ASYNC_RETRY_LIMIT,
|
||||||
@ -306,13 +306,13 @@ class Producer(object):
|
|||||||
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
|
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
|
||||||
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
|
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
|
||||||
|
|
||||||
if async:
|
if is_async:
|
||||||
assert batch_send_every_n > 0
|
assert batch_send_every_n > 0
|
||||||
assert batch_send_every_t > 0
|
assert batch_send_every_t > 0
|
||||||
assert async_queue_maxsize >= 0
|
assert async_queue_maxsize >= 0
|
||||||
|
|
||||||
self.client = client
|
self.client = client
|
||||||
self.async = async
|
self.is_async = is_async
|
||||||
self.req_acks = req_acks
|
self.req_acks = req_acks
|
||||||
self.ack_timeout = ack_timeout
|
self.ack_timeout = ack_timeout
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
@ -325,7 +325,7 @@ class Producer(object):
|
|||||||
self.codec = codec
|
self.codec = codec
|
||||||
self.codec_compresslevel = codec_compresslevel
|
self.codec_compresslevel = codec_compresslevel
|
||||||
|
|
||||||
if self.async:
|
if self.is_async:
|
||||||
# Messages are sent through this queue
|
# Messages are sent through this queue
|
||||||
self.queue = Queue(async_queue_maxsize)
|
self.queue = Queue(async_queue_maxsize)
|
||||||
self.async_queue_put_timeout = async_queue_put_timeout
|
self.async_queue_put_timeout = async_queue_put_timeout
|
||||||
@ -401,7 +401,7 @@ class Producer(object):
|
|||||||
if key is not None and not isinstance(key, six.binary_type):
|
if key is not None and not isinstance(key, six.binary_type):
|
||||||
raise TypeError("the key must be type bytes")
|
raise TypeError("the key must be type bytes")
|
||||||
|
|
||||||
if self.async:
|
if self.is_async:
|
||||||
for idx, m in enumerate(msg):
|
for idx, m in enumerate(msg):
|
||||||
try:
|
try:
|
||||||
item = (TopicAndPartition(topic, partition), m, key)
|
item = (TopicAndPartition(topic, partition), m, key)
|
||||||
@ -437,7 +437,7 @@ class Producer(object):
|
|||||||
log.warning('timeout argument to stop() is deprecated - '
|
log.warning('timeout argument to stop() is deprecated - '
|
||||||
'it will be removed in future release')
|
'it will be removed in future release')
|
||||||
|
|
||||||
if not self.async:
|
if not self.is_async:
|
||||||
log.warning('producer.stop() called, but producer is not async')
|
log.warning('producer.stop() called, but producer is not async')
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -445,7 +445,7 @@ class Producer(object):
|
|||||||
log.warning('producer.stop() called, but producer is already stopped')
|
log.warning('producer.stop() called, but producer is already stopped')
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.async:
|
if self.is_async:
|
||||||
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
|
||||||
self.thread_stop_event.set()
|
self.thread_stop_event.set()
|
||||||
self.thread.join()
|
self.thread.join()
|
||||||
|
@ -61,4 +61,4 @@ class KeyedProducer(Producer):
|
|||||||
return self.send_messages(topic, key, msg)
|
return self.send_messages(topic, key, msg)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<KeyedProducer batch=%s>' % self.async
|
return '<KeyedProducer batch=%s>' % self.is_async
|
||||||
|
@ -67,4 +67,4 @@ class SimpleProducer(Producer):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<SimpleProducer batch=%s>' % self.async
|
return '<SimpleProducer batch=%s>' % self.is_async
|
||||||
|
Loading…
x
Reference in New Issue
Block a user