diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 59ddecae..16760b66 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -35,10 +35,6 @@ import sys import threading from influxdb import InfluxDBClient -from kafka import KafkaClient -from kafka import SimpleConsumer -from kazoo.client import KazooClient -from kazoo.recipe.partitioner import SetPartitioner import pytz from oslo.config import cfg @@ -46,6 +42,8 @@ from openstack.common import log from openstack.common import service as os_service import service +from monasca_common.kafka.consumer import KafkaConsumer + LOG = log.getLogger(__name__) @@ -169,215 +167,62 @@ class AbstractPersister(threading.Thread): super(AbstractPersister, self).__init__() - self._kafka_client = KafkaClient(kafka_conf.uri) - self._consumer = ( - SimpleConsumer(self._kafka_client, - kafka_conf.group_id, - kafka_conf.topic, - # Set to true even though we actually do - # the commits manually. Needed to - # initialize - # offsets correctly. - auto_commit=True, - # Make these values None so that the - # manual commit will do the actual - # commit. - # Needed so that offsets are initialized - # correctly. If not done, then restarts - # will reread messages from beginning of - # the queue. - auto_commit_every_n=None, - auto_commit_every_t=None, - fetch_size_bytes=kafka_conf.fetch_size_bytes, - buffer_size=kafka_conf.buffer_size, - max_buffer_size=kafka_conf.max_buffer_size, - iter_timeout=1)) + self._data_points = [] self._kafka_topic = kafka_conf.topic + self._database_batch_size = kafka_conf.database_batch_size + + self._consumer = KafkaConsumer(kafka_conf.uri, + zookeeper_conf.uri, + kafka_conf.zookeeper_path, + kafka_conf.group_id, + kafka_conf.topic, + repartition_callback=self._flush, + periodic_callback=self._flush, + periodic_callback_rate=kafka_conf.max_wait_time_seconds) + self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address, influxdb_conf.port, influxdb_conf.user, influxdb_conf.password, influxdb_conf.database_name) - self._kazoo_client = KazooClient(hosts=zookeeper_conf.uri) - self._kazoo_client.start() - self._zookeeper_path = kafka_conf.zookeeper_path - self._partition_interval_recheck_secs = ( - zookeeper_conf.partition_interval_recheck_seconds) - - self._max_wait_time_secs = kafka_conf.max_wait_time_seconds - self._database_batch_size = kafka_conf.database_batch_size - self._kafka_topic = kafka_conf.topic - - self._message_count = 0 - self._data_points = [] - self._last_flush = datetime.now() - self._last_partition_check = datetime.now() - @abc.abstractmethod def process_message(self, message): pass - def _flush(self, partitions): + def _flush(self): + if not self._data_points: + return - if self._data_points: - - try: - - self._influxdb_client.write_points(self._data_points, 'ms') - - except Exception: - - LOG.exception("Error writing to influxdb: {}" - .format(self._data_points)) - - raise - - self._consumer.commit(partitions=partitions) + try: + self._influxdb_client.write_points(self._data_points, 'ms') LOG.info("Processed {} messages from topic '{}'".format( - self._message_count, self._kafka_topic)) + len(self._data_points), self._kafka_topic)) self._data_points = [] + self._consumer.commit() + except Exception: + LOG.exception("Error writing to influxdb: {}" + .format(self._data_points)) + raise - self._message_count = 0 - - self._last_flush = datetime.now() - - def _is_time_for_repartition_check(self): - - delta_partition_check_time = (datetime.now() - - self._last_partition_check) - - return delta_partition_check_time.seconds >= ( - self._partition_interval_recheck_secs) - - def _process_messages(self, partitions): - - while 1: - - if self._is_time_for_repartition_check(): - - return - - delta_flush_time = datetime.now() - self._last_flush - - if delta_flush_time.seconds >= self._max_wait_time_secs: - - self._flush(partitions) - - for message in self._consumer: - + def run(self): + try: + for raw_message in self._consumer: try: - + message = raw_message[1] data_point = self.process_message(message) - self._data_points.append(data_point) - - self._message_count += 1 - - if self._is_time_for_repartition_check(): - - return - except Exception: - LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if self._message_count >= self._database_batch_size: - - self._flush(partitions) - - def _get_set_partitioner(self): - """Partition the set of Kafka topic partitions. - - Acquire a lock on a subset of the Kafka partitions for a topic - to allow other instances of the persister to run without reading - from the same Kafka partitions for the given topic. - """ - - # Refresh the Kafka partitions and their offsets from Kafka to get - # a list of all available partitions. The set of available partitions - # configured in Kafka should not change. - self._consumer.fetch_last_known_offsets() - - # Partition on the partitions. - set_partitioner = ( - SetPartitioner(self._kazoo_client, - path=self._zookeeper_path, - set=self._consumer.fetch_offsets.keys(), - identifier=str(datetime.now()))) - - return set_partitioner - - def run(self): - - try: - - set_partitioner = self._get_set_partitioner() - - partitions = [] - - while 1: - - if set_partitioner.failed: - - raise Exception("Failed to acquire partition") - - elif set_partitioner.release: - - self._flush(partitions) - - LOG.info("Releasing locks on partition set {} " - "for topic {}".format(partitions, - self._kafka_topic)) - set_partitioner.release_set() - - partitions = [] - - elif set_partitioner.acquired: - - if not partitions: - - partitions = [p for p in set_partitioner] - - LOG.info("Acquired locks on partition set {} " - "for topic {}".format( - partitions, self._kafka_topic)) - - # Refresh the last known offsets again to make sure - # that they are the latest after having acquired the - # lock. Updates self._consumer.fetch_offsets. - self._consumer.fetch_last_known_offsets() - - # Modify self._consumer.fetch_offsets to hold only the - # offsets for the set of Kafka partitions acquired - # by this instance of the persister. - - partitioned_fetch_offsets = {} - - for p in partitions: - - partitioned_fetch_offsets[p] = ( - self._consumer.fetch_offsets[p]) - - self._consumer.fetch_offsets = partitioned_fetch_offsets - - self._last_partition_check = datetime.now() - - self._process_messages(partitions) - - elif set_partitioner.allocating: - - LOG.info("Waiting to acquire locks on partition set") - - set_partitioner.wait_for_acquire() - + if len(self._data_points) >= self._database_batch_size: + self._flush() except: - LOG.exception( 'Persister encountered fatal exception processing messages. ' 'Shutting down all threads and exiting') diff --git a/requirements.txt b/requirements.txt index 0be77169..74d3449d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,7 @@ +six==1.9.0 babel eventlet influxdb==2.8.0 iso8601 -kafka-python>=0.9.2,<0.9.3 -kazoo>=2.0 oslo.config<2.0 - +monasca-common