diff --git a/monasca_common/kafka/client_factory.py b/monasca_common/kafka/client_factory.py new file mode 100644 index 00000000..6347172b --- /dev/null +++ b/monasca_common/kafka/client_factory.py @@ -0,0 +1,56 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from monasca_common.confluent_kafka import consumer +from monasca_common.confluent_kafka import producer +from monasca_common.kafka import legacy_kafka_consumer +from monasca_common.kafka import producer as legacy_kafka_producer + + +def get_kafka_consumer(kafka_url, + kafka_consumer_group, + kafka_topic, + zookeeper_url=None, + zookeeper_path=None, + use_legacy_client=False, + repartition_callback=None, + commit_callback=None, + max_commit_interval=30, + client_id=""): + if use_legacy_client: + return legacy_kafka_consumer.LegacyKafkaConsumer( + kafka_url, + ','.join(zookeeper_url), + zookeeper_path, + kafka_consumer_group, + kafka_topic, + repartition_callback=repartition_callback, + commit_callback=commit_callback, + commit_timeout=max_commit_interval + ) + else: + return consumer.KafkaConsumer( + ",".join(kafka_url), + kafka_consumer_group, + kafka_topic, + client_id=client_id, + repartition_callback=repartition_callback, + commit_callback=commit_callback, + max_commit_interval=max_commit_interval + ) + + +def get_kafka_producer(kafka_url, use_legacy_client=False): + if use_legacy_client: + return legacy_kafka_producer.KafkaProducer(kafka_url) + else: + return producer.KafkaProducer(",".join(kafka_url)) diff --git a/monasca_common/kafka/legacy_kafka_consumer.py b/monasca_common/kafka/legacy_kafka_consumer.py new file mode 100644 index 00000000..96a19fac --- /dev/null +++ b/monasca_common/kafka/legacy_kafka_consumer.py @@ -0,0 +1,23 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from monasca_common.kafka import consumer, legacy_kafka_message + + +class LegacyKafkaConsumer(consumer.KafkaConsumer): + def __iter__(self): + """:return: Kafka message object compatible with Confluent Kafka client + object + """ + for raw_message in super(LegacyKafkaConsumer, self).__iter__(): + yield legacy_kafka_message.LegacyKafkaMessage(raw_message) diff --git a/monasca_common/kafka/legacy_kafka_message.py b/monasca_common/kafka/legacy_kafka_message.py new file mode 100644 index 00000000..c480eb7b --- /dev/null +++ b/monasca_common/kafka/legacy_kafka_message.py @@ -0,0 +1,32 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class LegacyKafkaMessage(object): + + def __init__(self, raw_message): + self.m_partition = raw_message[0] + self.m_offset = raw_message[1].offset + self.m_key = raw_message[1].message.key + self.m_value = raw_message[1].message.value + + def key(self): + return self.m_key + + def offset(self): + return self.m_offset + + def partition(self): + return self.m_partition + + def value(self): + return self.m_value