61 Commits

Author SHA1 Message Date
Takashi Kajinami
34fd61bdc2 Run pyupgrade to clean up Python 2 syntaxes
Update all .py source files by
 $ pyupgrade --py3-only $(git ls-files | grep ".py$")
to modernize the code according to Python 3 syntaxes.

pep8 errors are fixed by
 $ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
    --in-place oslo_messaging
and a few manual adjustments.

Also add the pyupgrade hook to pre-commit to avoid merging additional
Python 2 syntaxes.

Change-Id: I8115b7f8c5d27ce935e4422c351add4bb72e354f
2024-10-21 19:58:14 +09:00
Takashi Kajinami
3ed01f4845 Use oslo.utils implementation to parse/escape server address
oslo.utils have provided a few useful utility methods to parse or
escape server addresses. Replace the own logic by these methods to
reduce amount of own logic.

Change-Id: I7807314e891e202e47254bf7f98aca4c99005079
2024-10-06 01:11:08 +09:00
Takashi Kajinami
b0e28a1603 kafka: Fix invalid hostaddr format for IPv6 address
When IPv6 address is used for host, the hostaddr should be formatted
in [<address>]:<port> format instead of <address>:<port> format. This
ensures the correct format is used.

Closes-Bug: 1907702
Change-Id: I6f4a453a69e942d5b2d66ffeca6960b85c8bc721
2024-02-20 19:09:56 +09:00
Guillaume Espanel
43f2224aac Remove logging from ProducerConnection._produce_message
In impl_kafka, _produce_message is run in a tpool.execute
context but it was also calling logging functions.
This could cause subsequent calls to logging functions to
deadlock.

This patch moves the logging calls out of the tpool.execute scope.

Change-Id: I81167eea0a6b1a43a88baa3bc383af684f4b1345
Closes-bug: #1981093
2022-08-03 17:35:16 +02:00
Michal Arbet
5a43d4548a Add support for kafka SSL autentication
Change-Id: Idef066a2e3b4923789a6b081d5442e931aba4507
2020-01-16 23:26:53 +01:00
John Eckersberg
d873c0d8f5 Do not use threading.Event
Waiting on a threading.Event with eventlet can cause busy looping via
epoll_wait, see related bug for more details.

Change-Id: I007613058a2d21d1712c02fa6d1602b63705c1ab
Related-bug: #1518430
2019-12-18 13:11:41 +00:00
Zuul
dfc8fe9a8f Merge "Support kafka message compression" 2019-08-05 15:53:35 +00:00
Kenneth Giusti
73c0c0071e Add the "transport_options" parameter to the amqp1 and kafka drivers.
Change-Id: I5ca6ec2cb30b8d7f18e1770f80024957bc029bf3
2019-06-20 16:12:39 -04:00
zhang-shaoman
9a752862e2 Support kafka message compression
When the message is large, in order to improve the efficiency of
kafka, we need to compress the message before send it, so we need to
support kafka message compression.

Change-Id: I9e86d43ad934c1f82dc3dcf93d317538f9d2568e
Implements: blueprint support-kafka-compression
2019-06-20 11:18:35 +08:00
Andy Smith
0953fa1759 Kafka driver deployment guide
Depends-On: Idfb9fe3700d882c8285c6dc56b0620951178eba2
Change-Id: If8370c0c83312d675bde837f768ae40ec3603972
2019-01-23 08:27:43 -05:00
Andy Smith
5a842ae155 Switch driver to confluent-kafka client library
This patch switches the kafka python client from kafka-python to
confluent-kafka due to documented threading issues with the
kafka-python consumer and the recommendation to use multiplrocessing.
The confluent-kafka client leverages the high performance librdkafka
C client and is safe for multiple thread use.

This patch:
* switches to confluent-kafka library
* revises consumer and producer message operations
* utilizes event.tpool method for confluent-kafka blocking calls
* updates unit tests
* adds kafka specific timeouts for functional tests
* adds release note

Depends-On: Ice374dca539b8ed1b1965b75379bad5140121483
Change-Id: Idfb9fe3700d882c8285c6dc56b0620951178eba2
2018-12-04 11:25:07 -05:00
Andy Smith
252844879d Don't use monotonic with Python >=3.3
A change to the global-requirements has limited use of monotonic
library to Python versions earlier than 3.3 (later versions have
built-in support for a monotonic clock), so limit it in
requirements.txt.

Note: this patch updates kafka driver (due to deprecated exception
in library) in order to pass unit tests

Change-Id: Id6b0814e05a0e548a8c2a5359daf1a6878cf6859
2018-12-03 14:52:49 -05:00
Steve Kowalik
4cc1264a18 Remove default_{host,port} deprecated options
The Kafka driver deprecated the kafka_default_host and
kafka_default_port options in 5.10.0, released in Ocata. Remove them.

Change-Id: I206e68ec1624bb6d5d6ba320572530352bbd4378
2018-09-05 14:44:54 +10:00
Dan Smith
b34ab8b1cc [rabbitmq] Implement active call monitoring
This adds an optional call_monitor_timeout parameter to the RPC client,
which if specified, will enable heartbeating of long-running calls by
the server. This enables the user to increase the regular timeout to
a much larger value, allowing calls to take a very long time, but
with heartbeating to indicate that they are still running on the server
side. If the server stops heartbeating, then the call_monitor_timeout
takes over and we fail with the usual MessagingTimeout instead of waiting
for the longer overall timeout to expire.

Change-Id: I60334aaf019f177a984583528b71d00859d31f84
2018-05-09 10:08:37 -07:00
Zuul
4d03b16334 Merge "Add heartbeat() method to RpcIncomingMessage" 2018-05-01 16:49:07 +00:00
Dan Smith
930e6189e2 Add heartbeat() method to RpcIncomingMessage
This adds a heartbeat() method to RpcIncomingMessage to be used by a
subsequent patch implementation of active-call heartbeating. This is
unimplemented in all drivers for the moment.

Change-Id: If8ab0dc16e3bef69d5a826c31c0fe35e403ac6a1
2018-04-24 07:35:56 -07:00
Andrew Smith
5f4755b92b Add kafka for python 3 functional test
Change-Id: I743cd09e3450fac215ff65db37c3fe53e2e43601
2018-04-17 14:05:20 -04:00
Andrew Smith
90f7610f9d Add kafka config options for security (ssl/sasl)
Change-Id: Ia1be1b67a7151d449185e2ad52eff1787e8b0933
2018-02-13 09:17:06 -05:00
Andrew Smith
d09bf21897 Add support for synchronous commit
This patch changes the default driver behavior to synchronously
commit messages following consumer poll. A configuration option
will enable the auto commit for asynchronous commit if desired.

Depends-On: I5b4f01c928373cac530aa6877a34c684577bc64e
Change-Id: I92a3dc95c5d424aa722138195fef5a855a66b31d
2018-01-18 14:57:54 -05:00
Andrew Smith
1ccdccddaa Add kafka driver vhost emulation
Emulate vhost support by adding the virtual host name to the
topic created on the kafka server. Also, update connection
management for producer/consumer.

This patch:
* updates target to topic generation
* add consumer and producer connection classes
* remove connection pool
* update driver test

Change-Id: Idd164444c04e9f465a43ee909af840a41bb090c0
2017-12-27 12:57:39 -05:00
Andrew Smith
3afc3a0a1d Update kafka functional test
This patch addresses a number of issues that prevented the functional
tests from running. The functional tests now execute and can complete
succesfully. At times, the test will fail (noticiably in CI) indicating
an underlying issue with consumer interaction with the kafka server.

It would be beneficial to merge this patch as it provides repeatability
and visibility for driver-kafka server integration to facilitate
additional debugging and testing.

This patch:

* removes use of deprecated get_transport
* override consumer_group for each test
* changed to synchronous send
* update to kafka 1.0.0 server

Depends-On: Ib552152e841a9fc0bffdcb7c3f7bc75613d0ed62
Change-Id: I7009a3b96ee250c177c10f5121eb73d908747a52
2017-12-16 14:41:03 -05:00
Andrew Smith
eccdea5ceb Add kafka_driver directory
Make uniform with other drivers, kafka driver files, etc.

Change-Id: I6c6e201c304a6005ef191f96e5ac39ffaf4ab8f7
2017-06-13 08:46:50 -04:00
Mehdi Abaakouk
58b026a2aa drivers: use common.ConfigOptsProxy everywhere
ConfigOptsProxy have been implemented only pika driver while
the oslo.messaging allow to pass the query string for all drivers.

This change fixes that.
Closes-Bug: #1666903
Closes-Bug: #1607889

Next step is to validate the query with ConfigOptsProxy, to
raise appropriate exception in case of mis-configuration.

Change-Id: I573334e774ccf33ecd27a85067045f3c6489ee89
2017-02-27 13:10:31 +01:00
Pierre Riteau
aab5ea2dfd Fix type of the kafka_consumer_timeout option
Change-Id: Ibe7a72cef031e84221522183521f65c238a2d8e5
2017-02-13 09:45:02 +00:00
Mehdi Abaakouk
578a186515 kafka: ensure topics are created
Change-Id: I3fb5ee1c134ca6ea220b204af8f6325eeadf9465
2017-01-26 08:00:40 +01:00
Mehdi Abaakouk
c7cdf2d9b7 kafka: fix python3 exception
Change-Id: I7244515f274719eb8e9189677554637e20917274
2017-01-26 08:00:40 +01:00
Mehdi Abaakouk
1ee3d7001a kafka: remove no really implemented feature
Change-Id: I6f2693c48d5d0ac1af68b3d4bb5ff361facef977
2017-01-02 11:46:48 +01:00
Mehdi Abaakouk
488594936a kafka: return to poller when timeout is reach
consume() must return only if user timeout is reached and not
when driver consumer_timeout is reached.

Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a
2017-01-02 11:46:48 +01:00
Mehdi Abaakouk
a7044799ad kafka: Don't hide unpack/unserialize exception
These exception are software bugs, don't convert them into
log for deployer.

Change-Id: I10e9112b53e5c754f38247679896d1d24dd454bf
2017-01-02 11:46:48 +01:00
Mehdi Abaakouk
c8880b6f11 kafka: timeout is in milliseconds
Change-Id: I791c5dd8e7f4fb46a0aea3e825b9b392686a2abd
2017-01-02 11:46:48 +01:00
Mehdi Abaakouk
799cd6fa8f kafka: disable batch for functional tests
Change-Id: I09a3049ca5f4647d0f6b002b3732a4c0edd43986
2017-01-02 11:46:48 +01:00
Mehdi Abaakouk
a76a51a78c kafka: Remove Producer singleton
This producer singleton works only if one transport use kafka.

If multiple transports with kafka are used, their overrides each other
the producer on each send.

This change creates only one producer per connection.

A later optimisation can be one producer per driver instance.

Change-Id: I429f7c5efcb41690dd1b17f856bda2425c788e53
2017-01-02 11:46:48 +01:00
Ilya Tyaptin
f139eb258d Moving driver to new kafka-python version
Currently Kafka driver for an oslo.messaging uses kafka-python==0.9.5
and mostly broken. This package version supports only low level Kafka
producer and consumer API which are marked as deprecated now [1]. Using
of these interfaces bring a big concern to the message processing,
because current KafkaConsumer has not any consuming coordination. This
fact causes a message duplication for the several consumers of one
topic. This behavior is specific to Ceilometer and services which read
and process notifications from other services.

New version of kafka-python allows to use async thread safe message
producers and coordinated consumers [1].

[1] http://kafka-python.readthedocs.io/en/master/changelog.html#feb-15-2016

The driver is currently experimental, python-kafka<1.0.0 API have major
issue described above that can't make the oslo.messaging driver works,
so we prefer having a working driver with a non-synced dependencies, that the
reverse.

Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com>
Change-Id: I29862ed7bf56b9d8878fa8e9fb1cbd9d643908a4
2017-01-02 11:46:48 +01:00
Hiroyasu.OHYAMA
204dfa7d84 [kafka] invoke TypeError exception when 'listen()' method of KafkaDriver
is called

The interface of BaseDriver which is a super-class of each underlying
transport driver has been changed.
But the interface of kafka driver doens't follow up this change. So if a
user chose it as a transport driver, an exception of TypeError would be
occurred.
This change corrects the interface in kafka's driver along with the
BaseDriver's one.

Change-Id: Iedd069b7f083e2cbf377f4148411f77ad758f979
Closes-Bug: #1616755
2016-08-29 16:02:33 +09:00
Kirill Bespalov
162f6e987b Introduce TTL for idle connections
We can reduce a workload of rabbitmq through implementation
of expiration mechanism for idle connections in the pool with
next properties:

 conn_pool_ttl (default 20 min)
 conn_pool_min_size: the pool size limit for expire() (default 2)

The problem is timeless idle connections in the pool, which can be created
via some single huge workload of RPCServer. One SEND connection is heartbeat
thread + some network activity every n second. So, we can reduce it.

Here is two ways to implement an expiration:

 [1] Create a separated thread for checking expire date of connections
 [2] Make call expire() on pool.get() or pool.put()

The [1] has some threading overhead, but probably insignificant
because the thread can sleep 99% time and wake up every 20 mins (by default).
Anyway current implementation is [2].

Change-Id: Ie8781d10549a044656824ceb78b2fe2e4f7f8b43
2016-07-22 13:03:04 +03:00
Jenkins
0e4db6531b Merge "kafka: Deprecates host, port options" 2016-05-24 21:03:03 +00:00
Jenkins
6ee6d55780 Merge "Remove logging from serialize_remote_exception" 2016-05-20 16:39:56 +00:00
Mehdi Abaakouk
a7e5a42968 kafka: Deprecates host, port options
kafka now can read configuration from TransportURL,
so we can deprecate the legacy options.

Change-Id: I87ed4c0404f323f93357dddcf7858c5d1ff16513
2016-05-20 18:38:05 +02:00
Jenkins
ad1bea351d Merge "[kafka] Add several bootstrap servers support" 2016-05-19 15:14:33 +00:00
Gevorg Davoian
39749c77a8 Remove logging from serialize_remote_exception
This patch removes log_failure argument from the function
serialize_remote_exception and from driver implementations
using it (because it is never used and always defaults to True)
and prevents error logging in this function (because these errors
are already logged by servers while processing incoming messages).

Change-Id: Ic01bb11d6c4f018a17f3219cdbd07ef4d30fa434
Closes-Bug: 1580352
2016-05-19 15:18:28 +03:00
Ildar Svetlov
99b843767d [kafka] Add several bootstrap servers support
At that moment kafka driver can use only url with one "host:port"
for the bootstrap server defining, but kafka client supports
set of host:port adresses: "host1:port1,host2:port2", ... .
This patch implement this functional in kafka driver for the better HA.

List self.hostaddrs stores strings "host:port" of Connection.
It collects from self.url.hosts

Change-Id: I5eece66ca6bd069a0df8c8629b4ac815f69a7c7d
Closes-Bug: #1572017
2016-05-19 10:51:58 +00:00
Dmitriy Ukhlov
6db00c77b0 Refactor base interfaces
1) Add MessageHandler base interface for on_incoming_callback replacement
2) Move message_handler parameter form Listener's __init__() to start()
3) Remove wait method from listener

Change-Id: Id414446817e3d2ff67b815074d042a9ce637ec24
2016-04-20 20:46:28 +00:00
Jenkins
c81aa02832 Merge "[kafka] Do not remove kafka_client during reset" 2016-04-10 23:48:36 +00:00
Dmitriy Ukhlov
5d7d7253d1 Refactor driver's listener interface
Current Listener interface has poll() method which return messages

To use it we need have poller thread which is located in MessageHandlerServer
But my investigations of existing driver's code shows that some implemetations have
its own thread inside for processing connection event loop. This event loop received
messages and store in queue object. And then our poller's thread reads this queue
This situation can be improved. we can remove poller's thread, remove queue object
and just call on_message server's callback from connection eventloop thread

This path provide posibility to do this for one of drivers and leave as is other drivers

Change-Id: I3e3d4369d8fdadcecf079d10af58b1e4f5616047
2016-04-05 18:08:08 +00:00
Ilya Tyaptin
9a065e3d8e [kafka] Do not remove kafka_client during reset
Currently we delete kafka_client, producer and consumer
from the Kafka driver connection when we reset it,
for example before returning it to pool.

It's a redundant action, because kafka_client and
kafka producer (it uses a kafka_client for
sending a message) could be used again without any
concerns.
In same time, currently we don't close a KafkaConsumer,
but it's needed because we should to close opened
sockets to the kafka.

In this patchset all action in reset are changed
to the more optimal behavior.

Change-Id: I6ff26256c933c79468f9e6cd1752181e5ace155f
Closes-bug: #1557528
2016-04-05 13:48:20 +03:00
Jenkins
0dbeba1c6a Merge "Use only unique topics for the Kafka driver" 2016-03-21 14:25:33 +00:00
Ilya Tyaptin
bec4ef4d12 Use only unique topics for the Kafka driver
Consumer in Kafka driver should use only unique topic,
otherwise a FetchDuplicate exception will be raised.

Change-Id: I569ce446eaf05dbc3a7fd0b41a2307e940ab87fb
Closes-bug: #1555081
2016-03-21 15:06:24 +03:00
Ilya Tyaptin
d3fedf8624 [Kafka] Ensure a topics before consume messages
Currently we trying ot fetch messages from the topics even
they have bot been created yet. This behaviour causes a
KafkaConfigurationError which are raised in the kafka driver.

Change-Id: I78cfd5ac24fbf37be5649232d0bc825319cf6402
Closes-bug: #1557521
2016-03-21 11:59:12 +03:00
Jenkins
cc89fc3174 Merge "Revert "Ensure the json result type is bytes on Python 3"" 2016-03-11 14:25:05 +00:00
Victor Stinner
bb4121a465 Revert "Ensure the json result type is bytes on Python 3"
This reverts commit bd81d09c02c5bc8561ad04de91802a5c1917d9e9.

I understand that the change was supposed to fix something, but instead it broke all tests on Python 3!?

It's wrong to replace blindly json.dumps() with jsonutils.dump_as_bytes(). In oslo messaging, the result is usually used as a value in a dictionary, and then the whole dictionary is passed to a second serializer which also serialize to JSON.

Sorry, I don't understand everything, but at least I see that tests passed on py3 before the change, and started to fail with the change.

Maybe json(utils).dumps() is misused in some places, but in this case, you should write a change which only fix these specific places, not replace all calls to dumps().

Change-Id: Icd54ee8e3f5c976dfd50b4b62c7f51288649e112
2016-03-11 09:00:08 +00:00