Merge pull request #5 from SandyWalsh/attempt

Capture latest staging changes
This commit is contained in:
Sandy Walsh 2012-05-01 08:23:12 -07:00
commit 804ed80e62
7 changed files with 127 additions and 67 deletions

View File

@ -20,6 +20,8 @@ from django.db import models
class Tenant(models.Model):
email = models.CharField(max_length=50)
project_name = models.CharField(max_length=50)
nova_stats_template = models.CharField(max_length=200)
loggly_template = models.CharField(max_length=200)
tenant_id = models.AutoField(primary_key=True, unique=True)
@ -35,19 +37,31 @@ class RawData(models.Model):
blank=True, db_index=True)
when = models.DateTimeField(db_index=True)
microseconds = models.IntegerField(default=0)
publisher = models.CharField(max_length=50, null=True,
publisher = models.CharField(max_length=100, null=True,
blank=True, db_index=True)
event = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
service = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
host = models.CharField(max_length=50, null=True,
host = models.CharField(max_length=100, null=True,
blank=True, db_index=True)
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
value = models.FloatField(null=True, blank=True, db_index=True)
units = models.CharField(max_length=10, null=True,
blank=True, db_index=True)
# Grouping ID is a number assigned and meant to fence-post
# a block of time. <set group id = 1> <do stuff> <set group id = 2> ...
# Later there will be REST call for setting this.
grouping_id = models.IntegerField(default=0, db_index=True)
# Nested calls can be grouped by a common transaction ID if you like.
# A calls B calls C calls D. These can all be linked with a common
# transaction ID if you like.
transaction_id = models.IntegerField(default=0, db_index=True)
class TenantForm(forms.ModelForm):
class Meta:
model = Tenant
fields = ('email', 'project_name')
fields = ('email', 'project_name', 'nova_stats_template', 'loggly_template')

View File

@ -9,6 +9,8 @@ urlpatterns = patterns('',
name='data'),
url(r'^(?P<tenant_id>\d+)/details/(?P<column>\w+)/(?P<row_id>\d+)/$',
'stacktach.views.details', name='details'),
url(r'^(?P<tenant_id>\d+)/search/$',
'stacktach.views.search', name='search'),
url(r'^(?P<tenant_id>\d+)/expand/(?P<row_id>\d+)/$',
'stacktach.views.expand', name='expand'),
url(r'^(?P<tenant_id>\d+)/host_status/$',

View File

@ -36,8 +36,8 @@ def _monitor_message(routing_key, body):
host = parts[1]
payload = body['payload']
request_spec = payload.get('request_spec', None)
instance = None
instance = payload.get('instance_id', instance)
instance = payload.get('instance_id', None)
instance = payload.get('instance_uuid', instance)
nova_tenant = body.get('_context_project_id', None)
nova_tenant = payload.get('tenant_id', nova_tenant)
return dict(host=host, instance=instance, publisher=publisher,
@ -56,9 +56,18 @@ def _compute_update_message(routing_key, body):
service=service, event=event, nova_tenant=nova_tenant)
def _tach_message(routing_key, body):
event = body['event_type']
value = body['value']
units = body['units']
transaction_id = body['transaction_id']
return dict(event=event, value=value, units=units, transaction_id=transaction_id)
# routing_key : handler
HANDLERS = {'monitor.info':_monitor_message,
'monitor.error':_monitor_message,
'tach':_tach_message,
'':_compute_update_message}
@ -72,9 +81,12 @@ def _parse(tenant, args, json_args):
values['tenant'] = tenant
when = body['_context_timestamp']
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
try:
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
values['microseconds'] = when.microsecond
except Exception, e:
pass
values['when'] = when
values['microseconds'] = when.microsecond
values['routing_key'] = routing_key
values['json'] = json_args
record = models.RawData(**values)
@ -83,13 +95,22 @@ def _parse(tenant, args, json_args):
return {}
def _post_process_raw_data(rows, highlight=None):
def _post_process_raw_data(rows, state, highlight=None):
for row in rows:
if "error" in row.routing_key:
row.is_error = True
if highlight and row.id == int(highlight):
row.highlight = True
row.when += datetime.timedelta(microseconds=row.microseconds)
novastats = state.tenant.nova_stats_template
if novastats and row.instance:
novastats = novastats.replace("[instance]", row.instance)
row.novastats = novastats
loggly = state.tenant.loggly_template
if loggly and row.instance:
loggly = loggly.replace("[instance]", row.instance)
row.loggly = loggly
class State(object):
def __init__(self):
@ -102,6 +123,7 @@ class State(object):
tenant = "'%s' - %s (%d)" % (self.tenant.project_name,
self.tenant.email, self.tenant.id)
return "[Version %s, Tenant %s]" % (self.version, tenant)
return "[Version %s, Tenant %s]" % (self.version, tenant)
def _reset_state(request):
@ -203,12 +225,13 @@ def details(request, tenant_id, column, row_id):
if column != 'when':
rows = rows.filter(**{column:value})
else:
value += datetime.timedelta(microseconds=row.microseconds)
from_time = value - datetime.timedelta(minutes=1)
to_time = value + datetime.timedelta(minutes=1)
rows = rows.filter(when__range=(from_time, to_time))
rows = rows.order_by('-when', '-microseconds')[:200]
_post_process_raw_data(rows, highlight=row_id)
_post_process_raw_data(rows, state, highlight=row_id)
c['rows'] = rows
c['allow_expansion'] = True
c['show_absolute_time'] = True
@ -231,21 +254,25 @@ def host_status(request, tenant_id):
state = _get_state(request, tenant_id)
c = _default_context(state)
hosts = models.RawData.objects.filter(tenant=tenant_id).\
filter(host__gt='').\
order_by('-when', '-microseconds')[:20]
_post_process_raw_data(hosts)
_post_process_raw_data(hosts, state)
c['rows'] = hosts
return render_to_response('host_status.html', c)
@tenant_check
def instance_status(request, tenant_id):
def search(request, tenant_id):
state = _get_state(request, tenant_id)
c = _default_context(state)
instances = models.RawData.objects.filter(tenant=tenant_id).\
exclude(instance='n/a').\
exclude(instance__isnull=True).\
order_by('-when', '-microseconds')[:20]
_post_process_raw_data(instances)
c['rows'] = instances
return render_to_response('instance_status.html', c)
column = request.POST.get('field', None)
value = request.POST.get('value', None)
rows = None
if column != None and value != None:
rows = models.RawData.objects.filter(tenant=tenant_id).\
filter(**{column:value}).\
order_by('-when', '-microseconds')
_post_process_raw_data(rows, state)
c['rows'] = rows
c['allow_expansion'] = True
c['show_absolute_time'] = True
return render_to_response('rows.html', c)

View File

@ -28,8 +28,8 @@
{% endblock %}
{% block body %}
<div style='float:right;'>{{state.tenant.email}} (TID:{{state.tenant.tenant_id}}) - {{state.tenant.project_name}} <a href='/logout'>logout</a></div>
<div class='status-title'>Recent Activity</div>
<div style='float:right;'><a href='/logout'>logout</a></div>
<div class='status-title'>Recent Activity - {{state.tenant.project_name}} (TID:{{state.tenant.tenant_id}})</div>
<div id='host-box' class='status std-height'>
<div id='host_activity' class='status-inner'>
{% include "host_status.html" %}

View File

@ -34,11 +34,13 @@
<td class='cell-border'><a href='#' onclick='details({{state.tenant.tenant_id}}, "host", {{row.id}});'>{{row.host}}</a></td>
<td class='cell-border'><b><a href='#' onclick='details({{state.tenant.tenant_id}}, "event", {{row.id}});'>{{row.event}}</a></b></td>
<td class='cell-border'>
<a href='#' onclick='details({{state.tenant.tenant_id}}, "instance", {{row.id}});'>
{% if row.instance %}
<a href='{{row.loggly}}' target="_blank">(L)</a>
<a href='{{row.novastats}}' target="_blank">(S)</a>
<a href='#' onclick='details({{state.tenant.tenant_id}}, "instance", {{row.id}});'>
{{row.instance}}
{% endif %}
</a>
{% endif %}
</td>
<td class='cell-border'><a href='#' onclick='details({{state.tenant.tenant_id}}, "when", {{row.id}});'>{% if show_absolute_time %}{{row.when}} (+{{row.when.microsecond}}){%else%}{{row.when|timesince:utc}} ago{%endif%}</a></td>
</tr>

21
urls.py
View File

@ -1,20 +1,9 @@
from django.conf.urls.defaults import patterns, include, url
# Uncomment the next two lines to enable the admin:
# from django.contrib import admin
# admin.autodiscover()
urlpatterns = patterns('',
url(r'^$', 'stacktach.views.welcome', name='welcome'),
url(r'new_tenant', 'stacktach.views.new_tenant', name='new_tenant'),
url(r'logout', 'stacktach.views.logout', name='logout'),
url(r'^(?P<tenant_id>\d+)/$', 'stacktach.views.home', name='home'),
url(r'^(?P<tenant_id>\d+)/data/$', 'stacktach.views.data',
name='data'),
url(r'^(?P<tenant_id>\d+)/details/(?P<column>\w+)/(?P<row_id>\d+)/$',
'stacktach.views.details', name='details'),
url(r'^(?P<tenant_id>\d+)/search/$',
'stacktach.views.search', name='search'),
url(r'^(?P<tenant_id>\d+)/expand/(?P<row_id>\d+)/$',
'stacktach.views.expand', name='expand'),
url(r'^(?P<tenant_id>\d+)/host_status/$',
'stacktach.views.host_status', name='host_status'),
url(r'^(?P<tenant_id>\d+)/instance_status/$',
'stacktach.views.instance_status', name='instance_status'),
url(r'^', include('stacktach.url')),
)

View File

@ -22,10 +22,20 @@ import kombu
import kombu.connection
import kombu.entity
import kombu.mixins
import logging
import threading
import time
import urllib
import urllib2
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
handler = logging.handlers.TimedRotatingFileHandler('worker.log',
when='h', interval=6, backupCount=4)
LOG.addHandler(handler)
# CHANGE THESE FOR YOUR INSTALLATION ...
DEPLOYMENTS = [
dict(
@ -43,37 +53,39 @@ try:
except ImportError:
pass
# For now we'll just grab all the fanout messages from compute to scheduler ...
scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
durable=False, auto_delete=True,
exclusive=True)
scheduler_queues = [
# For now we'll just grab all the fanout messages from compute to scheduler ...
#scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
# durable=False, auto_delete=True,
# exclusive=True)
#
#scheduler_queues = [
# The Queue name has to be unique or we we'll end up with Round Robin
# behavior from Rabbit, even though it's a Fanout queue. In Nova the
# queues have UUID's tacked on the end.
kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
auto_delete=False),
]
# kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
# auto_delete=True),
# ]
nova_exchange = kombu.entity.Exchange("nova", type="topic",
durable=True, auto_delete=False,
exclusive=False)
nova_exchange = kombu.entity.Exchange("nova", type="topic", exclusive=False,
durable=True, auto_delete=False)
nova_queues = [
kombu.Queue("monitor", nova_exchange, durable=False, auto_delete=False,
exclusive=False, routing_key='monitor.*'),
kombu.Queue("monitor.info", nova_exchange, durable=True, auto_delete=False,
exclusive=False, routing_key='monitor.info'),
kombu.Queue("monitor.error", nova_exchange, durable=True, auto_delete=False,
exclusive=False, routing_key='monitor.error'),
]
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
class NovaConsumer(kombu.mixins.ConsumerMixin):
def __init__(self, connection, url):
self.connection = connection
self.url = url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=scheduler_queues,
callbacks=[self.on_scheduler]),
return [#Consumer(queues=scheduler_queues,
# callbacks=[self.on_scheduler]),
Consumer(queues=nova_queues, callbacks=[self.on_nova])]
def _process(self, body, message):
@ -86,20 +98,22 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
cooked_data = urllib.urlencode(raw_data)
req = urllib2.Request(self.url, cooked_data)
response = urllib2.urlopen(req)
page = response.read()
print page
LOG.debug("Sent %s to %s", routing_key, self.url)
#page = response.read()
#print page
except urllib2.HTTPError, e:
if e.code == 401:
print "Unauthorized. Correct URL?", self.url
print e
LOG.debug("Unauthorized. Correct URL? %s" % self.url)
LOG.exception(e)
page = e.read()
print page
LOG.debug(page)
raise
def on_scheduler(self, body, message):
# Uncomment if you want periodic compute node status updates.
#self._process(body, message)
message.ack()
LOG.debug("SCHEDULER ACKED")
def on_nova(self, body, message):
self._process(body, message)
@ -121,8 +135,9 @@ class Monitor(threading.Thread):
password = self.deployment.get('rabbit_password', 'rabbit')
virtual_host = self.deployment.get('rabbit_virtual_host', '/')
print "StackTach", url
print "Rabbit:", host, port, user_id, virtual_host
LOG.info("StackTach %s" % url)
LOG.info("Rabbit: %s %s %s %s" %
(host, port, user_id, virtual_host))
params = dict(hostname=host,
port=port,
@ -130,17 +145,28 @@ class Monitor(threading.Thread):
password=password,
virtual_host=virtual_host)
with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn, url)
consumer.run()
while True:
with kombu.connection.BrokerConnection(**params) as conn:
try:
consumer = NovaConsumer(conn, url)
consumer.run()
except Exception as e:
LOG.exception("url=%s, exception=%s. Reconnecting in 5s" % (url, e))
time.sleep(5)
with daemon.DaemonContext():
with daemon.DaemonContext(files_preserve=[handler.stream]):
workers = []
for deployment in DEPLOYMENTS:
LOG.info("Starting deployment: %s", deployment)
monitor = Monitor(deployment)
workers.append(monitor)
monitor.start()
try:
monitor.start()
except Exception as e:
LOG.exception("Deployment: %s, Exception: %s" % (deployment, e))
for worker in workers:
LOG.info("Attempting to join to %s" % worker.deployment)
worker.join()
LOG.info("Joined to %s" % worker.deployment)