Historical analysis of migration times.

Break out the log parser so I can reuse it, add a couple of tools
to analyse test logs to generate historical timings.

Fixes bug: 1263873
DocImpact: we should document the methodology here.

Change-Id: I23a66e17612daa20bcb1beda59af1e6bd4f1b276
This commit is contained in:
Michael Still 2013-12-26 09:49:42 +11:00
parent ec19f69f9c
commit 3c65b637a4
8 changed files with 13009 additions and 104 deletions

11203
results.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,8 @@ warnerrors = True
[entry_points]
console_scripts =
turbo-hipster = turbo_hipster.worker_server:main
turbo-hipster-analyse = turbo_hipster.analyse_historical:main
turbo-hipster-report = turbo_hipster.report_historical:main
[build_sphinx]
source-dir = doc/source

1490
tests/assets/logcontent Normal file

File diff suppressed because it is too large Load Diff

View File

@ -53,35 +53,40 @@ class TestHandleResults(testtools.TestCase):
def test_check_log_for_errors(self):
logfile = os.path.join(TESTS_DIR,
'assets/20131007_devstack_export.log')
with open(os.path.join(TESTS_DIR,
'datasets/some_dataset_example/config.json'),
'r') as config_stream:
dataset_config = json.load(config_stream)
gitpath = ''
handle_results.find_schemas = lambda x: [123]
result, msg = handle_results.check_log_for_errors(logfile, gitpath,
dataset_config)
self.assertFalse(result)
self.assertEqual(msg,
'FAILURE - Final schema version does not match '
'expectation')
def fake_find_schemas_230():
return [230]
handle_results.find_schemas = lambda x: [228]
result, msg = handle_results.check_log_for_errors(logfile, gitpath,
dataset_config)
self.assertTrue(result)
self.assertEqual(msg, 'SUCCESS')
lp = handle_results.LogParser(logfile, '/tmp/foo')
lp.find_schemas = fake_find_schemas_230
lp.process_log()
self.assertEqual(['FAILURE - Final schema version does not match '
'expectation'], lp.errors)
self.assertEqual([], lp.warnings)
dataset_config['maximum_migration_times']['152'] = 3
result, msg = handle_results.check_log_for_errors(logfile, gitpath,
dataset_config)
self.assertFalse(result)
self.assertEqual(msg, ('WARNING - Migration 152 took too long, '
'WARNING - Migration 152 took too long'))
def fake_find_schemas_228():
return [228]
dataset_config['maximum_migration_times']['152'] = 10
result, msg = handle_results.check_log_for_errors(logfile, gitpath,
dataset_config)
self.assertTrue(result)
self.assertEqual(msg, 'SUCCESS')
lp = handle_results.LogParser(logfile, '/tmp/foo')
lp.find_schemas = fake_find_schemas_228
lp.process_log()
self.assertEqual([], lp.errors)
self.assertEqual([], lp.warnings)
def test_parse_log(self):
# This is a regression test for a log which didn't used to parse.
logfile = os.path.join(TESTS_DIR, 'assets/logcontent')
lp = handle_results.LogParser(logfile, None)
lp.process_log()
self.assertEqual([], lp.errors)
self.assertEqual([], lp.warnings)
migrations = []
for migration in lp.migrations:
migrations.append(migration[0])
for migration in range(134, 229):
self.assertTrue(migration in migrations,
'Migration %d missing from %s'
% (migration, migrations))

View File

@ -0,0 +1,132 @@
#!/usr/bin/python2
#
# Copyright 2013 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import datetime
import json
import logging
import os
import re
import sys
import swiftclient
from task_plugins.gate_real_db_upgrade import handle_results
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config',
default=
'/etc/turbo-hipster/config.json',
help='Path to json config file.')
args = parser.parse_args()
with open(args.config, 'r') as config_stream:
config = json.load(config_stream)
swift_config = config['publish_logs']
log = logging.getLogger(__name__)
if not os.path.isdir(os.path.dirname(config['debug_log'])):
os.makedirs(os.path.dirname(config['debug_log']))
logging.basicConfig(format='%(asctime)s %(name)s %(message)s',
filename=config['debug_log'], level=logging.INFO)
# Open a connection to swift
connection = swiftclient.client.Connection(
authurl=swift_config['authurl'],
user=swift_config['user'],
key=swift_config['password'],
os_options={'region_name': swift_config['region']},
tenant_name=swift_config['tenant'],
auth_version=2.0)
log.info('Got connection to swift')
a = Analyser()
# Iterate through the logs and determine timing information. This probably
# should be done in a "more cloudy" way, but this is good enough for now.
total_items = 0
items = connection.get_container(swift_config['container'], limit=1000)[1]
while items:
total_items += len(items)
print ('%s Processing %d items, %d items total'
% (datetime.datetime.now(), len(items), total_items))
for item in items:
log.info('Processing %s' % item['name'])
a.process(connection, swift_config['container'], item['name'])
a.dump()
items = connection.get_container(swift_config['container'],
marker=item['name'], limit=1000)[1]
TEST_NAME1_RE = re.compile('.*/gate-real-db-upgrade_nova_([^_]+)_([^/]*)/.*')
TEST_NAME2_RE = re.compile('.*/gate-real-db-upgrade_nova_([^_]+)/.*/(.*).log')
class Analyser(object):
log = logging.getLogger(__name__)
def __init__(self):
self.results = {}
def dump(self):
with open('results.json', 'w') as f:
f.write(json.dumps(self.results, indent=4, sort_keys=True))
def process(self, connection, container, name):
engine_name = None
test_name = None
m = TEST_NAME1_RE.match(name)
if m:
engine_name = m.group(1)
test_name = m.group(2)
else:
m = TEST_NAME2_RE.match(name)
if m:
engine_name = m.group(1)
test_name = m.group(2)
if not engine_name or not test_name:
self.log.warn('Log name %s does not match regexp' % name)
return
content = connection.get_object(container, name)[1]
with open('/tmp/logcontent', 'w') as f:
f.write(content)
lp = handle_results.LogParser('/tmp/logcontent', None)
lp.process_log()
if not lp.migrations:
self.log.warn('Log %s contained no migrations' % name)
for migration in lp.migrations:
duration = migration[2] - migration[1]
self.results.setdefault(engine_name, {})
self.results[engine_name].setdefault(test_name, {})
self.results[engine_name][test_name].setdefault(migration[0], {})
self.results[engine_name][test_name][migration[0]]\
.setdefault(duration, 0)
self.results[engine_name][test_name][migration[0]][duration] += 1
if __name__ == '__main__':
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '../')))
main()

View File

@ -0,0 +1,58 @@
#!/usr/bin/python2
#
# Copyright 2013 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import numpy
import os
import sys
def main():
with open('results.json') as f:
results = json.loads(f.read())
for migration in sorted(results['mysql']['user_001']):
times = []
for time in results['mysql']['user_001'][migration]:
for i in range(results['mysql']['user_001'][migration][time]):
times.append(int(time))
times = sorted(times)
np_times = numpy.array(times)
mean = np_times.mean()
stddev = np_times.std()
failed_threshold = int(max(30.0, mean + stddev * 2))
failed = 0
for time in times:
if time > failed_threshold:
failed += 1
if failed_threshold != 30 or failed > 0:
print ('%s: Values range from %s to %s seconds. %d values. '
'Mean is %.02f, stddev is %.02f.\n '
'Recommend max of %d. With this value %.02f%% of tests '
'would have failed.'
% (migration, np_times.min(), np_times.max(), len(times),
mean, stddev, failed_threshold,
failed * 100.0 / len(times)))
if __name__ == '__main__':
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '../')))
main()

View File

@ -78,73 +78,77 @@ def generate_push_results(datasets, publish_config):
return last_link_uri
def find_schemas(gitpath):
MIGRATION_NUMBER_RE = re.compile('^([0-9]+).*\.py$')
return [int(MIGRATION_NUMBER_RE.findall(f)[0]) for f in os.listdir(
os.path.join(gitpath, 'nova/db/sqlalchemy/migrate_repo/versions'))
if MIGRATION_NUMBER_RE.match(f)]
MIGRATION_NUMBER_RE = re.compile('^([0-9]+).*\.py$')
MIGRATION_START_RE = re.compile('.* ([0-9]+) -\> ([0-9]+)\.\.\..*$')
MIGRATION_END_RE = re.compile('done$')
MIGRATION_FINAL_SCHEMA_RE = re.compile('Final schema version is ([0-9]+)')
def check_log_for_errors(logfile, gitpath, dataset_config):
""" Run regex over the given logfile to find errors
class LogParser(object):
def __init__(self, logpath, gitpath):
self.logpath = logpath
self.gitpath = gitpath
:returns: success (boolean), message (string)"""
def find_schemas(self):
"""Return a list of the schema numbers present in git."""
return [int(MIGRATION_NUMBER_RE.findall(f)[0]) for f in os.listdir(
os.path.join(self.gitpath,
'nova/db/sqlalchemy/migrate_repo/versions'))
if MIGRATION_NUMBER_RE.match(f)]
MIGRATION_START_RE = re.compile('([0-9]+) -\> ([0-9]+)\.\.\. $')
MIGRATION_END_RE = re.compile('done$')
#MIGRATION_COMMAND_START = '***** Start DB upgrade to state of'
#MIGRATION_COMMAND_END = '***** Finished DB upgrade to state of'
MIGRATION_FINAL_SCHEMA_RE = re.compile('Final schema version is ([0-9]+)')
def process_log(self):
"""Analyse a log for errors."""
self.errors = []
self.warnings = []
self.migrations = []
with open(logfile, 'r') as fd:
migration_started = False
warnings = []
for line in fd:
if 'ERROR 1045' in line:
return False, "FAILURE - Could not setup seed database."
elif 'ERROR 1049' in line:
return False, "FAILURE - Could not find seed database."
elif 'ImportError' in line:
return False, "FAILURE - Could not import required module."
elif MIGRATION_START_RE.search(line):
if migration_started:
# We didn't see the last one finish,
# something must have failed
return False, ("FAILURE - Did not find the end of a "
"migration after a start")
with open(self.logpath, 'r') as fd:
migration_started = False
migration_started = True
migration_start_time = line_to_time(line)
migration_number_from = MIGRATION_START_RE.findall(line)[0][0]
migration_number_to = MIGRATION_START_RE.findall(line)[0][1]
elif MIGRATION_END_RE.search(line):
if migration_started:
# We found the end to this migration
migration_started = False
if migration_number_to > migration_number_from:
migration_end_time = line_to_time(line)
if not migration_time_passes(migration_number_to,
migration_start_time,
migration_end_time,
dataset_config):
warnings.append("WARNING - Migration %s took too "
"long" % migration_number_to)
elif 'Final schema version is' in line:
# Check the final version is as expected
final_version = MIGRATION_FINAL_SCHEMA_RE.findall(line)[0]
if int(final_version) != max(find_schemas(gitpath)):
return False, ("FAILURE - Final schema version does not "
"match expectation")
for line in fd:
if 'ERROR 1045' in line:
return False, "FAILURE - Could not setup seed database."
elif 'ERROR 1049' in line:
return False, "FAILURE - Could not find seed database."
elif 'ImportError' in line:
return False, "FAILURE - Could not import required module."
elif MIGRATION_START_RE.search(line):
if migration_started:
# We didn't see the last one finish,
# something must have failed
self.errors.append('FAILURE - Migration started '
'but did not end')
if migration_started:
# We never saw the end of a migration,
# something must have failed
return False, ("FAILURE - Did not find the end of a migration "
"after a start")
elif len(warnings) > 0:
return False, ', '.join(warnings)
migration_started = True
migration_start_time = line_to_time(line)
return True, "SUCCESS"
m = MIGRATION_START_RE.match(line)
migration_number_from = int(m.group(1))
migration_number_to = int(m.group(2))
elif MIGRATION_END_RE.search(line):
if migration_started:
# We found the end to this migration
migration_started = False
if migration_number_to > migration_number_from:
migration_end_time = line_to_time(line)
data = (migration_number_to,
migration_start_time,
migration_end_time)
self.migrations.append(data)
elif 'Final schema version is' in line and self.gitpath:
# Check the final version is as expected
final_version = MIGRATION_FINAL_SCHEMA_RE.findall(line)[0]
if int(final_version) != max(self.find_schemas()):
self.errors.append('FAILURE - Final schema version '
'does not match expectation')
if migration_started:
# We never saw the end of a migration, something must have
# failed
self.errors.append('FAILURE - Did not find the end of a '
'migration after a start')
def line_to_time(line):

View File

@ -128,31 +128,42 @@ class Runner(object):
self.work_data['url'] = index_url
def _check_all_dataset_logs_for_errors(self):
self.log.debug("Check logs for errors")
self.log.debug('Check logs for errors')
for i, dataset in enumerate(self.job_datasets):
# Look for the beginning of the migration start
dataset_success, message = \
handle_results.check_log_for_errors(
dataset['job_log_file_path'], self.git_path,
dataset['config'])
lp = handle_results.LogParser(dataset['job_log_file_path'],
self.git_path)
lp.process_log()
if not lp.migrations:
self.success = False
self.messages.append('No migrations run')
if lp.errors:
self.success = False
for err in lp.errors:
self.messages.append(err)
if lp.warnings:
self.success = False
for warn in lp.warnings:
self.messages.append(warn)
for migration in lp.migrations:
if not handle_results.migration_time_passes(migration[0],
migration[1],
migration[2],
dataset['config']):
self.success = False
self.messages.append('WARNING - Migration %s took too long'
% migration[1])
if self.success:
if dataset_success:
self.job_datasets[i]['result'] = 'SUCCESS'
else:
self.success = False
self.job_datasets[i]['result'] = message
self.messages.append(message)
self.job_datasets[i]['result'] = 'SUCCESS'
else:
self.job_datasets[i]['result'] = self.messages[0]
if not dataset_success:
self.messages.append(message)
if self.success:
self.work_data['result'] = "SUCCESS"
self.work_data['result'] = 'SUCCESS'
else:
self.work_data['result'] = "\n".join(self.messages)
self.work_data['result'] = '\n'.join(self.messages)
def _get_datasets(self):
self.log.debug("Get configured datasets to run tests against")