Merge "Retry mechanism improved and fixed"
This commit is contained in:
commit
b3504c00db
@ -54,6 +54,12 @@ class Server(SynergyObject):
|
||||
def setState(self, state):
|
||||
self.set("state", state)
|
||||
|
||||
def getHost(self):
|
||||
return self.get("host")
|
||||
|
||||
def setHost(self, host):
|
||||
self.set("host", host)
|
||||
|
||||
def getFlavor(self):
|
||||
return self.get("flavor")
|
||||
|
||||
|
@ -59,6 +59,9 @@ class Notifications(object):
|
||||
server.setDeletedAt(server_info["deleted_at"])
|
||||
server.setTerminatedAt(server_info["terminated_at"])
|
||||
|
||||
if "host" in server_info:
|
||||
server.setHost(server_info["host"])
|
||||
|
||||
if "uuid" in server_info:
|
||||
server.setId(server_info["uuid"])
|
||||
elif "instance_id" in server_info:
|
||||
@ -84,96 +87,66 @@ class Notifications(object):
|
||||
return
|
||||
|
||||
state = payload["state"]
|
||||
if state == "error":
|
||||
LOG.info("Notification INFO: event_type=%s payload=%s"
|
||||
% (event_type, payload))
|
||||
|
||||
if ((event_type == "compute.instance.delete.end" and
|
||||
(state == "deleted" or state == "error" or state == "building")) or
|
||||
(event_type == "compute.instance.update" and state == "error") or
|
||||
(event_type == "scheduler.run_instance" and state == "error")):
|
||||
server_info = None
|
||||
event_types = ["compute.instance.create.end",
|
||||
"compute.instance.delete.end",
|
||||
"compute.instance.update",
|
||||
"scheduler.run_instance"]
|
||||
|
||||
if event_type == "scheduler.run_instance":
|
||||
server_info = payload["request_spec"]["instance_type"]
|
||||
else:
|
||||
server_info = payload
|
||||
if event_type not in event_types:
|
||||
return
|
||||
|
||||
if server_info["tenant_id"] not in self.projects:
|
||||
return
|
||||
|
||||
server = self._makeServer(server_info)
|
||||
flavor = server.getFlavor()
|
||||
|
||||
message = "N/A"
|
||||
if "message" in server_info:
|
||||
message = server_info["message"]
|
||||
|
||||
LOG.debug("Notification INFO (type=%s state=%s): vcpus=%s "
|
||||
"memory=%s prj_id=%s server_id=%s (message=%s)"
|
||||
% (event_type, server.getState(), flavor.getVCPUs(),
|
||||
flavor.getMemory(), server.getProjectId(),
|
||||
server.getId(), message))
|
||||
|
||||
quota = self.projects[server.getProjectId()].getQuota()
|
||||
|
||||
try:
|
||||
quota.release(server)
|
||||
except Exception as ex:
|
||||
LOG.warn("Cannot release server id=%r: %s"
|
||||
% (server.getId(), ex))
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
|
||||
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
state = payload["state"]
|
||||
instance_id = payload["instance_id"]
|
||||
LOG.debug("Notification WARN: event_type=%s state=%s instance_id=%s "
|
||||
"payload=%s" % (event_type, state, instance_id, payload))
|
||||
|
||||
def error(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
server = None
|
||||
message = "N\A"
|
||||
server_info = None
|
||||
|
||||
if event_type == "terminate_instance":
|
||||
server_info = payload["args"]["instance"]
|
||||
message = payload["exception"]["value"]
|
||||
|
||||
elif event_type == "compute.instance.create.error" or\
|
||||
event_type == "compute.instance.update.error":
|
||||
if event_type == "scheduler.run_instance":
|
||||
server_info = payload["request_spec"]["instance_type"]
|
||||
else:
|
||||
server_info = payload
|
||||
message = payload["message"]
|
||||
|
||||
server = self._makeServer(server_info)
|
||||
|
||||
if not server:
|
||||
LOG.info("Notification ERROR: event_type=%s payload=%s"
|
||||
% (event_type, payload))
|
||||
return
|
||||
server_id = server.getId()
|
||||
host = server.getHost()
|
||||
|
||||
if server.getProjectId() not in self.projects:
|
||||
return
|
||||
|
||||
flavor = server.getFlavor()
|
||||
if event_type == "compute.instance.create.end" and \
|
||||
state == "active":
|
||||
LOG.info("the server %s is now active on host %s"
|
||||
% (server_id, host))
|
||||
else:
|
||||
quota = self.projects[server.getProjectId()].getQuota()
|
||||
|
||||
LOG.debug("Notification ERROR (type=%s state=%s): vcpus=%s "
|
||||
"memory=%s prj_id=%s server_id=%s (error=%s)"
|
||||
% (event_type, server.getState(), flavor.getVCPUs(),
|
||||
flavor.getMemory(), server.getProjectId(),
|
||||
server.getId(), message))
|
||||
if event_type == "compute.instance.delete.end" and \
|
||||
state == "deleted":
|
||||
LOG.info("the server %s has been deleted on host %s"
|
||||
% (server_id, host))
|
||||
try:
|
||||
quota.release(server)
|
||||
except Exception as ex:
|
||||
LOG.warn("cannot release server %s "
|
||||
"(reason=%s)" % (server_id, ex))
|
||||
elif state == "error":
|
||||
LOG.info("error occurred on server %s (host %s)"
|
||||
% (server_id, host))
|
||||
|
||||
if not server.getTerminatedAt() and not server.getDeletedAt():
|
||||
try:
|
||||
self.nova_manager.deleteServer(server)
|
||||
except Exception as ex:
|
||||
LOG.error("Cannot delete server id=%r: %s"
|
||||
% (server.getId(), ex))
|
||||
if not server.getTerminatedAt() and not server.getDeletedAt():
|
||||
try:
|
||||
self.nova_manager.deleteServer(server)
|
||||
except Exception as ex:
|
||||
LOG.error("cannot delete server %s: %s"
|
||||
% (server_id, ex))
|
||||
|
||||
quota = self.projects[server.getProjectId()].getQuota()
|
||||
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
LOG.debug("Notification WARN: event_type=%s, payload=%s metadata=%s"
|
||||
% (event_type, payload, metadata))
|
||||
|
||||
try:
|
||||
quota.release(server)
|
||||
except Exception as ex:
|
||||
LOG.warn("Cannot release server id=%r: %s"
|
||||
% (server.getId(), ex))
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
def error(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
LOG.debug("Notification ERROR: event_type=%s, payload=%s metadata=%s"
|
||||
% (event_type, payload, metadata))
|
||||
|
||||
|
||||
class Worker(Thread):
|
||||
@ -190,7 +163,7 @@ class Worker(Thread):
|
||||
self.nova_manager = nova_manager
|
||||
self.keystone_manager = keystone_manager
|
||||
self.exit = False
|
||||
LOG.info("Worker %r created!" % self.name)
|
||||
LOG.info("Worker %s created!" % self.name)
|
||||
|
||||
def getName(self):
|
||||
return self.name
|
||||
@ -205,7 +178,7 @@ class Worker(Thread):
|
||||
raise ex
|
||||
|
||||
def run(self):
|
||||
LOG.info("Worker %r running!" % self.name)
|
||||
LOG.info("Worker %s running!" % self.name)
|
||||
queue_items = []
|
||||
last_release_time = SharedQuota.getLastReleaseTime()
|
||||
|
||||
@ -234,7 +207,7 @@ class Worker(Thread):
|
||||
|
||||
try:
|
||||
request = Request.fromDict(queue_item.getData())
|
||||
|
||||
user_id = request.getUserId()
|
||||
prj_id = request.getProjectId()
|
||||
context = request.getContext()
|
||||
server = request.getServer()
|
||||
@ -243,26 +216,27 @@ class Worker(Thread):
|
||||
|
||||
try:
|
||||
s = self.nova_manager.getServer(server_id, detail=True)
|
||||
|
||||
if s.getState() != "building":
|
||||
# or server["OS-EXT-STS:task_state"] != "scheduling":
|
||||
self.queue.deleteItem(queue_item)
|
||||
continue
|
||||
except Exception as ex:
|
||||
LOG.warn("Worker %s: the server %r is not anymore availa"
|
||||
"ble ! [reason=%s]" % (self.name, server_id, ex))
|
||||
LOG.warn("the server %s is not anymore available!"
|
||||
"(reason=%s)" % (self.name, server_id, ex))
|
||||
self.queue.deleteItem(queue_item)
|
||||
|
||||
continue
|
||||
|
||||
quota = self.projects[prj_id].getQuota()
|
||||
quota = self.projects[prj_id].getQuota()
|
||||
computes = []
|
||||
blocking = False
|
||||
|
||||
if server.isEphemeral() and not SharedQuota.isEnabled():
|
||||
blocking = True
|
||||
|
||||
if quota.allocate(server, blocking=blocking):
|
||||
found = False
|
||||
|
||||
try:
|
||||
km = self.keystone_manager
|
||||
trust = km.getTrust(context["trust_id"])
|
||||
@ -271,33 +245,21 @@ class Worker(Thread):
|
||||
context["auth_token"] = token.getId()
|
||||
context["user_id"] = token.getUser().getId()
|
||||
except Exception as ex:
|
||||
LOG.error("Worker %r: error on getting the token "
|
||||
"for server (id=%r) reason=%s"
|
||||
LOG.error("error on getting the token for server "
|
||||
"%s (reason=%s)"
|
||||
% (self.name, server.getId(), ex))
|
||||
raise ex
|
||||
|
||||
try:
|
||||
computes = self.nova_manager.selectComputes(request)
|
||||
self.nova_manager.buildServer(request)
|
||||
|
||||
LOG.info("building server %s (user_id=%s prj_id=%s quo"
|
||||
"ta=shared)" % (server_id, user_id, prj_id))
|
||||
|
||||
found = True
|
||||
except Exception as ex:
|
||||
LOG.warn("Worker %s: compute not found for server %r!"
|
||||
" [reason=%s]" % (self.name,
|
||||
server.getId(), ex.message))
|
||||
|
||||
found = False
|
||||
|
||||
for compute in computes:
|
||||
try:
|
||||
self.nova_manager.buildServer(request, compute)
|
||||
|
||||
LOG.info("Worker %r: server (id=%r) "
|
||||
"builded!" % (self.name, server.getId()))
|
||||
|
||||
found = True
|
||||
break
|
||||
except Exception as ex:
|
||||
LOG.error("Worker %r: error on building the "
|
||||
"server (id=%r) reason=%s"
|
||||
% (self.name, server.getId(), ex))
|
||||
LOG.error("error on building the server %s (reason=%s)"
|
||||
% (self.name, server.getId(), ex))
|
||||
|
||||
if found:
|
||||
self.queue.deleteItem(queue_item)
|
||||
@ -309,11 +271,11 @@ class Worker(Thread):
|
||||
|
||||
except Exception as ex:
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
LOG.error("Worker %r: %s" % (self.name, ex))
|
||||
LOG.error("Worker %s: %s" % (self.name, ex))
|
||||
|
||||
self.queue.deleteItem(queue_item)
|
||||
|
||||
LOG.info("Worker %r destroyed!" % self.name)
|
||||
LOG.info("Worker %s destroyed!" % self.name)
|
||||
|
||||
|
||||
class SchedulerManager(Manager):
|
||||
@ -404,7 +366,7 @@ class SchedulerManager(Manager):
|
||||
project = self.projects.get(prj_id, None)
|
||||
|
||||
if not project:
|
||||
raise Exception("project (id=%r) not found!" % prj_id)
|
||||
raise Exception("project %s not found!" % prj_id)
|
||||
elif prj_name:
|
||||
for prj in self.projects.values():
|
||||
if prj_name == prj.getName():
|
||||
@ -412,7 +374,7 @@ class SchedulerManager(Manager):
|
||||
break
|
||||
|
||||
if not project:
|
||||
raise Exception("project (name=%r) not found!" % prj_name)
|
||||
raise Exception("project %r not found!" % prj_name)
|
||||
elif not all_users:
|
||||
return self.projects.values()
|
||||
|
||||
@ -438,7 +400,7 @@ class SchedulerManager(Manager):
|
||||
else:
|
||||
domain = self.keystone_manager.getDomains(name="default")
|
||||
if not domain:
|
||||
raise Exception("domain 'default' not found!")
|
||||
raise Exception("domain'default' not found!")
|
||||
|
||||
domain = domain[0]
|
||||
dom_id = domain.getId()
|
||||
@ -543,43 +505,48 @@ class SchedulerManager(Manager):
|
||||
|
||||
project = self.projects[request.getProjectId()]
|
||||
quota = project.getQuota()
|
||||
retry = request.getRetry()
|
||||
num_attempts = 0
|
||||
reason = None
|
||||
|
||||
if retry:
|
||||
num_attempts = retry.get("num_attempts", 0)
|
||||
reason = retry.get("exc_reason", "n/a")
|
||||
|
||||
if 0 < num_attempts < 3:
|
||||
self.nova_manager.buildServer(request)
|
||||
|
||||
LOG.info("retrying to build the server %s (user_id"
|
||||
"=%s prj_id=%s, num_attempts=%s, reason=%s)"
|
||||
% (request.getId(), request.getUserId(),
|
||||
request.getProjectId(), num_attempts, reason))
|
||||
return
|
||||
|
||||
if server.isPermanent():
|
||||
if quota.allocate(server, blocking=False):
|
||||
self.nova_manager.buildServer(request)
|
||||
|
||||
LOG.info("new request: id=%r user_id=%s prj_id=%s "
|
||||
LOG.info("new request: id=%s user_id=%s prj_id=%s "
|
||||
"quota=private" % (request.getId(),
|
||||
request.getUserId(),
|
||||
request.getProjectId()))
|
||||
|
||||
self.nova_manager.buildServer(request)
|
||||
LOG.info("building server %s (user_id=%s prj_id=%s "
|
||||
"quota=private)" % (server.getId(),
|
||||
request.getUserId(),
|
||||
request.getProjectId()))
|
||||
else:
|
||||
self.nova_manager.deleteServer(server)
|
||||
LOG.info("request rejected (quota exceeded): "
|
||||
"id=%r user_id=%s prj_id=%s "
|
||||
"id=%s user_id=%s prj_id=%s "
|
||||
"quota=private" % (request.getId(),
|
||||
request.getUserId(),
|
||||
request.getProjectId()))
|
||||
else:
|
||||
timestamp = request.getCreatedAt()
|
||||
priority = 0
|
||||
retry = request.getRetry()
|
||||
|
||||
if retry:
|
||||
num_attempts = retry["num_attempts"]
|
||||
|
||||
if num_attempts:
|
||||
quota.release(server)
|
||||
|
||||
priority = 99999999
|
||||
LOG.info("released resource uuid %s num attempts"
|
||||
"%s" % (request.getId(), num_attempts))
|
||||
|
||||
if priority == 0:
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user_id=request.getUserId(),
|
||||
prj_id=request.getProjectId(),
|
||||
timestamp=timestamp,
|
||||
retry=0)
|
||||
priority = self.fairshare_manager.calculatePriority(
|
||||
user_id=request.getUserId(),
|
||||
prj_id=request.getProjectId(),
|
||||
timestamp=request.getCreatedAt(),
|
||||
retry=num_attempts)
|
||||
|
||||
context = request.getContext()
|
||||
|
||||
@ -608,19 +575,13 @@ class SchedulerManager(Manager):
|
||||
priority=priority,
|
||||
data=request.toDict())
|
||||
|
||||
LOG.info("new request: id=%r user_id=%s prj_id=%s priority"
|
||||
LOG.info("new request: id=%s user_id=%s prj_id=%s priority"
|
||||
"=%s quota=shared" % (request.getId(),
|
||||
request.getUserId(),
|
||||
request.getProjectId(),
|
||||
priority))
|
||||
else:
|
||||
self.nova_manager.buildServer(request)
|
||||
|
||||
self.nova_manager.setQuotaTypeServer(server)
|
||||
LOG.info("new request: id=%r user_id=%s prj_id=%s "
|
||||
"quota=private" % (request.getId(),
|
||||
request.getUserId(),
|
||||
request.getProjectId()))
|
||||
except Exception as ex:
|
||||
LOG.error("Exception has occured", exc_info=1)
|
||||
LOG.error(ex)
|
||||
|
Loading…
x
Reference in New Issue
Block a user