Add rabbitmq consumer
Change-Id: Iabc0cee2158fa92ed4fc5a6aaf54b55c45cfd7cb
This commit is contained in:
parent
4cdf46c7f3
commit
eb96bf0f37
@ -7,8 +7,6 @@ RUN apt-get update && apt-get install -y vim python-pip python3-pip python-dev l
|
|||||||
# Surveil needs alignak (as a lib)
|
# Surveil needs alignak (as a lib)
|
||||||
RUN useradd shinken && pip install https://github.com/Alignak-monitoring/alignak/archive/396d10105827f8c75686811991829548e6778e11.zip
|
RUN useradd shinken && pip install https://github.com/Alignak-monitoring/alignak/archive/396d10105827f8c75686811991829548e6778e11.zip
|
||||||
|
|
||||||
# python-surveilclient (used by surveil-init)
|
|
||||||
RUN pip install python-surveilclient==0.5.1
|
|
||||||
|
|
||||||
# Download packs
|
# Download packs
|
||||||
ENV MONITORING_TOOLS_VERSION 0.2.0
|
ENV MONITORING_TOOLS_VERSION 0.2.0
|
||||||
@ -33,6 +31,7 @@ ADD surveil /opt/surveil/surveil
|
|||||||
|
|
||||||
#ADD .git /surveil/.git
|
#ADD .git /surveil/.git
|
||||||
ENV PBR_VERSION=PROD
|
ENV PBR_VERSION=PROD
|
||||||
|
RUN pip install -U six
|
||||||
|
|
||||||
# We are using develop so that the code can be mounted when in DEV.
|
# We are using develop so that the code can be mounted when in DEV.
|
||||||
RUN pip install -U six requests
|
RUN pip install -U six requests
|
||||||
|
@ -16,6 +16,26 @@ surveil:
|
|||||||
#SURVEIL_OS_PASSWORD: "password"
|
#SURVEIL_OS_PASSWORD: "password"
|
||||||
#SURVEIL_OS_TENANT_NAME: "admin"
|
#SURVEIL_OS_TENANT_NAME: "admin"
|
||||||
|
|
||||||
|
rabbitconsumer:
|
||||||
|
build: .
|
||||||
|
links:
|
||||||
|
- surveil
|
||||||
|
environment:
|
||||||
|
PBR_VERSION: "PROD"
|
||||||
|
#SURVEIL_API_URL: "http://surveil:8080/v2"
|
||||||
|
#SURVEIL_AUTH_URL: "http://surveil:8080/v2/auth"
|
||||||
|
#SURVEIL_VERSION: "2_0"
|
||||||
|
#RABBIT_HOST: "192.168.49.239"
|
||||||
|
#RABBIT_PORT: 5672
|
||||||
|
#QUEUE: "test"
|
||||||
|
#RABBIT_USER: "admin"
|
||||||
|
#RABBIT_PASSWORD: "admin"
|
||||||
|
#SURVEIL_OS_AUTH_URL: "http://keystone:5000/v2.0"
|
||||||
|
#SURVEIL_OS_USERNAME: "admin"
|
||||||
|
#SURVEIL_OS_PASSWORD: "password"
|
||||||
|
#SURVEIL_TENANT_NAME: "admin"
|
||||||
|
command: bash -c "cd /opt/surveil && ./setup.sh && python setup.py develop && surveil-rabbitMQ-consumer"
|
||||||
|
|
||||||
alignak:
|
alignak:
|
||||||
#build: tools/docker/alignak_container/
|
#build: tools/docker/alignak_container/
|
||||||
image: savoirfairelinux/surveil-alignak:0.4.4
|
image: savoirfairelinux/surveil-alignak:0.4.4
|
||||||
|
13
etc/surveil/surveil_rabbitMQ_consumer.cfg
Normal file
13
etc/surveil/surveil_rabbitMQ_consumer.cfg
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[rabbitconsumer]
|
||||||
|
SURVEIL_API_URL=http://surveil:8080/v2
|
||||||
|
SURVEIL_AUTH_URL=http://surveil:8080/v2/auth
|
||||||
|
SURVEIL_VERSION=2_0
|
||||||
|
SURVEIL_OS_AUTH_URL=http://localhost/v2.0
|
||||||
|
SURVEIL_OS_USERNAME=admin
|
||||||
|
SURVEIL_OS_PASSWORD=password
|
||||||
|
SURVEIL_OS_TENANT_NAME=admin
|
||||||
|
RABBIT_HOST=192.168.49.239
|
||||||
|
RABBIT_PORT=5672
|
||||||
|
QUEUE=test
|
||||||
|
RABBIT_USER=admin
|
||||||
|
RABBIT_PASSWORD=admin
|
@ -9,4 +9,6 @@ oslo.policy>=0.3.0
|
|||||||
keystonemiddleware
|
keystonemiddleware
|
||||||
PasteDeploy
|
PasteDeploy
|
||||||
influxdb==2.0.1
|
influxdb==2.0.1
|
||||||
|
pika
|
||||||
|
python-surveilclient==0.5.1
|
||||||
six
|
six
|
||||||
|
@ -15,6 +15,7 @@ console_scripts =
|
|||||||
surveil-init = surveil.cmd.init:main
|
surveil-init = surveil.cmd.init:main
|
||||||
surveil-pack-upload = surveil.cmd.pack_upload:main
|
surveil-pack-upload = surveil.cmd.pack_upload:main
|
||||||
surveil-os-discovery = surveil.cmd.os_discovery:main
|
surveil-os-discovery = surveil.cmd.os_discovery:main
|
||||||
|
surveil-rabbitMQ-consumer = surveil.cmd.rabbitMQ_consumer:main
|
||||||
|
|
||||||
[build_sphinx]
|
[build_sphinx]
|
||||||
source-dir = doc/source
|
source-dir = doc/source
|
||||||
|
162
surveil/cmd/rabbitMQ_consumer.py
Normal file
162
surveil/cmd/rabbitMQ_consumer.py
Normal file
@ -0,0 +1,162 @@
|
|||||||
|
# Copyright 2014 - Savoir-Faire Linux inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Starter script for the RabbitMQ receiver"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pika
|
||||||
|
from six.moves import configparser
|
||||||
|
from surveilclient import client
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read("/etc/surveil/surveil_rabbitMQ_consumer.cfg")
|
||||||
|
|
||||||
|
daemon_config = {
|
||||||
|
"SURVEIL_API_URL": os.environ.get(
|
||||||
|
'SURVEIL_API_URL',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_API_URL")
|
||||||
|
),
|
||||||
|
"SURVEIL_AUTH_URL": os.environ.get(
|
||||||
|
'SURVEIL_AUTH_URL',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_AUTH_URL")
|
||||||
|
),
|
||||||
|
"SURVEIL_VERSION": os.environ.get(
|
||||||
|
'SURVEIL_VERSION',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_VERSION")
|
||||||
|
),
|
||||||
|
"RABBIT_HOST": os.environ.get(
|
||||||
|
'RABBIT_HOST',
|
||||||
|
config.get("rabbitconsumer", "RABBIT_HOST")
|
||||||
|
),
|
||||||
|
"RABBIT_PORT": int(os.environ.get(
|
||||||
|
'RABBIT_PORT',
|
||||||
|
config.get("rabbitconsumer", "RABBIT_PORT")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
"QUEUE": os.environ.get(
|
||||||
|
'QUEUE',
|
||||||
|
config.get("rabbitconsumer", "QUEUE")
|
||||||
|
),
|
||||||
|
"RABBIT_USER": os.environ.get(
|
||||||
|
'RABBIT_USER',
|
||||||
|
config.get("rabbitconsumer", "RABBIT_USER")
|
||||||
|
),
|
||||||
|
"RABBIT_PASSWORD": os.environ.get(
|
||||||
|
'RABBIT_PASSWORD',
|
||||||
|
config.get("rabbitconsumer", "RABBIT_PASSWORD")
|
||||||
|
),
|
||||||
|
"SURVEIL_OS_AUTH_URL": os.environ.get(
|
||||||
|
'SURVEIL_OS_AUTH_URL',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_OS_AUTH_URL")
|
||||||
|
),
|
||||||
|
"SURVEIL_OS_USERNAME": os.environ.get(
|
||||||
|
'SURVEIL_OS_USERNAME',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_OS_USERNAME")
|
||||||
|
),
|
||||||
|
"SURVEIL_OS_PASSWORD": os.environ.get(
|
||||||
|
'SURVEIL_OS_PASSWORD',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_OS_PASSWORD")
|
||||||
|
),
|
||||||
|
"SURVEIL_OS_TENANT_NAME": os.environ.get(
|
||||||
|
'SURVEIL_OS_TENANT_NAME',
|
||||||
|
config.get("rabbitconsumer", "SURVEIL_OS_TENANT_NAME")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (daemon_config["RABBIT_USER"] is not None
|
||||||
|
and daemon_config["RABBIT_PASSWORD"] is not None):
|
||||||
|
id = pika.credentials.PlainCredentials(daemon_config["RABBIT_USER"],
|
||||||
|
daemon_config["RABBIT_PASSWORD"]
|
||||||
|
)
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"],
|
||||||
|
port=daemon_config["RABBIT_PORT"],
|
||||||
|
credentials=id))
|
||||||
|
else:
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host=daemon_config["RABBIT_HOST"],
|
||||||
|
port=daemon_config["RABBIT_PORT"]))
|
||||||
|
|
||||||
|
channel = connection.channel()
|
||||||
|
channel.queue_declare(queue=daemon_config["QUEUE"])
|
||||||
|
|
||||||
|
threads = []
|
||||||
|
compt_thread = []
|
||||||
|
|
||||||
|
def reload_config_threads():
|
||||||
|
c = client.Client(daemon_config["SURVEIL_API_URL"],
|
||||||
|
auth_url=daemon_config["SURVEIL_AUTH_URL"],
|
||||||
|
version=daemon_config["SURVEIL_VERSION"])
|
||||||
|
while True:
|
||||||
|
time.sleep(30)
|
||||||
|
if compt_thread:
|
||||||
|
c.config.reload_config()
|
||||||
|
del (compt_thread[:])
|
||||||
|
|
||||||
|
def join_finished_threads():
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
todel = []
|
||||||
|
for t in threads:
|
||||||
|
if not t.isAlive():
|
||||||
|
t.join()
|
||||||
|
todel.append(t)
|
||||||
|
for t in todel:
|
||||||
|
threads.remove(t)
|
||||||
|
|
||||||
|
joiner_thread = threading.Thread(target=join_finished_threads)
|
||||||
|
joiner_thread.daemon = True
|
||||||
|
joiner_thread.start()
|
||||||
|
|
||||||
|
reload_config_thread = threading.Thread(target=reload_config_threads)
|
||||||
|
reload_config_thread.daemon = True
|
||||||
|
reload_config_thread.start()
|
||||||
|
|
||||||
|
def process_event(body):
|
||||||
|
c = client.Client(daemon_config["SURVEIL_API_URL"],
|
||||||
|
auth_url=daemon_config["SURVEIL_AUTH_URL"],
|
||||||
|
version=daemon_config["SURVEIL_VERSION"])
|
||||||
|
event = json.loads(body)
|
||||||
|
if event['event_type'] == 'compute.instance.create.start':
|
||||||
|
custom_fields = {
|
||||||
|
"_OS_AUTH_URL": daemon_config["SURVEIL_OS_AUTH_URL"],
|
||||||
|
"_OS_TENANT_NAME": daemon_config["SURVEIL_OS_TENANT_NAME"],
|
||||||
|
"_OS_USERNAME": daemon_config["SURVEIL_OS_USERNAME"],
|
||||||
|
"_OS_PASSWORD": daemon_config["SURVEIL_OS_PASSWORD"],
|
||||||
|
"_OS_INSTANCE_ID": event['payload']['instance_id']
|
||||||
|
}
|
||||||
|
c.config.hosts.create(
|
||||||
|
host_name=event['payload']['hostname'],
|
||||||
|
address=event['payload']['hostname'],
|
||||||
|
use='linux-openstackceilometer',
|
||||||
|
custom_fields=custom_fields
|
||||||
|
)
|
||||||
|
elif event['event_type'] == 'compute.instance.delete.end':
|
||||||
|
c.config.hosts.delete(event['payload']['hostname'])
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
t = threading.Thread(target=process_event, args=(body,))
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
threads.append(t)
|
||||||
|
compt_thread.append(t)
|
||||||
|
|
||||||
|
channel.basic_consume(callback, queue=daemon_config["QUEUE"], no_ack=True)
|
||||||
|
channel.start_consuming()
|
Loading…
x
Reference in New Issue
Block a user