Improve success rate

Change-Id: I59e4a8d8e23575fb0af74924dad027340517f7e7
This commit is contained in:
Jung, Gueyoung (gj6146) 2017-02-27 12:54:50 -05:00 committed by Omar Rivera
parent 46bd04cb24
commit 1c53afa58e
15 changed files with 236 additions and 613 deletions

19
valet/api/common/ostro_helper.py Normal file → Executable file
View File

@ -94,7 +94,7 @@ class Ostro(object):
def __init__(self):
"""Initializer."""
self.tries = conf.music.get('tries', 100)
self.tries = conf.music.get('tries', 1000)
self.interval = conf.music.get('interval', 0.1)
def _map_names_to_uuids(self, mapping, data):
@ -127,34 +127,27 @@ class Ostro(object):
# TODO(JD): This really belongs in valet-engine once it exists.
def _send(self, stack_id, request):
"""Send request."""
request_query = Query(PlacementRequest)
# Creating the placement request effectively enqueues it.
PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612
result_query = Query(PlacementResult)
requested = False
for __ in range(self.tries, 0, -1): # pylint: disable=W0612
# Take a breather in between checks.
# TODO(JD): This is a blocking operation at the moment.
time.sleep(self.interval)
# First, check to see if there's already a response.
result = result_query.filter_by(stack_id=stack_id).first()
if result:
placement = result.placement
result.delete()
return placement
elif not requested:
# Next, check to see if there's already a request.
prior_request = request_query.filter_by(
stack_id=stack_id).first()
if not prior_request:
# No request? Make one! Creating it enqueues it.
PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612
requested = True
self.error_uri = '/errors/server_error'
message = "Timed out waiting for a response."
LOG.error(message + " for stack_id = " + stack_id)
response = self._build_error(message)
return json.dumps(response)

45
valet/engine/conf.py Normal file → Executable file
View File

@ -31,45 +31,24 @@ ostro_cli_opts = [
engine_group = cfg.OptGroup(name='engine', title='Valet Engine conf')
engine_opts = [
cfg.StrOpt('pid', default='/var/run/valet/ostro-daemon.pid'),
cfg.StrOpt('mode', default='live', help='sim will let Ostro simulate datacenter, '
'while live will let it handle a real datacenter'),
cfg.StrOpt('mode', default='live', help='run as actual or simulation for test'),
cfg.StrOpt('sim_cfg_loc', default='/etc/valet/engine/ostro_sim.cfg'),
cfg.BoolOpt('network_control', default=False, help='whether network controller (i.e., Tegu) has been deployed'),
cfg.StrOpt('network_control_url', default='http://network_control:29444/tegu/api'),
cfg.StrOpt('ip', default='localhost'),
cfg.IntOpt('health_timeout', default=10, help='health check grace period (seconds, default=10)'),
cfg.IntOpt('priority', default=1, help='this instance priority (master=1)'),
cfg.StrOpt('rpc_server_ip', default='localhost',
help='Set RPC server ip and port if used. Otherwise, ignore these parameters'),
cfg.StrOpt('rpc_server_port', default='8002'),
cfg.StrOpt('datacenter_name', default='bigsite',
help='Inform the name of datacenter (region name), where Valet/Ostro is deployed.'),
cfg.StrOpt('datacenter_name', default='aic', help='The name of region'),
cfg.IntOpt('num_of_region_chars', default='3', help='number of chars that indicates the region code'),
cfg.StrOpt('rack_code_list', default='r', help='rack indicator.'),
cfg.ListOpt('node_code_list', default='a,c,u,f,o,p,s',
help='indicates the node type. a: network, c KVM compute, u: ESXi compute, f: ?, o: operation, '
'p: power, s: storage.'),
cfg.StrOpt('compute_trigger_time', default='1:00',
help='trigger time or frequency for checking compute hosting server status (i.e., call Nova)'),
cfg.IntOpt('compute_trigger_frequency', default=3600,
help='trigger time or frequency for checking compute hosting server status (i.e., call Nova)'),
cfg.StrOpt('topology_trigger_time', default='2:00',
help='Set trigger time or frequency for checking datacenter topology (i.e., call AIC Formation)'),
cfg.IntOpt('topology_trigger_frequency', default=3600,
help='Set trigger time or frequency for checking datacenter topology (i.e., call AIC Formation)'),
cfg.FloatOpt('default_cpu_allocation_ratio', default=16, help='Set default overbooking ratios. Note that '
'each compute node can have its own ratios'),
cfg.FloatOpt('default_ram_allocation_ratio', default=1.5, help='Set default overbooking ratios. Note that '
'each compute node can have its own ratios'),
cfg.FloatOpt('default_disk_allocation_ratio', default=1, help='Set default overbooking ratios. Note that '
'each compute node can have its own ratios'),
cfg.FloatOpt('static_cpu_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) '
'that are set aside for applications workload spikes.'),
cfg.FloatOpt('static_mem_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) '
'that are set aside for applications workload spikes.'),
cfg.FloatOpt('static_local_disk_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) '
'that are set aside for applications workload '
'spikes.'),
cfg.ListOpt('node_code_list', default='a,c,u,f,o,p,s', help='Indicates the node type'),
cfg.IntOpt('compute_trigger_frequency', default=1800, help='Frequency for checking compute hosting status'),
cfg.IntOpt('topology_trigger_frequency', default=3600, help='Frequency for checking datacenter topology'),
cfg.IntOpt('update_batch_wait', default=600, help='Wait time before start resource synch from Nova'),
cfg.FloatOpt('default_cpu_allocation_ratio', default=16, help='Default CPU overbooking ratios'),
cfg.FloatOpt('default_ram_allocation_ratio', default=1.5, help='Default mem overbooking ratios'),
cfg.FloatOpt('default_disk_allocation_ratio', default=1, help='Default disk overbooking ratios'),
cfg.FloatOpt('static_cpu_standby_ratio', default=20, help='Percentages of standby cpu resources'),
cfg.FloatOpt('static_mem_standby_ratio', default=20, help='Percentages of standby mem resources'),
cfg.FloatOpt('static_local_disk_standby_ratio', default=20, help='Percentages of disk standby esources'),
] + logger_conf("engine")
listener_group = cfg.OptGroup(name='events_listener',

23
valet/engine/listener/listener_manager.py Normal file → Executable file
View File

@ -18,7 +18,6 @@
from datetime import datetime
import json
import pika
import pprint
import threading
import traceback
from valet.common.conf import get_logger
@ -133,18 +132,8 @@ class ListenerManager(threading.Thread):
else:
return
self.listener_logger.debug("\nMessage No: %s\n",
method_frame.delivery_tag)
message_obj = yaml.load(body)
if 'oslo.message' in message_obj.keys():
message_obj = yaml.load(message_obj['oslo.message'])
if self.config.events_listener.output_format == 'json':
self.listener_logger.debug(json.dumps(message_obj,
sort_keys=True, indent=2))
elif self.config.events_listener.output_format == 'yaml':
self.listener_logger.debug(yaml.dump(message_obj))
else:
self.listener_logger.debug(pprint.pformat(message_obj))
self.listener_logger.debug("\nMessage No: %s\n", method_frame.delivery_tag)
self.listener_logger.debug(json.dumps(message, sort_keys=True, indent=2))
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
except Exception:
self.listener_logger.error(traceback.format_exc())
@ -195,7 +184,13 @@ class ListenerManager(threading.Thread):
def is_nova_name(self, args):
"""Return True if object name is Instance."""
return args['objinst']['nova_object.name'] == 'Instance'
if args['objinst']['nova_object.data']['vm_state'] in ['deleted', 'active']:
return True
else:
if args['objinst']['nova_object.data']['vm_state'] == 'building':
return False
else:
return True
def is_nova_state(self, args):
"""Return True if object vm_state is deleted or active."""

View File

@ -15,11 +15,23 @@
"""App Handler."""
import operator
import time
from valet.engine.optimizer.app_manager.app_topology import AppTopology
from valet.engine.optimizer.app_manager.app_topology_base import VM
from valet.engine.optimizer.app_manager.application import App
class AppHistory(object):
def __init__(self, _key):
self.decision_key = _key
self.host = None
self.result = None
self.timestamp = None
class AppHandler(object):
"""App Handler Class.
@ -38,10 +50,59 @@ class AppHandler(object):
""" current app requested, a temporary copy """
self.apps = {}
self.decision_history = {}
self.max_decision_history = 5000
self.min_decision_history = 1000
self.last_log_index = 0
self.status = "success"
# NOTE(GJ): do not cache migration decision
def check_history(self, _app):
stack_id = _app["stack_id"]
action = _app["action"]
if action == "create":
decision_key = stack_id + ":" + action + ":none"
if decision_key in self.decision_history.keys():
return (decision_key, self.decision_history[decision_key].result)
else:
return (decision_key, None)
elif action == "replan":
decision_key = stack_id + ":" + action + ":" + _app["orchestration_id"]
if decision_key in self.decision_history.keys():
return (decision_key, self.decision_history[decision_key].result)
else:
return (decision_key, None)
else:
return (None, None)
def put_history(self, _decision_key, _result):
decision_key_list = _decision_key.split(":")
action = decision_key_list[1]
if action == "create" or action == "replan":
app_history = AppHistory(_decision_key)
app_history.result = _result
app_history.timestamp = time.time()
self.decision_history[_decision_key] = app_history
if len(self.decision_history) > self.max_decision_history:
self._clean_decision_history()
def _clean_decision_history(self):
count = 0
num_of_removes = len(self.decision_history) - self.min_decision_history
remove_item_list = []
for decision in (sorted(self.decision_history.values(), key=operator.attrgetter('timestamp'))):
remove_item_list.append(decision.decision_key)
count += 1
if count == num_of_removes:
break
for dk in remove_item_list:
if dk in self.decision_history.keys():
del self.decision_history[dk]
def add_app(self, _app):
"""Add app and set or regenerate topology, return updated topology."""
self.apps.clear()
@ -102,7 +163,7 @@ class AppHandler(object):
return app_topology
def add_placement(self, _placement_map, _timestamp):
def add_placement(self, _placement_map, _app_topology, _timestamp):
"""Change requested apps to scheduled and place them."""
for v in _placement_map.keys():
if self.apps[v.app_uuid].status == "requested":
@ -110,7 +171,16 @@ class AppHandler(object):
self.apps[v.app_uuid].timestamp_scheduled = _timestamp
if isinstance(v, VM):
self.apps[v.app_uuid].add_vm(v, _placement_map[v])
if self.apps[v.app_uuid].request_type == "replan":
if v.uuid in _app_topology.planned_vm_map.keys():
self.apps[v.app_uuid].add_vm(v, _placement_map[v], "replanned")
else:
self.apps[v.app_uuid].add_vm(v, _placement_map[v], "scheduled")
if v.uuid == _app_topology.candidate_list_map.keys()[0]:
self.apps[v.app_uuid].add_vm(v, _placement_map[v], "replanned")
else:
self.apps[v.app_uuid].add_vm(v, _placement_map[v], "scheduled")
# NOTE(GJ): do not handle Volume in this version
else:
if _placement_map[v] in self.resource.hosts.keys():
host = self.resource.hosts[_placement_map[v]]
@ -125,12 +195,13 @@ class AppHandler(object):
pass
def _store_app_placements(self):
# NOTE(GJ): do not track application history in this version
for appk, app in self.apps.iteritems():
json_info = app.get_json_info()
if self.db.add_app(appk, json_info) is False:
return False
if self.db is not None:
for appk, app in self.apps.iteritems():
json_info = app.get_json_info()
if self.db.add_app(appk, json_info) is False:
return False
return True
def remove_placement(self):
@ -204,12 +275,11 @@ class AppHandler(object):
if _action == "replan":
if vmk == _app["orchestration_id"]:
_app_topology.candidate_list_map[vmk] = \
_app["locations"]
_app_topology.candidate_list_map[vmk] = _app["locations"]
elif vmk in _app["exclusions"]:
_app_topology.planned_vm_map[vmk] = vm["host"]
if vm["status"] == "replanned":
_app_topology.planned_vm_map[vmk] = vm["host"]
elif _action == "migrate":
if vmk == _app["orchestration_id"]:
_app_topology.exclusion_list_map[vmk] = _app[

View File

@ -28,7 +28,7 @@ class App(object):
self.app_id = _app_id
self.app_name = _app_name
self.request_type = _action # create, update, or ping
self.request_type = _action # create, replan, migrate, or ping
self.timestamp_scheduled = 0
@ -38,10 +38,10 @@ class App(object):
self.status = 'requested' # Moved to "scheduled" (and then "placed")
def add_vm(self, _vm, _host_name):
def add_vm(self, _vm, _host_name, _status):
"""Add vm to app, set status to scheduled."""
self.vms[_vm.uuid] = _vm
self.vms[_vm.uuid].status = "scheduled"
self.vms[_vm.uuid].status = _status
self.vms[_vm.uuid].host = _host_name
def add_volume(self, _vol, _host_name):

25
valet/engine/optimizer/db_connect/music_handler.py Normal file → Executable file
View File

@ -17,7 +17,6 @@
import json
import operator
import time
from valet.common.music import Music
from valet.engine.optimizer.db_connect.event import Event
@ -167,6 +166,7 @@ class MusicHandler(object):
return True
# TODO(GJ): evaluate the delay
def get_events(self):
"""Get Events.
@ -176,15 +176,14 @@ class MusicHandler(object):
"""
event_list = []
ts = time.time()
events = {}
try:
events = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_event_table)
except Exception as e:
self.logger.error("DB:event: " + str(e))
self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts))
return None
# FIXME(GJ): return None?
return {}
if len(events) > 0:
for _, row in events.iteritems():
@ -249,8 +248,10 @@ class MusicHandler(object):
e.args = args
event_list.append(e)
else:
if self.delete_event(event_id) \
is False:
self.logger.warn("unknown vm_state = " + change_data["vm_state"])
if 'uuid' in change_data.keys():
self.logger.warn(" uuid = " + change_data['uuid'])
if self.delete_event(event_id) is False:
return None
else:
if self.delete_event(event_id) is False:
@ -285,6 +286,7 @@ class MusicHandler(object):
return None
continue
# NOTE(GJ): do not check the existance of scheduler_hints
if 'instance' not in args.keys():
if self.delete_event(event_id) is False:
return None
@ -333,7 +335,6 @@ class MusicHandler(object):
if len(event_list) > 0:
event_list.sort(key=operator.attrgetter('event_id'))
self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts))
return event_list
def delete_event(self, _event_id):
@ -413,15 +414,14 @@ class MusicHandler(object):
"""Return list of requests that consists of all rows in a db table."""
request_list = []
ts = time.time()
requests = {}
try:
requests = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_request_table)
except Exception as e:
self.logger.error("DB: while reading requests: " + str(e))
self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts))
return None
# FIXME(GJ): return None?
return {}
if len(requests) > 0:
self.logger.info("MusicHandler.get_requests: placement request "
@ -434,7 +434,6 @@ class MusicHandler(object):
for r in r_list:
request_list.append(r)
self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts))
return request_list
def put_result(self, _result):
@ -625,8 +624,6 @@ class MusicHandler(object):
self.logger.error("DB: while deleting app: " + str(e))
return False
# self.logger.info("DB: app deleted")
if _app_data is not None:
data = {
'stack_id': _k,
@ -640,8 +637,6 @@ class MusicHandler(object):
self.logger.error("DB: while inserting app: " + str(e))
return False
# self.logger.info("DB: app added")
return True
def get_app_info(self, _s_uuid):

View File

@ -74,9 +74,7 @@ class Ostro(object):
self.end_of_process = False
self.batch_store_trigger = 10 # sec
# self.batch_events_count = 1
'''
def run_ostro(self):
self.logger.info("start Ostro ......")
@ -106,7 +104,7 @@ class Ostro(object):
break
else:
if self.resource.resource_updated is True and \
(time.time()-self.resource.curr_db_timestamp) >= self.batch_store_trigger:
(time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger:
self.data_lock.acquire()
if self.resource.store_topology_updates() is False:
self.data_lock.release()
@ -119,58 +117,6 @@ class Ostro(object):
self.topology.end_of_process = True
self.compute.end_of_process = True
for t in self.thread_list:
t.join()
self.logger.info("exit Ostro")
'''
def run_ostro(self):
"""Start main engine process."""
"""Start topology, compute, and listener processes. Start process of
retrieving and handling events and requests from the db every 1 second.
"""
self.logger.info("Ostro.run_ostro: start Ostro ......")
self.topology.start()
self.compute.start()
self.listener.start()
self.thread_list.append(self.topology)
self.thread_list.append(self.compute)
self.thread_list.append(self.listener)
while self.end_of_process is False:
time.sleep(0.1)
request_list = self.db.get_requests()
if request_list is None:
break
if len(request_list) > 0:
if self.place_app(request_list) is False:
break
else:
event_list = self.db.get_events()
if event_list is None:
break
if len(event_list) > 0:
if self.handle_events(event_list) is False:
break
else:
if self.resource.resource_updated is True and \
(time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger:
self.data_lock.acquire()
if self.resource.store_topology_updates() is False:
self.data_lock.release()
break
self.resource.resource_updated = False
self.data_lock.release()
self.topology.end_of_process = True
self.compute.end_of_process = True
for t in self.thread_list:
t.join()
@ -227,7 +173,6 @@ class Ostro(object):
def _set_topology(self):
if not self.topology.set_topology():
# self.status = "datacenter configuration error"
self.logger.error("failed to read datacenter topology")
return False
@ -236,7 +181,6 @@ class Ostro(object):
def _set_hosts(self):
if not self.compute.set_hosts():
# self.status = "OpenStack (Nova) internal error"
self.logger.error("failed to read hosts from OpenStack (Nova)")
return False
@ -245,17 +189,14 @@ class Ostro(object):
def _set_flavors(self):
if not self.compute.set_flavors():
# self.status = "OpenStack (Nova) internal error"
self.logger.error("failed to read flavors from OpenStack (Nova)")
return False
self.logger.info("done flavors bootstrap")
return True
# TODO(GJ): evaluate delay
def place_app(self, _app_data):
"""Place results of query and placement requests in the db."""
start_time = time.time()
for req in _app_data:
if req["action"] == "query":
self.logger.info("start query")
@ -272,23 +213,24 @@ class Ostro(object):
self.logger.info("start app placement")
result = None
placement_map = self._place_app(req)
if placement_map is None:
result = self._get_json_results("placement", "error",
self.status, placement_map)
(decision_key, old_decision) = self.app_handler.check_history(req)
if old_decision is None:
placement_map = self._place_app(req)
if placement_map is None:
result = self._get_json_results("placement", "error", self.status, placement_map)
else:
result = self._get_json_results("placement", "ok", "success", placement_map)
if decision_key is not None:
self.app_handler.put_history(decision_key, result)
else:
result = self._get_json_results("placement", "ok",
"success", placement_map)
self.logger.warn("decision(" + decision_key + ") already made")
result = old_decision
if self.db.put_result(result) is False:
return False
self.logger.info("done app placement")
end_time = time.time()
self.logger.debug("EVAL: total decision delay of request = " + str(end_time - start_time))
return True
def _query(self, _q):
@ -394,14 +336,12 @@ class Ostro(object):
self.data_lock.release()
return None
"""Update resource and app information."""
# Update resource and app information
if len(placement_map) > 0:
self.resource.update_topology(store=False)
self.app_handler.add_placement(placement_map, app_topology, self.resource.current_timestamp)
self.app_handler.add_placement(placement_map,
self.resource.current_timestamp)
if len(app_topology.exclusion_list_map) > 0 and \
len(app_topology.planned_vm_map) > 0:
if len(app_topology.exclusion_list_map) > 0 and len(app_topology.planned_vm_map) > 0:
for vk in app_topology.planned_vm_map.keys():
if vk in placement_map.keys():
del placement_map[vk]
@ -426,7 +366,7 @@ class Ostro(object):
self.logger.warn("Ostro._set_vm_flavor_properties: does not exist "
"flavor (" + _vm.flavor + ") and try to refetch")
"""Reset flavor resource and try again."""
# Reset flavor resource and try again
if self._set_flavors() is False:
return False
@ -446,6 +386,7 @@ class Ostro(object):
return True
# TODO(GJ): evaluate the delay
def handle_events(self, _event_list):
"""Handle events in the event list."""
"""Update the engine's resource topology based on the properties of
@ -453,12 +394,8 @@ class Ostro(object):
"""
self.data_lock.acquire()
event_handler_start_time = time.time()
resource_updated = False
# events_count = 0
# handled_event_list = []
for e in _event_list:
if e.host is not None and e.host != "none":
if self._check_host(e.host) is False:
@ -493,45 +430,27 @@ class Ostro(object):
return False
if len(vm_info) == 0:
"""
h_uuid is None or "none" because vm is not created
by stack or, stack not found because vm is created
by the other stack
"""
self.logger.warn("Ostro.handle_events: no vm_info "
"found in app placement record")
self._add_vm_to_host(e.uuid, orch_id[0], e.host,
e.vcpus, e.mem, e.local_disk)
# Stack not found because vm is created by the other stack
self.logger.warn("EVENT: no vm_info found in app placement record")
self._add_vm_to_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk)
else:
if "planned_host" in vm_info.keys() and \
vm_info["planned_host"] != e.host:
"""
vm is activated in the different host
"""
self.logger.warn("Ostro.handle_events: vm "
"activated in the different "
"host")
self._add_vm_to_host(
e.uuid, orch_id[0], e.host, e.vcpus, e.mem,
e.local_disk)
if "planned_host" in vm_info.keys() and vm_info["planned_host"] != e.host:
# VM is activated in the different host
self.logger.warn("EVENT: vm activated in the different host")
self._add_vm_to_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk)
self._remove_vm_from_host(
e.uuid, orch_id[0], vm_info["planned_host"],
float(vm_info["cpus"]),
float(vm_info["mem"]),
float(vm_info["local_volume"]))
self._remove_vm_from_host(e.uuid, orch_id[0],
vm_info["planned_host"],
float(vm_info["cpus"]),
float(vm_info["mem"]),
float(vm_info["local_volume"]))
self._remove_vm_from_logical_groups(
e.uuid, orch_id[0], vm_info["planned_host"])
self._remove_vm_from_logical_groups(e.uuid, orch_id[0], vm_info["planned_host"])
else:
"""
found vm in the planned host,
possibly the vm deleted in the host while batch cleanup
"""
if self._check_h_uuid(orch_id[0], e.host) \
is False:
self.logger.debug("Ostro.handle_events: "
"planned vm was deleted")
# Found vm in the planned host,
# Possibly the vm deleted in the host while batch cleanup
if self._check_h_uuid(orch_id[0], e.host) is False:
self.logger.warn("EVENT: planned vm was deleted")
if self._check_uuid(e.uuid, e.host) is True:
self._update_h_uuid_in_host(orch_id[0],
e.uuid,
@ -593,11 +512,6 @@ class Ostro(object):
self.logger.warn("Ostro.handle_events: unknown event "
"method = " + e.method)
# events_count += 1
# handled_event_list.append(e)
# if events_count >= self.batch_events_count:
# break
if resource_updated is True:
self.resource.update_topology(store=False)
@ -612,8 +526,6 @@ class Ostro(object):
self.data_lock.release()
return False
self.logger.debug("EVAL: total delay for event handling = " + str(time.time() - event_handler_start_time))
self.data_lock.release()
return True

View File

@ -44,9 +44,6 @@ class Config(object):
self.api_protocol = 'http://'
self.network_control = False
self.network_control_api = None
self.db_keyspace = None
self.db_request_table = None
self.db_response_table = None
@ -63,9 +60,6 @@ class Config(object):
self.priority = 0
self.rpc_server_ip = None
self.rpc_server_port = 0
# Logging parameters
self.logger_name = None
self.logging_level = None
@ -84,10 +78,9 @@ class Config(object):
self.rack_code_list = []
self.node_code_list = []
self.topology_trigger_time = None
self.topology_trigger_freq = 0
self.compute_trigger_time = None
self.compute_trigger_freq = 0
self.update_batch_wait = 0
self.default_cpu_allocation_ratio = 1
self.default_ram_allocation_ratio = 1
@ -169,16 +162,8 @@ class Config(object):
self.process = CONF.engine.pid
self.rpc_server_ip = CONF.engine.rpc_server_ip
self.rpc_server_port = CONF.engine.rpc_server_port
self.datacenter_name = CONF.engine.datacenter_name
self.network_control = CONF.engine.network_control
self.network_control_url = CONF.engine.network_control_url
self.default_cpu_allocation_ratio = \
CONF.engine.default_cpu_allocation_ratio
@ -195,14 +180,12 @@ class Config(object):
self.static_local_disk_standby_ratio = \
CONF.engine.static_local_disk_standby_ratio
self.topology_trigger_time = CONF.engine.topology_trigger_time
self.topology_trigger_freq = CONF.engine.topology_trigger_frequency
self.compute_trigger_time = CONF.engine.compute_trigger_time
self.compute_trigger_freq = CONF.engine.compute_trigger_frequency
self.update_batch_wait = CONF.engine.update_batch_wait
self.db_keyspace = CONF.music.keyspace
self.db_request_table = CONF.music.request_table

View File

@ -1,182 +0,0 @@
#
# Copyright 2014-2017 AT&T Intellectual Property
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# - Handle user requests
"""Database Cleaner."""
from configuration import Config
from valet.common.music import Music
class DBCleaner(object):
"""Database Cleaner."""
def __init__(self, _config):
"""Initialization."""
self.config = _config
self.music = Music()
def clean_db_tables(self):
"""Clean tables in Music."""
"""Clean resource, resource_index, request, response, event,
app, app_index, and uuid tables.
"""
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_resource_table)
if len(results) > 0:
print("resource table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_resource_table,
'site_name', row['site_name'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_request_table)
if len(results) > 0:
print("request table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_request_table,
'stack_id', row['stack_id'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_response_table)
if len(results) > 0:
print("response table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_response_table,
'stack_id', row['stack_id'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_event_table)
if len(results) > 0:
print("event table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_event_table,
'timestamp', row['timestamp'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_resource_index_table)
if len(results) > 0:
print("resource_index table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(
self.config.db_keyspace,
self.config.db_resource_index_table,
'site_name', row['site_name'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_app_index_table)
if len(results) > 0:
print("app_index table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_app_index_table,
'site_name', row['site_name'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_app_table)
if len(results) > 0:
print("app table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_app_table,
'stack_id', row['stack_id'])
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_uuid_table)
if len(results) > 0:
print("uuid table result = ", len(results))
for _, row in results.iteritems():
self.music.delete_row_eventually(self.config.db_keyspace,
self.config.db_uuid_table,
'uuid', row['uuid'])
def check_db_tables(self):
"""Log whether tables in Music have been cleaned."""
"""Check resource, resource_index, request, response, event,
app, app_index, and uuid tables.
"""
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_resource_table)
if len(results) > 0:
print("resource table not cleaned ")
else:
print("resource table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_request_table)
if len(results) > 0:
print("request table not cleaned ")
else:
print("request table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_response_table)
if len(results) > 0:
print("response table not cleaned ")
else:
print("response table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_event_table)
if len(results) > 0:
print("event table not cleaned ")
else:
print("event table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_resource_index_table)
if len(results) > 0:
print("resource log index table not cleaned ")
else:
print("resource log index table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_app_index_table)
if len(results) > 0:
print("app log index table not cleaned ")
else:
print("app log index table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_app_table)
if len(results) > 0:
print("app log table not cleaned ")
else:
print("app log table cleaned")
results = self.music.read_all_rows(self.config.db_keyspace,
self.config.db_uuid_table)
if len(results) > 0:
print("uuid table not cleaned ")
else:
print("uuid table cleaned")
if __name__ == '__main__':
config = Config()
config_status = config.configure()
if config_status != "success":
print("Error while configuring Ostro: " + config_status)
sys.exit(2)
c = DBCleaner(config)
c.clean_db_tables()
c.check_db_tables()

View File

@ -39,7 +39,7 @@ class OstroDaemon(Daemon):
self.logger.error("ostro bootstrap failed")
sys.exit(2)
# write pidfile
# Write pidfile
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
@ -58,18 +58,16 @@ def verify_dirs(list_of_dirs):
if __name__ == "__main__":
""" configuration """
# Configuration
try:
config = Config()
''' logger '''
logger = get_logger("ostro_daemon")
config_status = config.configure()
if config_status != "success":
print(config_status)
sys.exit(2)
""" verify directories """
# Verify directories
dirs_list = [config.logging_loc, config.resource_log_loc,
config.app_log_loc, os.path.dirname(config.process)]
verify_dirs(dirs_list)
@ -77,8 +75,6 @@ if __name__ == "__main__":
# Start daemon process
daemon = OstroDaemon(config.priority, config.process, logger)
logger.info("%s ostro ..." % config.command)
# switch case
exit_code = {
'start': daemon.start,
'stop': daemon.stop,

View File

@ -1,25 +0,0 @@
# Version 2.0.2
# Set simulation parameters
num_of_spine_switches=0
#num_of_racks=1
num_of_racks=2
#num_of_hosts_per_rack=8
num_of_hosts_per_rack=2
bandwidth_of_spine=40000
bandwidth_of_rack=40000
bandwidth_of_host=10000
num_of_aggregates=1
aggregated_ratio=5
cpus_per_host=16
mem_per_host=16000
disk_per_host=1000
num_of_basic_flavors=3
base_flavor_cpus=1
base_flavor_mem=2000
base_flavor_disk=40

View File

@ -49,6 +49,8 @@ class ComputeManager(threading.Thread):
self.admin_token = None
self.project_token = None
self.update_batch_wait = self.config.update_batch_wait
def run(self):
"""Start Compute Manager thread to run setup."""
self.logger.info("ComputeManager: start " + self.thread_name +
@ -59,39 +61,15 @@ class ComputeManager(threading.Thread):
while self.end_of_process is False:
time.sleep(60)
curr_ts = time.time()
if curr_ts > period_end:
# Give some time (batch_wait) to update resource status via message bus
# Otherwise, late update will be cleaned up
if (curr_ts - self.resource.current_timestamp) > self.update_batch_wait:
self._run()
period_end = curr_ts + self.config.compute_trigger_freq
if time.time() > period_end:
self._run()
period_end = time.time() + self.config.compute_trigger_freq
else:
(alarm_HH, alarm_MM) = self.config.compute_trigger_time.split(':')
now = time.localtime()
timeout = True
last_trigger_year = now.tm_year
last_trigger_mon = now.tm_mon
last_trigger_mday = now.tm_mday
while self.end_of_process is False:
time.sleep(60)
now = time.localtime()
if now.tm_year > last_trigger_year or \
now.tm_mon > last_trigger_mon or \
now.tm_mday > last_trigger_mday:
timeout = False
if timeout is False and \
now.tm_hour >= int(alarm_HH) and now.tm_min >= int(alarm_MM):
self._run()
timeout = True
last_trigger_year = now.tm_year
last_trigger_mon = now.tm_mon
last_trigger_mday = now.tm_mday
# NOTE(GJ): do not timer based batch
self.logger.info("exit compute_manager " + self.thread_name)
def _run(self):
@ -124,7 +102,6 @@ class ComputeManager(threading.Thread):
status = compute.set_hosts(hosts, logical_groups)
if status != "success":
# self.logger.error("ComputeManager: " + status)
return False
self._compute_avail_host_resources(hosts)
@ -256,10 +233,14 @@ class ComputeManager(threading.Thread):
def _check_host_config_update(self, _host, _rhost):
topology_updated = False
topology_updated = self._check_host_status(_host, _rhost)
topology_updated = self._check_host_resources(_host, _rhost)
topology_updated = self._check_host_memberships(_host, _rhost)
topology_updated = self._check_host_vms(_host, _rhost)
if self._check_host_status(_host, _rhost) is True:
topology_updated = True
if self._check_host_resources(_host, _rhost) is True:
topology_updated = True
if self._check_host_memberships(_host, _rhost) is True:
topology_updated = True
if self._check_host_vms(_host, _rhost) is True:
topology_updated = True
return topology_updated
@ -358,36 +339,37 @@ class ComputeManager(threading.Thread):
def _check_host_vms(self, _host, _rhost):
topology_updated = False
''' clean up VMs '''
for rvm_id in _rhost.vm_list:
if rvm_id[2] == "none":
_rhost.vm_list.remove(rvm_id)
topology_updated = True
self.logger.warn("ComputeManager: host (" + _rhost.name +
") updated (none vm removed)")
# Clean up VMs
blen = len(_rhost.vm_list)
_rhost.vm_list = [v for v in _rhost.vm_list if v[2] != "none"]
alen = len(_rhost.vm_list)
if alen != blen:
topology_updated = True
self.logger.warn("host (" + _rhost.name + ") " + str(blen - alen) + " none vms removed")
self.resource.clean_none_vms_from_logical_groups(_rhost)
for vm_id in _host.vm_list:
if _rhost.exist_vm_by_uuid(vm_id[2]) is False:
_rhost.vm_list.append(vm_id)
topology_updated = True
self.logger.warn("ComputeManager: host (" + _rhost.name +
") updated (new vm placed)")
for rvm_id in _rhost.vm_list:
if _host.exist_vm_by_uuid(rvm_id[2]) is False:
_rhost.vm_list.remove(rvm_id)
self.resource.remove_vm_by_uuid_from_logical_groups(_rhost,
rvm_id[2])
self.resource.remove_vm_by_uuid_from_logical_groups(_rhost, rvm_id[2])
topology_updated = True
self.logger.warn("ComputeManager: host (" + _rhost.name +
") updated (vm removed)")
blen = len(_rhost.vm_list)
_rhost.vm_list = [v for v in _rhost.vm_list if _host.exist_vm_by_uuid(v[2]) is True]
alen = len(_rhost.vm_list)
if alen != blen:
topology_updated = True
self.logger.warn("host (" + _rhost.name + ") " + str(blen - alen) + " vms removed")
return topology_updated
def set_flavors(self):

View File

@ -54,7 +54,6 @@ class Resource(object):
self.current_timestamp = 0
self.curr_db_timestamp = 0
# self.last_log_index = 0
self.resource_updated = False
@ -65,6 +64,7 @@ class Resource(object):
self.disk_avail = 0
self.nw_bandwidth_avail = 0
# FIXME(GJ): should check invalid return here?
def bootstrap_from_db(self, _resource_status):
"""Return True if bootsrap resource from database successful."""
try:
@ -433,8 +433,6 @@ class Resource(object):
self.nw_bandwidth_avail += min(avail_nw_bandwidth_list)
def store_topology_updates(self):
store_start_time = time.time()
updated = False
flavor_updates = {}
logical_group_updates = {}
@ -449,56 +447,42 @@ class Resource(object):
for fk, flavor in self.flavors.iteritems():
if flavor.last_update >= self.curr_db_timestamp:
flavor_updates[fk] = flavor.get_json_info()
# self.logger.debug("resource flavor(" + fk + ") stored")
updated = True
for lgk, lg in self.logical_groups.iteritems():
if lg.last_update >= self.curr_db_timestamp:
logical_group_updates[lgk] = lg.get_json_info()
# self.logger.debug("resource lg(" + lgk + ") stored")
updated = True
for shk, storage_host in self.storage_hosts.iteritems():
if storage_host.last_update >= self.curr_db_timestamp or \
storage_host.last_cap_update >= self.curr_db_timestamp:
storage_updates[shk] = storage_host.get_json_info()
# self.logger.debug("resource storage(" + shk + ") stored")
updated = True
for sk, s in self.switches.iteritems():
if s.last_update >= self.curr_db_timestamp:
switch_updates[sk] = s.get_json_info()
# self.logger.debug("resource switch(" + sk + ") stored")
updated = True
for hk, host in self.hosts.iteritems():
if host.last_update > self.current_timestamp or \
host.last_link_update > self.current_timestamp:
host_updates[hk] = host.get_json_info()
# self.logger.debug("resource host(" + hk + ") stored")
updated = True
for hgk, host_group in self.host_groups.iteritems():
if host_group.last_update >= self.curr_db_timestamp or \
host_group.last_link_update >= self.curr_db_timestamp:
host_group_updates[hgk] = host_group.get_json_info()
# self.logger.debug("resource hgroup(" + hgk + ") stored")
updated = True
if self.datacenter.last_update >= self.curr_db_timestamp or \
self.datacenter.last_link_update >= self.curr_db_timestamp:
datacenter_update = self.datacenter.get_json_info()
# self.logger.debug("resource datacenter stored")
updated = True
# (resource_logfile, last_index, mode) = util.get_last_logfile(self.config.resource_log_loc,
# self.config.max_log_size,
# self.config.max_num_of_logs,
# self.datacenter.name,
# self.last_log_index)
# self.last_log_index = last_index
# logging = open(self.config.resource_log_loc + resource_logfile, mode)
# NOTE(GJ): do not track resource change histroy in this version
if updated is True:
json_logging = {}
@ -519,27 +503,11 @@ class Resource(object):
if datacenter_update is not None:
json_logging['datacenter'] = datacenter_update
# logged_data = json.dumps(json_logging)
# logging.write(logged_data)
# logging.write("\n")
# logging.close()
# self.logger.info("log resource status in " + resource_logfile)
# if self.db is not None:
if self.db.update_resource_status(self.datacenter.name, json_logging) is False:
return None
self.curr_db_timestamp = time.time()
# for test
# self.show_current_logical_groups()
# self.show_current_host_status()
self.logger.debug("EVAL: total delay for store resource status = " + str(time.time() - store_start_time))
return True
def show_current_logical_groups(self):
@ -731,32 +699,7 @@ class Resource(object):
") status changed")
updated = True
if host.original_vCPUs != _vcpus or \
host.vCPUs_used != _vcpus_used:
self.logger.debug("Resource.update_host_resources: host(" + _hn +
") cpu changed")
host.original_vCPUs = _vcpus
host.vCPUs_used = _vcpus_used
updated = True
if host.free_mem_mb != _fmem or \
host.original_mem_cap != _mem:
self.logger.debug("Resource.update_host_resources: host(" + _hn +
") mem changed")
host.free_mem_mb = _fmem
host.original_mem_cap = _mem
updated = True
if host.free_disk_gb != _fldisk or \
host.original_local_disk_cap != _ldisk or \
host.disk_available_least != _avail_least:
self.logger.debug("Resource.update_host_resources: host(" + _hn +
") disk changed")
host.free_disk_gb = _fldisk
host.original_local_disk_cap = _ldisk
host.disk_available_least = _avail_least
updated = True
# FIXME(GJ): should check cpu, memm and disk here?
if updated is True:
self.compute_avail_resources(_hn, host)
@ -831,11 +774,11 @@ class Resource(object):
lg = self.logical_groups[lgk]
if isinstance(_host, Host):
# remove host from lg's membership if the host has no vms of lg
# Remove host from lg's membership if the host has no vms of lg
if lg.remove_vm_by_h_uuid(_h_uuid, _host.name) is True:
lg.last_update = time.time()
# remove lg from host's membership if lg does not have the host
# Remove lg from host's membership if lg does not have the host
if _host.remove_membership(lg) is True:
_host.last_update = time.time()
@ -870,11 +813,11 @@ class Resource(object):
lg = self.logical_groups[lgk]
if isinstance(_host, Host):
# remove host from lg's membership if the host has no vms of lg
# Remove host from lg's membership if the host has no vms of lg
if lg.remove_vm_by_uuid(_uuid, _host.name) is True:
lg.last_update = time.time()
# remove lg from host's membership if lg does not have the host
# Remove lg from host's membership if lg does not have the host
if _host.remove_membership(lg) is True:
_host.last_update = time.time()
@ -1006,7 +949,8 @@ class Resource(object):
if self.config.default_ram_allocation_ratio > 0:
ram_allocation_ratio = self.config.default_ram_allocation_ratio
static_ram_standby_ratio = 0
if self.config.static_mem_standby_ratio > 0:
static_ram_standby_ratio = float(self.config.static_mem_standby_ratio) / float(100)
host.compute_avail_mem(ram_allocation_ratio, static_ram_standby_ratio)
@ -1017,7 +961,8 @@ class Resource(object):
if self.config.default_cpu_allocation_ratio > 0:
cpu_allocation_ratio = self.config.default_cpu_allocation_ratio
static_cpu_standby_ratio = 0
if self.config.static_cpu_standby_ratio > 0:
static_cpu_standby_ratio = float(self.config.static_cpu_standby_ratio) / float(100)
host.compute_avail_vCPUs(cpu_allocation_ratio, static_cpu_standby_ratio)
@ -1029,10 +974,10 @@ class Resource(object):
disk_allocation_ratio = \
self.config.default_disk_allocation_ratio
static_disk_standby_ratio = 0
if self.config.static_local_disk_standby_ratio > 0:
static_disk_standby_ratio = float(self.config.static_local_disk_standby_ratio) / float(100)
host.compute_avail_disk(disk_allocation_ratio,
static_disk_standby_ratio)
host.compute_avail_disk(disk_allocation_ratio, static_disk_standby_ratio)
def get_flavor(self, _id):
"""Return flavor according to name passed in."""

View File

@ -117,7 +117,6 @@ class Datacenter(object):
'last_link_update': self.last_link_update}
# data container for rack or cluster
class HostGroup(object):
"""Class for Host Group Object.
@ -668,16 +667,18 @@ class LogicalGroup(object):
"""Return True if vm's or host vm's removed with physical id none."""
success = False
for vm_id in self.vm_list:
if vm_id[2] == "none":
self.vm_list.remove(vm_id)
success = True
blen = len(self.vm_list)
self.vm_list = [v for v in self.vm_list if v[2] != "none"]
alen = len(self.vm_list)
if alen != blen:
success = True
if _host_id in self.vms_per_host.keys():
for vm_id in self.vms_per_host[_host_id]:
if vm_id[2] == "none":
self.vms_per_host[_host_id].remove(vm_id)
success = True
blen = len(self.vms_per_host[_host_id])
self.vms_per_host[_host_id] = [v for v in self.vms_per_host[_host_id] if v[2] != "none"]
alen = len(self.vm_list)
if alen != blen:
success = True
if self.group_type == "EX" or self.group_type == "AFF" or \
self.group_type == "DIV":

View File

@ -46,6 +46,8 @@ class TopologyManager(threading.Thread):
self.logger = _logger
self.update_batch_wait = self.config.update_batch_wait
def run(self):
"""Function starts and tracks Topology Manager Thread."""
self.logger.info("TopologyManager: start " +
@ -56,38 +58,14 @@ class TopologyManager(threading.Thread):
while self.end_of_process is False:
time.sleep(70)
if time.time() > period_end:
self._run()
period_end = time.time() + self.config.topology_trigger_freq
else:
(alarm_HH, alarm_MM) = self.config.topology_trigger_time.split(':')
now = time.localtime()
timeout = True
last_trigger_year = now.tm_year
last_trigger_mon = now.tm_mon
last_trigger_mday = now.tm_mday
while self.end_of_process is False:
time.sleep(70)
now = time.localtime()
if now.tm_year > last_trigger_year or \
now.tm_mon > last_trigger_mon or \
now.tm_mday > last_trigger_mday:
timeout = False
if timeout is False and \
now.tm_hour >= int(alarm_HH) and now.tm_min >= int(alarm_MM):
self._run()
timeout = True
last_trigger_year = now.tm_year
last_trigger_mon = now.tm_mon
last_trigger_mday = now.tm_mday
curr_ts = time.time()
if curr_ts > period_end:
# Give some time (batch_wait) to update resource status via message bus
# Otherwise, late update will be cleaned up
if (curr_ts - self.resource.current_timestamp) > self.update_batch_wait:
self._run()
period_end = curr_ts + self.config.topology_trigger_freq
# NOTE(GJ): do not timer based batch
self.logger.info("exit topology_manager " + self.thread_name)
def _run(self):
@ -119,7 +97,6 @@ class TopologyManager(threading.Thread):
status = topology.set_topology(datacenter, host_groups, hosts,
self.resource.hosts, switches)
if status != "success":
# self.logger.error("TopologyManager: " + status)
return False
self.data_lock.acquire()
@ -505,12 +482,14 @@ class TopologyManager(threading.Thread):
self.logger.warn("TopologyManager: datacenter updated "
"(new region code, " + rc + ")")
for rrc in self.resource.datacenter.region_code_list:
if rrc not in _datacenter.region_code_list:
self.resource.datacenter.region_code_list.remove(rrc)
updated = True
self.logger.warn("TopologyManager: datacenter updated "
"(region code, " + rrc + ", removed)")
code_list = self.resource.datacenter.region_code_list
blen = len(code_list)
code_list = [r for r in code_list if r in _datacenter.region_code_list]
alen = len(code_list)
if alen != blen:
updated = True
self.resource.datacenter.region_code_list = code_list
self.logger.warn("datacenter updated (region code removed)")
for rk in _datacenter.resources.keys():
exist = False