Updating migrations to work with optimized usage parsing
This commit is contained in:
parent
a9c5ce87df
commit
45ae991f66
@ -4,6 +4,14 @@ import datetime
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
try:
|
||||||
|
import ujson as json
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import simplejson as json
|
||||||
|
except ImportError:
|
||||||
|
import json
|
||||||
|
|
||||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||||
os.pardir, os.pardir))
|
os.pardir, os.pardir))
|
||||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
|
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
|
||||||
@ -26,7 +34,8 @@ def add_past_usage(raws):
|
|||||||
print "%s events to be processed" % count
|
print "%s events to be processed" % count
|
||||||
last_update = datetime.datetime.utcnow()
|
last_update = datetime.datetime.utcnow()
|
||||||
for raw in raws:
|
for raw in raws:
|
||||||
views.aggregate_usage(raw)
|
json_dict = json.dumps(raw.json)
|
||||||
|
views.aggregate_usage(raw, json_dict[1])
|
||||||
processed += 1
|
processed += 1
|
||||||
if processed % 50 == 0:
|
if processed % 50 == 0:
|
||||||
next_update = last_update + datetime.timedelta(seconds=30)
|
next_update = last_update + datetime.timedelta(seconds=30)
|
@ -4,7 +4,13 @@ import datetime
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import multiprocessing
|
try:
|
||||||
|
import ujson as json
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import simplejson as json
|
||||||
|
except ImportError:
|
||||||
|
import json
|
||||||
|
|
||||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||||
os.pardir, os.pardir))
|
os.pardir, os.pardir))
|
||||||
@ -48,9 +54,9 @@ def usage_already_exists(raw):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def populate_usage(raw):
|
def populate_usage(raw, body):
|
||||||
if not usage_already_exists(raw):
|
if not usage_already_exists(raw):
|
||||||
views.aggregate_usage(raw)
|
views.aggregate_usage(raw, body)
|
||||||
|
|
||||||
|
|
||||||
def print_status(event, completed, errored, total):
|
def print_status(event, completed, errored, total):
|
||||||
@ -86,7 +92,8 @@ for event in events:
|
|||||||
new_loc = loc + 500
|
new_loc = loc + 500
|
||||||
for raw in raws[loc:new_loc]:
|
for raw in raws[loc:new_loc]:
|
||||||
try:
|
try:
|
||||||
populate_usage(raw)
|
json_dict = json.dumps(raw.json)
|
||||||
|
populate_usage(raw, json_dict[1])
|
||||||
completed += 1
|
completed += 1
|
||||||
except Exception:
|
except Exception:
|
||||||
errored += 1
|
errored += 1
|
@ -421,22 +421,6 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
|
|||||||
self.assertEquals(usage.instance_type_id, '1')
|
self.assertEquals(usage.instance_type_id, '1')
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def test_process_usage_for_new_launch(self):
|
|
||||||
when = utils.decimal_utc()
|
|
||||||
notif = utils.create_nova_notif(request_id=REQUEST_ID_1)
|
|
||||||
json_str = json.dumps(notif)
|
|
||||||
event = 'compute.instance.rebuild.start'
|
|
||||||
raw = utils.create_raw(self.mox, when, event=event, json_str=json_str)
|
|
||||||
usage = self.mox.CreateMockAnything()
|
|
||||||
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
|
|
||||||
request_id=REQUEST_ID_1) \
|
|
||||||
.AndReturn((usage, True))
|
|
||||||
views.STACKDB.save(usage)
|
|
||||||
self.mox.ReplayAll()
|
|
||||||
views._process_usage_for_new_launch(raw)
|
|
||||||
self.assertEquals(usage.instance_type_id, '1')
|
|
||||||
self.mox.VerifyAll()
|
|
||||||
|
|
||||||
def test_process_usage_for_updates_create_end(self):
|
def test_process_usage_for_updates_create_end(self):
|
||||||
when_time = datetime.datetime.utcnow()
|
when_time = datetime.datetime.utcnow()
|
||||||
when_str = str(when_time)
|
when_str = str(when_time)
|
||||||
@ -464,33 +448,6 @@ class StacktackUsageParsingTestCase(unittest.TestCase):
|
|||||||
self.assertEqual(usage.launched_at, when_decimal)
|
self.assertEqual(usage.launched_at, when_decimal)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def test_process_usage_for_updates_create_end(self):
|
|
||||||
when_time = datetime.datetime.utcnow()
|
|
||||||
when_str = str(when_time)
|
|
||||||
when_decimal = utils.decimal_utc(when_time)
|
|
||||||
notif = utils.create_nova_notif(request_id=REQUEST_ID_1,
|
|
||||||
launched=str(when_time))
|
|
||||||
json_str = json.dumps(notif)
|
|
||||||
event = 'compute.instance.rebuild.end'
|
|
||||||
raw = utils.create_raw(self.mox, when_decimal, event=event,
|
|
||||||
json_str=json_str)
|
|
||||||
usage = self.mox.CreateMockAnything()
|
|
||||||
usage.instance = INSTANCE_ID_1
|
|
||||||
usage.request_id = REQUEST_ID_1
|
|
||||||
usage.instance_type_id = '1'
|
|
||||||
views.STACKDB.get_or_create_instance_usage(instance=INSTANCE_ID_1,
|
|
||||||
request_id=REQUEST_ID_1) \
|
|
||||||
.AndReturn((usage, True))
|
|
||||||
views.STACKDB.save(usage)
|
|
||||||
self.mox.ReplayAll()
|
|
||||||
|
|
||||||
views._process_usage_for_updates(raw)
|
|
||||||
self.assertEqual(usage.instance, INSTANCE_ID_1)
|
|
||||||
self.assertEqual(usage.request_id, REQUEST_ID_1)
|
|
||||||
self.assertEqual(usage.instance_type_id, '1')
|
|
||||||
self.assertEqual(usage.launched_at, when_decimal)
|
|
||||||
self.mox.VerifyAll()
|
|
||||||
|
|
||||||
def test_process_usage_for_updates_revert_end(self):
|
def test_process_usage_for_updates_revert_end(self):
|
||||||
when_time = datetime.datetime.utcnow()
|
when_time = datetime.datetime.utcnow()
|
||||||
when_decimal = utils.decimal_utc(when_time)
|
when_decimal = utils.decimal_utc(when_time)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user