From 76cfd01074569a4d3d3fefe5438219f2b4a1601c Mon Sep 17 00:00:00 2001 From: "flavien.peyre" Date: Wed, 27 May 2015 11:43:59 -0400 Subject: [PATCH] Add rabbitmq consumer Change-Id: Iabc0cee2158fa92ed4fc5a6aaf54b55c45cfd7cb --- Dockerfile | 3 +- docker-compose-production.yml | 20 +++ etc/surveil/surveil_rabbitMQ_consumer.cfg | 13 ++ requirements.txt | 2 + setup.cfg | 1 + surveil/cmd/rabbitMQ_consumer.py | 162 ++++++++++++++++++++++ test-requirements.txt | 1 - 7 files changed, 199 insertions(+), 3 deletions(-) create mode 100644 etc/surveil/surveil_rabbitMQ_consumer.cfg create mode 100644 surveil/cmd/rabbitMQ_consumer.py diff --git a/Dockerfile b/Dockerfile index edf9396..2dfb2b6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,8 +7,6 @@ RUN apt-get update && apt-get install -y vim python-pip python3-pip python-dev l # Surveil needs alignak (as a lib) RUN useradd shinken && pip install https://github.com/Alignak-monitoring/alignak/archive/396d10105827f8c75686811991829548e6778e11.zip -# python-surveilclient (used by surveil-init) -RUN pip install python-surveilclient==0.5.1 # Download packs ENV MONITORING_TOOLS_VERSION 0.2.0 @@ -33,6 +31,7 @@ ADD surveil /opt/surveil/surveil #ADD .git /surveil/.git ENV PBR_VERSION=PROD +RUN pip install -U six # We are using develop so that the code can be mounted when in DEV. RUN pip install -U six requests diff --git a/docker-compose-production.yml b/docker-compose-production.yml index aaec359..189927a 100644 --- a/docker-compose-production.yml +++ b/docker-compose-production.yml @@ -16,6 +16,26 @@ surveil: #SURVEIL_OS_PASSWORD: "password" #SURVEIL_OS_TENANT_NAME: "admin" +rabbitconsumer: + build: . + links: + - surveil + environment: + PBR_VERSION: "PROD" + #SURVEIL_API_URL: "http://surveil:8080/v2" + #SURVEIL_AUTH_URL: "http://surveil:8080/v2/auth" + #SURVEIL_VERSION: "2_0" + #RABBIT_HOST: "192.168.49.239" + #RABBIT_PORT: 5672 + #QUEUE: "test" + #RABBIT_USER: "admin" + #RABBIT_PASSWORD: "admin" + #SURVEIL_OS_AUTH_URL: "http://keystone:5000/v2.0" + #SURVEIL_OS_USERNAME: "admin" + #SURVEIL_OS_PASSWORD: "password" + #SURVEIL_TENANT_NAME: "admin" + command: bash -c "cd /opt/surveil && ./setup.sh && python setup.py develop && surveil-rabbitMQ-consumer" + alignak: #build: tools/docker/alignak_container/ image: savoirfairelinux/surveil-alignak:0.4.4 diff --git a/etc/surveil/surveil_rabbitMQ_consumer.cfg b/etc/surveil/surveil_rabbitMQ_consumer.cfg new file mode 100644 index 0000000..e639e30 --- /dev/null +++ b/etc/surveil/surveil_rabbitMQ_consumer.cfg @@ -0,0 +1,13 @@ +[rabbitconsumer] +SURVEIL_API_URL=http://surveil:8080/v2 +SURVEIL_AUTH_URL=http://surveil:8080/v2/auth +SURVEIL_VERSION=2_0 +SURVEIL_OS_AUTH_URL=http://localhost/v2.0 +SURVEIL_OS_USERNAME=admin +SURVEIL_OS_PASSWORD=password +SURVEIL_OS_TENANT_NAME=admin +RABBIT_HOST=192.168.49.239 +RABBIT_PORT=5672 +QUEUE=test +RABBIT_USER=admin +RABBIT_PASSWORD=admin diff --git a/requirements.txt b/requirements.txt index 581bc56..340fa71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,6 @@ oslo.policy>=0.3.0 keystonemiddleware PasteDeploy influxdb==2.0.1 +pika +python-surveilclient==0.6.0 six diff --git a/setup.cfg b/setup.cfg index e278d2c..f908a5f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,6 +15,7 @@ console_scripts = surveil-init = surveil.cmd.init:main surveil-pack-upload = surveil.cmd.pack_upload:main surveil-os-discovery = surveil.cmd.os_discovery:main + surveil-rabbitMQ-consumer = surveil.cmd.rabbitMQ_consumer:main [build_sphinx] source-dir = doc/source diff --git a/surveil/cmd/rabbitMQ_consumer.py b/surveil/cmd/rabbitMQ_consumer.py new file mode 100644 index 0000000..7f5c2dc --- /dev/null +++ b/surveil/cmd/rabbitMQ_consumer.py @@ -0,0 +1,162 @@ +# Copyright 2014 - Savoir-Faire Linux inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Starter script for the RabbitMQ receiver""" + +import json +import os +import threading +import time + +import pika +from six.moves import configparser +from surveilclient import client + + +def main(): + config = configparser.ConfigParser() + config.read("/etc/surveil/surveil_rabbitMQ_consumer.cfg") + + daemon_config = { + "SURVEIL_API_URL": os.environ.get( + 'SURVEIL_API_URL', + config.get("rabbitconsumer", "SURVEIL_API_URL") + ), + "SURVEIL_AUTH_URL": os.environ.get( + 'SURVEIL_AUTH_URL', + config.get("rabbitconsumer", "SURVEIL_AUTH_URL") + ), + "SURVEIL_VERSION": os.environ.get( + 'SURVEIL_VERSION', + config.get("rabbitconsumer", "SURVEIL_VERSION") + ), + "RABBIT_HOST": os.environ.get( + 'RABBIT_HOST', + config.get("rabbitconsumer", "RABBIT_HOST") + ), + "RABBIT_PORT": int(os.environ.get( + 'RABBIT_PORT', + config.get("rabbitconsumer", "RABBIT_PORT") + ) + ), + "QUEUE": os.environ.get( + 'QUEUE', + config.get("rabbitconsumer", "QUEUE") + ), + "RABBIT_USER": os.environ.get( + 'RABBIT_USER', + config.get("rabbitconsumer", "RABBIT_USER") + ), + "RABBIT_PASSWORD": os.environ.get( + 'RABBIT_PASSWORD', + config.get("rabbitconsumer", "RABBIT_PASSWORD") + ), + "SURVEIL_OS_AUTH_URL": os.environ.get( + 'SURVEIL_OS_AUTH_URL', + config.get("rabbitconsumer", "SURVEIL_OS_AUTH_URL") + ), + "SURVEIL_OS_USERNAME": os.environ.get( + 'SURVEIL_OS_USERNAME', + config.get("rabbitconsumer", "SURVEIL_OS_USERNAME") + ), + "SURVEIL_OS_PASSWORD": os.environ.get( + 'SURVEIL_OS_PASSWORD', + config.get("rabbitconsumer", "SURVEIL_OS_PASSWORD") + ), + "SURVEIL_OS_TENANT_NAME": os.environ.get( + 'SURVEIL_OS_TENANT_NAME', + config.get("rabbitconsumer", "SURVEIL_OS_TENANT_NAME") + ) + } + + if (daemon_config["RABBIT_USER"] is not None + and daemon_config["RABBIT_PASSWORD"] is not None): + id = pika.credentials.PlainCredentials(daemon_config["RABBIT_USER"], + daemon_config["RABBIT_PASSWORD"] + ) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"], + port=daemon_config["RABBIT_PORT"], + credentials=id)) + else: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"], + port=daemon_config["RABBIT_PORT"])) + + channel = connection.channel() + channel.queue_declare(queue=daemon_config["QUEUE"]) + + threads = [] + compt_thread = [] + + def reload_config_threads(): + c = client.Client(daemon_config["SURVEIL_API_URL"], + auth_url=daemon_config["SURVEIL_AUTH_URL"], + version=daemon_config["SURVEIL_VERSION"]) + while True: + time.sleep(30) + if compt_thread: + c.config.reload_config() + del (compt_thread[:]) + + def join_finished_threads(): + while True: + time.sleep(1) + todel = [] + for t in threads: + if not t.isAlive(): + t.join() + todel.append(t) + for t in todel: + threads.remove(t) + + joiner_thread = threading.Thread(target=join_finished_threads) + joiner_thread.daemon = True + joiner_thread.start() + + reload_config_thread = threading.Thread(target=reload_config_threads) + reload_config_thread.daemon = True + reload_config_thread.start() + + def process_event(body): + c = client.Client(daemon_config["SURVEIL_API_URL"], + auth_url=daemon_config["SURVEIL_AUTH_URL"], + version=daemon_config["SURVEIL_VERSION"]) + event = json.loads(body) + if event['event_type'] == 'compute.instance.create.start': + custom_fields = { + "_OS_AUTH_URL": daemon_config["SURVEIL_OS_AUTH_URL"], + "_OS_TENANT_NAME": daemon_config["SURVEIL_OS_TENANT_NAME"], + "_OS_USERNAME": daemon_config["SURVEIL_OS_USERNAME"], + "_OS_PASSWORD": daemon_config["SURVEIL_OS_PASSWORD"], + "_OS_INSTANCE_ID": event['payload']['instance_id'] + } + c.config.hosts.create( + host_name=event['payload']['hostname'], + address=event['payload']['hostname'], + use='linux-openstackceilometer', + custom_fields=custom_fields + ) + elif event['event_type'] == 'compute.instance.delete.end': + c.config.hosts.delete(event['payload']['hostname']) + + def callback(ch, method, properties, body): + t = threading.Thread(target=process_event, args=(body,)) + t.daemon = True + t.start() + threads.append(t) + compt_thread.append(t) + + channel.basic_consume(callback, queue=daemon_config["QUEUE"], no_ack=True) + channel.start_consuming() \ No newline at end of file diff --git a/test-requirements.txt b/test-requirements.txt index 6c4b66e..c8c56aa 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,4 +9,3 @@ mongomock httpretty==0.8.3 sphinx_rtd_theme docker-compose -python-surveilclient==0.6.0