Refactor local manager

Change-Id: I1d17780079b2c08f3436c32b9e3bad44ddbc0eb9
This commit is contained in:
kong 2015-05-03 00:00:13 +08:00
parent 149b1ca0f7
commit b58b966f50

View File

@ -1,11 +1,11 @@
# Copyright 2012 Anton Beloglazov # Copyright 2012 Anton Beloglazov
# Copyright 2015 Huawei Technologies Co. Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
@ -101,36 +101,36 @@ local manager performs the following steps:
""" """
from contracts import contract from contracts import contract
from neat.contracts_primitive import *
from neat.contracts_extra import *
import requests
from hashlib import sha1 from hashlib import sha1
import libvirt
import os
import requests
import time import time
import neat.common as common from oslo_config import cfg
from neat.config import * from oslo_log import log as logging
from neat.db_utils import *
import logging from terracotta import common
from terracotta.contracts_primitive import *
from terracotta.openstack.common import service from terracotta.contracts_extra import *
from terracotta.openstack.common import periodic_task
from terracotta.openstack.common import threadgroup
from terracotta.utils import db_utils
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class LocalManager(service.Service): class LocalManager(periodic_task.PeriodicTasks):
def __init__(self): def __init__(self):
super(Service, self).__init__() super(Service, self).__init__()
self.state = self.init_state() self.state = self.init_state()
self.tg = threadgroup.ThreadGroup()
self.tg.add_dynamic_timer( self.tg.add_dynamic_timer(
self.execute, self.run_periodic_tasks,
initial_delay=initial_delay, initial_delay=None,
periodic_interval_max=self.periodic_interval_max, periodic_interval_max=1,
self.state context=None
) )
def init_state(self): def init_state(self):
@ -145,98 +145,90 @@ class LocalManager(service.Service):
vir_connection = libvirt.openReadOnly(None) vir_connection = libvirt.openReadOnly(None)
if vir_connection is None: if vir_connection is None:
message = 'Failed to open a connection to the hypervisor' message = 'Failed to open a connection to the hypervisor'
log.critical(message) LOG.critical(message)
raise OSError(message) raise OSError(message)
physical_cpu_mhz_total = int( physical_cpu_mhz_total = int(
common.physical_cpu_mhz_total(vir_connection) * common.physical_cpu_mhz_total(vir_connection) *
float(config['host_cpu_usable_by_vms'])) float(CONF.host_cpu_usable_by_vms))
return {'previous_time': 0., return {'previous_time': 0.,
'vir_connection': vir_connection, 'vir_connection': vir_connection,
'db': init_db(config['sql_connection']), 'db': db_utils.init_db(),
'physical_cpu_mhz_total': physical_cpu_mhz_total, 'physical_cpu_mhz_total': physical_cpu_mhz_total,
'hostname': vir_connection.getHostname(), 'hostname': vir_connection.getHostname(),
'hashed_username': sha1(config['os_admin_user']).hexdigest(), 'hashed_username': sha1(CONF.os_admin_user).hexdigest(),
'hashed_password': sha1(config['os_admin_password']).hexdigest()} 'hashed_password': sha1(CONF.os_admin_password).hexdigest()}
def execute(self, state): @periodic_task.periodic_task
def execute(self):
""" Execute an iteration of the local manager. """ Execute an iteration of the local manager.
1. Read the data on resource usage by the VMs running on the host from 1. Read the data on resource usage by the VMs running on the host from
the <local_data_directory>/vm directory. the <local_data_directory>/vm directory.
2. Call the function specified in the algorithm_underload_detection 2. Call the function specified in the algorithm_underload_detection
configuration option and pass the data on the resource usage by the configuration option and pass the data on the resource usage by the
VMs, as well as the frequency of the CPU as arguments. VMs, as well as the frequency of the CPU as arguments.
3. If the host is underloaded, send a request to the REST API of the 3. If the host is underloaded, send a request to the REST API of the
global manager and pass a list of the UUIDs of all the VMs global manager and pass a list of the UUIDs of all the VMs
currently running on the host in the vm_uuids parameter, as well as currently running on the host in the vm_uuids parameter, as well as
the reason for migration as being 0. the reason for migration as being 0.
4. If the host is not underloaded, call the function specified in the 4. If the host is not underloaded, call the function specified in the
algorithm_overload_detection configuration option and pass the data algorithm_overload_detection configuration option and pass the data
on the resource usage by the VMs, as well as the frequency of the on the resource usage by the VMs, as well as the frequency of the
host's CPU as arguments. host's CPU as arguments.
5. If the host is overloaded, call the function specified in the 5. If the host is overloaded, call the function specified in the
algorithm_vm_selection configuration option and pass the data on algorithm_vm_selection configuration option and pass the data on
the resource usage by the VMs, as well as the frequency of the the resource usage by the VMs, as well as the frequency of the
host's CPU as arguments host's CPU as arguments
6. If the host is overloaded, send a request to the REST API of the 6. If the host is overloaded, send a request to the REST API of the
global manager and pass a list of the UUIDs of the VMs selected by global manager and pass a list of the UUIDs of the VMs selected by
the VM selection algorithm in the vm_uuids parameter, as well as the VM selection algorithm in the vm_uuids parameter, as well as
the reason for migration as being 1. the reason for migration as being 1.
:param config: A config dictionary.
:type config: dict(str: *)
:param state: A state dictionary.
:type state: dict(str: *)
:return: The updated state dictionary.
:rtype: dict(str: *)
""" """
log.info('Started an iteration') LOG.info('Started an iteration')
vm_path = common.build_local_vm_path(config['local_data_directory']) state = self.state
vm_cpu_mhz = get_local_vm_data(vm_path) vm_path = common.build_local_vm_path(CONF.local_data_directory)
vm_ram = get_ram(state['vir_connection'], vm_cpu_mhz.keys()) vm_cpu_mhz = self.get_local_vm_data(vm_path)
vm_cpu_mhz = cleanup_vm_data(vm_cpu_mhz, vm_ram.keys()) vm_ram = self.get_ram(state['vir_connection'], vm_cpu_mhz.keys())
vm_cpu_mhz = self.cleanup_vm_data(vm_cpu_mhz, vm_ram.keys())
if not vm_cpu_mhz: if not vm_cpu_mhz:
if log.isEnabledFor(logging.INFO): LOG.info('Skipped an iteration')
log.info('The host is idle') return
log.info('Skipped an iteration')
return state
host_path = common.build_local_host_path(config['local_data_directory']) host_path = common.build_local_host_path(CONF.local_data_directory)
host_cpu_mhz = get_local_host_data(host_path) host_cpu_mhz = self.get_local_host_data(host_path)
host_cpu_utilization = vm_mhz_to_percentage( host_cpu_utilization = self.vm_mhz_to_percentage(
vm_cpu_mhz.values(), vm_cpu_mhz.values(),
host_cpu_mhz, host_cpu_mhz,
state['physical_cpu_mhz_total']) state['physical_cpu_mhz_total'])
if log.isEnabledFor(logging.DEBUG): LOG.debug('The total physical CPU Mhz: %s',
log.debug('The total physical CPU Mhz: %s', str(state['physical_cpu_mhz_total'])) str(state['physical_cpu_mhz_total']))
log.debug('VM CPU MHz: %s', str(vm_cpu_mhz)) LOG.debug('VM CPU MHz: %s', str(vm_cpu_mhz))
log.debug('Host CPU MHz: %s', str(host_cpu_mhz)) LOG.debug('Host CPU MHz: %s', str(host_cpu_mhz))
log.debug('CPU utilization: %s', str(host_cpu_utilization)) LOG.debug('CPU utilization: %s', str(host_cpu_utilization))
if not host_cpu_utilization: if not host_cpu_utilization:
log.info('Not enough data yet - skipping to the next iteration') LOG.info('Not enough data yet - skipping to the next iteration')
log.info('Skipped an iteration') LOG.info('Skipped an iteration')
return state return
time_step = int(config['data_collector_interval']) time_step = int(CONF.data_collector_interval)
migration_time = common.calculate_migration_time( migration_time = common.calculate_migration_time(
vm_ram, float(config['network_migration_bandwidth'])) vm_ram, float(CONF.network_migration_bandwidth))
if 'underload_detection' not in state: if 'underload_detection' not in state:
underload_detection_params = common.parse_parameters( underload_detection_params = common.parse_parameters(
config['algorithm_underload_detection_parameters']) CONF.algorithm_underload_detection_parameters)
underload_detection = common.call_function_by_name( underload_detection = common.call_function_by_name(
config['algorithm_underload_detection_factory'], CONF.algorithm_underload_detection_factory,
[time_step, [time_step,
migration_time, migration_time,
underload_detection_params]) underload_detection_params])
@ -244,9 +236,9 @@ class LocalManager(service.Service):
state['underload_detection_state'] = {} state['underload_detection_state'] = {}
overload_detection_params = common.parse_parameters( overload_detection_params = common.parse_parameters(
config['algorithm_overload_detection_parameters']) CONF.algorithm_overload_detection_parameters)
overload_detection = common.call_function_by_name( overload_detection = common.call_function_by_name(
config['algorithm_overload_detection_factory'], CONF.algorithm_overload_detection_factory,
[time_step, [time_step,
migration_time, migration_time,
overload_detection_params]) overload_detection_params])
@ -254,9 +246,9 @@ class LocalManager(service.Service):
state['overload_detection_state'] = {} state['overload_detection_state'] = {}
vm_selection_params = common.parse_parameters( vm_selection_params = common.parse_parameters(
config['algorithm_vm_selection_parameters']) CONF.algorithm_vm_selection_parameters)
vm_selection = common.call_function_by_name( vm_selection = common.call_function_by_name(
config['algorithm_vm_selection_factory'], CONF.algorithm_vm_selection_factory,
[time_step, [time_step,
migration_time, migration_time,
vm_selection_params]) vm_selection_params])
@ -267,71 +259,35 @@ class LocalManager(service.Service):
overload_detection = state['overload_detection'] overload_detection = state['overload_detection']
vm_selection = state['vm_selection'] vm_selection = state['vm_selection']
if log.isEnabledFor(logging.INFO): LOG.info('Started underload detection')
log.info('Started underload detection')
underload, state['underload_detection_state'] = underload_detection( underload, state['underload_detection_state'] = underload_detection(
host_cpu_utilization, state['underload_detection_state']) host_cpu_utilization, state['underload_detection_state'])
if log.isEnabledFor(logging.INFO): LOG.info('Completed underload detection')
log.info('Completed underload detection')
if log.isEnabledFor(logging.INFO): LOG.info('Started overload detection')
log.info('Started overload detection')
overload, state['overload_detection_state'] = overload_detection( overload, state['overload_detection_state'] = overload_detection(
host_cpu_utilization, state['overload_detection_state']) host_cpu_utilization, state['overload_detection_state'])
if log.isEnabledFor(logging.INFO): LOG.info('Completed overload detection')
log.info('Completed overload detection')
if underload: if underload:
if log.isEnabledFor(logging.INFO): LOG.info('Underload detected')
log.info('Underload detected') # TODO(xylan): send rpc message to global manager
try:
r = requests.put('http://' + config['global_manager_host'] +
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
'time': time.time(),
'host': state['hostname'],
'reason': 0})
if log.isEnabledFor(logging.INFO):
log.info('Received response: [%s] %s',
r.status_code, r.content)
except requests.exceptions.ConnectionError:
log.exception('Exception at underload request:')
else: else:
if overload: if overload:
if log.isEnabledFor(logging.INFO): LOG.info('Overload detected')
log.info('Overload detected')
log.info('Started VM selection') LOG.info('Started VM selection')
vm_uuids, state['vm_selection_state'] = vm_selection( vm_uuids, state['vm_selection_state'] = vm_selection(
vm_cpu_mhz, vm_ram, state['vm_selection_state']) vm_cpu_mhz, vm_ram, state['vm_selection_state'])
log.info('Completed VM selection') LOG.info('Completed VM selection')
if log.isEnabledFor(logging.INFO): LOG.info('Selected VMs to migrate: %s', str(vm_uuids))
log.info('Selected VMs to migrate: %s', str(vm_uuids)) # TODO(xylan): send rpc message to global manager
try:
r = requests.put('http://' + config['global_manager_host'] +
':' + config['global_manager_port'],
{'username': state['hashed_username'],
'password': state['hashed_password'],
'time': time.time(),
'host': state['hostname'],
'reason': 1,
'vm_uuids': ','.join(vm_uuids)})
if log.isEnabledFor(logging.INFO):
log.info('Received response: [%s] %s',
r.status_code, r.content)
except requests.exceptions.ConnectionError:
log.exception('Exception at overload request:')
else: else:
if log.isEnabledFor(logging.INFO): LOG.info('No underload or overload detected')
log.info('No underload or overload detected')
if log.isEnabledFor(logging.INFO): LOG.info('Completed an iteration')
log.info('Completed an iteration') self.state = state
return state
@contract @contract
@ -388,21 +344,21 @@ class LocalManager(service.Service):
@contract @contract
def get_ram(self, vir_connection, vms): def get_ram(self, vir_connection, vm_ids):
""" Get the maximum RAM for a set of VM UUIDs. """ Get the maximum RAM for a set of VM UUIDs.
:param vir_connection: A libvirt connection object. :param vir_connection: A libvirt connection object.
:type vir_connection: virConnect :type vir_connection: virConnect
:param vms: A list of VM UUIDs. :param vm_ids: A list of VM UUIDs.
:type vms: list(str) :type vm_ids: list(str)
:return: The maximum RAM for the VM UUIDs. :return: The maximum RAM for the VM UUIDs.
:rtype: dict(str : long) :rtype: dict(str : long)
""" """
vms_ram = {} vms_ram = {}
for uuid in vms: for uuid in vm_ids:
ram = get_max_ram(vir_connection, uuid) ram = self.get_max_ram(vir_connection, uuid)
if ram: if ram:
vms_ram[uuid] = ram vms_ram[uuid] = ram
@ -430,7 +386,8 @@ class LocalManager(service.Service):
@contract @contract
def vm_mhz_to_percentage(self, vm_mhz_history, host_mhz_history, physical_cpu_mhz): def vm_mhz_to_percentage(self, vm_mhz_history, host_mhz_history,
physical_cpu_mhz):
""" Convert VM CPU utilization to the host's CPU utilization. """ Convert VM CPU utilization to the host's CPU utilization.
:param vm_mhz_history: A list of CPU utilization histories of VMs in MHz. :param vm_mhz_history: A list of CPU utilization histories of VMs in MHz.