#!/usr/bin/env python # coding=utf-8 # (C) Copyright 2017 Hewlett Packard Enterprise Development LP # (C) Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Wait for specific Kafka topics. For using this script you need to set two environment variables: * `KAFKA_URI` for connection string to Kafka together with port. Example: `kafka:9092`, `192.168.10.6:9092`. * `KAFKA_WAIT_FOR_TOPICS` that contain topics that should exist in Kafka to consider it's working. Many topics should be separated with comma. Example: `retry-notifications,alarm-state-transitions`. After making sure that this environment variables are set you can simply execute this script in the following way: `python3 kafka_wait_for_topics.py && ./start_service.sh` `python3 kafka_wait_for_topics.py || exit 1` Additional environment variables available are: * `LOG_LEVEL` - default to `INFO` * `KAFKA_WAIT_RETRIES` - number of retries, default to `24` * `KAFKA_WAIT_INTERVAL` - in seconds, default to `5` """ import logging import os import sys import time from pykafka.exceptions import NoBrokersAvailableError from pykafka import KafkaClient # Run this script only with Python 3 if sys.version_info.major != 3: sys.stdout.write("Sorry, requires Python 3.x\n") sys.exit(1) LOG_LEVEL = logging.getLevelName(os.environ.get('LOG_LEVEL', 'INFO')) logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) KAFKA_HOSTS = os.environ.get('KAFKA_URI', 'kafka:9092') REQUIRED_TOPICS = os.environ.get('KAFKA_WAIT_FOR_TOPICS', '') \ .encode('utf-8').split(b',') KAFKA_WAIT_RETRIES = int(os.environ.get('KAFKA_WAIT_RETRIES', '24')) KAFKA_WAIT_INTERVAL = int(os.environ.get('KAFKA_WAIT_INTERVAL', '5')) class TopicNoPartition(Exception): """Raise when topic has no partitions.""" class TopicNotFound(Exception): """Raise when topic was not found.""" def retry(retries=KAFKA_WAIT_RETRIES, delay=KAFKA_WAIT_INTERVAL, check_exceptions=()): """Retry decorator.""" def decorator(func): """Decorator.""" def f_retry(*args, **kwargs): """Retry running function on exception after delay.""" for i in range(1, retries + 1): try: return func(*args, **kwargs) # pylint: disable=W0703 # We want to catch all exceptions here to retry. except check_exceptions + (Exception,) as exc: if i < retries: logger.info('Connection attempt %d of %d failed', i, retries) if isinstance(exc, check_exceptions): logger.debug('Caught known exception, retrying...', exc_info=True) else: logger.warn( 'Caught unknown exception, retrying...', exc_info=True) else: logger.exception('Failed after %d attempts', retries) raise # No exception so wait before retrying time.sleep(delay) return f_retry return decorator @retry(check_exceptions=(TopicNoPartition, TopicNotFound)) def check_topics(client, req_topics): """Check for existence of provided topics in Kafka.""" client.update_cluster() logger.debug('Found topics: %r', client.topics.keys()) for req_topic in req_topics: if req_topic not in client.topics.keys(): err_topic_not_found = 'Topic not found: {}'.format(req_topic) logger.warning(err_topic_not_found) raise TopicNotFound(err_topic_not_found) topic = client.topics[req_topic] if not topic.partitions: err_topic_no_part = 'Topic has no partitions: {}'.format(req_topic) logger.warning(err_topic_no_part) raise TopicNoPartition(err_topic_no_part) logger.info('Topic is ready: %s', req_topic) @retry(check_exceptions=(NoBrokersAvailableError,)) def connect_kafka(hosts): """Connect to Kafka with retries.""" return KafkaClient(hosts=hosts) def main(): """Start main part of the wait script.""" logger.info('Checking for available topics: %r', repr(REQUIRED_TOPICS)) client = connect_kafka(hosts=KAFKA_HOSTS) check_topics(client, REQUIRED_TOPICS) if __name__ == '__main__': main()