
This producer singleton works only if one transport use kafka. If multiple transports with kafka are used, their overrides each other the producer on each send. This change creates only one producer per connection. A later optimisation can be one producer per driver instance. Change-Id: I429f7c5efcb41690dd1b17f856bda2425c788e53
400 lines
14 KiB
Python
400 lines
14 KiB
Python
# Copyright (C) 2015 Cisco Systems, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Following code fixes 2 issues with kafka-python and
|
|
# The current release of eventlet (0.19.0) does not actually remove
|
|
# select.poll [1]. Because of kafka-python.selectors34 selects
|
|
# PollSelector instead of SelectSelector [2]. PollSelector relies on
|
|
# select.poll, which does not work when eventlet/greenlet is used. This
|
|
# bug in evenlet is fixed in the master branch [3], but there's no
|
|
# release of eventlet that includes this fix at this point.
|
|
|
|
import json
|
|
import threading
|
|
|
|
import kafka
|
|
from kafka.client_async import selectors
|
|
import kafka.errors
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import eventletutils
|
|
import tenacity
|
|
|
|
from oslo_messaging._drivers import base
|
|
from oslo_messaging._drivers import common as driver_common
|
|
from oslo_messaging._drivers import kafka_options
|
|
from oslo_messaging._drivers import pool as driver_pool
|
|
from oslo_messaging._i18n import _LE
|
|
from oslo_messaging._i18n import _LW
|
|
from oslo_serialization import jsonutils
|
|
|
|
if eventletutils.is_monkey_patched('select'):
|
|
# monkeypatch the vendored SelectSelector._select like eventlet does
|
|
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
|
from eventlet.green import select
|
|
selectors.SelectSelector._select = staticmethod(select.select)
|
|
|
|
# Force to use the select selectors
|
|
KAFKA_SELECTOR = selectors.SelectSelector
|
|
else:
|
|
KAFKA_SELECTOR = selectors.DefaultSelector
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def unpack_message(msg):
|
|
context = {}
|
|
message = None
|
|
try:
|
|
if msg:
|
|
msg = json.loads(msg)
|
|
message = driver_common.deserialize_msg(msg)
|
|
if 'context' in message:
|
|
context = message['context']
|
|
del message['context']
|
|
except ValueError as e:
|
|
LOG.info("Invalid format of consumed message: %s" % e)
|
|
except Exception:
|
|
LOG.warning(_LW("Exception during message unpacking"))
|
|
return message, context
|
|
|
|
|
|
def pack_message(ctxt, msg):
|
|
"""Pack context into msg."""
|
|
|
|
if isinstance(ctxt, dict):
|
|
context_d = ctxt
|
|
else:
|
|
context_d = ctxt.to_dict()
|
|
msg['context'] = context_d
|
|
|
|
msg = driver_common.serialize_msg(msg)
|
|
|
|
return msg
|
|
|
|
|
|
def target_to_topic(target, priority=None):
|
|
"""Convert target into topic string
|
|
|
|
:param target: Message destination target
|
|
:type target: oslo_messaging.Target
|
|
:param priority: Notification priority
|
|
:type priority: string
|
|
"""
|
|
if not priority:
|
|
return target.topic
|
|
return target.topic + '.' + priority
|
|
|
|
|
|
def retry_on_retriable_kafka_error(exc):
|
|
return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable)
|
|
|
|
|
|
def with_reconnect(retries=None):
|
|
def decorator(func):
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error),
|
|
wait=tenacity.wait_fixed(1),
|
|
stop=tenacity.stop_after_attempt(retries),
|
|
reraise=True
|
|
)
|
|
def wrapper(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class Connection(object):
|
|
|
|
def __init__(self, conf, url, purpose):
|
|
|
|
self.client = None
|
|
driver_conf = conf.oslo_messaging_kafka
|
|
self.batch_size = driver_conf.producer_batch_size
|
|
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
|
self.conf = conf
|
|
self.producer = None
|
|
self.producer_lock = threading.Lock()
|
|
self.consumer = None
|
|
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
|
|
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
|
self.group_id = driver_conf.consumer_group
|
|
self.url = url
|
|
self._parse_url()
|
|
# TODO(Support for manual/auto_commit functionality)
|
|
# When auto_commit is False, consumer can manually notify
|
|
# the completion of the subscription.
|
|
# Currently we don't support for non auto commit option
|
|
self.auto_commit = True
|
|
self._consume_loop_stopped = False
|
|
|
|
def _parse_url(self):
|
|
driver_conf = self.conf.oslo_messaging_kafka
|
|
self.hostaddrs = []
|
|
|
|
for host in self.url.hosts:
|
|
if host.hostname:
|
|
self.hostaddrs.append("%s:%s" % (
|
|
host.hostname,
|
|
host.port or driver_conf.kafka_default_port))
|
|
|
|
if not self.hostaddrs:
|
|
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
|
|
driver_conf.kafka_default_port))
|
|
|
|
def notify_send(self, topic, ctxt, msg, retry):
|
|
"""Send messages to Kafka broker.
|
|
|
|
:param topic: String of the topic
|
|
:param ctxt: context for the messages
|
|
:param msg: messages for publishing
|
|
:param retry: the number of retry
|
|
"""
|
|
retry = retry if retry >= 0 else None
|
|
message = pack_message(ctxt, msg)
|
|
message = jsonutils.dumps(message)
|
|
|
|
@with_reconnect(retries=retry)
|
|
def wrapped_with_reconnect():
|
|
self._ensure_producer()
|
|
# NOTE(sileht): This returns a future, we can use get()
|
|
# if we want to block like other driver
|
|
self.producer.send(topic, message)
|
|
|
|
try:
|
|
wrapped_with_reconnect()
|
|
except Exception:
|
|
# NOTE(sileht): if something goes wrong close the producer
|
|
# connection
|
|
self._close_producer()
|
|
raise
|
|
|
|
@with_reconnect()
|
|
def _poll_messages(self, timeout):
|
|
return self.consumer.poll(timeout)
|
|
|
|
def consume(self, timeout=None):
|
|
"""Receive up to 'max_fetch_messages' messages.
|
|
|
|
:param timeout: poll timeout in seconds
|
|
"""
|
|
if self._consume_loop_stopped:
|
|
return None
|
|
|
|
timeout = timeout if timeout >= 0 else self.consumer_timeout
|
|
try:
|
|
messages = self._poll_messages(timeout)
|
|
except kafka.errors.ConsumerTimeout as e:
|
|
raise driver_common.Timeout(e.message)
|
|
except Exception:
|
|
LOG.exception(_LE("Failed to consume messages"))
|
|
messages = None
|
|
return messages
|
|
|
|
def stop_consuming(self):
|
|
self._consume_loop_stopped = True
|
|
|
|
def reset(self):
|
|
"""Reset a connection so it can be used again."""
|
|
pass
|
|
|
|
def close(self):
|
|
self._close_producer()
|
|
if self.consumer:
|
|
self.consumer.close()
|
|
self.consumer = None
|
|
|
|
def commit(self):
|
|
"""Commit is used by subscribers belonging to the same group.
|
|
After subscribing messages, commit is called to prevent
|
|
the other subscribers which belong to the same group
|
|
from re-subscribing the same messages.
|
|
|
|
Currently self.auto_commit option is always True,
|
|
so we don't need to call this function.
|
|
"""
|
|
self.consumer.commit()
|
|
|
|
def _close_producer(self):
|
|
with self.producer_lock:
|
|
if self.producer:
|
|
self.producer.close()
|
|
self.producer = None
|
|
|
|
def _ensure_producer(self):
|
|
if self.producer:
|
|
return
|
|
with self.producer_lock:
|
|
if self.producer:
|
|
return
|
|
self.producer = kafka.KafkaProducer(
|
|
bootstrap_servers=self.hostaddrs,
|
|
linger_ms=self.linger_ms,
|
|
batch_size=self.batch_size,
|
|
selector=KAFKA_SELECTOR)
|
|
|
|
@with_reconnect()
|
|
def declare_topic_consumer(self, topics, group=None):
|
|
self.consumer = kafka.KafkaConsumer(
|
|
*topics, group_id=(group or self.group_id),
|
|
bootstrap_servers=self.hostaddrs,
|
|
max_partition_fetch_bytes=self.max_fetch_bytes,
|
|
selector=KAFKA_SELECTOR
|
|
)
|
|
|
|
|
|
class OsloKafkaMessage(base.RpcIncomingMessage):
|
|
|
|
def __init__(self, ctxt, message):
|
|
super(OsloKafkaMessage, self).__init__(ctxt, message)
|
|
|
|
def requeue(self):
|
|
LOG.warning(_LW("requeue is not supported"))
|
|
|
|
def reply(self, reply=None, failure=None):
|
|
LOG.warning(_LW("reply is not supported"))
|
|
|
|
|
|
class KafkaListener(base.PollStyleListener):
|
|
|
|
def __init__(self, conn):
|
|
super(KafkaListener, self).__init__()
|
|
self._stopped = threading.Event()
|
|
self.conn = conn
|
|
self.incoming_queue = []
|
|
|
|
@base.batch_poll_helper
|
|
def poll(self, timeout=None):
|
|
# TODO(sileht): use batch capability of kafka
|
|
while not self._stopped.is_set():
|
|
if self.incoming_queue:
|
|
return self.incoming_queue.pop(0)
|
|
try:
|
|
messages = self.conn.consume(timeout=timeout)
|
|
if messages:
|
|
self._put_messages_to_queue(messages)
|
|
except driver_common.Timeout:
|
|
return None
|
|
|
|
def _put_messages_to_queue(self, messages):
|
|
for topic, records in messages.items():
|
|
if records:
|
|
for record in records:
|
|
message, context = unpack_message(record.value)
|
|
if message:
|
|
self.incoming_queue.append(
|
|
OsloKafkaMessage(ctxt=context, message=message))
|
|
|
|
def stop(self):
|
|
self._stopped.set()
|
|
self.conn.stop_consuming()
|
|
|
|
def cleanup(self):
|
|
self.conn.close()
|
|
|
|
def commit(self):
|
|
# TODO(Support for manually/auto commit functionality)
|
|
# It's better to allow users to commit manually and support for
|
|
# self.auto_commit = False option. For now, this commit function
|
|
# is meaningless since user couldn't call this function and
|
|
# auto_commit option is always True.
|
|
self.conn.commit()
|
|
|
|
|
|
class KafkaDriver(base.BaseDriver):
|
|
"""Note: Current implementation of this driver is experimental.
|
|
We will have functional and/or integrated testing enabled for this driver.
|
|
"""
|
|
|
|
def __init__(self, conf, url, default_exchange=None,
|
|
allowed_remote_exmods=None):
|
|
|
|
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
|
title='Kafka driver options')
|
|
conf.register_group(opt_group)
|
|
conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group)
|
|
|
|
super(KafkaDriver, self).__init__(
|
|
conf, url, default_exchange, allowed_remote_exmods)
|
|
|
|
# the pool configuration properties
|
|
max_size = self.conf.oslo_messaging_kafka.pool_size
|
|
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
|
ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
|
|
|
|
self.connection_pool = driver_pool.ConnectionPool(
|
|
self.conf, max_size, min_size, ttl,
|
|
self._url, Connection)
|
|
self.listeners = []
|
|
|
|
def cleanup(self):
|
|
for c in self.listeners:
|
|
c.close()
|
|
self.listeners = []
|
|
|
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
|
retry=None):
|
|
raise NotImplementedError(
|
|
'The RPC implementation for Kafka is not implemented')
|
|
|
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
|
"""Send notification to Kafka brokers
|
|
|
|
:param target: Message destination target
|
|
:type target: oslo_messaging.Target
|
|
:param ctxt: Message context
|
|
:type ctxt: dict
|
|
:param message: Message payload to pass
|
|
:type message: dict
|
|
:param version: Messaging API version (currently not used)
|
|
:type version: str
|
|
:param retry: an optional default kafka consumer retries configuration
|
|
None means to retry forever
|
|
0 means no retry
|
|
N means N retries
|
|
:type retry: int
|
|
"""
|
|
with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
|
|
conn.notify_send(target_to_topic(target), ctxt, message, retry)
|
|
|
|
def listen(self, target, batch_size, batch_timeout):
|
|
raise NotImplementedError(
|
|
'The RPC implementation for Kafka is not implemented')
|
|
|
|
def listen_for_notifications(self, targets_and_priorities, pool,
|
|
batch_size, batch_timeout):
|
|
"""Listen to a specified list of targets on Kafka brokers
|
|
|
|
:param targets_and_priorities: List of pairs (target, priority)
|
|
priority is not used for kafka driver
|
|
target.exchange_target.topic is used as
|
|
a kafka topic
|
|
:type targets_and_priorities: list
|
|
:param pool: consumer group of Kafka consumers
|
|
:type pool: string
|
|
"""
|
|
conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
|
|
topics = set()
|
|
for target, priority in targets_and_priorities:
|
|
topics.add(target_to_topic(target, priority))
|
|
|
|
conn.declare_topic_consumer(topics, pool)
|
|
|
|
listener = KafkaListener(conn)
|
|
return base.PollStyleListenerAdapter(listener, batch_size,
|
|
batch_timeout)
|
|
|
|
def _get_connection(self, purpose):
|
|
return driver_common.ConnectionContext(self.connection_pool, purpose)
|