
To let other OpenStack projects move forward with new versions of kafka-python we're forking kafka-python and embedding it in monasca-common. This allows us to migrate to the new async interfaces provided by more recent kafka clients over time and not block other projects. Requiring pykafka to allow us to have ~4x more throughput once we write to their async interfaces. Change-Id: Ifb6ab67ce1335a5ec4ed7dd8b0027dc9d46a6dda Depends-On: I26f9c588f2818059ab6ba24f9fad8e213798a39c
230 lines
7.4 KiB
Python
230 lines
7.4 KiB
Python
from __future__ import absolute_import
|
|
|
|
import atexit
|
|
import logging
|
|
import numbers
|
|
from threading import Lock
|
|
|
|
import monasca_common.kafka_lib.common as kafka_common
|
|
from monasca_common.kafka_lib.common import (
|
|
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
|
UnknownTopicOrPartitionError, check_error, KafkaError
|
|
)
|
|
|
|
from monasca_common.kafka_lib.util import kafka_bytestring, ReentrantTimer
|
|
|
|
|
|
log = logging.getLogger('kafka.consumer')
|
|
|
|
AUTO_COMMIT_MSG_COUNT = 100
|
|
AUTO_COMMIT_INTERVAL = 5000
|
|
|
|
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
|
|
FETCH_MAX_WAIT_TIME = 100
|
|
FETCH_MIN_BYTES = 4096
|
|
FETCH_BUFFER_SIZE_BYTES = 4096
|
|
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
|
|
|
|
ITER_TIMEOUT_SECONDS = 60
|
|
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
|
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
|
|
|
|
MAX_BACKOFF_SECONDS = 60
|
|
|
|
class Consumer(object):
|
|
"""
|
|
Base class to be used by other consumers. Not to be used directly
|
|
|
|
This base class provides logic for
|
|
|
|
* initialization and fetching metadata of partitions
|
|
* Auto-commit logic
|
|
* APIs for fetching pending message count
|
|
|
|
"""
|
|
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
|
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
|
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
|
|
|
|
self.client = client
|
|
self.topic = kafka_bytestring(topic)
|
|
self.group = None if group is None else kafka_bytestring(group)
|
|
self.client.load_metadata_for_topics(topic)
|
|
self.offsets = {}
|
|
|
|
if partitions is None:
|
|
partitions = self.client.get_partition_ids_for_topic(topic)
|
|
else:
|
|
assert all(isinstance(x, numbers.Integral) for x in partitions)
|
|
|
|
# Variables for handling offset commits
|
|
self.commit_lock = Lock()
|
|
self.commit_timer = None
|
|
self.count_since_commit = 0
|
|
self.auto_commit = auto_commit
|
|
self.auto_commit_every_n = auto_commit_every_n
|
|
self.auto_commit_every_t = auto_commit_every_t
|
|
|
|
# Set up the auto-commit timer
|
|
if auto_commit is True and auto_commit_every_t is not None:
|
|
self.commit_timer = ReentrantTimer(auto_commit_every_t,
|
|
self.commit)
|
|
self.commit_timer.start()
|
|
|
|
# Set initial offsets
|
|
if self.group is not None:
|
|
self.fetch_last_known_offsets(partitions)
|
|
else:
|
|
for partition in partitions:
|
|
self.offsets[partition] = 0
|
|
|
|
# Register a cleanup handler
|
|
def cleanup(obj):
|
|
obj.stop()
|
|
self._cleanup_func = cleanup
|
|
atexit.register(cleanup, self)
|
|
|
|
self.partition_info = False # Do not return partition info in msgs
|
|
|
|
def provide_partition_info(self):
|
|
"""
|
|
Indicates that partition info must be returned by the consumer
|
|
"""
|
|
self.partition_info = True
|
|
|
|
def fetch_last_known_offsets(self, partitions=None):
|
|
if self.group is None:
|
|
raise ValueError('KafkaClient.group must not be None')
|
|
|
|
if partitions is None:
|
|
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
|
|
|
responses = self.client.send_offset_fetch_request(
|
|
self.group,
|
|
[OffsetFetchRequest(self.topic, p) for p in partitions],
|
|
fail_on_error=False
|
|
)
|
|
|
|
for resp in responses:
|
|
try:
|
|
check_error(resp)
|
|
# API spec says server wont set an error here
|
|
# but 0.8.1.1 does actually...
|
|
except UnknownTopicOrPartitionError:
|
|
pass
|
|
|
|
# -1 offset signals no commit is currently stored
|
|
if resp.offset == -1:
|
|
self.offsets[resp.partition] = 0
|
|
|
|
# Otherwise we committed the stored offset
|
|
# and need to fetch the next one
|
|
else:
|
|
self.offsets[resp.partition] = resp.offset
|
|
|
|
def commit(self, partitions=None):
|
|
"""Commit stored offsets to Kafka via OffsetCommitRequest (v0)
|
|
|
|
Keyword Arguments:
|
|
partitions (list): list of partitions to commit, default is to commit
|
|
all of them
|
|
|
|
Returns: True on success, False on failure
|
|
"""
|
|
|
|
# short circuit if nothing happened. This check is kept outside
|
|
# to prevent un-necessarily acquiring a lock for checking the state
|
|
if self.count_since_commit == 0:
|
|
return
|
|
|
|
with self.commit_lock:
|
|
# Do this check again, just in case the state has changed
|
|
# during the lock acquiring timeout
|
|
if self.count_since_commit == 0:
|
|
return
|
|
|
|
reqs = []
|
|
if partitions is None: # commit all partitions
|
|
partitions = list(self.offsets.keys())
|
|
|
|
log.debug('Committing new offsets for %s, partitions %s',
|
|
self.topic, partitions)
|
|
for partition in partitions:
|
|
offset = self.offsets[partition]
|
|
log.debug('Commit offset %d in SimpleConsumer: '
|
|
'group=%s, topic=%s, partition=%s',
|
|
offset, self.group, self.topic, partition)
|
|
|
|
reqs.append(OffsetCommitRequest(self.topic, partition,
|
|
offset, None))
|
|
|
|
try:
|
|
self.client.send_offset_commit_request(self.group, reqs)
|
|
except KafkaError as e:
|
|
log.error('%s saving offsets: %s', e.__class__.__name__, e)
|
|
return False
|
|
else:
|
|
self.count_since_commit = 0
|
|
return True
|
|
|
|
def _auto_commit(self):
|
|
"""
|
|
Check if we have to commit based on number of messages and commit
|
|
"""
|
|
|
|
# Check if we are supposed to do an auto-commit
|
|
if not self.auto_commit or self.auto_commit_every_n is None:
|
|
return
|
|
|
|
if self.count_since_commit >= self.auto_commit_every_n:
|
|
self.commit()
|
|
|
|
def stop(self):
|
|
if self.commit_timer is not None:
|
|
self.commit_timer.stop()
|
|
self.commit()
|
|
|
|
if hasattr(self, '_cleanup_func'):
|
|
# Remove cleanup handler now that we've stopped
|
|
|
|
# py3 supports unregistering
|
|
if hasattr(atexit, 'unregister'):
|
|
atexit.unregister(self._cleanup_func) # pylint: disable=no-member
|
|
|
|
# py2 requires removing from private attribute...
|
|
else:
|
|
|
|
# ValueError on list.remove() if the exithandler no longer
|
|
# exists is fine here
|
|
try:
|
|
atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
|
|
except ValueError:
|
|
pass
|
|
|
|
del self._cleanup_func
|
|
|
|
def pending(self, partitions=None):
|
|
"""
|
|
Gets the pending message count
|
|
|
|
Keyword Arguments:
|
|
partitions (list): list of partitions to check for, default is to check all
|
|
"""
|
|
if partitions is None:
|
|
partitions = self.offsets.keys()
|
|
|
|
total = 0
|
|
reqs = []
|
|
|
|
for partition in partitions:
|
|
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
|
|
|
resps = self.client.send_offset_request(reqs)
|
|
for resp in resps:
|
|
partition = resp.partition
|
|
pending = resp.offsets[0]
|
|
offset = self.offsets[partition]
|
|
total += pending - offset
|
|
|
|
return total
|