lean and mean
This commit is contained in:
parent
5a504b2f61
commit
2b2dbb3928
137
worker.py
137
worker.py
@ -1,13 +1,15 @@
|
||||
# Copyright 2012 - Dark Secret Software Inc.
|
||||
|
||||
import json
|
||||
import kombu
|
||||
import kombu.connection
|
||||
import kombu.entity
|
||||
import kombu.mixins
|
||||
import pprint
|
||||
import readline
|
||||
import rlcompleter
|
||||
import threading
|
||||
import time
|
||||
import urllib
|
||||
import urllib2
|
||||
|
||||
url = 'http://darksecretsoftware.com/stacktach/data/'
|
||||
|
||||
# For now we'll just grab all the fanout messages from compute to scheduler ...
|
||||
scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
|
||||
@ -40,113 +42,48 @@ RABBIT_VIRTUAL_HOST = "/"
|
||||
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
self.hosts = {}
|
||||
self.timeline = []
|
||||
self.instances = {}
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def get_status(self):
|
||||
with self.lock:
|
||||
return "%s/%s" % (len(self.hosts), len(self.instances))
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=scheduler_queues, callbacks=[self.on_scheduler]),
|
||||
Consumer(queues=nova_queues, callbacks=[self.on_nova])]
|
||||
|
||||
def _process(self, body, message, host=None, instance_id=None):
|
||||
routing_key = message.delivery_info['routing_key']
|
||||
def _process(self, body, message):
|
||||
routing_key = message.delivery_info['routing_key']
|
||||
payload = (routing_key, body)
|
||||
jvalues = json.dumps(payload)
|
||||
|
||||
payload = (routing_key, body)
|
||||
|
||||
self.timeline.append(payload)
|
||||
|
||||
if host:
|
||||
host_rec = self.hosts.get(host, [])
|
||||
host_rec.append(payload)
|
||||
self.hosts[host] = host_rec
|
||||
|
||||
if instance_id:
|
||||
instance_rec = self.instances.get(instance_id, [])
|
||||
instance_rec.append(payload)
|
||||
self.instances[instance_id]=instance_rec
|
||||
|
||||
def get_host(self, host):
|
||||
# There is still a lock issue here since we're returning
|
||||
# a list which could get modified in this thread.
|
||||
with self.lock:
|
||||
return self.hosts.get(host)
|
||||
|
||||
def get_instance(self, instance):
|
||||
# See warning above.
|
||||
with self.lock:
|
||||
return self.instances.get(instance)
|
||||
try:
|
||||
raw_data = dict(args=jvalues)
|
||||
cooked_data = urllib.urlencode(raw_data)
|
||||
req = urllib2.Request(url, cooked_data)
|
||||
response = urllib2.urlopen(req)
|
||||
page = response.read()
|
||||
print "sent", page
|
||||
except urllib2.HTTPError, e:
|
||||
print e
|
||||
page = e.read()
|
||||
print page
|
||||
raise
|
||||
|
||||
def on_scheduler(self, body, message):
|
||||
with self.lock:
|
||||
method = body['method']
|
||||
args = body['args']
|
||||
if 'host' not in args:
|
||||
print "SCHEDULER? ", body
|
||||
else:
|
||||
host = args['host']
|
||||
print "on_scheduler(%s)" % routing_key, method, host
|
||||
message.ack()
|
||||
self._process(body, message)
|
||||
message.ack()
|
||||
|
||||
def on_nova(self, body, message):
|
||||
with self.lock:
|
||||
# print "on_nova(%s)" % routing_key,
|
||||
event_type = body['event_type']
|
||||
publisher = body['publisher_id']
|
||||
host = publisher.split('.')[1]
|
||||
payload = body['payload']
|
||||
instance_id = payload.get('instance_id',
|
||||
payload.get('instance_uuid', None))
|
||||
state = payload.get('state', '<unknown state>')
|
||||
print event_type, publisher, state, instance_id
|
||||
message.ack()
|
||||
|
||||
self._process(body, message, host=host, instance_id=instance_id)
|
||||
|
||||
|
||||
class Monitor(threading.Thread):
|
||||
def run(self):
|
||||
params = dict(hostname=RABBIT_HOST,
|
||||
port=RABBIT_PORT,
|
||||
userid=RABBIT_USERID,
|
||||
password=RABBIT_PASSWORD,
|
||||
virtual_host=RABBIT_VIRTUAL_HOST)
|
||||
|
||||
with kombu.connection.BrokerConnection(**params) as conn:
|
||||
self.consumer = SchedulerFanoutConsumer(conn)
|
||||
try:
|
||||
self.consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
print("bye bye")
|
||||
self._process(body, message)
|
||||
message.ack()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
monitor = Monitor()
|
||||
monitor.start()
|
||||
time.sleep(5) #hack
|
||||
consumer = monitor.consumer
|
||||
pp = pprint.PrettyPrinter(indent=2)
|
||||
params = dict(hostname=RABBIT_HOST,
|
||||
port=RABBIT_PORT,
|
||||
userid=RABBIT_USERID,
|
||||
password=RABBIT_PASSWORD,
|
||||
virtual_host=RABBIT_VIRTUAL_HOST)
|
||||
|
||||
readline.parse_and_bind('tab: complete')
|
||||
more = True
|
||||
while more:
|
||||
print "%s>" % consumer.get_status(),
|
||||
line = raw_input()
|
||||
parts = line.split(' ')
|
||||
|
||||
if line == 'quit':
|
||||
more = False
|
||||
|
||||
result = None
|
||||
if parts[0]=='i':
|
||||
result = consumer.get_instance(parts[1])
|
||||
|
||||
if parts[0]=='h':
|
||||
result = consumer.get_host(parts[1])
|
||||
|
||||
if result:
|
||||
pp.pprint(result)
|
||||
with kombu.connection.BrokerConnection(**params) as conn:
|
||||
consumer = SchedulerFanoutConsumer(conn)
|
||||
try:
|
||||
consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
print("bye bye")
|
||||
|
Loading…
x
Reference in New Issue
Block a user