Added kafka interaction objects to monasca_common

New consumer object intended to be used by the pieces of Monasca that want to
consume data from kafka.

New producer object that will write to kafka in a performant manner.

Require kazoo
Removed PyYaml requirement

Change-Id: I2eb0c5cd1ed64b83a67912109c4c6de7a1d73722
This commit is contained in:
Joe Keen 2015-10-22 16:14:12 -06:00
parent 8a9d5b2af2
commit 7eec9f1ebc
4 changed files with 285 additions and 3 deletions

@ -0,0 +1,223 @@
# Copyright (c) 2014, 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import threading
import kafka.client
import kafka.common
import kafka.consumer
from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner
log = logging.getLogger(__name__)
"""Kafka consumer interface
Kafka consumer class that will automatically share partitions between processes
using the same zookeeper path.
For performance it is often required that data from the kafka queue be
batched before being processed. There are two important concerns to
keep in mind when dealing with batched data.
1. Negotiating partitions takes a rather long amount of time so when
the negotiation process begins a defined repartition_callback will be
called. This is a good time to process whatever has been batched.
2. If the traffic across the kafka topic is low enough it will take a long
time to build a batch of data. A commit_callback is available that will
fire when the commit_timeout duration has elapsed since the last commit.
"""
class KafkaConsumer(object):
def __init__(self, kafka_url,
zookeeper_url, zookeeper_path,
group, topic,
repartition_callback=None,
commit_callback=None,
commit_timeout=30):
"""Init
kafka_url - Kafka location
zookeeper_url - Zookeeper location
zookeeper_path - Zookeeper path used for partition
negotiation
group - Kafka consumer group
topic - Kafka topic
repartition_callback - Callback to run when the Kafka consumer
group changes. Repartitioning takes a
relatively long time so this is a good
time to flush and commit any data.
commit_callback - Callback to run when the commit_timeout
has elapsed between commits.
commit_timeout - Timeout between commits.
"""
self._kazoo_client = None
self._set_partitioner = None
self._repartition_callback = repartition_callback
self._commit_callback = commit_callback
self._commit_timeout = commit_timeout
self._last_commit = 0
self._partitions = []
self._kafka_topic = topic
self._zookeeper_url = zookeeper_url
self._zookeeper_path = zookeeper_path
self._kafka = kafka.client.KafkaClient(kafka_url)
# No auto-commit so that commits only happen after the message is processed.
self._consumer = kafka.consumer.SimpleConsumer(self._kafka,
group,
self._kafka_topic,
auto_commit=False,
iter_timeout=5,
max_buffer_size=None)
self._consumer.provide_partition_info()
self._consumer.fetch_last_known_offsets()
def __iter__(self):
self._partition()
self._last_commit = datetime.datetime.now()
while 1:
if self._repartition():
if self._repartition_callback:
self._repartition_callback()
self._partition()
# When Kafka resizes the partitions it's possible that it
# will remove data at our current offset. When this
# happens the next attempt to read from Kafka will generate
# an OffsetOutOfRangeError. We trap this error and seek to
# the head of the current Kafka data. Because this error
# only happens when Kafka removes data we're currently
# pointing at we're gauranteed that we won't read any
# duplicate data however we will lose any information
# between our current offset and the new Kafka head.
try:
messages = self._consumer.get_messages(count=1000, timeout=1)
for message in messages:
log.debug("Consuming message from kafka, "
"partition {}, offset {}".
format(message[0], message[1].offset))
yield message
if self._commit_callback:
time_now = datetime.datetime.now()
time_delta = time_now - self._last_commit
if time_delta.total_seconds() > self._commit_timeout:
self._commit_callback()
except kafka.common.OffsetOutOfRangeError:
log.error("Kafka OffsetOutOfRange. Jumping to head.")
self._consumer.seek(0, 0)
def _repartition(self):
return not self._set_partitioner.acquired
def _partition(self):
"""Consume messages from kafka using the Kazoo SetPartitioner to
allow multiple consumer processes to negotiate access to the kafka
partitions
"""
# KazooClient and SetPartitioner objects need to be instantiated after
# the consumer process has forked. Instantiating prior to forking
# gives the appearance that things are working but after forking the
# connection to zookeeper is lost and no state changes are visible
if not self._kazoo_client:
self._kazoo_client = KazooClient(hosts=self._zookeeper_url)
self._kazoo_client.start()
state_change_event = threading.Event()
self._set_partitioner = (
SetPartitioner(self._kazoo_client,
path=self._zookeeper_path,
set=self._consumer.fetch_offsets.keys(),
state_change_event=state_change_event,
identifier=str(datetime.datetime.now())))
try:
while 1:
if self._set_partitioner.failed:
raise Exception("Failed to acquire partition")
elif self._set_partitioner.release:
log.info("Releasing locks on partition set {} "
"for topic {}".format(self._partitions,
self._kafka_topic))
self._set_partitioner.release_set()
self._partitions = []
elif self._set_partitioner.acquired:
if not self._partitions:
self._partitions = [p for p in self._set_partitioner]
if not self._partitions:
log.info("Not assigned any partitions on topic {},"
" waiting for a Partitioner state change"
.format(self._kafka_topic))
state_change_event.wait()
state_change_event.clear()
continue
log.info("Acquired locks on partition set {} "
"for topic {}".format(self._partitions, self._kafka_topic))
# Refresh the last known offsets again to make sure
# that they are the latest after having acquired the
# lock. Updates self._consumer.fetch_offsets.
self._consumer.fetch_last_known_offsets()
# Modify self._consumer.fetch_offsets to hold only the
# offsets for the set of Kafka partitions acquired
# by this instance.
partitioned_fetch_offsets = {}
for p in self._partitions:
partitioned_fetch_offsets[p] = (
self._consumer.fetch_offsets[p])
self._consumer.fetch_offsets = partitioned_fetch_offsets
return
elif self._set_partitioner.allocating:
log.info("Waiting to acquire locks on partition set")
self._set_partitioner.wait_for_acquire()
except Exception:
log.exception('KafkaConsumer encountered fatal exception '
'processing messages.')
raise
def commit(self):
self._last_commit = datetime.datetime.now()
self._consumer.commit()

@ -0,0 +1,60 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kafka.client
import kafka.producer
import logging
import time
log = logging.getLogger(__name__)
class KafkaProducer(object):
"""Adds messages to a kafka topic
"""
def __init__(self, url):
"""Init
url - kafka connection details
"""
self._kafka = kafka.client.KafkaClient(url)
self._producer = kafka.producer.KeyedProducer(
self._kafka,
async=False,
req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
def publish(self, topic, messages):
"""Takes messages and puts them on the supplied kafka topic
"""
# Using a key producer to make sure we can distribute messages evenly
# across all partitions. In the kafka-python library, as of version
# 0.9.2, it doesn't support sending message batches for keyed
# producers. Batching writes to kafka is important for performance so
# we have to work around this limitation. Using the _next_partition
# function allows us to get proper distribution and the speed of the
# send_messages function.
if not isinstance(messages, list):
messages = [messages]
try:
key = time.time() * 1000
partition = self._producer._next_partition(topic, key)
self._producer.send_messages(topic, partition, *messages)
except Exception:
log.exception('Error publishing to {} topic.'.format(topic))
raise

@ -1,8 +1,7 @@
iso8601
six>=1.7.0
kazoo>=2.0
kafka-python>=0.9.1,<0.9.3
PyYAML
MySQL-python>=1.2.3
oslo.config>=1.2.1
oslo.utils
pbr>=0.11,<2.0
six>=1.7.0