From fa4448174fbacd446abb9a1b944342f27adc080e Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 7 Nov 2012 10:01:57 -0400 Subject: [PATCH] better logging, stem the worker memory leaks and fix up the start script --- stacktach/views.py | 20 +++++++++------ worker/stacktach.sh | 7 +++--- worker/worker.py | 61 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 24 deletions(-) diff --git a/stacktach/views.py b/stacktach/views.py index b8031d2..e4684e7 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -1,7 +1,8 @@ # Copyright 2012 - Dark Secret Software Inc. -from django.shortcuts import render_to_response +from django import db from django import http +from django.shortcuts import render_to_response from django import template from stacktach import models @@ -30,7 +31,6 @@ def _monitor_message(routing_key, body): host = ".".join(parts[1:]) else: host = None - #logging.error("publisher=%s, host=%s" % (publisher, host)) payload = body['payload'] request_spec = payload.get('request_spec', None) @@ -87,7 +87,8 @@ def aggregate(raw): # While we hope only one lifecycle ever exists it's quite # likely we get multiple due to the workers and threads. lifecycle = None - lifecycles = models.Lifecycle.objects.filter(instance=raw.instance) + lifecycles = models.Lifecycle.objects.select_related().\ + filter(instance=raw.instance) if len(lifecycles) > 0: lifecycle = lifecycles[0] if not lifecycle: @@ -115,7 +116,8 @@ def aggregate(raw): # *shouldn't* happen). start = step == 'start' timing = None - timings = models.Timing.objects.filter(name=name, lifecycle=lifecycle) + timings = models.Timing.objects.select_related().\ + filter(name=name, lifecycle=lifecycle) if not start: for t in timings: try: @@ -154,18 +156,21 @@ def aggregate(raw): def process_raw_data(deployment, args, json_args): """This is called directly by the worker to add the event to the db.""" + db.reset_queries() + routing_key, body = args + record = None handler = HANDLERS.get(routing_key, None) if handler: values = handler(routing_key, body) if not values: - return {} + return record values['deployment'] = deployment try: when = body['timestamp'] except KeyError: - when = body['_context_timestamp'] # Old way of doing it + when = body['_context_timestamp'] # Old way of doing it try: try: when = datetime.datetime.strptime(when, "%Y-%m-%d %H:%M:%S.%f") @@ -181,8 +186,7 @@ def process_raw_data(deployment, args, json_args): record.save() aggregate(record) - return record - return None + return record def _post_process_raw_data(rows, highlight=None): diff --git a/worker/stacktach.sh b/worker/stacktach.sh index 3fdf080..be51937 100755 --- a/worker/stacktach.sh +++ b/worker/stacktach.sh @@ -1,16 +1,15 @@ #!/bin/bash -WORKDIR=/srv/www/stacktach/django/stproject +WORKDIR=/srv/www/stacktach/django/stproject/ DAEMON=/usr/bin/python -ARGS=$WORKDIR/start_workers.py +ARGS=$WORKDIR/worker/start_workers.py PIDFILE=/var/run/stacktach.pid -export DJANGO_SETTINGS_MODULE=settings - case "$1" in start) echo "Starting server" cd $WORKDIR + source etc/stacktach_config.sh /sbin/start-stop-daemon --start --pidfile $PIDFILE --make-pidfile -b --exec $DAEMON $ARGS ;; stop) diff --git a/worker/worker.py b/worker/worker.py index 94d7d78..dc4becc 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -16,6 +16,7 @@ # This is the worker you run in your OpenStack environment. You need # to set TENANT_ID and URL to point to your StackTach web server. +import datetime import json import kombu import kombu.connection @@ -24,6 +25,8 @@ import kombu.mixins import logging import time +from pympler.process import ProcessMemoryInfo + from stacktach import models, views from stacktach import datetime_to_decimal as dt @@ -40,12 +43,15 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): self.connection = connection self.deployment = deployment self.name = name + self.last_time = None + self.pmi = None + self.processed = 0 + self.total_processed = 0 def get_consumers(self, Consumer, channel): durable = self.deployment_config.get('durable_queue', True) nova_exchange = kombu.entity.Exchange("nova", type="topic", - exclusive=False, durable=durable, auto_delete=False) - + exclusive=False, durable=durable, auto_delete=False) nova_queues = [ kombu.Queue("monitor.info", nova_exchange, durable=durable, @@ -63,20 +69,49 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): payload = (routing_key, body) jvalues = json.dumps(payload) - args = (routing_key, json.loads(message.body)) + body = str(message.body) + args = (routing_key, json.loads(body)) asJson = json.dumps(args) raw = views.process_raw_data(self.deployment, args, asJson) - if not raw: - LOG.debug("No record from %s", routing_key) - else: - LOG.debug("Recorded rec# %d from %s/%s at %s (%.6f)" % - (raw.id, self.name, routing_key, - str(dt.dt_from_decimal(raw.when)), - float(raw.when))) + if raw: + self.processed += 1 + + self._check_memory() + + def _check_memory(self): + if not self.pmi: + self.pmi = ProcessMemoryInfo() + self.last_vsz = self.pmi.vsz + self.initial_vsz = self.pmi.vsz + + utc = datetime.datetime.utcnow() + check = self.last_time is None + if self.last_time: + diff = utc - self.last_time + if diff.seconds > 30: + check = True + if check: + self.last_time = utc + self.pmi.update() + diff = (self.pmi.vsz - self.last_vsz) / 1000 + idiff = (self.pmi.vsz - self.initial_vsz) / 1000 + self.total_processed += self.processed + per_message = 0 + if self.total_processed: + per_message = idiff / self.total_processed + LOG.debug("%20s %6dk/%6dk ram, " + "%3d/%4d msgs @ %6dk/msg" % + (self.name, diff, idiff, self.processed, + self.total_processed, per_message)) + self.last_vsz = self.pmi.vsz + self.processed = 0 def on_nova(self, body, message): - self._process(body, message) + try: + self._process(body, message) + except Exception, e: + LOG.exception("Problem %s" % e) message.ack() @@ -97,9 +132,11 @@ def run(deployment_config): port=port, userid=user_id, password=password, + transport="librabbitmq", virtual_host=virtual_host) while True: + LOG.debug("Processing on '%s'" % name) with kombu.connection.BrokerConnection(**params) as conn: try: consumer = NovaConsumer(name, conn, deployment) @@ -108,4 +145,4 @@ def run(deployment_config): LOG.exception("name=%s, exception=%s. Reconnecting in 5s" % (name, e)) time.sleep(5) - + LOG.debug("Completed processing on '%s'" % name)