From 1c53afa58e95cdf232e04adc9cb1303b71e44e8a Mon Sep 17 00:00:00 2001 From: "Jung, Gueyoung (gj6146)" Date: Mon, 27 Feb 2017 12:54:50 -0500 Subject: [PATCH] Improve success rate Change-Id: I59e4a8d8e23575fb0af74924dad027340517f7e7 --- valet/api/common/ostro_helper.py | 19 +- valet/engine/conf.py | 45 ++--- valet/engine/listener/listener_manager.py | 23 +-- .../optimizer/app_manager/app_handler.py | 92 +++++++-- .../optimizer/app_manager/application.py | 6 +- .../optimizer/db_connect/music_handler.py | 25 +-- valet/engine/optimizer/ostro/ostro.py | 158 ++++----------- .../optimizer/ostro_server/configuration.py | 23 +-- .../optimizer/ostro_server/db_cleaner.py | 182 ------------------ .../optimizer/ostro_server/ostro_daemon.py | 8 +- .../optimizer/ostro_server/ostro_sim.cfg | 25 --- .../resource_manager/compute_manager.py | 84 ++++---- valet/engine/resource_manager/resource.py | 83 ++------ .../engine/resource_manager/resource_base.py | 19 +- .../resource_manager/topology_manager.py | 57 ++---- 15 files changed, 236 insertions(+), 613 deletions(-) mode change 100644 => 100755 valet/api/common/ostro_helper.py mode change 100644 => 100755 valet/engine/conf.py mode change 100644 => 100755 valet/engine/listener/listener_manager.py mode change 100644 => 100755 valet/engine/optimizer/db_connect/music_handler.py delete mode 100644 valet/engine/optimizer/ostro_server/db_cleaner.py delete mode 100644 valet/engine/optimizer/ostro_server/ostro_sim.cfg diff --git a/valet/api/common/ostro_helper.py b/valet/api/common/ostro_helper.py old mode 100644 new mode 100755 index 2952ba5..6e47b12 --- a/valet/api/common/ostro_helper.py +++ b/valet/api/common/ostro_helper.py @@ -94,7 +94,7 @@ class Ostro(object): def __init__(self): """Initializer.""" - self.tries = conf.music.get('tries', 100) + self.tries = conf.music.get('tries', 1000) self.interval = conf.music.get('interval', 0.1) def _map_names_to_uuids(self, mapping, data): @@ -127,34 +127,27 @@ class Ostro(object): # TODO(JD): This really belongs in valet-engine once it exists. def _send(self, stack_id, request): - """Send request.""" - request_query = Query(PlacementRequest) + # Creating the placement request effectively enqueues it. + PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612 result_query = Query(PlacementResult) - requested = False for __ in range(self.tries, 0, -1): # pylint: disable=W0612 # Take a breather in between checks. # TODO(JD): This is a blocking operation at the moment. time.sleep(self.interval) - # First, check to see if there's already a response. result = result_query.filter_by(stack_id=stack_id).first() if result: placement = result.placement result.delete() return placement - elif not requested: - # Next, check to see if there's already a request. - prior_request = request_query.filter_by( - stack_id=stack_id).first() - if not prior_request: - # No request? Make one! Creating it enqueues it. - PlacementRequest(stack_id=stack_id, request=request) # pylint: disable=W0612 - requested = True self.error_uri = '/errors/server_error' message = "Timed out waiting for a response." + + LOG.error(message + " for stack_id = " + stack_id) + response = self._build_error(message) return json.dumps(response) diff --git a/valet/engine/conf.py b/valet/engine/conf.py old mode 100644 new mode 100755 index 1297e0e..bdb67c5 --- a/valet/engine/conf.py +++ b/valet/engine/conf.py @@ -31,45 +31,24 @@ ostro_cli_opts = [ engine_group = cfg.OptGroup(name='engine', title='Valet Engine conf') engine_opts = [ cfg.StrOpt('pid', default='/var/run/valet/ostro-daemon.pid'), - cfg.StrOpt('mode', default='live', help='sim will let Ostro simulate datacenter, ' - 'while live will let it handle a real datacenter'), + cfg.StrOpt('mode', default='live', help='run as actual or simulation for test'), cfg.StrOpt('sim_cfg_loc', default='/etc/valet/engine/ostro_sim.cfg'), - cfg.BoolOpt('network_control', default=False, help='whether network controller (i.e., Tegu) has been deployed'), - cfg.StrOpt('network_control_url', default='http://network_control:29444/tegu/api'), cfg.StrOpt('ip', default='localhost'), cfg.IntOpt('health_timeout', default=10, help='health check grace period (seconds, default=10)'), cfg.IntOpt('priority', default=1, help='this instance priority (master=1)'), - cfg.StrOpt('rpc_server_ip', default='localhost', - help='Set RPC server ip and port if used. Otherwise, ignore these parameters'), - cfg.StrOpt('rpc_server_port', default='8002'), - cfg.StrOpt('datacenter_name', default='bigsite', - help='Inform the name of datacenter (region name), where Valet/Ostro is deployed.'), + cfg.StrOpt('datacenter_name', default='aic', help='The name of region'), cfg.IntOpt('num_of_region_chars', default='3', help='number of chars that indicates the region code'), cfg.StrOpt('rack_code_list', default='r', help='rack indicator.'), - cfg.ListOpt('node_code_list', default='a,c,u,f,o,p,s', - help='indicates the node type. a: network, c KVM compute, u: ESXi compute, f: ?, o: operation, ' - 'p: power, s: storage.'), - cfg.StrOpt('compute_trigger_time', default='1:00', - help='trigger time or frequency for checking compute hosting server status (i.e., call Nova)'), - cfg.IntOpt('compute_trigger_frequency', default=3600, - help='trigger time or frequency for checking compute hosting server status (i.e., call Nova)'), - cfg.StrOpt('topology_trigger_time', default='2:00', - help='Set trigger time or frequency for checking datacenter topology (i.e., call AIC Formation)'), - cfg.IntOpt('topology_trigger_frequency', default=3600, - help='Set trigger time or frequency for checking datacenter topology (i.e., call AIC Formation)'), - cfg.FloatOpt('default_cpu_allocation_ratio', default=16, help='Set default overbooking ratios. Note that ' - 'each compute node can have its own ratios'), - cfg.FloatOpt('default_ram_allocation_ratio', default=1.5, help='Set default overbooking ratios. Note that ' - 'each compute node can have its own ratios'), - cfg.FloatOpt('default_disk_allocation_ratio', default=1, help='Set default overbooking ratios. Note that ' - 'each compute node can have its own ratios'), - cfg.FloatOpt('static_cpu_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) ' - 'that are set aside for applications workload spikes.'), - cfg.FloatOpt('static_mem_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) ' - 'that are set aside for applications workload spikes.'), - cfg.FloatOpt('static_local_disk_standby_ratio', default=20, help='unused percentages of resources (i.e. standby) ' - 'that are set aside for applications workload ' - 'spikes.'), + cfg.ListOpt('node_code_list', default='a,c,u,f,o,p,s', help='Indicates the node type'), + cfg.IntOpt('compute_trigger_frequency', default=1800, help='Frequency for checking compute hosting status'), + cfg.IntOpt('topology_trigger_frequency', default=3600, help='Frequency for checking datacenter topology'), + cfg.IntOpt('update_batch_wait', default=600, help='Wait time before start resource synch from Nova'), + cfg.FloatOpt('default_cpu_allocation_ratio', default=16, help='Default CPU overbooking ratios'), + cfg.FloatOpt('default_ram_allocation_ratio', default=1.5, help='Default mem overbooking ratios'), + cfg.FloatOpt('default_disk_allocation_ratio', default=1, help='Default disk overbooking ratios'), + cfg.FloatOpt('static_cpu_standby_ratio', default=20, help='Percentages of standby cpu resources'), + cfg.FloatOpt('static_mem_standby_ratio', default=20, help='Percentages of standby mem resources'), + cfg.FloatOpt('static_local_disk_standby_ratio', default=20, help='Percentages of disk standby esources'), ] + logger_conf("engine") listener_group = cfg.OptGroup(name='events_listener', diff --git a/valet/engine/listener/listener_manager.py b/valet/engine/listener/listener_manager.py old mode 100644 new mode 100755 index 64f6e64..d9fbad3 --- a/valet/engine/listener/listener_manager.py +++ b/valet/engine/listener/listener_manager.py @@ -18,7 +18,6 @@ from datetime import datetime import json import pika -import pprint import threading import traceback from valet.common.conf import get_logger @@ -133,18 +132,8 @@ class ListenerManager(threading.Thread): else: return - self.listener_logger.debug("\nMessage No: %s\n", - method_frame.delivery_tag) - message_obj = yaml.load(body) - if 'oslo.message' in message_obj.keys(): - message_obj = yaml.load(message_obj['oslo.message']) - if self.config.events_listener.output_format == 'json': - self.listener_logger.debug(json.dumps(message_obj, - sort_keys=True, indent=2)) - elif self.config.events_listener.output_format == 'yaml': - self.listener_logger.debug(yaml.dump(message_obj)) - else: - self.listener_logger.debug(pprint.pformat(message_obj)) + self.listener_logger.debug("\nMessage No: %s\n", method_frame.delivery_tag) + self.listener_logger.debug(json.dumps(message, sort_keys=True, indent=2)) channel.basic_ack(delivery_tag=method_frame.delivery_tag) except Exception: self.listener_logger.error(traceback.format_exc()) @@ -195,7 +184,13 @@ class ListenerManager(threading.Thread): def is_nova_name(self, args): """Return True if object name is Instance.""" - return args['objinst']['nova_object.name'] == 'Instance' + if args['objinst']['nova_object.data']['vm_state'] in ['deleted', 'active']: + return True + else: + if args['objinst']['nova_object.data']['vm_state'] == 'building': + return False + else: + return True def is_nova_state(self, args): """Return True if object vm_state is deleted or active.""" diff --git a/valet/engine/optimizer/app_manager/app_handler.py b/valet/engine/optimizer/app_manager/app_handler.py index 6e11065..6521249 100755 --- a/valet/engine/optimizer/app_manager/app_handler.py +++ b/valet/engine/optimizer/app_manager/app_handler.py @@ -15,11 +15,23 @@ """App Handler.""" +import operator +import time + from valet.engine.optimizer.app_manager.app_topology import AppTopology from valet.engine.optimizer.app_manager.app_topology_base import VM from valet.engine.optimizer.app_manager.application import App +class AppHistory(object): + + def __init__(self, _key): + self.decision_key = _key + self.host = None + self.result = None + self.timestamp = None + + class AppHandler(object): """App Handler Class. @@ -38,10 +50,59 @@ class AppHandler(object): """ current app requested, a temporary copy """ self.apps = {} + self.decision_history = {} + self.max_decision_history = 5000 + self.min_decision_history = 1000 + self.last_log_index = 0 self.status = "success" + # NOTE(GJ): do not cache migration decision + def check_history(self, _app): + stack_id = _app["stack_id"] + action = _app["action"] + + if action == "create": + decision_key = stack_id + ":" + action + ":none" + if decision_key in self.decision_history.keys(): + return (decision_key, self.decision_history[decision_key].result) + else: + return (decision_key, None) + elif action == "replan": + decision_key = stack_id + ":" + action + ":" + _app["orchestration_id"] + if decision_key in self.decision_history.keys(): + return (decision_key, self.decision_history[decision_key].result) + else: + return (decision_key, None) + else: + return (None, None) + + def put_history(self, _decision_key, _result): + decision_key_list = _decision_key.split(":") + action = decision_key_list[1] + if action == "create" or action == "replan": + app_history = AppHistory(_decision_key) + app_history.result = _result + app_history.timestamp = time.time() + self.decision_history[_decision_key] = app_history + + if len(self.decision_history) > self.max_decision_history: + self._clean_decision_history() + + def _clean_decision_history(self): + count = 0 + num_of_removes = len(self.decision_history) - self.min_decision_history + remove_item_list = [] + for decision in (sorted(self.decision_history.values(), key=operator.attrgetter('timestamp'))): + remove_item_list.append(decision.decision_key) + count += 1 + if count == num_of_removes: + break + for dk in remove_item_list: + if dk in self.decision_history.keys(): + del self.decision_history[dk] + def add_app(self, _app): """Add app and set or regenerate topology, return updated topology.""" self.apps.clear() @@ -102,7 +163,7 @@ class AppHandler(object): return app_topology - def add_placement(self, _placement_map, _timestamp): + def add_placement(self, _placement_map, _app_topology, _timestamp): """Change requested apps to scheduled and place them.""" for v in _placement_map.keys(): if self.apps[v.app_uuid].status == "requested": @@ -110,7 +171,16 @@ class AppHandler(object): self.apps[v.app_uuid].timestamp_scheduled = _timestamp if isinstance(v, VM): - self.apps[v.app_uuid].add_vm(v, _placement_map[v]) + if self.apps[v.app_uuid].request_type == "replan": + if v.uuid in _app_topology.planned_vm_map.keys(): + self.apps[v.app_uuid].add_vm(v, _placement_map[v], "replanned") + else: + self.apps[v.app_uuid].add_vm(v, _placement_map[v], "scheduled") + if v.uuid == _app_topology.candidate_list_map.keys()[0]: + self.apps[v.app_uuid].add_vm(v, _placement_map[v], "replanned") + else: + self.apps[v.app_uuid].add_vm(v, _placement_map[v], "scheduled") + # NOTE(GJ): do not handle Volume in this version else: if _placement_map[v] in self.resource.hosts.keys(): host = self.resource.hosts[_placement_map[v]] @@ -125,12 +195,13 @@ class AppHandler(object): pass def _store_app_placements(self): + # NOTE(GJ): do not track application history in this version + + for appk, app in self.apps.iteritems(): + json_info = app.get_json_info() + if self.db.add_app(appk, json_info) is False: + return False - if self.db is not None: - for appk, app in self.apps.iteritems(): - json_info = app.get_json_info() - if self.db.add_app(appk, json_info) is False: - return False return True def remove_placement(self): @@ -204,12 +275,11 @@ class AppHandler(object): if _action == "replan": if vmk == _app["orchestration_id"]: - _app_topology.candidate_list_map[vmk] = \ - _app["locations"] - + _app_topology.candidate_list_map[vmk] = _app["locations"] elif vmk in _app["exclusions"]: _app_topology.planned_vm_map[vmk] = vm["host"] - + if vm["status"] == "replanned": + _app_topology.planned_vm_map[vmk] = vm["host"] elif _action == "migrate": if vmk == _app["orchestration_id"]: _app_topology.exclusion_list_map[vmk] = _app[ diff --git a/valet/engine/optimizer/app_manager/application.py b/valet/engine/optimizer/app_manager/application.py index 7af0f1c..0723c1f 100755 --- a/valet/engine/optimizer/app_manager/application.py +++ b/valet/engine/optimizer/app_manager/application.py @@ -28,7 +28,7 @@ class App(object): self.app_id = _app_id self.app_name = _app_name - self.request_type = _action # create, update, or ping + self.request_type = _action # create, replan, migrate, or ping self.timestamp_scheduled = 0 @@ -38,10 +38,10 @@ class App(object): self.status = 'requested' # Moved to "scheduled" (and then "placed") - def add_vm(self, _vm, _host_name): + def add_vm(self, _vm, _host_name, _status): """Add vm to app, set status to scheduled.""" self.vms[_vm.uuid] = _vm - self.vms[_vm.uuid].status = "scheduled" + self.vms[_vm.uuid].status = _status self.vms[_vm.uuid].host = _host_name def add_volume(self, _vol, _host_name): diff --git a/valet/engine/optimizer/db_connect/music_handler.py b/valet/engine/optimizer/db_connect/music_handler.py old mode 100644 new mode 100755 index 5fea74d..0b08ec1 --- a/valet/engine/optimizer/db_connect/music_handler.py +++ b/valet/engine/optimizer/db_connect/music_handler.py @@ -17,7 +17,6 @@ import json import operator -import time from valet.common.music import Music from valet.engine.optimizer.db_connect.event import Event @@ -167,6 +166,7 @@ class MusicHandler(object): return True + # TODO(GJ): evaluate the delay def get_events(self): """Get Events. @@ -176,15 +176,14 @@ class MusicHandler(object): """ event_list = [] - ts = time.time() events = {} try: events = self.music.read_all_rows(self.config.db_keyspace, self.config.db_event_table) except Exception as e: self.logger.error("DB:event: " + str(e)) - self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts)) - return None + # FIXME(GJ): return None? + return {} if len(events) > 0: for _, row in events.iteritems(): @@ -249,8 +248,10 @@ class MusicHandler(object): e.args = args event_list.append(e) else: - if self.delete_event(event_id) \ - is False: + self.logger.warn("unknown vm_state = " + change_data["vm_state"]) + if 'uuid' in change_data.keys(): + self.logger.warn(" uuid = " + change_data['uuid']) + if self.delete_event(event_id) is False: return None else: if self.delete_event(event_id) is False: @@ -285,6 +286,7 @@ class MusicHandler(object): return None continue + # NOTE(GJ): do not check the existance of scheduler_hints if 'instance' not in args.keys(): if self.delete_event(event_id) is False: return None @@ -333,7 +335,6 @@ class MusicHandler(object): if len(event_list) > 0: event_list.sort(key=operator.attrgetter('event_id')) - self.logger.debug("EVAL: the delay of getting events = " + str(time.time() - ts)) return event_list def delete_event(self, _event_id): @@ -413,15 +414,14 @@ class MusicHandler(object): """Return list of requests that consists of all rows in a db table.""" request_list = [] - ts = time.time() requests = {} try: requests = self.music.read_all_rows(self.config.db_keyspace, self.config.db_request_table) except Exception as e: self.logger.error("DB: while reading requests: " + str(e)) - self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts)) - return None + # FIXME(GJ): return None? + return {} if len(requests) > 0: self.logger.info("MusicHandler.get_requests: placement request " @@ -434,7 +434,6 @@ class MusicHandler(object): for r in r_list: request_list.append(r) - self.logger.debug("EVAL: the delay of getting requests = " + str(time.time() - ts)) return request_list def put_result(self, _result): @@ -625,8 +624,6 @@ class MusicHandler(object): self.logger.error("DB: while deleting app: " + str(e)) return False - # self.logger.info("DB: app deleted") - if _app_data is not None: data = { 'stack_id': _k, @@ -640,8 +637,6 @@ class MusicHandler(object): self.logger.error("DB: while inserting app: " + str(e)) return False - # self.logger.info("DB: app added") - return True def get_app_info(self, _s_uuid): diff --git a/valet/engine/optimizer/ostro/ostro.py b/valet/engine/optimizer/ostro/ostro.py index 5c8e0b6..8b31bed 100755 --- a/valet/engine/optimizer/ostro/ostro.py +++ b/valet/engine/optimizer/ostro/ostro.py @@ -74,9 +74,7 @@ class Ostro(object): self.end_of_process = False self.batch_store_trigger = 10 # sec - # self.batch_events_count = 1 - ''' def run_ostro(self): self.logger.info("start Ostro ......") @@ -106,7 +104,7 @@ class Ostro(object): break else: if self.resource.resource_updated is True and \ - (time.time()-self.resource.curr_db_timestamp) >= self.batch_store_trigger: + (time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger: self.data_lock.acquire() if self.resource.store_topology_updates() is False: self.data_lock.release() @@ -119,58 +117,6 @@ class Ostro(object): self.topology.end_of_process = True self.compute.end_of_process = True - for t in self.thread_list: - t.join() - - self.logger.info("exit Ostro") - ''' - - def run_ostro(self): - """Start main engine process.""" - """Start topology, compute, and listener processes. Start process of - retrieving and handling events and requests from the db every 1 second. - """ - self.logger.info("Ostro.run_ostro: start Ostro ......") - - self.topology.start() - self.compute.start() - self.listener.start() - - self.thread_list.append(self.topology) - self.thread_list.append(self.compute) - self.thread_list.append(self.listener) - - while self.end_of_process is False: - time.sleep(0.1) - - request_list = self.db.get_requests() - if request_list is None: - break - - if len(request_list) > 0: - if self.place_app(request_list) is False: - break - else: - event_list = self.db.get_events() - if event_list is None: - break - - if len(event_list) > 0: - if self.handle_events(event_list) is False: - break - else: - if self.resource.resource_updated is True and \ - (time.time() - self.resource.curr_db_timestamp) >= self.batch_store_trigger: - self.data_lock.acquire() - if self.resource.store_topology_updates() is False: - self.data_lock.release() - break - self.resource.resource_updated = False - self.data_lock.release() - - self.topology.end_of_process = True - self.compute.end_of_process = True - for t in self.thread_list: t.join() @@ -227,7 +173,6 @@ class Ostro(object): def _set_topology(self): if not self.topology.set_topology(): - # self.status = "datacenter configuration error" self.logger.error("failed to read datacenter topology") return False @@ -236,7 +181,6 @@ class Ostro(object): def _set_hosts(self): if not self.compute.set_hosts(): - # self.status = "OpenStack (Nova) internal error" self.logger.error("failed to read hosts from OpenStack (Nova)") return False @@ -245,17 +189,14 @@ class Ostro(object): def _set_flavors(self): if not self.compute.set_flavors(): - # self.status = "OpenStack (Nova) internal error" self.logger.error("failed to read flavors from OpenStack (Nova)") return False self.logger.info("done flavors bootstrap") return True + # TODO(GJ): evaluate delay def place_app(self, _app_data): - """Place results of query and placement requests in the db.""" - start_time = time.time() - for req in _app_data: if req["action"] == "query": self.logger.info("start query") @@ -272,23 +213,24 @@ class Ostro(object): self.logger.info("start app placement") result = None - placement_map = self._place_app(req) - - if placement_map is None: - result = self._get_json_results("placement", "error", - self.status, placement_map) + (decision_key, old_decision) = self.app_handler.check_history(req) + if old_decision is None: + placement_map = self._place_app(req) + if placement_map is None: + result = self._get_json_results("placement", "error", self.status, placement_map) + else: + result = self._get_json_results("placement", "ok", "success", placement_map) + if decision_key is not None: + self.app_handler.put_history(decision_key, result) else: - result = self._get_json_results("placement", "ok", - "success", placement_map) + self.logger.warn("decision(" + decision_key + ") already made") + result = old_decision if self.db.put_result(result) is False: return False self.logger.info("done app placement") - end_time = time.time() - self.logger.debug("EVAL: total decision delay of request = " + str(end_time - start_time)) - return True def _query(self, _q): @@ -394,14 +336,12 @@ class Ostro(object): self.data_lock.release() return None - """Update resource and app information.""" + # Update resource and app information if len(placement_map) > 0: self.resource.update_topology(store=False) + self.app_handler.add_placement(placement_map, app_topology, self.resource.current_timestamp) - self.app_handler.add_placement(placement_map, - self.resource.current_timestamp) - if len(app_topology.exclusion_list_map) > 0 and \ - len(app_topology.planned_vm_map) > 0: + if len(app_topology.exclusion_list_map) > 0 and len(app_topology.planned_vm_map) > 0: for vk in app_topology.planned_vm_map.keys(): if vk in placement_map.keys(): del placement_map[vk] @@ -426,7 +366,7 @@ class Ostro(object): self.logger.warn("Ostro._set_vm_flavor_properties: does not exist " "flavor (" + _vm.flavor + ") and try to refetch") - """Reset flavor resource and try again.""" + # Reset flavor resource and try again if self._set_flavors() is False: return False @@ -446,6 +386,7 @@ class Ostro(object): return True + # TODO(GJ): evaluate the delay def handle_events(self, _event_list): """Handle events in the event list.""" """Update the engine's resource topology based on the properties of @@ -453,12 +394,8 @@ class Ostro(object): """ self.data_lock.acquire() - event_handler_start_time = time.time() - resource_updated = False - # events_count = 0 - # handled_event_list = [] for e in _event_list: if e.host is not None and e.host != "none": if self._check_host(e.host) is False: @@ -493,45 +430,27 @@ class Ostro(object): return False if len(vm_info) == 0: - """ - h_uuid is None or "none" because vm is not created - by stack or, stack not found because vm is created - by the other stack - """ - self.logger.warn("Ostro.handle_events: no vm_info " - "found in app placement record") - self._add_vm_to_host(e.uuid, orch_id[0], e.host, - e.vcpus, e.mem, e.local_disk) + # Stack not found because vm is created by the other stack + self.logger.warn("EVENT: no vm_info found in app placement record") + self._add_vm_to_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk) else: - if "planned_host" in vm_info.keys() and \ - vm_info["planned_host"] != e.host: - """ - vm is activated in the different host - """ - self.logger.warn("Ostro.handle_events: vm " - "activated in the different " - "host") - self._add_vm_to_host( - e.uuid, orch_id[0], e.host, e.vcpus, e.mem, - e.local_disk) + if "planned_host" in vm_info.keys() and vm_info["planned_host"] != e.host: + # VM is activated in the different host + self.logger.warn("EVENT: vm activated in the different host") + self._add_vm_to_host(e.uuid, orch_id[0], e.host, e.vcpus, e.mem, e.local_disk) - self._remove_vm_from_host( - e.uuid, orch_id[0], vm_info["planned_host"], - float(vm_info["cpus"]), - float(vm_info["mem"]), - float(vm_info["local_volume"])) + self._remove_vm_from_host(e.uuid, orch_id[0], + vm_info["planned_host"], + float(vm_info["cpus"]), + float(vm_info["mem"]), + float(vm_info["local_volume"])) - self._remove_vm_from_logical_groups( - e.uuid, orch_id[0], vm_info["planned_host"]) + self._remove_vm_from_logical_groups(e.uuid, orch_id[0], vm_info["planned_host"]) else: - """ - found vm in the planned host, - possibly the vm deleted in the host while batch cleanup - """ - if self._check_h_uuid(orch_id[0], e.host) \ - is False: - self.logger.debug("Ostro.handle_events: " - "planned vm was deleted") + # Found vm in the planned host, + # Possibly the vm deleted in the host while batch cleanup + if self._check_h_uuid(orch_id[0], e.host) is False: + self.logger.warn("EVENT: planned vm was deleted") if self._check_uuid(e.uuid, e.host) is True: self._update_h_uuid_in_host(orch_id[0], e.uuid, @@ -593,11 +512,6 @@ class Ostro(object): self.logger.warn("Ostro.handle_events: unknown event " "method = " + e.method) - # events_count += 1 - # handled_event_list.append(e) - # if events_count >= self.batch_events_count: - # break - if resource_updated is True: self.resource.update_topology(store=False) @@ -612,8 +526,6 @@ class Ostro(object): self.data_lock.release() return False - self.logger.debug("EVAL: total delay for event handling = " + str(time.time() - event_handler_start_time)) - self.data_lock.release() return True diff --git a/valet/engine/optimizer/ostro_server/configuration.py b/valet/engine/optimizer/ostro_server/configuration.py index 9cac9e4..f247874 100755 --- a/valet/engine/optimizer/ostro_server/configuration.py +++ b/valet/engine/optimizer/ostro_server/configuration.py @@ -44,9 +44,6 @@ class Config(object): self.api_protocol = 'http://' - self.network_control = False - self.network_control_api = None - self.db_keyspace = None self.db_request_table = None self.db_response_table = None @@ -63,9 +60,6 @@ class Config(object): self.priority = 0 - self.rpc_server_ip = None - self.rpc_server_port = 0 - # Logging parameters self.logger_name = None self.logging_level = None @@ -84,10 +78,9 @@ class Config(object): self.rack_code_list = [] self.node_code_list = [] - self.topology_trigger_time = None self.topology_trigger_freq = 0 - self.compute_trigger_time = None self.compute_trigger_freq = 0 + self.update_batch_wait = 0 self.default_cpu_allocation_ratio = 1 self.default_ram_allocation_ratio = 1 @@ -169,16 +162,8 @@ class Config(object): self.process = CONF.engine.pid - self.rpc_server_ip = CONF.engine.rpc_server_ip - - self.rpc_server_port = CONF.engine.rpc_server_port - self.datacenter_name = CONF.engine.datacenter_name - self.network_control = CONF.engine.network_control - - self.network_control_url = CONF.engine.network_control_url - self.default_cpu_allocation_ratio = \ CONF.engine.default_cpu_allocation_ratio @@ -195,14 +180,12 @@ class Config(object): self.static_local_disk_standby_ratio = \ CONF.engine.static_local_disk_standby_ratio - self.topology_trigger_time = CONF.engine.topology_trigger_time - self.topology_trigger_freq = CONF.engine.topology_trigger_frequency - self.compute_trigger_time = CONF.engine.compute_trigger_time - self.compute_trigger_freq = CONF.engine.compute_trigger_frequency + self.update_batch_wait = CONF.engine.update_batch_wait + self.db_keyspace = CONF.music.keyspace self.db_request_table = CONF.music.request_table diff --git a/valet/engine/optimizer/ostro_server/db_cleaner.py b/valet/engine/optimizer/ostro_server/db_cleaner.py deleted file mode 100644 index f488618..0000000 --- a/valet/engine/optimizer/ostro_server/db_cleaner.py +++ /dev/null @@ -1,182 +0,0 @@ -# -# Copyright 2014-2017 AT&T Intellectual Property -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# - Handle user requests - -"""Database Cleaner.""" - -from configuration import Config -from valet.common.music import Music - - -class DBCleaner(object): - """Database Cleaner.""" - - def __init__(self, _config): - """Initialization.""" - self.config = _config - - self.music = Music() - - def clean_db_tables(self): - """Clean tables in Music.""" - """Clean resource, resource_index, request, response, event, - app, app_index, and uuid tables. - """ - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_resource_table) - if len(results) > 0: - print("resource table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_resource_table, - 'site_name', row['site_name']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_request_table) - if len(results) > 0: - print("request table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_request_table, - 'stack_id', row['stack_id']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_response_table) - if len(results) > 0: - print("response table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_response_table, - 'stack_id', row['stack_id']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_event_table) - if len(results) > 0: - print("event table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_event_table, - 'timestamp', row['timestamp']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_resource_index_table) - if len(results) > 0: - print("resource_index table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually( - self.config.db_keyspace, - self.config.db_resource_index_table, - 'site_name', row['site_name']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_app_index_table) - if len(results) > 0: - print("app_index table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_app_index_table, - 'site_name', row['site_name']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_app_table) - if len(results) > 0: - print("app table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_app_table, - 'stack_id', row['stack_id']) - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_uuid_table) - if len(results) > 0: - print("uuid table result = ", len(results)) - for _, row in results.iteritems(): - self.music.delete_row_eventually(self.config.db_keyspace, - self.config.db_uuid_table, - 'uuid', row['uuid']) - - def check_db_tables(self): - """Log whether tables in Music have been cleaned.""" - """Check resource, resource_index, request, response, event, - app, app_index, and uuid tables. - """ - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_resource_table) - if len(results) > 0: - print("resource table not cleaned ") - else: - print("resource table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_request_table) - if len(results) > 0: - print("request table not cleaned ") - else: - print("request table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_response_table) - if len(results) > 0: - print("response table not cleaned ") - else: - print("response table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_event_table) - if len(results) > 0: - print("event table not cleaned ") - else: - print("event table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_resource_index_table) - if len(results) > 0: - print("resource log index table not cleaned ") - else: - print("resource log index table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_app_index_table) - if len(results) > 0: - print("app log index table not cleaned ") - else: - print("app log index table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_app_table) - if len(results) > 0: - print("app log table not cleaned ") - else: - print("app log table cleaned") - - results = self.music.read_all_rows(self.config.db_keyspace, - self.config.db_uuid_table) - if len(results) > 0: - print("uuid table not cleaned ") - else: - print("uuid table cleaned") - - -if __name__ == '__main__': - config = Config() - config_status = config.configure() - if config_status != "success": - print("Error while configuring Ostro: " + config_status) - sys.exit(2) - - c = DBCleaner(config) - c.clean_db_tables() - c.check_db_tables() diff --git a/valet/engine/optimizer/ostro_server/ostro_daemon.py b/valet/engine/optimizer/ostro_server/ostro_daemon.py index 1f3c5ee..bd1c92d 100755 --- a/valet/engine/optimizer/ostro_server/ostro_daemon.py +++ b/valet/engine/optimizer/ostro_server/ostro_daemon.py @@ -39,7 +39,7 @@ class OstroDaemon(Daemon): self.logger.error("ostro bootstrap failed") sys.exit(2) - # write pidfile + # Write pidfile pid = str(os.getpid()) file(self.pidfile, 'w+').write("%s\n" % pid) @@ -58,18 +58,16 @@ def verify_dirs(list_of_dirs): if __name__ == "__main__": - """ configuration """ # Configuration try: config = Config() - ''' logger ''' logger = get_logger("ostro_daemon") config_status = config.configure() if config_status != "success": print(config_status) sys.exit(2) - """ verify directories """ + # Verify directories dirs_list = [config.logging_loc, config.resource_log_loc, config.app_log_loc, os.path.dirname(config.process)] verify_dirs(dirs_list) @@ -77,8 +75,6 @@ if __name__ == "__main__": # Start daemon process daemon = OstroDaemon(config.priority, config.process, logger) - logger.info("%s ostro ..." % config.command) - # switch case exit_code = { 'start': daemon.start, 'stop': daemon.stop, diff --git a/valet/engine/optimizer/ostro_server/ostro_sim.cfg b/valet/engine/optimizer/ostro_server/ostro_sim.cfg deleted file mode 100644 index d250498..0000000 --- a/valet/engine/optimizer/ostro_server/ostro_sim.cfg +++ /dev/null @@ -1,25 +0,0 @@ -# Version 2.0.2 - -# Set simulation parameters -num_of_spine_switches=0 -#num_of_racks=1 -num_of_racks=2 -#num_of_hosts_per_rack=8 -num_of_hosts_per_rack=2 - -bandwidth_of_spine=40000 -bandwidth_of_rack=40000 -bandwidth_of_host=10000 - -num_of_aggregates=1 -aggregated_ratio=5 - -cpus_per_host=16 -mem_per_host=16000 -disk_per_host=1000 - -num_of_basic_flavors=3 -base_flavor_cpus=1 -base_flavor_mem=2000 -base_flavor_disk=40 - diff --git a/valet/engine/resource_manager/compute_manager.py b/valet/engine/resource_manager/compute_manager.py index b52c8ca..bf49f69 100755 --- a/valet/engine/resource_manager/compute_manager.py +++ b/valet/engine/resource_manager/compute_manager.py @@ -49,6 +49,8 @@ class ComputeManager(threading.Thread): self.admin_token = None self.project_token = None + self.update_batch_wait = self.config.update_batch_wait + def run(self): """Start Compute Manager thread to run setup.""" self.logger.info("ComputeManager: start " + self.thread_name + @@ -59,39 +61,15 @@ class ComputeManager(threading.Thread): while self.end_of_process is False: time.sleep(60) + curr_ts = time.time() + if curr_ts > period_end: + # Give some time (batch_wait) to update resource status via message bus + # Otherwise, late update will be cleaned up + if (curr_ts - self.resource.current_timestamp) > self.update_batch_wait: + self._run() + period_end = curr_ts + self.config.compute_trigger_freq - if time.time() > period_end: - self._run() - period_end = time.time() + self.config.compute_trigger_freq - - else: - (alarm_HH, alarm_MM) = self.config.compute_trigger_time.split(':') - - now = time.localtime() - timeout = True - last_trigger_year = now.tm_year - last_trigger_mon = now.tm_mon - last_trigger_mday = now.tm_mday - - while self.end_of_process is False: - time.sleep(60) - - now = time.localtime() - if now.tm_year > last_trigger_year or \ - now.tm_mon > last_trigger_mon or \ - now.tm_mday > last_trigger_mday: - - timeout = False - - if timeout is False and \ - now.tm_hour >= int(alarm_HH) and now.tm_min >= int(alarm_MM): - self._run() - - timeout = True - last_trigger_year = now.tm_year - last_trigger_mon = now.tm_mon - last_trigger_mday = now.tm_mday - + # NOTE(GJ): do not timer based batch self.logger.info("exit compute_manager " + self.thread_name) def _run(self): @@ -124,7 +102,6 @@ class ComputeManager(threading.Thread): status = compute.set_hosts(hosts, logical_groups) if status != "success": - # self.logger.error("ComputeManager: " + status) return False self._compute_avail_host_resources(hosts) @@ -256,10 +233,14 @@ class ComputeManager(threading.Thread): def _check_host_config_update(self, _host, _rhost): topology_updated = False - topology_updated = self._check_host_status(_host, _rhost) - topology_updated = self._check_host_resources(_host, _rhost) - topology_updated = self._check_host_memberships(_host, _rhost) - topology_updated = self._check_host_vms(_host, _rhost) + if self._check_host_status(_host, _rhost) is True: + topology_updated = True + if self._check_host_resources(_host, _rhost) is True: + topology_updated = True + if self._check_host_memberships(_host, _rhost) is True: + topology_updated = True + if self._check_host_vms(_host, _rhost) is True: + topology_updated = True return topology_updated @@ -358,36 +339,37 @@ class ComputeManager(threading.Thread): def _check_host_vms(self, _host, _rhost): topology_updated = False - ''' clean up VMs ''' - for rvm_id in _rhost.vm_list: - if rvm_id[2] == "none": - _rhost.vm_list.remove(rvm_id) - - topology_updated = True - self.logger.warn("ComputeManager: host (" + _rhost.name + - ") updated (none vm removed)") + # Clean up VMs + blen = len(_rhost.vm_list) + _rhost.vm_list = [v for v in _rhost.vm_list if v[2] != "none"] + alen = len(_rhost.vm_list) + if alen != blen: + topology_updated = True + self.logger.warn("host (" + _rhost.name + ") " + str(blen - alen) + " none vms removed") self.resource.clean_none_vms_from_logical_groups(_rhost) for vm_id in _host.vm_list: if _rhost.exist_vm_by_uuid(vm_id[2]) is False: _rhost.vm_list.append(vm_id) - topology_updated = True self.logger.warn("ComputeManager: host (" + _rhost.name + ") updated (new vm placed)") for rvm_id in _rhost.vm_list: if _host.exist_vm_by_uuid(rvm_id[2]) is False: - _rhost.vm_list.remove(rvm_id) - - self.resource.remove_vm_by_uuid_from_logical_groups(_rhost, - rvm_id[2]) - + self.resource.remove_vm_by_uuid_from_logical_groups(_rhost, rvm_id[2]) topology_updated = True self.logger.warn("ComputeManager: host (" + _rhost.name + ") updated (vm removed)") + blen = len(_rhost.vm_list) + _rhost.vm_list = [v for v in _rhost.vm_list if _host.exist_vm_by_uuid(v[2]) is True] + alen = len(_rhost.vm_list) + if alen != blen: + topology_updated = True + self.logger.warn("host (" + _rhost.name + ") " + str(blen - alen) + " vms removed") + return topology_updated def set_flavors(self): diff --git a/valet/engine/resource_manager/resource.py b/valet/engine/resource_manager/resource.py index f4b1d4e..3815105 100755 --- a/valet/engine/resource_manager/resource.py +++ b/valet/engine/resource_manager/resource.py @@ -54,7 +54,6 @@ class Resource(object): self.current_timestamp = 0 self.curr_db_timestamp = 0 - # self.last_log_index = 0 self.resource_updated = False @@ -65,6 +64,7 @@ class Resource(object): self.disk_avail = 0 self.nw_bandwidth_avail = 0 + # FIXME(GJ): should check invalid return here? def bootstrap_from_db(self, _resource_status): """Return True if bootsrap resource from database successful.""" try: @@ -433,8 +433,6 @@ class Resource(object): self.nw_bandwidth_avail += min(avail_nw_bandwidth_list) def store_topology_updates(self): - store_start_time = time.time() - updated = False flavor_updates = {} logical_group_updates = {} @@ -449,56 +447,42 @@ class Resource(object): for fk, flavor in self.flavors.iteritems(): if flavor.last_update >= self.curr_db_timestamp: flavor_updates[fk] = flavor.get_json_info() - # self.logger.debug("resource flavor(" + fk + ") stored") updated = True for lgk, lg in self.logical_groups.iteritems(): if lg.last_update >= self.curr_db_timestamp: logical_group_updates[lgk] = lg.get_json_info() - # self.logger.debug("resource lg(" + lgk + ") stored") updated = True for shk, storage_host in self.storage_hosts.iteritems(): if storage_host.last_update >= self.curr_db_timestamp or \ storage_host.last_cap_update >= self.curr_db_timestamp: storage_updates[shk] = storage_host.get_json_info() - # self.logger.debug("resource storage(" + shk + ") stored") updated = True for sk, s in self.switches.iteritems(): if s.last_update >= self.curr_db_timestamp: switch_updates[sk] = s.get_json_info() - # self.logger.debug("resource switch(" + sk + ") stored") updated = True for hk, host in self.hosts.iteritems(): if host.last_update > self.current_timestamp or \ host.last_link_update > self.current_timestamp: host_updates[hk] = host.get_json_info() - # self.logger.debug("resource host(" + hk + ") stored") updated = True for hgk, host_group in self.host_groups.iteritems(): if host_group.last_update >= self.curr_db_timestamp or \ host_group.last_link_update >= self.curr_db_timestamp: host_group_updates[hgk] = host_group.get_json_info() - # self.logger.debug("resource hgroup(" + hgk + ") stored") updated = True if self.datacenter.last_update >= self.curr_db_timestamp or \ self.datacenter.last_link_update >= self.curr_db_timestamp: datacenter_update = self.datacenter.get_json_info() - # self.logger.debug("resource datacenter stored") updated = True - # (resource_logfile, last_index, mode) = util.get_last_logfile(self.config.resource_log_loc, - # self.config.max_log_size, - # self.config.max_num_of_logs, - # self.datacenter.name, - # self.last_log_index) - # self.last_log_index = last_index - - # logging = open(self.config.resource_log_loc + resource_logfile, mode) + # NOTE(GJ): do not track resource change histroy in this version if updated is True: json_logging = {} @@ -519,27 +503,11 @@ class Resource(object): if datacenter_update is not None: json_logging['datacenter'] = datacenter_update - # logged_data = json.dumps(json_logging) - - # logging.write(logged_data) - # logging.write("\n") - - # logging.close() - - # self.logger.info("log resource status in " + resource_logfile) - - # if self.db is not None: if self.db.update_resource_status(self.datacenter.name, json_logging) is False: return None self.curr_db_timestamp = time.time() - # for test - # self.show_current_logical_groups() - # self.show_current_host_status() - - self.logger.debug("EVAL: total delay for store resource status = " + str(time.time() - store_start_time)) - return True def show_current_logical_groups(self): @@ -731,32 +699,7 @@ class Resource(object): ") status changed") updated = True - if host.original_vCPUs != _vcpus or \ - host.vCPUs_used != _vcpus_used: - self.logger.debug("Resource.update_host_resources: host(" + _hn + - ") cpu changed") - host.original_vCPUs = _vcpus - host.vCPUs_used = _vcpus_used - updated = True - - if host.free_mem_mb != _fmem or \ - host.original_mem_cap != _mem: - self.logger.debug("Resource.update_host_resources: host(" + _hn + - ") mem changed") - host.free_mem_mb = _fmem - host.original_mem_cap = _mem - updated = True - - if host.free_disk_gb != _fldisk or \ - host.original_local_disk_cap != _ldisk or \ - host.disk_available_least != _avail_least: - self.logger.debug("Resource.update_host_resources: host(" + _hn + - ") disk changed") - - host.free_disk_gb = _fldisk - host.original_local_disk_cap = _ldisk - host.disk_available_least = _avail_least - updated = True + # FIXME(GJ): should check cpu, memm and disk here? if updated is True: self.compute_avail_resources(_hn, host) @@ -831,11 +774,11 @@ class Resource(object): lg = self.logical_groups[lgk] if isinstance(_host, Host): - # remove host from lg's membership if the host has no vms of lg + # Remove host from lg's membership if the host has no vms of lg if lg.remove_vm_by_h_uuid(_h_uuid, _host.name) is True: lg.last_update = time.time() - # remove lg from host's membership if lg does not have the host + # Remove lg from host's membership if lg does not have the host if _host.remove_membership(lg) is True: _host.last_update = time.time() @@ -870,11 +813,11 @@ class Resource(object): lg = self.logical_groups[lgk] if isinstance(_host, Host): - # remove host from lg's membership if the host has no vms of lg + # Remove host from lg's membership if the host has no vms of lg if lg.remove_vm_by_uuid(_uuid, _host.name) is True: lg.last_update = time.time() - # remove lg from host's membership if lg does not have the host + # Remove lg from host's membership if lg does not have the host if _host.remove_membership(lg) is True: _host.last_update = time.time() @@ -1006,7 +949,8 @@ class Resource(object): if self.config.default_ram_allocation_ratio > 0: ram_allocation_ratio = self.config.default_ram_allocation_ratio - static_ram_standby_ratio = 0 + if self.config.static_mem_standby_ratio > 0: + static_ram_standby_ratio = float(self.config.static_mem_standby_ratio) / float(100) host.compute_avail_mem(ram_allocation_ratio, static_ram_standby_ratio) @@ -1017,7 +961,8 @@ class Resource(object): if self.config.default_cpu_allocation_ratio > 0: cpu_allocation_ratio = self.config.default_cpu_allocation_ratio - static_cpu_standby_ratio = 0 + if self.config.static_cpu_standby_ratio > 0: + static_cpu_standby_ratio = float(self.config.static_cpu_standby_ratio) / float(100) host.compute_avail_vCPUs(cpu_allocation_ratio, static_cpu_standby_ratio) @@ -1029,10 +974,10 @@ class Resource(object): disk_allocation_ratio = \ self.config.default_disk_allocation_ratio - static_disk_standby_ratio = 0 + if self.config.static_local_disk_standby_ratio > 0: + static_disk_standby_ratio = float(self.config.static_local_disk_standby_ratio) / float(100) - host.compute_avail_disk(disk_allocation_ratio, - static_disk_standby_ratio) + host.compute_avail_disk(disk_allocation_ratio, static_disk_standby_ratio) def get_flavor(self, _id): """Return flavor according to name passed in.""" diff --git a/valet/engine/resource_manager/resource_base.py b/valet/engine/resource_manager/resource_base.py index dbf98f9..6daede8 100755 --- a/valet/engine/resource_manager/resource_base.py +++ b/valet/engine/resource_manager/resource_base.py @@ -117,7 +117,6 @@ class Datacenter(object): 'last_link_update': self.last_link_update} -# data container for rack or cluster class HostGroup(object): """Class for Host Group Object. @@ -668,16 +667,18 @@ class LogicalGroup(object): """Return True if vm's or host vm's removed with physical id none.""" success = False - for vm_id in self.vm_list: - if vm_id[2] == "none": - self.vm_list.remove(vm_id) - success = True + blen = len(self.vm_list) + self.vm_list = [v for v in self.vm_list if v[2] != "none"] + alen = len(self.vm_list) + if alen != blen: + success = True if _host_id in self.vms_per_host.keys(): - for vm_id in self.vms_per_host[_host_id]: - if vm_id[2] == "none": - self.vms_per_host[_host_id].remove(vm_id) - success = True + blen = len(self.vms_per_host[_host_id]) + self.vms_per_host[_host_id] = [v for v in self.vms_per_host[_host_id] if v[2] != "none"] + alen = len(self.vm_list) + if alen != blen: + success = True if self.group_type == "EX" or self.group_type == "AFF" or \ self.group_type == "DIV": diff --git a/valet/engine/resource_manager/topology_manager.py b/valet/engine/resource_manager/topology_manager.py index 2ab14eb..9452f77 100755 --- a/valet/engine/resource_manager/topology_manager.py +++ b/valet/engine/resource_manager/topology_manager.py @@ -46,6 +46,8 @@ class TopologyManager(threading.Thread): self.logger = _logger + self.update_batch_wait = self.config.update_batch_wait + def run(self): """Function starts and tracks Topology Manager Thread.""" self.logger.info("TopologyManager: start " + @@ -56,38 +58,14 @@ class TopologyManager(threading.Thread): while self.end_of_process is False: time.sleep(70) - - if time.time() > period_end: - self._run() - period_end = time.time() + self.config.topology_trigger_freq - - else: - (alarm_HH, alarm_MM) = self.config.topology_trigger_time.split(':') - now = time.localtime() - timeout = True - last_trigger_year = now.tm_year - last_trigger_mon = now.tm_mon - last_trigger_mday = now.tm_mday - - while self.end_of_process is False: - time.sleep(70) - - now = time.localtime() - if now.tm_year > last_trigger_year or \ - now.tm_mon > last_trigger_mon or \ - now.tm_mday > last_trigger_mday: - - timeout = False - - if timeout is False and \ - now.tm_hour >= int(alarm_HH) and now.tm_min >= int(alarm_MM): - self._run() - - timeout = True - last_trigger_year = now.tm_year - last_trigger_mon = now.tm_mon - last_trigger_mday = now.tm_mday - + curr_ts = time.time() + if curr_ts > period_end: + # Give some time (batch_wait) to update resource status via message bus + # Otherwise, late update will be cleaned up + if (curr_ts - self.resource.current_timestamp) > self.update_batch_wait: + self._run() + period_end = curr_ts + self.config.topology_trigger_freq + # NOTE(GJ): do not timer based batch self.logger.info("exit topology_manager " + self.thread_name) def _run(self): @@ -119,7 +97,6 @@ class TopologyManager(threading.Thread): status = topology.set_topology(datacenter, host_groups, hosts, self.resource.hosts, switches) if status != "success": - # self.logger.error("TopologyManager: " + status) return False self.data_lock.acquire() @@ -505,12 +482,14 @@ class TopologyManager(threading.Thread): self.logger.warn("TopologyManager: datacenter updated " "(new region code, " + rc + ")") - for rrc in self.resource.datacenter.region_code_list: - if rrc not in _datacenter.region_code_list: - self.resource.datacenter.region_code_list.remove(rrc) - updated = True - self.logger.warn("TopologyManager: datacenter updated " - "(region code, " + rrc + ", removed)") + code_list = self.resource.datacenter.region_code_list + blen = len(code_list) + code_list = [r for r in code_list if r in _datacenter.region_code_list] + alen = len(code_list) + if alen != blen: + updated = True + self.resource.datacenter.region_code_list = code_list + self.logger.warn("datacenter updated (region code removed)") for rk in _datacenter.resources.keys(): exist = False