Merge "Fix off-by-one in sync counter"

This commit is contained in:
Jenkins 2016-01-15 17:19:37 +00:00 committed by Gerrit Code Review
commit 83d46e3bdd

@ -56,12 +56,17 @@ class MultiQueue(object):
for key in priorities:
self.queues[key] = collections.deque()
self.condition = threading.Condition()
self.incomplete = []
def qsize(self):
count = 0
for queue in self.queues.values():
count += len(queue)
return count
self.condition.acquire()
try:
for queue in self.queues.values():
count += len(queue)
return count + len(self.incomplete)
finally:
self.condition.release()
def put(self, item, priority):
added = False
@ -82,6 +87,7 @@ class MultiQueue(object):
for queue in self.queues.values():
try:
ret = queue.popleft()
self.incomplete.append(ret)
return ret
except IndexError:
pass
@ -100,6 +106,14 @@ class MultiQueue(object):
self.condition.release()
return results
def complete(self, item):
self.condition.acquire()
try:
if item in self.incomplete:
self.incomplete.remove(item)
finally:
self.condition.release()
class UpdateEvent(object):
def updateRelatedChanges(self, session, change):
@ -1335,6 +1349,7 @@ class Sync(object):
try:
task.run(self)
task.complete(True)
self.queue.complete(task)
except requests.ConnectionError as e:
self.log.warning("Offline due to: %s" % (e,))
if not self.offline:
@ -1347,6 +1362,7 @@ class Sync(object):
return task
except Exception:
task.complete(False)
self.queue.complete(task)
self.log.exception('Exception running task %s' % (task,))
self.app.status.update(error=True, refresh=False)
self.offline = False