Cleaned up the code

This commit is contained in:
Anton Beloglazov 2012-11-07 17:58:17 +11:00
parent 2de83b80d9
commit 6cbc0dc0d0
11 changed files with 80 additions and 86 deletions

View File

@ -235,7 +235,7 @@ def parse_parameters(params):
:return: A dict of parameters.
:rtype: dict(str: *)
"""
return dict((str(k), v)
return dict((str(k), v)
for k, v in json.loads(params).items())
@ -251,7 +251,7 @@ def parse_compute_hosts(compute_hosts):
"""
return filter(None, re.split('\W+', compute_hosts))
@contract
def calculate_migration_time(vms, bandwidth):
""" Calculate the mean migration time from VM RAM usage data.
@ -286,7 +286,7 @@ def execute_on_hosts(hosts, commands):
for host in hosts:
print 'Host: ' + host
print subprocess.Popen(
'ssh ' + host + ' "' + commands_merged + '"',
'ssh ' + host + ' "' + commands_merged + '"',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True).communicate()[0]

View File

@ -35,7 +35,7 @@ class Database(object):
vm_migrations=Table,
host_states=Table,
host_overload=Table)
def __init__(self, connection, hosts, host_resource_usage, vms,
def __init__(self, connection, hosts, host_resource_usage, vms,
vm_resource_usage, vm_migrations, host_states, host_overload):
""" Initialize the database.
@ -282,7 +282,7 @@ class Database(object):
:return: A dict of host names to IDs.
:rtype: dict(str: int)
"""
return dict((str(x[1]), int(x[0]))
return dict((str(x[1]), int(x[0]))
for x in self.hosts.select().execute().fetchall())
@contract(datetime_threshold=datetime.datetime)
@ -316,8 +316,8 @@ class Database(object):
"""
host_ids = self.select_host_ids()
to_insert = [{'host_id': host_ids[k],
'state': v}
for k, v in hosts.items()]
'state': v}
for k, v in hosts.items()]
self.connection.execute(
self.host_states.insert(), to_insert)
@ -352,8 +352,8 @@ class Database(object):
:return: A list of host names.
:rtype: list(str)
"""
return [host
for host, state in self.select_host_states().items()
return [host
for host, state in self.select_host_states().items()
if state == 1]
@contract
@ -363,8 +363,8 @@ class Database(object):
:return: A list of host names.
:rtype: list(str)
"""
return [host
for host, state in self.select_host_states().items()
return [host
for host, state in self.select_host_states().items()
if state == 0]
@contract
@ -394,4 +394,3 @@ class Database(object):
self.vm_migrations.insert().execute(
vm_id=self.select_vm_id(vm),
host_id=self.select_host_id(hostname))

View File

@ -86,7 +86,7 @@ def init_db(sql_connection):
metadata.create_all()
connection = engine.connect()
db = Database(connection, hosts, host_resource_usage, vms,
db = Database(connection, hosts, host_resource_usage, vms,
vm_resource_usage, vm_migrations, host_states, host_overload)
log.debug('Initialized a DB connection to %s', sql_connection)

View File

@ -91,7 +91,7 @@ def execute(config, state):
state['db'].cleanup_vm_resource_usage(datetime_threshold)
state['db'].cleanup_host_resource_usage(datetime_threshold)
if log.isEnabledFor(logging.INFO):
log.info('Cleaned up data older than %s',
log.info('Cleaned up data older than %s',
datetime_threshold.strftime('%Y-%m-%d %H:%M:%S'))
return state

View File

@ -71,7 +71,6 @@ from contracts import contract
from neat.contracts_extra import *
import bottle
import json
from hashlib import sha1
import novaclient
from novaclient.v1_1 import client
@ -164,8 +163,8 @@ def start():
int(config['log_level']))
state = init_state(config)
switch_hosts_on(state['db'],
state['host_macs'],
switch_hosts_on(state['db'],
state['host_macs'],
state['compute_hosts'])
bottle.debug(True)
@ -216,13 +215,13 @@ def get_remote_addr(request):
def service():
params = get_params(bottle.request)
state = bottle.app().state
validate_params(state['state']['hashed_username'],
state['state']['hashed_password'],
validate_params(state['state']['hashed_username'],
state['state']['hashed_password'],
params)
log.info('Received a request from %s: %s',
log.info('Received a request from %s: %s',
get_remote_addr(bottle.request),
str(params))
try:
try:
if params['reason'] == 0:
log.info('Processing an underload of a host %s', params['host'])
execute_underload(
@ -323,11 +322,11 @@ def execute_underload(config, state, host):
if vm not in vms_last_cpu:
log.info('No data yet for VM: %s - skipping host %s', vm, host)
del hosts_cpu_total[host]
del hosts_ram_total[host]
del hosts_ram_total[host]
if host in hosts_cpu_usage:
del hosts_cpu_usage[host]
if host in hosts_ram_usage:
del hosts_ram_usage[host]
del hosts_ram_usage[host]
break
host_cpu_mhz += vms_last_cpu[vm]
else:
@ -360,7 +359,7 @@ def execute_underload(config, state, host):
int(config['data_collector_data_length']))
vms_ram = vms_ram_limit(state['nova'], vms_to_migrate)
# Remove VMs that are not in vms_ram
# Remove VMs that are not in vms_ram
# These instances might have been deleted
for i, vm in enumerate(vms_to_migrate):
if not vm in vms_ram:
@ -415,14 +414,14 @@ def execute_underload(config, state, host):
else:
log.info('Started underload VM migrations')
migrate_vms(state['db'],
state['nova'],
config['vm_instance_directory'],
state['nova'],
config['vm_instance_directory'],
placement)
log.info('Completed underload VM migrations')
if hosts_to_deactivate:
switch_hosts_off(state['db'],
config['sleep_command'],
switch_hosts_off(state['db'],
config['sleep_command'],
hosts_to_deactivate)
log.info('Completed processing an underload request')
@ -521,7 +520,7 @@ def execute_overload(config, state, host, vm_uuids):
int(config['data_collector_data_length']))
vms_ram = vms_ram_limit(state['nova'], vms_to_migrate)
# Remove VMs that are not in vms_ram
# Remove VMs that are not in vms_ram
# These instances might have been deleted
for i, vm in enumerate(vms_to_migrate):
if not vm in vms_ram:
@ -571,13 +570,13 @@ def execute_overload(config, state, host, vm_uuids):
set(inactive_hosts_cpu.keys()).intersection(
set(placement.values())))
if hosts_to_activate:
switch_hosts_on(state['db'],
state['host_macs'],
switch_hosts_on(state['db'],
state['host_macs'],
hosts_to_activate)
log.info('Started overload VM migrations')
migrate_vms(state['db'],
state['nova'],
config['vm_instance_directory'],
state['nova'],
config['vm_instance_directory'],
placement)
log.info('Completed overload VM migrations')
log.info('Completed processing an overload request')
@ -651,8 +650,8 @@ def host_mac(host):
:rtype: str
"""
mac = subprocess.Popen(
("ping -c 1 {0} > /dev/null;" +
"arp -a {0} | awk '{{print $4}}'").format(host),
("ping -c 1 {0} > /dev/null;" +
"arp -a {0} | awk '{{print $4}}'").format(host),
stdout=subprocess.PIPE,
shell=True).communicate()[0].strip()
if len(mac) != 17:
@ -732,23 +731,23 @@ def migrate_vms(db, nova, vm_instance_directory, placement):
vm_pairs = [vms[x:x + 2] for x in xrange(0, len(vms), 2)]
for vm_pair in vm_pairs:
# To avoid problems with migration, need the following:
subprocess.call('chown -R nova:nova ' + vm_instance_directory,
subprocess.call('chown -R nova:nova ' + vm_instance_directory,
shell=True)
for vm in vm_pair:
host = placement[vm]
nova.servers.live_migrate(vm, host, False, False)
if log.isEnabledFor(logging.INFO):
log.info('Started migration of VM %s to %s', vm, host)
log.info('Started migration of VM %s to %s', vm, host)
time.sleep(10)
while True:
for vm_uuid in list(vm_pair):
vm = nova.servers.get(vm_uuid)
if log.isEnabledFor(logging.DEBUG):
log.debug('VM %s: %s, %s',
vm_uuid,
vm_hostname(vm),
log.debug('VM %s: %s, %s',
vm_uuid,
vm_hostname(vm),
vm.status)
if vm_hostname(vm) != placement[vm_uuid] or \
vm.status != u'ACTIVE':
@ -757,7 +756,7 @@ def migrate_vms(db, nova, vm_instance_directory, placement):
vm_pair.remove(vm_uuid)
db.insert_vm_migration(vm_uuid, placement[vm_uuid])
if log.isEnabledFor(logging.INFO):
log.info('Completed migration of VM %s to %s',
log.info('Completed migration of VM %s to %s',
vm_uuid, placement[vm_uuid])
else:
break
@ -782,10 +781,10 @@ def switch_hosts_off(db, sleep_command, hosts):
command = 'ssh {0} "{1}"'. \
format(host, sleep_command)
if log.isEnabledFor(logging.DEBUG):
log.debug('Calling: %s', command)
log.debug('Calling: %s', command)
subprocess.call(command, shell=True)
if log.isEnabledFor(logging.INFO):
log.info('Switched off hosts: %s', str(hosts))
log.info('Switched off hosts: %s', str(hosts))
db.insert_host_states(dict((x, 0) for x in hosts))
@ -807,8 +806,8 @@ def switch_hosts_on(db, host_macs, hosts):
host_macs[host] = host_mac(host)
command = 'ether-wake {0}'.format(host_macs[host])
if log.isEnabledFor(logging.DEBUG):
log.debug('Calling: %s', command)
subprocess.call(command, shell=True)
log.debug('Calling: %s', command)
subprocess.call(command, shell=True)
if log.isEnabledFor(logging.INFO):
log.info('Switched on hosts: %s', str(hosts))
db.insert_host_states(dict((x, 1) for x in hosts))

View File

@ -46,16 +46,16 @@ def best_fit_decreasing_factory(time_step, migration_time, params):
params['last_n_vm_cpu'],
get_available_resources(
params['cpu_threshold'],
hosts_cpu_usage,
hosts_cpu_usage,
hosts_cpu_total),
get_available_resources(
params['ram_threshold'],
hosts_ram_usage,
hosts_ram_usage,
hosts_ram_total),
inactive_hosts_cpu,
inactive_hosts_cpu,
inactive_hosts_ram,
vms_cpu,
vms_ram),
vms_cpu,
vms_ram),
{})
@ -112,8 +112,8 @@ def best_fit_decreasing(last_n_vm_cpu, hosts_cpu, hosts_ram,
vms_tmp = []
for vm, cpu in vms_cpu.items():
last_n_cpu = cpu[-last_n_vm_cpu:]
vms_tmp.append((sum(last_n_cpu) / len(last_n_cpu),
vms_ram[vm],
vms_tmp.append((sum(last_n_cpu) / len(last_n_cpu),
vms_ram[vm],
vm))
vms = sorted(vms_tmp, reverse=True)
hosts = sorted(((v, hosts_ram[k], k)

View File

@ -162,9 +162,9 @@ def init_state(config):
host_cpu_usable_by_vms = float(config['host_cpu_usable_by_vms'])
db = init_db(config['sql_connection'])
db.update_host(hostname,
int(host_cpu_mhz * host_cpu_usable_by_vms),
physical_cpus,
db.update_host(hostname,
int(host_cpu_mhz * host_cpu_usable_by_vms),
physical_cpus,
host_ram)
return {'previous_time': 0.,
@ -174,7 +174,7 @@ def init_state(config):
'previous_overload': -1,
'vir_connection': vir_connection,
'hostname': hostname,
'host_cpu_overload_threshold':
'host_cpu_overload_threshold':
float(config['host_cpu_overload_threshold']) * \
host_cpu_usable_by_vms,
'physical_cpus': physical_cpus,
@ -269,22 +269,22 @@ def execute(config, state):
log.info('Completed VM data collection')
log.info('Started host data collection')
(host_cpu_time_total,
host_cpu_time_busy,
(host_cpu_time_total,
host_cpu_time_busy,
host_cpu_mhz) = get_host_cpu_mhz(state['physical_cpu_mhz'],
state['previous_host_cpu_time_total'],
state['previous_host_cpu_time_busy'])
log.info('Completed host data collection')
if state['previous_time'] > 0:
append_vm_data_locally(vm_path, cpu_mhz, data_length)
append_vm_data_remotely(state['db'], cpu_mhz)
host_cpu_mhz_hypervisor = host_cpu_mhz - sum(cpu_mhz.values())
if host_cpu_mhz_hypervisor < 0:
host_cpu_mhz_hypervisor = 0
append_host_data_locally(host_path, host_cpu_mhz_hypervisor, data_length)
append_host_data_remotely(state['db'],
append_host_data_remotely(state['db'],
state['hostname'],
host_cpu_mhz_hypervisor)
@ -304,7 +304,7 @@ def execute(config, state):
state['previous_cpu_time'] = cpu_time
state['previous_host_cpu_time_total'] = host_cpu_time_total
state['previous_host_cpu_time_busy'] = host_cpu_time_busy
log.info('Completed an iteration')
return state
@ -485,7 +485,7 @@ def append_vm_data_locally(path, data, data_length):
f.truncate(0)
f.seek(0)
f.write('\n'.join([str(x) for x in values]) + '\n')
@contract
def append_vm_data_remotely(db, data):
@ -695,7 +695,7 @@ def get_host_characteristics(vir_connection):
@contract()
def log_host_overload(db, overload_threshold, hostname,
def log_host_overload(db, overload_threshold, hostname,
previous_overload, host_mhz, vms_mhz):
""" Log to the DB whether the host is overloaded.

View File

@ -103,7 +103,6 @@ local manager performs the following steps:
from contracts import contract
from neat.contracts_extra import *
import itertools
import requests
from hashlib import sha1
import time
@ -159,7 +158,7 @@ def init_state(config):
raise OSError(message)
physical_cpu_mhz_total = int(
common.physical_cpu_mhz_total(vir_connection) *
common.physical_cpu_mhz_total(vir_connection) *
float(config['host_cpu_usable_by_vms']))
return {'previous_time': 0.,
'vir_connection': vir_connection,
@ -226,7 +225,7 @@ def execute(config, state):
host_cpu_mhz = get_local_host_data(host_path)
host_cpu_utilization = vm_mhz_to_percentage(
vm_cpu_mhz.values(),
vm_cpu_mhz.values(),
host_cpu_mhz,
state['physical_cpu_mhz_total'])
if log.isEnabledFor(logging.DEBUG):
@ -244,7 +243,6 @@ def execute(config, state):
if 'underload_detection' not in state:
underload_detection_params = common.parse_parameters(
config['algorithm_underload_detection_parameters'])
underload_detection_state = None
underload_detection = common.call_function_by_name(
config['algorithm_underload_detection_factory'],
[time_step,
@ -255,7 +253,6 @@ def execute(config, state):
overload_detection_params = common.parse_parameters(
config['algorithm_overload_detection_parameters'])
overload_detection_state = None
overload_detection = common.call_function_by_name(
config['algorithm_overload_detection_factory'],
[time_step,
@ -266,7 +263,6 @@ def execute(config, state):
vm_selection_params = common.parse_parameters(
config['algorithm_vm_selection_parameters'])
vm_selection_state = None
vm_selection = common.call_function_by_name(
config['algorithm_vm_selection_factory'],
[time_step,
@ -289,14 +285,14 @@ def execute(config, state):
log.info('Underload detected')
try:
r = requests.put('http://' + config['global_manager_host'] + \
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
'time': time.time(),
'host': state['hostname'],
'reason': 0})
if log.isEnabledFor(logging.INFO):
log.info('Received response: [%s] %s',
log.info('Received response: [%s] %s',
r.status_code, r.content)
except requests.exceptions.ConnectionError:
log.exception('Exception at underload request:')
@ -320,15 +316,15 @@ def execute(config, state):
log.info('Selected VMs to migrate: %s', str(vm_uuids))
try:
r = requests.put('http://' + config['global_manager_host'] + \
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
'time': time.time(),
'host': state['hostname'],
'reason': 1,
'reason': 1,
'vm_uuids': ','.join(vm_uuids)})
if log.isEnabledFor(logging.INFO):
log.info('Received response: [%s] %s',
log.info('Received response: [%s] %s',
r.status_code, r.content)
except requests.exceptions.ConnectionError:
log.exception('Exception at overload request:')
@ -428,7 +424,7 @@ def get_max_ram(vir_connection, uuid):
:return: The maximum RAM of the VM in MB.
:rtype: int|None
"""
try:
try:
domain = vir_connection.lookupByUUIDString(uuid)
return domain.maxMemory() / 1024
except libvirt.libvirtError:
@ -446,7 +442,7 @@ def vm_mhz_to_percentage(vm_mhz_history, host_mhz_history, physical_cpu_mhz):
:type host_mhz_history: list(int)
:param physical_cpu_mhz: The total frequency of the physical CPU in MHz.
:type physical_cpu_mhz: int
:type physical_cpu_mhz: int,>0
:return: The history of the host's CPU utilization in percentages.
:rtype: list(float)

View File

@ -54,7 +54,7 @@ def mhod_factory(time_step, migration_time, params):
time_step,
migration_time,
utilization,
state)
state)
return mhod_wrapper
@ -158,7 +158,7 @@ def mhod(state_config, otf, window_sizes, bruteforce_step, learning_steps,
tmp = set(p[current_state])
if len(tmp) != 1 or 0 not in tmp:
policy = bruteforce.optimize(
bruteforce_step, 1.0, otf, (migration_time / time_step),
bruteforce_step, 1.0, otf, (migration_time / time_step),
ls, p, state_vector, time_in_states, time_in_state_n)
log.debug('MHOD policy:' + str(policy))
command = issue_command_deterministic(policy)

View File

@ -90,7 +90,7 @@ def threshold(threshold, utilization):
def last_n_average_threshold(threshold, n, utilization):
""" Averaging static threshold-based underload detection algorithm.
The algorithm returns True, if the average of the last n values of
The algorithm returns True, if the average of the last n values of
the host's CPU utilization is lower than the specified threshold.
:param threshold: The static underload CPU utilization threshold.

View File

@ -102,7 +102,7 @@ def minimum_migration_time_max_cpu_factory(time_step, migration_time, params):
"""
return lambda vms_cpu, vms_ram, state=None: \
([minimum_migration_time_max_cpu(params['last_n'],
vms_cpu,
vms_cpu,
vms_ram)], {})