diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 6b037a55b..deb5da0f3 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -162,7 +162,8 @@ class Connection(object): self._ensure_producer() # NOTE(sileht): This returns a future, we can use get() # if we want to block like other driver - self.producer.send(topic, message) + future = self.producer.send(topic, message) + future.get() try: wrapped_with_reconnect() diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 0d90faf66..e7dc11599 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -31,7 +31,7 @@ class TestKafkaDriverLoad(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'kafka' def test_driver_load(self): - transport = oslo_messaging.get_transport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf) self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver) @@ -57,7 +57,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'kafka' def test_transport_url(self): - transport = oslo_messaging.get_transport(self.conf, self.url) + transport = oslo_messaging.get_notification_transport(self.conf, + self.url) self.addCleanup(transport.cleanup) driver = transport._driver @@ -72,7 +73,7 @@ class TestKafkaDriver(test_utils.BaseTestCase): def setUp(self): super(TestKafkaDriver, self).setUp() self.messaging_conf.transport_driver = 'kafka' - transport = oslo_messaging.get_transport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf) self.driver = transport._driver def test_send(self): @@ -125,7 +126,7 @@ class TestKafkaConnection(test_utils.BaseTestCase): def setUp(self): super(TestKafkaConnection, self).setUp() self.messaging_conf.transport_driver = 'kafka' - transport = oslo_messaging.get_transport(self.conf) + transport = oslo_messaging.get_notification_transport(self.conf) self.driver = transport._driver def test_notify(self): @@ -135,4 +136,4 @@ class TestKafkaConnection(test_utils.BaseTestCase): fake_producer = fake_producer_class.return_value conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, {"fake_text": "fake_message_1"}, 10) - self.assertEqual(1, len(fake_producer.send.mock_calls)) + self.assertEqual(2, len(fake_producer.send.mock_calls)) diff --git a/oslo_messaging/tests/functional/notify/test_logger.py b/oslo_messaging/tests/functional/notify/test_logger.py index 56c0859f8..feb1ee01d 100644 --- a/oslo_messaging/tests/functional/notify/test_logger.py +++ b/oslo_messaging/tests/functional/notify/test_logger.py @@ -13,6 +13,7 @@ # under the License. import logging +import uuid import testscenarios @@ -51,6 +52,10 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): # NOTE(gtt): Using different topic to make tests run in parallel topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver) + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', str(uuid.uuid4()), + group='oslo_messaging_kafka') + self.config(driver=[self.driver], topics=[topic], group='oslo_messaging_notifications') @@ -67,7 +72,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL): log_method = getattr(log, self.priority) log_method('Test logging at priority: %s' % self.priority) - events = listener.get_events(timeout=1) + events = listener.get_events(timeout=5) self.assertEqual(1, len(events)) info_event = events[0] diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index becf067fb..42372a61f 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -292,6 +292,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # to be run in parallel def test_simple(self): + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', 'test_simple', + group='oslo_messaging_kafka') + listener = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test_simple'])) notifier = listener.notifier('abc') @@ -304,6 +308,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual('abc', event[3]) def test_multiple_topics(self): + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', 'test_multiple_topics', + group='oslo_messaging_kafka') + listener = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['a', 'b'])) a = listener.notifier('pub-a', topics=['a']) @@ -356,8 +364,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertThat(len(stream), matchers.GreaterThan(0)) def test_independent_topics(self): + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', + 'test_independent_topics_a', + group='oslo_messaging_kafka') listener_a = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['1'])) + + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', + 'test_independent_topics_b', + group='oslo_messaging_kafka') listener_b = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['2'])) @@ -383,6 +400,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): check_received(listener_b, "pub-2", b_out) def test_all_categories(self): + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', 'test_all_categories', + group='oslo_messaging_kafka') + listener = self.useFixture(utils.NotificationFixture( self.conf, self.url, ['test_all_categories'])) n = listener.notifier('abc') @@ -411,6 +432,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # end-to-end acknowledgement with router intermediary # sender pends until batch_size or timeout reached self.skipTest("qdrouterd backend") + if self.url.startswith("kafka://"): + self.conf.set_override('consumer_group', 'test_simple_batch', + group='oslo_messaging_kafka') listener = self.useFixture( utils.BatchNotificationFixture(self.conf, self.url, diff --git a/setup-test-env-kafka.sh b/setup-test-env-kafka.sh index 3b385b7ce..2db436353 100755 --- a/setup-test-env-kafka.sh +++ b/setup-test-env-kafka.sh @@ -3,15 +3,16 @@ set -e . tools/functions.sh +SCALA_VERSION=${SCALA_VERSION:-"2.12"} +KAFKA_VERSION=${KAFKA_VERSION:-"1.0.0"} + if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) trap "clean_exit $DATADIR" EXIT - SCALA_VERSION="2.11" - KAFKA_VERSION="0.10.1.0" tarball=kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz - wget http://apache.crihan.fr/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball + wget http://www.apache.org/dist/kafka/${KAFKA_VERSION}/$tarball -O $DATADIR/$tarball tar -xzf $DATADIR/$tarball -C $DATADIR export PATH=$DATADIR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}/bin:$PATH fi diff --git a/tools/functions.sh b/tools/functions.sh index 658148703..0986ffc3d 100644 --- a/tools/functions.sh +++ b/tools/functions.sh @@ -11,7 +11,10 @@ wait_for_line () { function clean_exit(){ local error_code="$?" - kill -9 $(jobs -p) + for job in `jobs -p` + do + kill -9 $job + done rm -rf "$1" return $error_code } diff --git a/tox.ini b/tox.ini index 877aec68b..4609bb326 100644 --- a/tox.ini +++ b/tox.ini @@ -52,9 +52,8 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args= [testenv:py27-func-kafka] setenv = {[testenv]setenv} - TRANSPORT_DRIVER=kafka + TRANSPORT_URL=kafka://127.0.0.1:9092// OS_GROUP_REGEX=oslo_messaging.tests.functional - kafka-python>=1.3.1 commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' [testenv:py27-func-amqp1]