diff --git a/monasca_common/kafka/producer.py b/monasca_common/kafka/producer.py index b454f950..236ced95 100644 --- a/monasca_common/kafka/producer.py +++ b/monasca_common/kafka/producer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# (C) Copyright 2015, 2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,10 +47,29 @@ class KafkaProducer(object): if not isinstance(messages, list): messages = [messages] - try: - if key is None: - key = int(time.time() * 1000) - self._producer.send_messages(topic, str(key), *messages) - except Exception: - log.exception('Error publishing to {} topic.'.format(topic)) - raise + first = True + success = False + while not success: + try: + if key is None: + key = int(time.time() * 1000) + self._producer.send_messages(topic, str(key), *messages) + success = True + except Exception: + if first: + # This is a warning because of all the other warning and + # error messages that are logged in this case. This way + # someone looking at the log file can see the retry + log.warn("Failed send on topic {}, clear metadata and retry" + .format(topic)) + + # If Kafka is running in Kubernetes, the cached metadata + # contains the IP Address of the Kafka pod. If the Kafka + # pod has restarted, the IP Address will have changed + # which would have caused the first publish to fail. So, + # clear the cached metadata and retry the publish + self._kafka.reset_topic_metadata(topic) + first = False + continue + log.exception('Error publishing to {} topic.'.format(topic)) + raise