Merge "Modifying Persister to use common kafka interface"
This commit is contained in:
commit
b2479ae244
@ -35,10 +35,6 @@ import sys
|
||||
import threading
|
||||
|
||||
from influxdb import InfluxDBClient
|
||||
from kafka import KafkaClient
|
||||
from kafka import SimpleConsumer
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.recipe.partitioner import SetPartitioner
|
||||
import pytz
|
||||
from oslo.config import cfg
|
||||
|
||||
@ -46,6 +42,8 @@ from openstack.common import log
|
||||
from openstack.common import service as os_service
|
||||
import service
|
||||
|
||||
from monasca_common.kafka.consumer import KafkaConsumer
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -169,215 +167,62 @@ class AbstractPersister(threading.Thread):
|
||||
|
||||
super(AbstractPersister, self).__init__()
|
||||
|
||||
self._kafka_client = KafkaClient(kafka_conf.uri)
|
||||
self._consumer = (
|
||||
SimpleConsumer(self._kafka_client,
|
||||
kafka_conf.group_id,
|
||||
kafka_conf.topic,
|
||||
# Set to true even though we actually do
|
||||
# the commits manually. Needed to
|
||||
# initialize
|
||||
# offsets correctly.
|
||||
auto_commit=True,
|
||||
# Make these values None so that the
|
||||
# manual commit will do the actual
|
||||
# commit.
|
||||
# Needed so that offsets are initialized
|
||||
# correctly. If not done, then restarts
|
||||
# will reread messages from beginning of
|
||||
# the queue.
|
||||
auto_commit_every_n=None,
|
||||
auto_commit_every_t=None,
|
||||
fetch_size_bytes=kafka_conf.fetch_size_bytes,
|
||||
buffer_size=kafka_conf.buffer_size,
|
||||
max_buffer_size=kafka_conf.max_buffer_size,
|
||||
iter_timeout=1))
|
||||
self._data_points = []
|
||||
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
|
||||
self._database_batch_size = kafka_conf.database_batch_size
|
||||
|
||||
self._consumer = KafkaConsumer(kafka_conf.uri,
|
||||
zookeeper_conf.uri,
|
||||
kafka_conf.zookeeper_path,
|
||||
kafka_conf.group_id,
|
||||
kafka_conf.topic,
|
||||
repartition_callback=self._flush,
|
||||
periodic_callback=self._flush,
|
||||
periodic_callback_rate=kafka_conf.max_wait_time_seconds)
|
||||
|
||||
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
|
||||
influxdb_conf.port,
|
||||
influxdb_conf.user,
|
||||
influxdb_conf.password,
|
||||
influxdb_conf.database_name)
|
||||
|
||||
self._kazoo_client = KazooClient(hosts=zookeeper_conf.uri)
|
||||
self._kazoo_client.start()
|
||||
self._zookeeper_path = kafka_conf.zookeeper_path
|
||||
self._partition_interval_recheck_secs = (
|
||||
zookeeper_conf.partition_interval_recheck_seconds)
|
||||
|
||||
self._max_wait_time_secs = kafka_conf.max_wait_time_seconds
|
||||
self._database_batch_size = kafka_conf.database_batch_size
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
|
||||
self._message_count = 0
|
||||
self._data_points = []
|
||||
self._last_flush = datetime.now()
|
||||
self._last_partition_check = datetime.now()
|
||||
|
||||
@abc.abstractmethod
|
||||
def process_message(self, message):
|
||||
pass
|
||||
|
||||
def _flush(self, partitions):
|
||||
def _flush(self):
|
||||
if not self._data_points:
|
||||
return
|
||||
|
||||
if self._data_points:
|
||||
|
||||
try:
|
||||
|
||||
self._influxdb_client.write_points(self._data_points, 'ms')
|
||||
|
||||
except Exception:
|
||||
|
||||
LOG.exception("Error writing to influxdb: {}"
|
||||
.format(self._data_points))
|
||||
|
||||
raise
|
||||
|
||||
self._consumer.commit(partitions=partitions)
|
||||
try:
|
||||
self._influxdb_client.write_points(self._data_points, 'ms')
|
||||
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
self._message_count, self._kafka_topic))
|
||||
len(self._data_points), self._kafka_topic))
|
||||
|
||||
self._data_points = []
|
||||
self._consumer.commit()
|
||||
except Exception:
|
||||
LOG.exception("Error writing to influxdb: {}"
|
||||
.format(self._data_points))
|
||||
raise
|
||||
|
||||
self._message_count = 0
|
||||
|
||||
self._last_flush = datetime.now()
|
||||
|
||||
def _is_time_for_repartition_check(self):
|
||||
|
||||
delta_partition_check_time = (datetime.now() -
|
||||
self._last_partition_check)
|
||||
|
||||
return delta_partition_check_time.seconds >= (
|
||||
self._partition_interval_recheck_secs)
|
||||
|
||||
def _process_messages(self, partitions):
|
||||
|
||||
while 1:
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
|
||||
return
|
||||
|
||||
delta_flush_time = datetime.now() - self._last_flush
|
||||
|
||||
if delta_flush_time.seconds >= self._max_wait_time_secs:
|
||||
|
||||
self._flush(partitions)
|
||||
|
||||
for message in self._consumer:
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
for raw_message in self._consumer:
|
||||
try:
|
||||
|
||||
message = raw_message[1]
|
||||
data_point = self.process_message(message)
|
||||
|
||||
self._data_points.append(data_point)
|
||||
|
||||
self._message_count += 1
|
||||
|
||||
if self._is_time_for_repartition_check():
|
||||
|
||||
return
|
||||
|
||||
except Exception:
|
||||
|
||||
LOG.exception('Error processing message. Message is '
|
||||
'being dropped. {}'.format(message))
|
||||
|
||||
if self._message_count >= self._database_batch_size:
|
||||
|
||||
self._flush(partitions)
|
||||
|
||||
def _get_set_partitioner(self):
|
||||
"""Partition the set of Kafka topic partitions.
|
||||
|
||||
Acquire a lock on a subset of the Kafka partitions for a topic
|
||||
to allow other instances of the persister to run without reading
|
||||
from the same Kafka partitions for the given topic.
|
||||
"""
|
||||
|
||||
# Refresh the Kafka partitions and their offsets from Kafka to get
|
||||
# a list of all available partitions. The set of available partitions
|
||||
# configured in Kafka should not change.
|
||||
self._consumer.fetch_last_known_offsets()
|
||||
|
||||
# Partition on the partitions.
|
||||
set_partitioner = (
|
||||
SetPartitioner(self._kazoo_client,
|
||||
path=self._zookeeper_path,
|
||||
set=self._consumer.fetch_offsets.keys(),
|
||||
identifier=str(datetime.now())))
|
||||
|
||||
return set_partitioner
|
||||
|
||||
def run(self):
|
||||
|
||||
try:
|
||||
|
||||
set_partitioner = self._get_set_partitioner()
|
||||
|
||||
partitions = []
|
||||
|
||||
while 1:
|
||||
|
||||
if set_partitioner.failed:
|
||||
|
||||
raise Exception("Failed to acquire partition")
|
||||
|
||||
elif set_partitioner.release:
|
||||
|
||||
self._flush(partitions)
|
||||
|
||||
LOG.info("Releasing locks on partition set {} "
|
||||
"for topic {}".format(partitions,
|
||||
self._kafka_topic))
|
||||
set_partitioner.release_set()
|
||||
|
||||
partitions = []
|
||||
|
||||
elif set_partitioner.acquired:
|
||||
|
||||
if not partitions:
|
||||
|
||||
partitions = [p for p in set_partitioner]
|
||||
|
||||
LOG.info("Acquired locks on partition set {} "
|
||||
"for topic {}".format(
|
||||
partitions, self._kafka_topic))
|
||||
|
||||
# Refresh the last known offsets again to make sure
|
||||
# that they are the latest after having acquired the
|
||||
# lock. Updates self._consumer.fetch_offsets.
|
||||
self._consumer.fetch_last_known_offsets()
|
||||
|
||||
# Modify self._consumer.fetch_offsets to hold only the
|
||||
# offsets for the set of Kafka partitions acquired
|
||||
# by this instance of the persister.
|
||||
|
||||
partitioned_fetch_offsets = {}
|
||||
|
||||
for p in partitions:
|
||||
|
||||
partitioned_fetch_offsets[p] = (
|
||||
self._consumer.fetch_offsets[p])
|
||||
|
||||
self._consumer.fetch_offsets = partitioned_fetch_offsets
|
||||
|
||||
self._last_partition_check = datetime.now()
|
||||
|
||||
self._process_messages(partitions)
|
||||
|
||||
elif set_partitioner.allocating:
|
||||
|
||||
LOG.info("Waiting to acquire locks on partition set")
|
||||
|
||||
set_partitioner.wait_for_acquire()
|
||||
|
||||
if len(self._data_points) >= self._database_batch_size:
|
||||
self._flush()
|
||||
except:
|
||||
|
||||
LOG.exception(
|
||||
'Persister encountered fatal exception processing messages. '
|
||||
'Shutting down all threads and exiting')
|
||||
|
@ -1,8 +1,7 @@
|
||||
six==1.9.0
|
||||
babel
|
||||
eventlet
|
||||
influxdb==2.8.0
|
||||
iso8601
|
||||
kafka-python>=0.9.2,<0.9.3
|
||||
kazoo>=2.0
|
||||
oslo.config<2.0
|
||||
|
||||
monasca-common
|
||||
|
Loading…
x
Reference in New Issue
Block a user