diff --git a/neat/locals/collector.py b/neat/locals/collector.py index d64db63..650abd7 100644 --- a/neat/locals/collector.py +++ b/neat/locals/collector.py @@ -96,6 +96,7 @@ from neat.contracts_extra import * import os import time from collections import deque +import libvirt import neat.common as common from neat.config import * @@ -129,7 +130,7 @@ def start(): log.info('Creaned up the local VM data directory: ' + vm_path) interval = config.get('data_collector_interval') - log.info('Starting the data collector, ' + + log.info('Starting the data collector, ' + 'iterations every %s seconds', interval) return common.start( init_state, @@ -213,11 +214,19 @@ def execute(config, state): vms_previous = get_previous_vms(path) vms_current = get_current_vms(state['vir_connection']) - vms_added = get_added_vms(vms_previous, vms_current) + vms_added = get_added_vms(vms_previous, vms_current.keys()) added_vm_data = dict() if vms_added: if log.isEnabledFor(logging.DEBUG): log.debug('Added VMs: %s', str(vms_added)) + + for vm in list(vms_added): + if vms_current[vm] != libvirt.VIR_DOMAIN_RUNNING: + del vms_added[vm] + del vms_current[vm] + if log.isEnabledFor(logging.DEBUG): + log.debug('Added VM %s skipped as migrating in', vm) + added_vm_data = fetch_remote_data(state['db'], data_length, vms_added) @@ -225,7 +234,7 @@ def execute(config, state): log.debug('Fetched remote data: %s', str(added_vm_data)) write_data_locally(path, added_vm_data, data_length) - vms_removed = get_removed_vms(vms_previous, vms_current) + vms_removed = get_removed_vms(vms_previous, vms_current.keys()) if vms_removed: if log.isEnabledFor(logging.DEBUG): log.debug('Removed VMs: %s', str(vms_removed)) @@ -239,7 +248,7 @@ def execute(config, state): state['previous_cpu_time'], state['previous_time'], current_time, - vms_current, + vms_current.keys(), added_vm_data) if state['previous_time'] > 0: append_data_locally(path, cpu_mhz, data_length) @@ -266,19 +275,18 @@ def get_previous_vms(path): @contract() def get_current_vms(vir_connection): - """ Get a list of VM UUIDs from libvirt. + """ Get a dict of VM UUIDs to states from libvirt. :param vir_connection: A libvirt connection object. :type vir_connection: virConnect - :return: The list of VM UUIDs from libvirt. - :rtype: list(str) + :return: The dict of VM UUIDs to states from libvirt. + :rtype: dict(str: int) """ - # TODO: need to check the status of VMs, whether it's migrating - # If a VM is migrating in, it shouldn't be taken into account - vm_uuids = [] + vm_uuids = {} for vm_id in vir_connection.listDomainsID(): - vm_uuids.append(vir_connection.lookupByID(vm_id).UUIDString()) + vm = vir_connection.lookupByID(vm_id) + vm_uuids[vm.UUIDString()] = vm.state(0)[0] return vm_uuids @@ -472,8 +480,8 @@ def get_cpu_mhz(vir_connection, physical_core_mhz, previous_cpu_time, current_cpu_time) previous_cpu_time[uuid] = current_cpu_time if log.isEnabledFor(logging.DEBUG): - log.debug('VM %s: previous CPU time %d, ' + - 'current CPU time %d, CPU MHz %d', + log.debug('VM %s: previous CPU time %d, ' + + 'current CPU time %d, CPU MHz %d', uuid, cpu_time, current_cpu_time, cpu_mhz[uuid]) for uuid in added_vms: diff --git a/tests/locals/test_collector.py b/tests/locals/test_collector.py index 65c13a9..380bc54 100644 --- a/tests/locals/test_collector.py +++ b/tests/locals/test_collector.py @@ -91,7 +91,7 @@ class Collector(TestCase): @qc def get_current_vms( ids=dict_( - keys=int_(min=0), + keys=int_(min=0, max=1000), values=str_(of='abc123-', min_length=36, max_length=36), min_length=0, max_length=10 ) @@ -100,6 +100,7 @@ class Collector(TestCase): def init_vm(id): vm = mock('vm') expect(vm).UUIDString().and_return(ids[id]).once() + expect(vm).state(0).and_return([id * 13, id]).once() return vm connection = libvirt.virConnect() @@ -107,7 +108,8 @@ class Collector(TestCase): if ids: expect(connection).lookupByID(any_int) \ .and_call(lambda id: init_vm(id)) - assert collector.get_current_vms(connection) == ids.values() + expected = dict((v, k * 13) for k, v in ids.items()) + assert collector.get_current_vms(connection) == expected @qc def get_added_vms(