Merging in newer version of worker code and small changes for the django app

This commit is contained in:
Matthew Sherborne 2012-09-06 05:20:39 +10:00
parent fcd9a0172d
commit 5ca2df150c
14 changed files with 230 additions and 18 deletions

65
README
View File

@ -1,3 +1,66 @@
StackTach is a debugging tool for OpenStack Nova.
It takes events from AMQP and sends them to the StackTach server for web display.
--------------------
It takes events from AMQP and inserts them in a database for the StackTach django project server for web display.
stacktach workers consume the monitor.info and monitor.error rabbit messape queues.
It's important that they keep running otherwise the queues fill up and slow down the
whole nova environment.
--------------------
There's a django app in the 'stacktach' directory that lets you see what's
going on in your nova install, then there's two flavors of worker processes
that collect the amqp messages for displaying in the stacktach django apps.
The app was originally designed as a service so that different customers could
monitor their nova deployments, so it has 'tenants'.
After installing the django app like most django apps, if you navigate to it,
it'll prompt you to make a new tenant. It'll be given the number 1. You can
view it in the browser by appending /1 onto the end of the url.
That tenant ID then needs to be passed to one of the worker setups.
--------------------
The original worker setup uses the single script 'worker.py'. It depends on
the 'kombu' lib to talk to your amqp server and uses eventlet green threads,
one for each nova deploy.
--------------------
The newer version consists of the scripts:
* start_workers.py - Starts up sub-processes of worker_new.py's
* worker_conf.py - Holds the configs for your nova deploys
* worker_new.py - The actual worker code
This version was written to address stability issues in the original.
It depends on 'amqplib' for talking to the amqp broker.
It uses subprocess instead of threading to fire up the workers, one sub-proc
for each nova deploy.
The newer version seems to have a memory leak, so needs restarting
occasionally, but seems to be overall more stabler than the original.
--------------------
Before use, put your nova install detail into 'worker_conf.py'
If you need to start them manually:
sudo -i
cd /path/to/stacktach/install
export DJANGO_SETTINGS_MODULE=stacktach.settings
python start_workers.py
The start_workers.py imports the list of deployments to consume from worker_conf.py,
then it starts a sub process of worker.py for each deploy.
Nice way to see the logs in a screen session:
ls *.log | grep "global\|cell" | xargs watch --differences tail -n2

View File

@ -10,12 +10,10 @@ from stacktach import models
import datetime
import json
import logging
import pprint
import random
import sys
logger = logging.getLogger(__name__)
VERSION = 4
@ -33,7 +31,11 @@ def _monitor_message(routing_key, body):
publisher = body['publisher_id']
parts = publisher.split('.')
service = parts[0]
host = parts[1]
if len(parts) > 1:
host = ".".join(parts[1:])
else:
host = None
#logging.error("publisher=%s, host=%s" % (publisher, host))
payload = body['payload']
request_spec = payload.get('request_spec', None)
instance = payload.get('instance_id', None)
@ -80,9 +82,15 @@ def _parse(tenant, args, json_args):
return {}
values['tenant'] = tenant
when = body['_context_timestamp']
try:
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
when = body['timestamp']
except KeyError:
when = body['_context_timestamp'] # Old way of doing it
try:
try:
when = datetime.datetime.strptime(when, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f") # Old way of doing it
values['microseconds'] = when.microsecond
except Exception, e:
pass
@ -169,21 +177,21 @@ def _default_context(state):
context = dict(utc=datetime.datetime.utcnow(), state=state)
return context
def welcome(request):
state = _reset_state(request)
return render_to_response('stacktach/welcome.html', _default_context(state))
return render_to_response('welcome.html', _default_context(state))
@tenant_check
def home(request, tenant_id):
state = _get_state(request, tenant_id)
return render_to_response('stacktach/index.html', _default_context(state))
return render_to_response('index.html', _default_context(state))
def logout(request):
del request.session['state']
return render_to_response('stacktach/welcome.html', _default_context(None))
return render_to_response('welcome.html', _default_context(None))
@csrf_protect
@ -200,7 +208,7 @@ def new_tenant(request):
else:
form = models.TenantForm()
context['form'] = form
return render_to_response('stacktach/new_tenant.html', context,
return render_to_response('new_tenant.html', context,
context_instance=template.RequestContext(request))
@ -212,7 +220,7 @@ def data(request, tenant_id):
c = _default_context(state)
fields = _parse(state.tenant, args, raw_args)
c['cooked_args'] = fields
return render_to_response('stacktach/data.html', c)
return render_to_response('data.html', c)
@tenant_check
@ -229,13 +237,13 @@ def details(request, tenant_id, column, row_id):
from_time = value - datetime.timedelta(minutes=1)
to_time = value + datetime.timedelta(minutes=1)
rows = rows.filter(when__range=(from_time, to_time))
rows = rows.order_by('-when', '-microseconds')[:200]
_post_process_raw_data(rows, state, highlight=row_id)
c['rows'] = rows
c['allow_expansion'] = True
c['show_absolute_time'] = True
return render_to_response('stacktach/rows.html', c)
return render_to_response('rows.html', c)
@tenant_check
@ -246,7 +254,7 @@ def expand(request, tenant_id, row_id):
payload = json.loads(row.json)
pp = pprint.PrettyPrinter()
c['payload'] = pp.pformat(payload)
return render_to_response('stacktach/expand.html', c)
return render_to_response('expand.html', c)
@tenant_check
@ -257,7 +265,7 @@ def host_status(request, tenant_id):
order_by('-when', '-microseconds')[:20]
_post_process_raw_data(hosts, state)
c['rows'] = hosts
return render_to_response('stacktach/host_status.html', c)
return render_to_response('host_status.html', c)
@tenant_check
@ -270,9 +278,9 @@ def search(request, tenant_id):
if column != None and value != None:
rows = models.RawData.objects.filter(tenant=tenant_id).\
filter(**{column:value}).\
order_by('-when', '-microseconds')
order_by('-when', '-microseconds')[:22]
_post_process_raw_data(rows, state)
c['rows'] = rows
c['allow_expansion'] = True
c['show_absolute_time'] = True
return render_to_response('stacktach/rows.html', c)
return render_to_response('rows.html', c)

30
start_workers.py Normal file
View File

@ -0,0 +1,30 @@
# Copyright 2012 Openstack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from multiprocessing import Process
from worker import run
from worker_conf import DEPLOYMENTS
if __name__ == '__main__':
processes = []
for deployment in DEPLOYMENTS:
if deployment.get('enabled', True):
process = Process(target=run, args=(deployment,))
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()

29
worker_conf.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright 2012 Openstack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This is a sample conf file. Use it as a guide to make your own
DEPLOYMENTS = [
# My fun conf
dict(
tenant_id=1, # This is the stacktach tenant, not an openstack tenant
name="my-fun-nova-deploy",
url='http://stacktach.my-fun-nova-deploy.com', # The url for the base of the django app
rabbit_host="1.2.3.4", # ip/host name of the amqp server to listen to
rabbit_port=5672,
rabbit_userid="amqp-user-1",
rabbit_password="some secret password",
rabbit_virtual_host="amqp-vhost"),
]

82
worker_new.py Executable file
View File

@ -0,0 +1,82 @@
# Copyright 2012 Openstack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import amqplib.client_0_8 as amqp
import json
import socket
import time
from stacktach import models, views
class NovaConsumer(object):
def __init__(self, channel, tenant_id, logger):
self.channel = channel
self.tenant = models.Tenant.objects.get(tenant_id=tenant_id)
self.logger = logger
channel.basic_consume('monitor.info', callback=self.onMessage)
channel.basic_consume('monitor.error', callback=self.onMessage)
def onMessage(self, message):
routing_key = message.delivery_info['routing_key']
args = (routing_key, json.loads(message.body))
asJson = json.dumps(args)
#from pprint import pformat
#self.logger.debug("Saving %s", pformat(args))
views._parse(self.tenant, args, asJson)
self.logger.debug("Recorded %s ", routing_key)
self.channel.basic_ack(message.delivery_tag)
def run(deployment, logger):
tenant_id = deployment.get('tenant_id', 1)
host = deployment.get('rabbit_host', 'localhost')
port = deployment.get('rabbit_port', 5672)
user_id = deployment.get('rabbit_userid', 'rabbit')
password = deployment.get('rabbit_password', 'rabbit')
virtual_host = deployment.get('rabbit_virtual_host', '/')
logger.info("Rabbit: %s %s %s %s" %
(host, port, user_id, virtual_host))
while 1:
try:
conn = amqp.Connection(host, userid=user_id, password=password, virtual_host=virtual_host)
ch = conn.channel()
ch.access_request(virtual_host, active=True, read=True)
ch.exchange_declare('nova', type='topic', durable=True, auto_delete=False)
ch.queue_declare('monitor.info', durable=True, auto_delete=False, exclusive=False)
ch.queue_declare('monitor.error', durable=True, auto_delete=False, exclusive=False)
ch.queue_bind('monitor.info', 'nova')
ch.queue_bind('monitor.error', 'nova')
consumer = NovaConsumer(ch, tenant_id, logger)
#
# Loop as long as the channel has callbacks registered
#
while ch.callbacks:
ch.wait()
break
except socket.error, e:
logger.warn("Socket error: %s" % e)
time.sleep(5)
continue
ch.close()
conn.close()
logger.info("Finished")