diff --git a/neat/common.py b/neat/common.py index f50aaf7..c37d654 100644 --- a/neat/common.py +++ b/neat/common.py @@ -235,7 +235,7 @@ def parse_parameters(params): :return: A dict of parameters. :rtype: dict(str: *) """ - return dict((str(k), v) + return dict((str(k), v) for k, v in json.loads(params).items()) @@ -251,7 +251,7 @@ def parse_compute_hosts(compute_hosts): """ return filter(None, re.split('\W+', compute_hosts)) - + @contract def calculate_migration_time(vms, bandwidth): """ Calculate the mean migration time from VM RAM usage data. @@ -286,7 +286,7 @@ def execute_on_hosts(hosts, commands): for host in hosts: print 'Host: ' + host print subprocess.Popen( - 'ssh ' + host + ' "' + commands_merged + '"', + 'ssh ' + host + ' "' + commands_merged + '"', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True).communicate()[0] diff --git a/neat/db.py b/neat/db.py index 2fe95e8..b61f829 100644 --- a/neat/db.py +++ b/neat/db.py @@ -35,7 +35,7 @@ class Database(object): vm_migrations=Table, host_states=Table, host_overload=Table) - def __init__(self, connection, hosts, host_resource_usage, vms, + def __init__(self, connection, hosts, host_resource_usage, vms, vm_resource_usage, vm_migrations, host_states, host_overload): """ Initialize the database. @@ -282,7 +282,7 @@ class Database(object): :return: A dict of host names to IDs. :rtype: dict(str: int) """ - return dict((str(x[1]), int(x[0])) + return dict((str(x[1]), int(x[0])) for x in self.hosts.select().execute().fetchall()) @contract(datetime_threshold=datetime.datetime) @@ -316,8 +316,8 @@ class Database(object): """ host_ids = self.select_host_ids() to_insert = [{'host_id': host_ids[k], - 'state': v} - for k, v in hosts.items()] + 'state': v} + for k, v in hosts.items()] self.connection.execute( self.host_states.insert(), to_insert) @@ -352,8 +352,8 @@ class Database(object): :return: A list of host names. :rtype: list(str) """ - return [host - for host, state in self.select_host_states().items() + return [host + for host, state in self.select_host_states().items() if state == 1] @contract @@ -363,8 +363,8 @@ class Database(object): :return: A list of host names. :rtype: list(str) """ - return [host - for host, state in self.select_host_states().items() + return [host + for host, state in self.select_host_states().items() if state == 0] @contract @@ -394,4 +394,3 @@ class Database(object): self.vm_migrations.insert().execute( vm_id=self.select_vm_id(vm), host_id=self.select_host_id(hostname)) - diff --git a/neat/db_utils.py b/neat/db_utils.py index b8c9d46..6289933 100644 --- a/neat/db_utils.py +++ b/neat/db_utils.py @@ -86,7 +86,7 @@ def init_db(sql_connection): metadata.create_all() connection = engine.connect() - db = Database(connection, hosts, host_resource_usage, vms, + db = Database(connection, hosts, host_resource_usage, vms, vm_resource_usage, vm_migrations, host_states, host_overload) log.debug('Initialized a DB connection to %s', sql_connection) diff --git a/neat/globals/db_cleaner.py b/neat/globals/db_cleaner.py index c40c04a..1f27f1b 100644 --- a/neat/globals/db_cleaner.py +++ b/neat/globals/db_cleaner.py @@ -91,7 +91,7 @@ def execute(config, state): state['db'].cleanup_vm_resource_usage(datetime_threshold) state['db'].cleanup_host_resource_usage(datetime_threshold) if log.isEnabledFor(logging.INFO): - log.info('Cleaned up data older than %s', + log.info('Cleaned up data older than %s', datetime_threshold.strftime('%Y-%m-%d %H:%M:%S')) return state diff --git a/neat/globals/manager.py b/neat/globals/manager.py index 3d5432c..3862d7a 100644 --- a/neat/globals/manager.py +++ b/neat/globals/manager.py @@ -71,7 +71,6 @@ from contracts import contract from neat.contracts_extra import * import bottle -import json from hashlib import sha1 import novaclient from novaclient.v1_1 import client @@ -164,8 +163,8 @@ def start(): int(config['log_level'])) state = init_state(config) - switch_hosts_on(state['db'], - state['host_macs'], + switch_hosts_on(state['db'], + state['host_macs'], state['compute_hosts']) bottle.debug(True) @@ -216,13 +215,13 @@ def get_remote_addr(request): def service(): params = get_params(bottle.request) state = bottle.app().state - validate_params(state['state']['hashed_username'], - state['state']['hashed_password'], + validate_params(state['state']['hashed_username'], + state['state']['hashed_password'], params) - log.info('Received a request from %s: %s', + log.info('Received a request from %s: %s', get_remote_addr(bottle.request), str(params)) - try: + try: if params['reason'] == 0: log.info('Processing an underload of a host %s', params['host']) execute_underload( @@ -323,11 +322,11 @@ def execute_underload(config, state, host): if vm not in vms_last_cpu: log.info('No data yet for VM: %s - skipping host %s', vm, host) del hosts_cpu_total[host] - del hosts_ram_total[host] + del hosts_ram_total[host] if host in hosts_cpu_usage: del hosts_cpu_usage[host] if host in hosts_ram_usage: - del hosts_ram_usage[host] + del hosts_ram_usage[host] break host_cpu_mhz += vms_last_cpu[vm] else: @@ -360,7 +359,7 @@ def execute_underload(config, state, host): int(config['data_collector_data_length'])) vms_ram = vms_ram_limit(state['nova'], vms_to_migrate) - # Remove VMs that are not in vms_ram + # Remove VMs that are not in vms_ram # These instances might have been deleted for i, vm in enumerate(vms_to_migrate): if not vm in vms_ram: @@ -415,14 +414,14 @@ def execute_underload(config, state, host): else: log.info('Started underload VM migrations') migrate_vms(state['db'], - state['nova'], - config['vm_instance_directory'], + state['nova'], + config['vm_instance_directory'], placement) log.info('Completed underload VM migrations') if hosts_to_deactivate: - switch_hosts_off(state['db'], - config['sleep_command'], + switch_hosts_off(state['db'], + config['sleep_command'], hosts_to_deactivate) log.info('Completed processing an underload request') @@ -521,7 +520,7 @@ def execute_overload(config, state, host, vm_uuids): int(config['data_collector_data_length'])) vms_ram = vms_ram_limit(state['nova'], vms_to_migrate) - # Remove VMs that are not in vms_ram + # Remove VMs that are not in vms_ram # These instances might have been deleted for i, vm in enumerate(vms_to_migrate): if not vm in vms_ram: @@ -571,13 +570,13 @@ def execute_overload(config, state, host, vm_uuids): set(inactive_hosts_cpu.keys()).intersection( set(placement.values()))) if hosts_to_activate: - switch_hosts_on(state['db'], - state['host_macs'], + switch_hosts_on(state['db'], + state['host_macs'], hosts_to_activate) log.info('Started overload VM migrations') migrate_vms(state['db'], - state['nova'], - config['vm_instance_directory'], + state['nova'], + config['vm_instance_directory'], placement) log.info('Completed overload VM migrations') log.info('Completed processing an overload request') @@ -651,8 +650,8 @@ def host_mac(host): :rtype: str """ mac = subprocess.Popen( - ("ping -c 1 {0} > /dev/null;" + - "arp -a {0} | awk '{{print $4}}'").format(host), + ("ping -c 1 {0} > /dev/null;" + + "arp -a {0} | awk '{{print $4}}'").format(host), stdout=subprocess.PIPE, shell=True).communicate()[0].strip() if len(mac) != 17: @@ -732,23 +731,23 @@ def migrate_vms(db, nova, vm_instance_directory, placement): vm_pairs = [vms[x:x + 2] for x in xrange(0, len(vms), 2)] for vm_pair in vm_pairs: # To avoid problems with migration, need the following: - subprocess.call('chown -R nova:nova ' + vm_instance_directory, + subprocess.call('chown -R nova:nova ' + vm_instance_directory, shell=True) for vm in vm_pair: host = placement[vm] nova.servers.live_migrate(vm, host, False, False) if log.isEnabledFor(logging.INFO): - log.info('Started migration of VM %s to %s', vm, host) - + log.info('Started migration of VM %s to %s', vm, host) + time.sleep(10) while True: for vm_uuid in list(vm_pair): vm = nova.servers.get(vm_uuid) if log.isEnabledFor(logging.DEBUG): - log.debug('VM %s: %s, %s', - vm_uuid, - vm_hostname(vm), + log.debug('VM %s: %s, %s', + vm_uuid, + vm_hostname(vm), vm.status) if vm_hostname(vm) != placement[vm_uuid] or \ vm.status != u'ACTIVE': @@ -757,7 +756,7 @@ def migrate_vms(db, nova, vm_instance_directory, placement): vm_pair.remove(vm_uuid) db.insert_vm_migration(vm_uuid, placement[vm_uuid]) if log.isEnabledFor(logging.INFO): - log.info('Completed migration of VM %s to %s', + log.info('Completed migration of VM %s to %s', vm_uuid, placement[vm_uuid]) else: break @@ -782,10 +781,10 @@ def switch_hosts_off(db, sleep_command, hosts): command = 'ssh {0} "{1}"'. \ format(host, sleep_command) if log.isEnabledFor(logging.DEBUG): - log.debug('Calling: %s', command) + log.debug('Calling: %s', command) subprocess.call(command, shell=True) if log.isEnabledFor(logging.INFO): - log.info('Switched off hosts: %s', str(hosts)) + log.info('Switched off hosts: %s', str(hosts)) db.insert_host_states(dict((x, 0) for x in hosts)) @@ -807,8 +806,8 @@ def switch_hosts_on(db, host_macs, hosts): host_macs[host] = host_mac(host) command = 'ether-wake {0}'.format(host_macs[host]) if log.isEnabledFor(logging.DEBUG): - log.debug('Calling: %s', command) - subprocess.call(command, shell=True) + log.debug('Calling: %s', command) + subprocess.call(command, shell=True) if log.isEnabledFor(logging.INFO): log.info('Switched on hosts: %s', str(hosts)) db.insert_host_states(dict((x, 1) for x in hosts)) diff --git a/neat/globals/vm_placement/bin_packing.py b/neat/globals/vm_placement/bin_packing.py index 71c1a24..c2b0dd6 100644 --- a/neat/globals/vm_placement/bin_packing.py +++ b/neat/globals/vm_placement/bin_packing.py @@ -46,16 +46,16 @@ def best_fit_decreasing_factory(time_step, migration_time, params): params['last_n_vm_cpu'], get_available_resources( params['cpu_threshold'], - hosts_cpu_usage, + hosts_cpu_usage, hosts_cpu_total), get_available_resources( params['ram_threshold'], - hosts_ram_usage, + hosts_ram_usage, hosts_ram_total), - inactive_hosts_cpu, + inactive_hosts_cpu, inactive_hosts_ram, - vms_cpu, - vms_ram), + vms_cpu, + vms_ram), {}) @@ -112,8 +112,8 @@ def best_fit_decreasing(last_n_vm_cpu, hosts_cpu, hosts_ram, vms_tmp = [] for vm, cpu in vms_cpu.items(): last_n_cpu = cpu[-last_n_vm_cpu:] - vms_tmp.append((sum(last_n_cpu) / len(last_n_cpu), - vms_ram[vm], + vms_tmp.append((sum(last_n_cpu) / len(last_n_cpu), + vms_ram[vm], vm)) vms = sorted(vms_tmp, reverse=True) hosts = sorted(((v, hosts_ram[k], k) diff --git a/neat/locals/collector.py b/neat/locals/collector.py index b0ca2e2..2567d44 100644 --- a/neat/locals/collector.py +++ b/neat/locals/collector.py @@ -162,9 +162,9 @@ def init_state(config): host_cpu_usable_by_vms = float(config['host_cpu_usable_by_vms']) db = init_db(config['sql_connection']) - db.update_host(hostname, - int(host_cpu_mhz * host_cpu_usable_by_vms), - physical_cpus, + db.update_host(hostname, + int(host_cpu_mhz * host_cpu_usable_by_vms), + physical_cpus, host_ram) return {'previous_time': 0., @@ -174,7 +174,7 @@ def init_state(config): 'previous_overload': -1, 'vir_connection': vir_connection, 'hostname': hostname, - 'host_cpu_overload_threshold': + 'host_cpu_overload_threshold': float(config['host_cpu_overload_threshold']) * \ host_cpu_usable_by_vms, 'physical_cpus': physical_cpus, @@ -269,22 +269,22 @@ def execute(config, state): log.info('Completed VM data collection') log.info('Started host data collection') - (host_cpu_time_total, - host_cpu_time_busy, + (host_cpu_time_total, + host_cpu_time_busy, host_cpu_mhz) = get_host_cpu_mhz(state['physical_cpu_mhz'], state['previous_host_cpu_time_total'], state['previous_host_cpu_time_busy']) log.info('Completed host data collection') - + if state['previous_time'] > 0: append_vm_data_locally(vm_path, cpu_mhz, data_length) append_vm_data_remotely(state['db'], cpu_mhz) - + host_cpu_mhz_hypervisor = host_cpu_mhz - sum(cpu_mhz.values()) if host_cpu_mhz_hypervisor < 0: host_cpu_mhz_hypervisor = 0 append_host_data_locally(host_path, host_cpu_mhz_hypervisor, data_length) - append_host_data_remotely(state['db'], + append_host_data_remotely(state['db'], state['hostname'], host_cpu_mhz_hypervisor) @@ -304,7 +304,7 @@ def execute(config, state): state['previous_cpu_time'] = cpu_time state['previous_host_cpu_time_total'] = host_cpu_time_total state['previous_host_cpu_time_busy'] = host_cpu_time_busy - + log.info('Completed an iteration') return state @@ -485,7 +485,7 @@ def append_vm_data_locally(path, data, data_length): f.truncate(0) f.seek(0) f.write('\n'.join([str(x) for x in values]) + '\n') - + @contract def append_vm_data_remotely(db, data): @@ -695,7 +695,7 @@ def get_host_characteristics(vir_connection): @contract() -def log_host_overload(db, overload_threshold, hostname, +def log_host_overload(db, overload_threshold, hostname, previous_overload, host_mhz, vms_mhz): """ Log to the DB whether the host is overloaded. diff --git a/neat/locals/manager.py b/neat/locals/manager.py index 03fb40a..c377ff5 100644 --- a/neat/locals/manager.py +++ b/neat/locals/manager.py @@ -103,7 +103,6 @@ local manager performs the following steps: from contracts import contract from neat.contracts_extra import * -import itertools import requests from hashlib import sha1 import time @@ -159,7 +158,7 @@ def init_state(config): raise OSError(message) physical_cpu_mhz_total = int( - common.physical_cpu_mhz_total(vir_connection) * + common.physical_cpu_mhz_total(vir_connection) * float(config['host_cpu_usable_by_vms'])) return {'previous_time': 0., 'vir_connection': vir_connection, @@ -226,7 +225,7 @@ def execute(config, state): host_cpu_mhz = get_local_host_data(host_path) host_cpu_utilization = vm_mhz_to_percentage( - vm_cpu_mhz.values(), + vm_cpu_mhz.values(), host_cpu_mhz, state['physical_cpu_mhz_total']) if log.isEnabledFor(logging.DEBUG): @@ -244,7 +243,6 @@ def execute(config, state): if 'underload_detection' not in state: underload_detection_params = common.parse_parameters( config['algorithm_underload_detection_parameters']) - underload_detection_state = None underload_detection = common.call_function_by_name( config['algorithm_underload_detection_factory'], [time_step, @@ -255,7 +253,6 @@ def execute(config, state): overload_detection_params = common.parse_parameters( config['algorithm_overload_detection_parameters']) - overload_detection_state = None overload_detection = common.call_function_by_name( config['algorithm_overload_detection_factory'], [time_step, @@ -266,7 +263,6 @@ def execute(config, state): vm_selection_params = common.parse_parameters( config['algorithm_vm_selection_parameters']) - vm_selection_state = None vm_selection = common.call_function_by_name( config['algorithm_vm_selection_factory'], [time_step, @@ -289,14 +285,14 @@ def execute(config, state): log.info('Underload detected') try: r = requests.put('http://' + config['global_manager_host'] + \ - ':' + config['global_manager_port'], - {'username': state['hashed_username'], - 'password': state['hashed_password'], + ':' + config['global_manager_port'], + {'username': state['hashed_username'], + 'password': state['hashed_password'], 'time': time.time(), 'host': state['hostname'], 'reason': 0}) if log.isEnabledFor(logging.INFO): - log.info('Received response: [%s] %s', + log.info('Received response: [%s] %s', r.status_code, r.content) except requests.exceptions.ConnectionError: log.exception('Exception at underload request:') @@ -320,15 +316,15 @@ def execute(config, state): log.info('Selected VMs to migrate: %s', str(vm_uuids)) try: r = requests.put('http://' + config['global_manager_host'] + \ - ':' + config['global_manager_port'], - {'username': state['hashed_username'], - 'password': state['hashed_password'], + ':' + config['global_manager_port'], + {'username': state['hashed_username'], + 'password': state['hashed_password'], 'time': time.time(), 'host': state['hostname'], - 'reason': 1, + 'reason': 1, 'vm_uuids': ','.join(vm_uuids)}) if log.isEnabledFor(logging.INFO): - log.info('Received response: [%s] %s', + log.info('Received response: [%s] %s', r.status_code, r.content) except requests.exceptions.ConnectionError: log.exception('Exception at overload request:') @@ -428,7 +424,7 @@ def get_max_ram(vir_connection, uuid): :return: The maximum RAM of the VM in MB. :rtype: int|None """ - try: + try: domain = vir_connection.lookupByUUIDString(uuid) return domain.maxMemory() / 1024 except libvirt.libvirtError: @@ -446,7 +442,7 @@ def vm_mhz_to_percentage(vm_mhz_history, host_mhz_history, physical_cpu_mhz): :type host_mhz_history: list(int) :param physical_cpu_mhz: The total frequency of the physical CPU in MHz. - :type physical_cpu_mhz: int + :type physical_cpu_mhz: int,>0 :return: The history of the host's CPU utilization in percentages. :rtype: list(float) diff --git a/neat/locals/overload/mhod/core.py b/neat/locals/overload/mhod/core.py index 126873f..03d749b 100644 --- a/neat/locals/overload/mhod/core.py +++ b/neat/locals/overload/mhod/core.py @@ -54,7 +54,7 @@ def mhod_factory(time_step, migration_time, params): time_step, migration_time, utilization, - state) + state) return mhod_wrapper @@ -158,7 +158,7 @@ def mhod(state_config, otf, window_sizes, bruteforce_step, learning_steps, tmp = set(p[current_state]) if len(tmp) != 1 or 0 not in tmp: policy = bruteforce.optimize( - bruteforce_step, 1.0, otf, (migration_time / time_step), + bruteforce_step, 1.0, otf, (migration_time / time_step), ls, p, state_vector, time_in_states, time_in_state_n) log.debug('MHOD policy:' + str(policy)) command = issue_command_deterministic(policy) diff --git a/neat/locals/underload/trivial.py b/neat/locals/underload/trivial.py index 06dea2c..b2355c0 100644 --- a/neat/locals/underload/trivial.py +++ b/neat/locals/underload/trivial.py @@ -90,7 +90,7 @@ def threshold(threshold, utilization): def last_n_average_threshold(threshold, n, utilization): """ Averaging static threshold-based underload detection algorithm. - The algorithm returns True, if the average of the last n values of + The algorithm returns True, if the average of the last n values of the host's CPU utilization is lower than the specified threshold. :param threshold: The static underload CPU utilization threshold. diff --git a/neat/locals/vm_selection/algorithms.py b/neat/locals/vm_selection/algorithms.py index fe50760..42010eb 100644 --- a/neat/locals/vm_selection/algorithms.py +++ b/neat/locals/vm_selection/algorithms.py @@ -102,7 +102,7 @@ def minimum_migration_time_max_cpu_factory(time_step, migration_time, params): """ return lambda vms_cpu, vms_ram, state=None: \ ([minimum_migration_time_max_cpu(params['last_n'], - vms_cpu, + vms_cpu, vms_ram)], {})