diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index 1967421e..ed2670f2 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -35,6 +35,7 @@ buffer_size = 4096 max_buffer_size = 32768 # Path in zookeeper for kafka consumer group partitioning algo zookeeper_path = /persister_partitions/alarm-state-transitions +num_processors = 1 [kafka_metrics] # Comma separated list of Kafka broker host:port @@ -52,6 +53,7 @@ buffer_size = 4096 max_buffer_size = 32768 # Path in zookeeper for kafka consumer group partitioning algo zookeeper_path = /persister_partitions/metrics +num_processors = 1 [influxdb] database_name = mon diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 207827a5..c5653cfd 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -21,8 +21,11 @@ Start the perister as stand-alone process by running 'persister.py --config-file ' """ - +import multiprocessing +import os +import signal import sys +import time import simport from oslo_config import cfg @@ -49,7 +52,8 @@ kafka_common_opts = [cfg.StrOpt('uri'), cfg.IntOpt('fetch_size_bytes'), cfg.IntOpt('buffer_size'), cfg.IntOpt('max_buffer_size'), - cfg.StrOpt('zookeeper_path')] + cfg.StrOpt('zookeeper_path'), + cfg.IntOpt('num_processors')] kafka_metrics_opts = kafka_common_opts kafka_alarm_history_opts = kafka_common_opts @@ -71,6 +75,57 @@ repositories_group = cfg.OptGroup(name='repositories', title='repositories') cfg.CONF.register_group(repositories_group) cfg.CONF.register_opts(repositories_opts, repositories_group) +processors = [] # global list to facilitate clean signal handling +exiting = False + + +def clean_exit(signum, frame=None): + """Exit all processes attempting to finish uncommited active work before exit. + Can be called on an os signal or no zookeeper losing connection. + """ + global exiting + if exiting: + # Since this is set up as a handler for SIGCHLD when this kills one + # child it gets another signal, the global exiting avoids this running + # multiple times. + LOG.debug('Exit in progress clean_exit received additional signal %s' % signum) + return + + LOG.info('Received signal %s, beginning graceful shutdown.' % signum) + exiting = True + wait_for_exit = False + + for process in processors: + try: + if process.is_alive(): + process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle + wait_for_exit = True + except Exception: + pass + + # wait for a couple seconds to give the subprocesses a chance to shut down correctly. + if wait_for_exit: + time.sleep(2) + + # Kill everything, that didn't already die + for child in multiprocessing.active_children(): + LOG.debug('Killing pid %s' % child.pid) + try: + os.kill(child.pid, signal.SIGKILL) + except Exception: + pass + + if signum == signal.SIGTERM: + sys.exit(0) + + sys.exit(signum) + + +def start_process(respository, kafka_config): + LOG.info("start process: {}".format(respository)) + persister = Persister(kafka_config, cfg.CONF.zookeeper, respository) + persister.run() + def main(): log.register_options(cfg.CONF) @@ -78,26 +133,24 @@ def main(): cfg.CONF(sys.argv[1:], project='monasca', prog='persister') log.setup(cfg.CONF, "monasca-persister") - """Start persister. - - Start metric persister and alarm persister in separate threads. - """ + """Start persister.""" metric_repository = simport.load(cfg.CONF.repositories.metrics_driver) alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver) - metric_persister = Persister(cfg.CONF.kafka_metrics, - cfg.CONF.zookeeper, - metric_repository) + # Add processors for metrics topic + for proc in range(0, cfg.CONF.kafka_metrics.num_processors): + processors.append(multiprocessing.Process( + target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics))) - alarm_persister = Persister(cfg.CONF.kafka_alarm_history, - cfg.CONF.zookeeper, - alarm_state_history_repository) + # Add processors for alarm history topic + for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors): + processors.append(multiprocessing.Process( + target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history))) - metric_persister.start() - alarm_persister.start() - - LOG.info(''' + # Start + try: + LOG.info(''' _____ / \ ____ ____ _____ ______ ____ _____ @@ -113,8 +166,21 @@ def main(): \/ \/ \/ \/ ''') + for process in processors: + process.start() - LOG.info('Monasca Persister has started successfully!') + # The signal handlers must be added after the processes start otherwise + # they run on all processes + signal.signal(signal.SIGCHLD, clean_exit) + signal.signal(signal.SIGINT, clean_exit) + signal.signal(signal.SIGTERM, clean_exit) + + while True: + time.sleep(10) + + except Exception: + LOG.exception('Error! Exiting.') + clean_exit(signal.SIGKILL) if __name__ == "__main__": sys.exit(main()) diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index cd6b09e4..36051cb4 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import threading from oslo_log import log @@ -22,12 +21,10 @@ from monasca_common.kafka.consumer import KafkaConsumer LOG = log.getLogger(__name__) -class Persister(threading.Thread): +class Persister(object): def __init__(self, kafka_conf, zookeeper_conf, repository): - super(Persister, self).__init__() - self._data_points = [] self._kafka_topic = kafka_conf.topic