From 2147038a27c3ff3bb1f948fa9b2d55f20e306c3f Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 29 Feb 2012 07:46:37 -0600 Subject: [PATCH 1/5] novastats and other site templates --- stacktach/models.py | 8 +++++--- stacktach/views.py | 38 ++++++++++++++++++++++++++------------ templates/index.html | 4 ++-- templates/rows.html | 6 ++++-- urls.py | 21 +++++---------------- 5 files changed, 42 insertions(+), 35 deletions(-) diff --git a/stacktach/models.py b/stacktach/models.py index 0e403a7..0959809 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -20,6 +20,8 @@ from django.db import models class Tenant(models.Model): email = models.CharField(max_length=50) project_name = models.CharField(max_length=50) + nova_stats_template = models.CharField(max_length=200) + loggly_template = models.CharField(max_length=200) tenant_id = models.AutoField(primary_key=True, unique=True) @@ -35,13 +37,13 @@ class RawData(models.Model): blank=True, db_index=True) when = models.DateTimeField(db_index=True) microseconds = models.IntegerField(default=0) - publisher = models.CharField(max_length=50, null=True, + publisher = models.CharField(max_length=100, null=True, blank=True, db_index=True) event = models.CharField(max_length=50, null=True, blank=True, db_index=True) service = models.CharField(max_length=50, null=True, blank=True, db_index=True) - host = models.CharField(max_length=50, null=True, + host = models.CharField(max_length=100, null=True, blank=True, db_index=True) instance = models.CharField(max_length=50, null=True, blank=True, db_index=True) @@ -50,4 +52,4 @@ class RawData(models.Model): class TenantForm(forms.ModelForm): class Meta: model = Tenant - fields = ('email', 'project_name') + fields = ('email', 'project_name', 'nova_stats_template', 'loggly_template') diff --git a/stacktach/views.py b/stacktach/views.py index 03ff951..95907cc 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -83,13 +83,22 @@ def _parse(tenant, args, json_args): return {} -def _post_process_raw_data(rows, highlight=None): +def _post_process_raw_data(rows, state, highlight=None): for row in rows: if "error" in row.routing_key: row.is_error = True if highlight and row.id == int(highlight): row.highlight = True row.when += datetime.timedelta(microseconds=row.microseconds) + novastats = state.tenant.nova_stats_template + if novastats and row.instance: + novastats = novastats.replace("[instance]", row.instance) + row.novastats = novastats + loggly = state.tenant.loggly_template + if loggly and row.instance: + loggly = loggly.replace("[instance]", row.instance) + row.loggly = loggly + class State(object): def __init__(self): @@ -203,12 +212,13 @@ def details(request, tenant_id, column, row_id): if column != 'when': rows = rows.filter(**{column:value}) else: + value += datetime.timedelta(microseconds=row.microseconds) from_time = value - datetime.timedelta(minutes=1) to_time = value + datetime.timedelta(minutes=1) rows = rows.filter(when__range=(from_time, to_time)) rows = rows.order_by('-when', '-microseconds')[:200] - _post_process_raw_data(rows, highlight=row_id) + _post_process_raw_data(rows, state, highlight=row_id) c['rows'] = rows c['allow_expansion'] = True c['show_absolute_time'] = True @@ -231,21 +241,25 @@ def host_status(request, tenant_id): state = _get_state(request, tenant_id) c = _default_context(state) hosts = models.RawData.objects.filter(tenant=tenant_id).\ - filter(host__gt='').\ order_by('-when', '-microseconds')[:20] - _post_process_raw_data(hosts) + _post_process_raw_data(hosts, state) c['rows'] = hosts return render_to_response('host_status.html', c) @tenant_check -def instance_status(request, tenant_id): +def search(request, tenant_id): state = _get_state(request, tenant_id) c = _default_context(state) - instances = models.RawData.objects.filter(tenant=tenant_id).\ - exclude(instance='n/a').\ - exclude(instance__isnull=True).\ - order_by('-when', '-microseconds')[:20] - _post_process_raw_data(instances) - c['rows'] = instances - return render_to_response('instance_status.html', c) + column = request.POST.get('field', None) + value = request.POST.get('value', None) + rows = None + if column != None and value != None: + rows = models.RawData.objects.filter(tenant=tenant_id).\ + filter(**{column:value}).\ + order_by('-when', '-microseconds')[:200] + _post_process_raw_data(rows, state) + c['rows'] = rows + c['allow_expansion'] = True + c['show_absolute_time'] = True + return render_to_response('rows.html', c) diff --git a/templates/index.html b/templates/index.html index 497f375..2a9a54c 100644 --- a/templates/index.html +++ b/templates/index.html @@ -28,8 +28,8 @@ {% endblock %} {% block body %} -
{{state.tenant.email}} (TID:{{state.tenant.tenant_id}}) - {{state.tenant.project_name}} logout
-
Recent Activity
+
logout
+
Recent Activity - {{state.tenant.project_name}} (TID:{{state.tenant.tenant_id}})
{% include "host_status.html" %} diff --git a/templates/rows.html b/templates/rows.html index d2570a1..c80abcc 100644 --- a/templates/rows.html +++ b/templates/rows.html @@ -34,11 +34,13 @@ {{row.host}} {{row.event}} - {% if row.instance %} + (L) + (S) + {{row.instance}} - {% endif %} + {% endif %} {% if show_absolute_time %}{{row.when}} (+{{row.when.microsecond}}){%else%}{{row.when|timesince:utc}} ago{%endif%} diff --git a/urls.py b/urls.py index 08885f1..5ae73fa 100644 --- a/urls.py +++ b/urls.py @@ -1,20 +1,9 @@ from django.conf.urls.defaults import patterns, include, url +# Uncomment the next two lines to enable the admin: +# from django.contrib import admin +# admin.autodiscover() + urlpatterns = patterns('', - url(r'^$', 'stacktach.views.welcome', name='welcome'), - url(r'new_tenant', 'stacktach.views.new_tenant', name='new_tenant'), - url(r'logout', 'stacktach.views.logout', name='logout'), - url(r'^(?P\d+)/$', 'stacktach.views.home', name='home'), - url(r'^(?P\d+)/data/$', 'stacktach.views.data', - name='data'), - url(r'^(?P\d+)/details/(?P\w+)/(?P\d+)/$', - 'stacktach.views.details', name='details'), - url(r'^(?P\d+)/search/$', - 'stacktach.views.search', name='search'), - url(r'^(?P\d+)/expand/(?P\d+)/$', - 'stacktach.views.expand', name='expand'), - url(r'^(?P\d+)/host_status/$', - 'stacktach.views.host_status', name='host_status'), - url(r'^(?P\d+)/instance_status/$', - 'stacktach.views.instance_status', name='instance_status'), + url(r'^', include('stacktach.url')), ) From add1f6c99df13f875b523d55c2984fe6aa7916ad Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 29 Feb 2012 07:48:33 -0600 Subject: [PATCH 2/5] missed urls --- stacktach/urls.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stacktach/urls.py b/stacktach/urls.py index 03595eb..08885f1 100644 --- a/stacktach/urls.py +++ b/stacktach/urls.py @@ -9,6 +9,8 @@ urlpatterns = patterns('', name='data'), url(r'^(?P\d+)/details/(?P\w+)/(?P\d+)/$', 'stacktach.views.details', name='details'), + url(r'^(?P\d+)/search/$', + 'stacktach.views.search', name='search'), url(r'^(?P\d+)/expand/(?P\d+)/$', 'stacktach.views.expand', name='expand'), url(r'^(?P\d+)/host_status/$', From 6df7a45959b62296da7a904f8de323141897a807 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Tue, 1 May 2012 09:44:58 -0500 Subject: [PATCH 3/5] hope I got it all --- stacktach/models.py | 12 +++++++ stacktach/views.py | 23 ++++++++++--- worker.py | 82 +++++++++++++++++++++++++++++---------------- 3 files changed, 84 insertions(+), 33 deletions(-) diff --git a/stacktach/models.py b/stacktach/models.py index 0959809..4176566 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -47,6 +47,18 @@ class RawData(models.Model): blank=True, db_index=True) instance = models.CharField(max_length=50, null=True, blank=True, db_index=True) + value = models.FloatField(null=True, blank=True, db_index=True) + units = models.CharField(max_length=10, null=True, + blank=True, db_index=True) + # Grouping ID is a number assigned and meant to fence-post + # a block of time. ... + # Later there will be REST call for setting this. + grouping_id = models.IntegerField(default=0, db_index=True) + + # Nested calls can be grouped by a common transaction ID if you like. + # A calls B calls C calls D. These can all be linked with a common + # transaction ID if you like. + transaction_id = models.IntegerField(default=0, db_index=True) class TenantForm(forms.ModelForm): diff --git a/stacktach/views.py b/stacktach/views.py index 95907cc..255c1ac 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -36,8 +36,8 @@ def _monitor_message(routing_key, body): host = parts[1] payload = body['payload'] request_spec = payload.get('request_spec', None) - instance = None - instance = payload.get('instance_id', instance) + instance = payload.get('instance_id', None) + instance = payload.get('instance_uuid', instance) nova_tenant = body.get('_context_project_id', None) nova_tenant = payload.get('tenant_id', nova_tenant) return dict(host=host, instance=instance, publisher=publisher, @@ -56,9 +56,18 @@ def _compute_update_message(routing_key, body): service=service, event=event, nova_tenant=nova_tenant) +def _tach_message(routing_key, body): + event = body['event_type'] + value = body['value'] + units = body['units'] + transaction_id = body['transaction_id'] + return dict(event=event, value=value, units=units, transaction_id=transaction_id) + + # routing_key : handler HANDLERS = {'monitor.info':_monitor_message, 'monitor.error':_monitor_message, + 'tach':_tach_message, '':_compute_update_message} @@ -72,9 +81,12 @@ def _parse(tenant, args, json_args): values['tenant'] = tenant when = body['_context_timestamp'] - when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f") + try: + when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f") + values['microseconds'] = when.microsecond + except Exception, e: + pass values['when'] = when - values['microseconds'] = when.microsecond values['routing_key'] = routing_key values['json'] = json_args record = models.RawData(**values) @@ -111,6 +123,7 @@ class State(object): tenant = "'%s' - %s (%d)" % (self.tenant.project_name, self.tenant.email, self.tenant.id) return "[Version %s, Tenant %s]" % (self.version, tenant) + return "[Version %s, Tenant %s]" % (self.version, tenant) def _reset_state(request): @@ -257,7 +270,7 @@ def search(request, tenant_id): if column != None and value != None: rows = models.RawData.objects.filter(tenant=tenant_id).\ filter(**{column:value}).\ - order_by('-when', '-microseconds')[:200] + order_by('-when', '-microseconds') _post_process_raw_data(rows, state) c['rows'] = rows c['allow_expansion'] = True diff --git a/worker.py b/worker.py index 93bfd06..5c9d384 100755 --- a/worker.py +++ b/worker.py @@ -22,10 +22,20 @@ import kombu import kombu.connection import kombu.entity import kombu.mixins +import logging import threading +import time import urllib import urllib2 + +LOG = logging.getLogger(__name__) +LOG.setLevel(logging.DEBUG) +handler = logging.handlers.TimedRotatingFileHandler('worker.log', + when='h', interval=6, backupCount=4) +LOG.addHandler(handler) + + # CHANGE THESE FOR YOUR INSTALLATION ... DEPLOYMENTS = [ dict( @@ -43,37 +53,39 @@ try: except ImportError: pass -# For now we'll just grab all the fanout messages from compute to scheduler ... -scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout", - durable=False, auto_delete=True, - exclusive=True) -scheduler_queues = [ +# For now we'll just grab all the fanout messages from compute to scheduler ... +#scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout", +# durable=False, auto_delete=True, +# exclusive=True) +# +#scheduler_queues = [ # The Queue name has to be unique or we we'll end up with Round Robin # behavior from Rabbit, even though it's a Fanout queue. In Nova the # queues have UUID's tacked on the end. - kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False, - auto_delete=False), - ] +# kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False, +# auto_delete=True), +# ] -nova_exchange = kombu.entity.Exchange("nova", type="topic", - durable=True, auto_delete=False, - exclusive=False) +nova_exchange = kombu.entity.Exchange("nova", type="topic", exclusive=False, + durable=True, auto_delete=False) nova_queues = [ - kombu.Queue("monitor", nova_exchange, durable=False, auto_delete=False, - exclusive=False, routing_key='monitor.*'), + kombu.Queue("monitor.info", nova_exchange, durable=True, auto_delete=False, + exclusive=False, routing_key='monitor.info'), + kombu.Queue("monitor.error", nova_exchange, durable=True, auto_delete=False, + exclusive=False, routing_key='monitor.error'), ] -class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin): +class NovaConsumer(kombu.mixins.ConsumerMixin): def __init__(self, connection, url): self.connection = connection self.url = url def get_consumers(self, Consumer, channel): - return [Consumer(queues=scheduler_queues, - callbacks=[self.on_scheduler]), + return [#Consumer(queues=scheduler_queues, + # callbacks=[self.on_scheduler]), Consumer(queues=nova_queues, callbacks=[self.on_nova])] def _process(self, body, message): @@ -86,20 +98,22 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin): cooked_data = urllib.urlencode(raw_data) req = urllib2.Request(self.url, cooked_data) response = urllib2.urlopen(req) - page = response.read() - print page + LOG.debug("Sent %s to %s", routing_key, self.url) + #page = response.read() + #print page except urllib2.HTTPError, e: if e.code == 401: - print "Unauthorized. Correct URL?", self.url - print e + LOG.debug("Unauthorized. Correct URL? %s" % self.url) + LOG.exception(e) page = e.read() - print page + LOG.debug(page) raise def on_scheduler(self, body, message): # Uncomment if you want periodic compute node status updates. #self._process(body, message) message.ack() + LOG.debug("SCHEDULER ACKED") def on_nova(self, body, message): self._process(body, message) @@ -121,8 +135,9 @@ class Monitor(threading.Thread): password = self.deployment.get('rabbit_password', 'rabbit') virtual_host = self.deployment.get('rabbit_virtual_host', '/') - print "StackTach", url - print "Rabbit:", host, port, user_id, virtual_host + LOG.info("StackTach %s" % url) + LOG.info("Rabbit: %s %s %s %s" % + (host, port, user_id, virtual_host)) params = dict(hostname=host, port=port, @@ -130,17 +145,28 @@ class Monitor(threading.Thread): password=password, virtual_host=virtual_host) - with kombu.connection.BrokerConnection(**params) as conn: - consumer = SchedulerFanoutConsumer(conn, url) - consumer.run() + while True: + with kombu.connection.BrokerConnection(**params) as conn: + try: + consumer = NovaConsumer(conn, url) + consumer.run() + except Exception as e: + LOG.exception("url=%s, exception=%s. Reconnecting in 5s" % (url, e)) + time.sleep(5) -with daemon.DaemonContext(): +with daemon.DaemonContext(files_preserve=[handler.stream]): workers = [] for deployment in DEPLOYMENTS: + LOG.info("Starting deployment: %s", deployment) monitor = Monitor(deployment) workers.append(monitor) - monitor.start() + try: + monitor.start() + except Exception as e: + LOG.exception("Deployment: %s, Exception: %s" % (deployment, e)) for worker in workers: + LOG.info("Attempting to join to %s" % worker.deployment) worker.join() + LOG.info("Joined to %s" % worker.deployment) From 8d8b5bd1eb0a1c8248a4dd94103e53317e5c8f89 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Tue, 1 May 2012 09:51:20 -0500 Subject: [PATCH 4/5] temp --- worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/worker.py b/worker.py index 5c9d384..15a4ed8 100755 --- a/worker.py +++ b/worker.py @@ -155,7 +155,8 @@ class Monitor(threading.Thread): time.sleep(5) -with daemon.DaemonContext(files_preserve=[handler.stream]): +#with daemon.DaemonContext(files_preserve=[handler.stream]): +if True: workers = [] for deployment in DEPLOYMENTS: LOG.info("Starting deployment: %s", deployment) From ced4be801990e201cf2b190a115a1dc5bb28d9f5 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Tue, 1 May 2012 10:16:19 -0500 Subject: [PATCH 5/5] reactivate daemon --- worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 15a4ed8..5c9d384 100755 --- a/worker.py +++ b/worker.py @@ -155,8 +155,7 @@ class Monitor(threading.Thread): time.sleep(5) -#with daemon.DaemonContext(files_preserve=[handler.stream]): -if True: +with daemon.DaemonContext(files_preserve=[handler.stream]): workers = [] for deployment in DEPLOYMENTS: LOG.info("Starting deployment: %s", deployment)