stacktach/tests/unit/utils.py
2013-01-25 16:04:21 -05:00

106 lines
3.1 KiB
Python

import datetime
import os
import sys
import unittest
TENANT_ID_1 = 'testtenantid1'
def setup_sys_path():
sys.path = [os.path.abspath(os.path.dirname('stacktach'))] + sys.path
def setup_environment():
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
os.environ['STACKTACH_DB_ENGINE'] = 'django.db.backends.sqlite3'
when = str(datetime.datetime.utcnow())
os.environ['STACKTACH_DB_NAME'] = '/tmp/stacktach.%s.sqlite' % when
os.environ['STACKTACH_DB_HOST'] = ''
os.environ['STACKTACH_DB_USERNAME'] = ''
os.environ['STACKTACH_DB_PASSWORD'] = ''
install_dir = os.path.abspath(os.path.dirname('stacktach'))
os.environ['STACKTACH_INSTALL_DIR'] = install_dir
setup_sys_path()
setup_environment()
from stacktach import datetime_to_decimal as dt
INSTANCE_ID_1 = 'testinstanceid1'
INSTANCE_ID_2 = 'testinstanceid2'
MESSAGE_ID_1 = 'testmessageid1'
MESSAGE_ID_2 = 'testmessageid2'
REQUEST_ID_1 = 'testrequestid1'
REQUEST_ID_2 = 'testrequestid2'
REQUEST_ID_3 = 'testrequestid3'
def decimal_utc(t = datetime.datetime.utcnow()):
return dt.dt_to_decimal(t)
def create_nova_notif(request_id=None, instance=INSTANCE_ID_1, type_id='1',
launched=None, deleted = None, new_type_id=None,
message_id=MESSAGE_ID_1):
notif = ['', {
'message_id': message_id,
'payload': {
'instance_id': instance,
'instance_type_id': type_id,
}
}]
if request_id:
notif[1]['_context_request_id'] = request_id
if launched:
notif[1]['payload']['launched_at'] = launched
if deleted:
notif[1]['payload']['deleted_at'] = deleted
if new_type_id:
notif[1]['payload']['new_instance_type_id'] = new_type_id
return notif
def create_raw(mox, when, event, instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1, state='active', old_task='',
host='compute', json_str=''):
raw = mox.CreateMockAnything()
raw.host = host
raw.instance = instance
raw.event = event
raw.when = when
raw.state = state
raw.old_task = old_task
raw.request_id = request_id
raw.json = json_str
return raw
def create_lifecycle(mox, instance, last_state, last_task_state, last_raw):
lifecycle = mox.CreateMockAnything()
lifecycle.instance = instance
lifecycle.last_state = last_state
lifecycle.last_task_state = last_task_state
lifecycle.last_raw = last_raw
return lifecycle
def create_timing(mox, name, lifecycle, start_raw=None, start_when=None,
end_raw=None, end_when=None, diff=None):
timing = mox.CreateMockAnything()
timing.name = name
timing.lifecycle = lifecycle
timing.start_raw = start_raw
timing.start_when = start_when
timing.end_raw = end_raw
timing.end_when = end_when
timing.diff = diff
return timing
def create_tracker(mox, request_id, lifecycle, start, last_timing=None,
duration=str(0.0)):
tracker = mox.CreateMockAnything()
tracker.request_id=request_id
tracker.lifecycle=lifecycle
tracker.start=start
tracker.last_timing=last_timing
tracker.duration=duration
return tracker