hope I got it all

This commit is contained in:
Sandy Walsh 2012-05-01 09:44:58 -05:00
parent add1f6c99d
commit 6df7a45959
3 changed files with 84 additions and 33 deletions

View File

@ -47,6 +47,18 @@ class RawData(models.Model):
blank=True, db_index=True)
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
value = models.FloatField(null=True, blank=True, db_index=True)
units = models.CharField(max_length=10, null=True,
blank=True, db_index=True)
# Grouping ID is a number assigned and meant to fence-post
# a block of time. <set group id = 1> <do stuff> <set group id = 2> ...
# Later there will be REST call for setting this.
grouping_id = models.IntegerField(default=0, db_index=True)
# Nested calls can be grouped by a common transaction ID if you like.
# A calls B calls C calls D. These can all be linked with a common
# transaction ID if you like.
transaction_id = models.IntegerField(default=0, db_index=True)
class TenantForm(forms.ModelForm):

View File

@ -36,8 +36,8 @@ def _monitor_message(routing_key, body):
host = parts[1]
payload = body['payload']
request_spec = payload.get('request_spec', None)
instance = None
instance = payload.get('instance_id', instance)
instance = payload.get('instance_id', None)
instance = payload.get('instance_uuid', instance)
nova_tenant = body.get('_context_project_id', None)
nova_tenant = payload.get('tenant_id', nova_tenant)
return dict(host=host, instance=instance, publisher=publisher,
@ -56,9 +56,18 @@ def _compute_update_message(routing_key, body):
service=service, event=event, nova_tenant=nova_tenant)
def _tach_message(routing_key, body):
event = body['event_type']
value = body['value']
units = body['units']
transaction_id = body['transaction_id']
return dict(event=event, value=value, units=units, transaction_id=transaction_id)
# routing_key : handler
HANDLERS = {'monitor.info':_monitor_message,
'monitor.error':_monitor_message,
'tach':_tach_message,
'':_compute_update_message}
@ -72,9 +81,12 @@ def _parse(tenant, args, json_args):
values['tenant'] = tenant
when = body['_context_timestamp']
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
try:
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
values['microseconds'] = when.microsecond
except Exception, e:
pass
values['when'] = when
values['microseconds'] = when.microsecond
values['routing_key'] = routing_key
values['json'] = json_args
record = models.RawData(**values)
@ -111,6 +123,7 @@ class State(object):
tenant = "'%s' - %s (%d)" % (self.tenant.project_name,
self.tenant.email, self.tenant.id)
return "[Version %s, Tenant %s]" % (self.version, tenant)
return "[Version %s, Tenant %s]" % (self.version, tenant)
def _reset_state(request):
@ -257,7 +270,7 @@ def search(request, tenant_id):
if column != None and value != None:
rows = models.RawData.objects.filter(tenant=tenant_id).\
filter(**{column:value}).\
order_by('-when', '-microseconds')[:200]
order_by('-when', '-microseconds')
_post_process_raw_data(rows, state)
c['rows'] = rows
c['allow_expansion'] = True

View File

@ -22,10 +22,20 @@ import kombu
import kombu.connection
import kombu.entity
import kombu.mixins
import logging
import threading
import time
import urllib
import urllib2
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
handler = logging.handlers.TimedRotatingFileHandler('worker.log',
when='h', interval=6, backupCount=4)
LOG.addHandler(handler)
# CHANGE THESE FOR YOUR INSTALLATION ...
DEPLOYMENTS = [
dict(
@ -43,37 +53,39 @@ try:
except ImportError:
pass
# For now we'll just grab all the fanout messages from compute to scheduler ...
scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
durable=False, auto_delete=True,
exclusive=True)
scheduler_queues = [
# For now we'll just grab all the fanout messages from compute to scheduler ...
#scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
# durable=False, auto_delete=True,
# exclusive=True)
#
#scheduler_queues = [
# The Queue name has to be unique or we we'll end up with Round Robin
# behavior from Rabbit, even though it's a Fanout queue. In Nova the
# queues have UUID's tacked on the end.
kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
auto_delete=False),
]
# kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
# auto_delete=True),
# ]
nova_exchange = kombu.entity.Exchange("nova", type="topic",
durable=True, auto_delete=False,
exclusive=False)
nova_exchange = kombu.entity.Exchange("nova", type="topic", exclusive=False,
durable=True, auto_delete=False)
nova_queues = [
kombu.Queue("monitor", nova_exchange, durable=False, auto_delete=False,
exclusive=False, routing_key='monitor.*'),
kombu.Queue("monitor.info", nova_exchange, durable=True, auto_delete=False,
exclusive=False, routing_key='monitor.info'),
kombu.Queue("monitor.error", nova_exchange, durable=True, auto_delete=False,
exclusive=False, routing_key='monitor.error'),
]
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
class NovaConsumer(kombu.mixins.ConsumerMixin):
def __init__(self, connection, url):
self.connection = connection
self.url = url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=scheduler_queues,
callbacks=[self.on_scheduler]),
return [#Consumer(queues=scheduler_queues,
# callbacks=[self.on_scheduler]),
Consumer(queues=nova_queues, callbacks=[self.on_nova])]
def _process(self, body, message):
@ -86,20 +98,22 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
cooked_data = urllib.urlencode(raw_data)
req = urllib2.Request(self.url, cooked_data)
response = urllib2.urlopen(req)
page = response.read()
print page
LOG.debug("Sent %s to %s", routing_key, self.url)
#page = response.read()
#print page
except urllib2.HTTPError, e:
if e.code == 401:
print "Unauthorized. Correct URL?", self.url
print e
LOG.debug("Unauthorized. Correct URL? %s" % self.url)
LOG.exception(e)
page = e.read()
print page
LOG.debug(page)
raise
def on_scheduler(self, body, message):
# Uncomment if you want periodic compute node status updates.
#self._process(body, message)
message.ack()
LOG.debug("SCHEDULER ACKED")
def on_nova(self, body, message):
self._process(body, message)
@ -121,8 +135,9 @@ class Monitor(threading.Thread):
password = self.deployment.get('rabbit_password', 'rabbit')
virtual_host = self.deployment.get('rabbit_virtual_host', '/')
print "StackTach", url
print "Rabbit:", host, port, user_id, virtual_host
LOG.info("StackTach %s" % url)
LOG.info("Rabbit: %s %s %s %s" %
(host, port, user_id, virtual_host))
params = dict(hostname=host,
port=port,
@ -130,17 +145,28 @@ class Monitor(threading.Thread):
password=password,
virtual_host=virtual_host)
with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn, url)
consumer.run()
while True:
with kombu.connection.BrokerConnection(**params) as conn:
try:
consumer = NovaConsumer(conn, url)
consumer.run()
except Exception as e:
LOG.exception("url=%s, exception=%s. Reconnecting in 5s" % (url, e))
time.sleep(5)
with daemon.DaemonContext():
with daemon.DaemonContext(files_preserve=[handler.stream]):
workers = []
for deployment in DEPLOYMENTS:
LOG.info("Starting deployment: %s", deployment)
monitor = Monitor(deployment)
workers.append(monitor)
monitor.start()
try:
monitor.start()
except Exception as e:
LOG.exception("Deployment: %s, Exception: %s" % (deployment, e))
for worker in workers:
LOG.info("Attempting to join to %s" % worker.deployment)
worker.join()
LOG.info("Joined to %s" % worker.deployment)